HIVE-20225: SerDe to support Teradata Binary Format (Lu Li via cws)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8b16ad0f Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8b16ad0f Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8b16ad0f Branch: refs/heads/branch-2 Commit: 8b16ad0f54d3bcf56362118f31e25a792ad7291e Parents: bd32deb Author: Daniel Dai <dai...@gmail.com> Authored: Fri Aug 31 12:36:23 2018 -0700 Committer: Daniel Dai <dai...@gmail.com> Committed: Fri Aug 31 12:36:23 2018 -0700 ---------------------------------------------------------------------- .../td_data_with_1mb_rowsize.teradata.gz | Bin 0 -> 616 bytes .../teradata_binary_table.deflate | Bin 0 -> 1329 bytes .../ql/io/TeradataBinaryFileInputFormat.java | 66 ++ .../ql/io/TeradataBinaryFileOutputFormat.java | 112 ++++ .../hive/ql/io/TeradataBinaryRecordReader.java | 280 +++++++++ .../teradata/TeradataBinaryDataInputStream.java | 200 +++++++ .../TeradataBinaryDataOutputStream.java | 270 +++++++++ .../serde2/teradata/TeradataBinarySerde.java | 597 +++++++++++++++++++ .../TestTeradataBinarySerdeForDate.java | 72 +++ .../TestTeradataBinarySerdeForDecimal.java | 106 ++++ .../TestTeradataBinarySerdeForTimeStamp.java | 96 +++ .../TestTeradataBinarySerdeGeneral.java | 126 ++++ .../clientpositive/test_teradatabinaryfile.q | 123 ++++ .../test_teradatabinaryfile.q.out | 537 +++++++++++++++++ 14 files changed, 2585 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/8b16ad0f/data/files/teradata_binary_file/td_data_with_1mb_rowsize.teradata.gz ---------------------------------------------------------------------- diff --git a/data/files/teradata_binary_file/td_data_with_1mb_rowsize.teradata.gz b/data/files/teradata_binary_file/td_data_with_1mb_rowsize.teradata.gz new file mode 100644 index 0000000..7319e3c Binary files /dev/null and b/data/files/teradata_binary_file/td_data_with_1mb_rowsize.teradata.gz differ http://git-wip-us.apache.org/repos/asf/hive/blob/8b16ad0f/data/files/teradata_binary_file/teradata_binary_table.deflate ---------------------------------------------------------------------- diff --git a/data/files/teradata_binary_file/teradata_binary_table.deflate b/data/files/teradata_binary_file/teradata_binary_table.deflate new file mode 100644 index 0000000..fd53dde Binary files /dev/null and b/data/files/teradata_binary_file/teradata_binary_table.deflate differ http://git-wip-us.apache.org/repos/asf/hive/blob/8b16ad0f/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryFileInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryFileInputFormat.java b/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryFileInputFormat.java new file mode 100644 index 0000000..bed87c5 --- /dev/null +++ b/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryFileInputFormat.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +/** + * https://cwiki.apache.org/confluence/display/Hive/TeradataBinarySerde. + * FileInputFormat for Teradata binary files. + * + * In the Teradata Binary File, each record constructs as below: + * The first 2 bytes represents the length of the bytes next for this record. + * Then the null bitmap whose length is depended on the number of fields is followed. + * Then each field of the record is serialized into bytes - the serialization strategy is decided by the type of field. + * At last, there is one byte (0x0a) in the end of the record. + * + * This InputFormat currently doesn't support the split of the file. + * Teradata binary files are using little endian. + */ +public class TeradataBinaryFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> { + + @Override public RecordReader<NullWritable, BytesWritable> getRecordReader(InputSplit split, JobConf job, + Reporter reporter) throws IOException { + reporter.setStatus(split.toString()); + return new TeradataBinaryRecordReader(job, (FileSplit) split); + } + + /** + * the <code>TeradataBinaryFileInputFormat</code> is not splittable right now. + * Override the <code>isSplitable</code> function. + * + * @param fs the file system that the file is on + * @param filename the file name to check + * @return is this file splitable? + */ + @Override protected boolean isSplitable(FileSystem fs, Path filename) { + return false; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/8b16ad0f/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryFileOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryFileOutputFormat.java b/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryFileOutputFormat.java new file mode 100644 index 0000000..bd7718e --- /dev/null +++ b/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryFileOutputFormat.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Properties; + +import org.apache.commons.io.EndianUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.util.Progressable; + +import static java.lang.String.format; + +/** + * https://cwiki.apache.org/confluence/display/Hive/TeradataBinarySerde. + * FileOutputFormat for Teradata binary files. + * + * In the Teradata Binary File, each record constructs as below: + * The first 2 bytes represents the length of the bytes next for this record (null bitmap and fields). + * Then the null bitmap whose length is depended on the number of fields is followe. + * Then each field of the record is serialized into bytes - the serialization strategy is decided by the type of field. + * At last, there is one byte (0x0a) in the end of the record. + * + * Teradata binary files are using little endian. + */ +public class TeradataBinaryFileOutputFormat<K extends WritableComparable, V extends Writable> + extends HiveIgnoreKeyTextOutputFormat<K, V> { + private static final Log LOG = LogFactory.getLog(TeradataBinaryFileOutputFormat.class); + + static final byte RECORD_END_BYTE = (byte) 0x0a; + + /** + * create the final out file, and output row by row. After one row is + * appended, a configured row separator is appended + * + * @param jc + * the job configuration file + * @param outPath + * the final output file to be created + * @param valueClass + * the value class used for create + * @param isCompressed + * whether the content is compressed or not + * @param tableProperties + * the tableProperties of this file's corresponding table + * @param progress + * progress used for status report + * @return the RecordWriter + */ + @Override public RecordWriter getHiveRecordWriter(JobConf jc, Path outPath, Class<? extends Writable> valueClass, + boolean isCompressed, final Properties tableProperties, Progressable progress) throws IOException { + FileSystem fs = outPath.getFileSystem(jc); + final OutputStream outStream = Utilities.createCompressedStream(jc, fs.create(outPath, progress), isCompressed); + return new RecordWriter() { + @Override public void write(Writable r) throws IOException { + BytesWritable bw = (BytesWritable) r; + int recordLength = bw.getLength(); + + //Based on the row length to decide if the length is int or short + String rowLength = tableProperties + .getProperty(TeradataBinaryRecordReader.TD_ROW_LENGTH, TeradataBinaryRecordReader.DEFAULT_TD_ROW_LENGTH) + .toLowerCase(); + LOG.debug(format("The table property %s is: %s", TeradataBinaryRecordReader.TD_ROW_LENGTH, rowLength)); + + if (TeradataBinaryRecordReader.TD_ROW_LENGTH_TO_BYTE_NUM.containsKey(rowLength)) { + if (rowLength.equals(TeradataBinaryRecordReader.DEFAULT_TD_ROW_LENGTH)) { + EndianUtils.writeSwappedShort(outStream, (short) recordLength); // write the length using little endian + } else if (rowLength.equals(TeradataBinaryRecordReader.TD_ROW_LENGTH_1MB)) { + EndianUtils.writeSwappedInteger(outStream, recordLength); // write the length using little endian + } + } else { + throw new IllegalArgumentException(format("%s doesn't support the value %s, the supported values are %s", + TeradataBinaryRecordReader.TD_ROW_LENGTH, rowLength, + TeradataBinaryRecordReader.TD_ROW_LENGTH_TO_BYTE_NUM.keySet())); + } + + outStream.write(bw.getBytes(), 0, bw.getLength()); // write the content (the content is in little endian) + outStream.write(RECORD_END_BYTE); //write the record ending + } + + @Override public void close(boolean abort) throws IOException { + outStream.close(); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/8b16ad0f/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryRecordReader.java b/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryRecordReader.java new file mode 100644 index 0000000..337b5d2 --- /dev/null +++ b/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryRecordReader.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; + +import com.google.common.collect.ImmutableMap; +import org.apache.commons.codec.binary.Hex; +import org.apache.commons.io.EndianUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; + +import static java.lang.String.format; + +/** + * The TeradataBinaryRecordReader reads the record from Teradata binary files. + * + * In the Teradata Binary File, each record constructs as below: + * The first 2 bytes represents the length of the bytes next for this record. + * Then the null bitmap whose length is depended on the number of fields is followed. + * Then each field of the record is serialized into bytes - the serialization strategy is decided by the type of field. + * At last, there is one byte (0x0a) in the end of the record. + * + * This InputFormat currently doesn't support the split of the file. + * Teradata binary files are using little endian. + */ +public class TeradataBinaryRecordReader implements RecordReader<NullWritable, BytesWritable> { + + private static final Log LOG = LogFactory.getLog(TeradataBinaryRecordReader.class); + + private CompressionCodecFactory compressionCodecs = null; + private InputStream in; + private long start; + private long pos; + private long end; + private final Seekable filePosition; + private CompressionCodec codec; + + static final String TD_ROW_LENGTH = "teradata.row.length"; + static final Map<String, Integer> TD_ROW_LENGTH_TO_BYTE_NUM = ImmutableMap.of("64kb", 2, "1mb", 4); + static final String DEFAULT_TD_ROW_LENGTH = "64kb"; + static final String TD_ROW_LENGTH_1MB = "1mb"; + + private byte[] recordLengthBytes; + private byte[] valueByteArray = new byte[65536]; // max byte array + private byte[] endOfRecord = new byte[1]; + + private int recordLength = 0; + + public TeradataBinaryRecordReader(JobConf job, FileSplit fileSplit) throws IOException { + LOG.debug("initialize the TeradataBinaryRecordReader"); + + String rowLength = job.get(TD_ROW_LENGTH); + if (rowLength == null) { + LOG.debug("No table property in JobConf. Try to recover the table directly"); + Map<String, PartitionDesc> partitionDescMap = Utilities.getMapRedWork(job).getMapWork().getAliasToPartnInfo(); + for (String alias : Utilities.getMapRedWork(job).getMapWork().getAliasToPartnInfo().keySet()) { + LOG.debug(format("the current alias: %s", alias)); + rowLength = partitionDescMap.get(alias).getTableDesc().getProperties().getProperty(TD_ROW_LENGTH); + if (rowLength != null) { + break; + } + } + } + + if (rowLength == null) { + rowLength = DEFAULT_TD_ROW_LENGTH; + } else { + rowLength = rowLength.toLowerCase(); + } + + if (TD_ROW_LENGTH_TO_BYTE_NUM.containsKey(rowLength)) { + recordLengthBytes = new byte[TD_ROW_LENGTH_TO_BYTE_NUM.get(rowLength)]; + } else { + throw new IllegalArgumentException( + format("%s doesn't support the value %s, the supported values are %s", TD_ROW_LENGTH, rowLength, + TD_ROW_LENGTH_TO_BYTE_NUM.keySet())); + } + + start = fileSplit.getStart(); + end = start + fileSplit.getLength(); + + LOG.debug(format("The start of the file split is: %s", start)); + LOG.debug(format("The end of the file split is: %s", end)); + + final Path file = fileSplit.getPath(); + compressionCodecs = new CompressionCodecFactory(job); + codec = compressionCodecs.getCodec(file); + FileSystem fs = file.getFileSystem(job); + FSDataInputStream fileIn = fs.open(fileSplit.getPath()); + + /* currently the TeradataBinaryRecord file doesn't support file split at all */ + filePosition = fileIn; + if (isCompressedInput()) { + LOG.info(format("Input file is compressed. Using compression code %s", codec.getClass().getName())); + in = codec.createInputStream(fileIn); + } else { + LOG.info("The input file is not compressed"); + in = fileIn; + } + pos = start; + } + + /** + * Reads the next key/value pair from the input for processing. + * + * @param key the key to read data into + * @param value the value to read data into + * @return true iff a key/value was read, false if at EOF + */ + @Override public synchronized boolean next(NullWritable key, BytesWritable value) throws IOException { + + /* read the record length */ + int lengthExpected = recordLengthBytes.length; + int hasConsumed = readExpectedBytes(recordLengthBytes, lengthExpected); + if (hasConsumed == 0) { + LOG.info("Reach the End of File. No more record"); + return false; + } else if (hasConsumed < lengthExpected) { + LOG.error( + format("We expect %s bytes for the record length but read %d byte and reach the End of File.", lengthExpected, + hasConsumed)); + LOG.error(format("The current position in the file : %s", getFilePosition())); + LOG.error(format("The current consumed bytes: %s", pos)); + LOG.error(format("The bytes for the current record is: %s", Hex.encodeHexString(recordLengthBytes))); + throw new EOFException("When reading the record length, reach the unexpected end of file."); + } + /* get the record contend length to prepare to read the content */ + recordLength = EndianUtils.readSwappedUnsignedShort(recordLengthBytes, 0); + pos += lengthExpected; + + /* read the record content */ + lengthExpected = recordLength; + hasConsumed = readExpectedBytes(valueByteArray, lengthExpected); + if (hasConsumed < lengthExpected) { + LOG.error(format("We expect %s bytes for the record content but read %d byte and reach the End of File.", + lengthExpected, hasConsumed)); + LOG.error(format("The current position in the file : %s", getFilePosition())); + LOG.error(format("The current consumed bytes: %s", pos)); + LOG.error(format("The bytes for the current record is: %s", + Hex.encodeHexString(recordLengthBytes) + Hex.encodeHexString(valueByteArray))); + throw new EOFException("When reading the contend of the record, reach the unexpected end of file."); + } + value.set(valueByteArray, 0, recordLength); + pos += lengthExpected; + + /* read the record end */ + lengthExpected = endOfRecord.length; + hasConsumed = readExpectedBytes(endOfRecord, lengthExpected); + if (hasConsumed < lengthExpected) { + LOG.error(format("We expect %s bytes for the record end symbol but read %d byte and reach the End of File.", + lengthExpected, hasConsumed)); + LOG.error(format("The current position in the file : %s", getFilePosition())); + LOG.error(format("The current consumed bytes: %s", pos)); + LOG.error(format("The bytes for the current record is: %s", + Hex.encodeHexString(recordLengthBytes) + Hex.encodeHexString(valueByteArray) + Hex + .encodeHexString(endOfRecord))); + throw new EOFException("When reading the end of record, reach the unexpected end of file."); + } + + if (endOfRecord[0] != TeradataBinaryFileOutputFormat.RECORD_END_BYTE) { + throw new IOException(format("We expect 0x0a as the record end but get %s.", Hex.encodeHexString(endOfRecord))); + } + pos += lengthExpected; + + return true; + } + + /** + * Create an object of the appropriate type to be used as a key. + * + * @return a new key object. + */ + @Override public NullWritable createKey() { + return NullWritable.get(); + } + + /** + * Create an object of the appropriate type to be used as a value. + * + * @return a new value object. + */ + @Override public BytesWritable createValue() { + return new BytesWritable(); + } + + /** + * Returns the current position in the input. + * + * @return the current position in the input. + * @throws IOException + */ + @Override public long getPos() throws IOException { + return pos; + } + + /** + * + * @throws IOException + */ + @Override public void close() throws IOException { + if (in != null) { + in.close(); + } + } + + /** + * How much of the input has the {@link RecordReader} consumed i.e. + * has been processed by? + * + * @return progress from <code>0.0</code> to <code>1.0</code>. + * @throws IOException + */ + @Override public float getProgress() throws IOException { + if (start == end) { + return 0.0F; + } else { + return Math.min(1.0F, (float) (getFilePosition() - start) / (float) (end - start)); + } + } + + private boolean isCompressedInput() { + return codec != null; + } + + private synchronized long getFilePosition() throws IOException { + long retVal; + if (isCompressedInput() && filePosition != null) { + retVal = filePosition.getPos(); + } else { + retVal = getPos(); + } + return retVal; + } + + private synchronized int readExpectedBytes(byte[] toWrite, int lengthExpected) throws IOException { + int curPos = 0; + do { + int numOfByteRead = in.read(toWrite, curPos, lengthExpected - curPos); + if (numOfByteRead < 0) { + return curPos; + } else { + curPos += numOfByteRead; + } + } while (curPos < lengthExpected); + return curPos; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/8b16ad0f/src/java/org/apache/hadoop/hive/serde2/teradata/TeradataBinaryDataInputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/hadoop/hive/serde2/teradata/TeradataBinaryDataInputStream.java b/src/java/org/apache/hadoop/hive/serde2/teradata/TeradataBinaryDataInputStream.java new file mode 100644 index 0000000..230b3d5 --- /dev/null +++ b/src/java/org/apache/hadoop/hive/serde2/teradata/TeradataBinaryDataInputStream.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.teradata; + +import org.apache.commons.io.input.SwappedDataInputStream; +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.common.type.HiveDecimal; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.math.BigInteger; +import java.sql.Date; +import java.sql.Timestamp; +import java.text.ParseException; +import java.text.SimpleDateFormat; + +import static java.lang.String.format; + +/** + * The TeradataBinaryDataInputStream is used to handle the Teradata binary format input for record. + * Since the TD binary format uses little-endian to handle the SHORT, INT, LONG, DOUBLE and etc. + * while the Hadoop uses big-endian, + * We extend SwappedDataInputStream to handle these types and extend to handle the Teradata + * specific types like VARCHAR, CHAR, TIMESTAMP, DATE... + */ +public class TeradataBinaryDataInputStream extends SwappedDataInputStream { + + private static final int DATE_STRING_LENGTH = 8; + private final SimpleDateFormat dateFormat; + + /** + * Instantiates a new Teradata binary data input stream. + * + * @param input the input + */ + public TeradataBinaryDataInputStream(InputStream input) { + super(input); + dateFormat = new SimpleDateFormat("yyyyMMdd"); + } + + /** + * Read VARCHAR(N). + * The representation of Varchar in Teradata binary format is: + * the first two bytes represent the length N of this varchar field, + * the next N bytes represent the content of this varchar field. + * To pad the null varchar, the length will be 0 and the content will be none. + * + * @return the string + * @throws IOException the io exception + */ + public String readVarchar() throws IOException { + int varcharLength = readUnsignedShort(); + byte[] varcharContent = new byte[varcharLength]; + int numOfBytesRead = in.read(varcharContent); + if (varcharContent.length != 0 && numOfBytesRead != varcharLength) { + throw new EOFException( + format("Fail to read the varchar. Expect %d bytes, get %d bytes", varcharLength, numOfBytesRead)); + } + //force it to be UTF8 string + return new String(varcharContent, "UTF8"); + } + + /** + * Read TIMESTAMP(P). + * The representation of timestamp in Teradata binary format is: + * the byte number to read is based on the precision of timestamp, + * each byte represents one char and the timestamp is using string representation, + * eg: for TIMESTAMP(6), we need to read 26 bytes + * 31 39 31 31 2d 31 31 2d 31 31 20 31 39 3a 32 30 3a 32 31 2e 34 33 33 32 30 30 + * will represent 1911-11-11 19:20:21.433200. + * the null timestamp will use space to pad. + * + * @param byteNum the byte number that will be read from inputstream + * @return the timestamp + * @throws IOException the io exception + */ + public Timestamp readTimestamp(Integer byteNum) throws IOException { + // yyyy-mm-dd hh:mm:ss + byte[] timestampContent = new byte[byteNum]; + int numOfBytesRead = in.read(timestampContent); + if (timestampContent.length != 0 && numOfBytesRead != byteNum) { + throw new EOFException( + format("Fail to read the timestamp. Expect %d bytes, get %d bytes", byteNum, numOfBytesRead)); + } + String timestampStr = new String(timestampContent, "UTF8"); + if (timestampStr.trim().length() == 0) { + return null; + } else { + return Timestamp.valueOf(timestampStr); + } + } + + /** + * Read DATE. + * The representation of date in Teradata binary format is: + * The Date D is a int with 4 bytes using little endian, + * The representation is (D+19000000).ToString -> YYYYMMDD, + * eg: Date 07 b2 01 00 -> 111111 in little endian -> 19111111 - > 1911.11.11. + * the null date will use 0 to pad. + * + * @return the date + * @throws IOException the io exception + * @throws ParseException the parse exception + */ + public Date readDate() throws IOException, ParseException { + int di = readInt(); + if (di == 0) { + return null; + } else { + String dateString = String.valueOf(di + 19000000); + if (dateString.length() < DATE_STRING_LENGTH) { + dateString = StringUtils.leftPad(dateString, DATE_STRING_LENGTH, '0'); + } + return new Date(dateFormat.parse(dateString).getTime()); + } + } + + /** + * Read CHAR(N). + * The representation of char in Teradata binary format is + * the byte number to read is based on the [charLength] * [bytePerChar] <- totalLength, + * bytePerChar is decided by the charset: LATAIN charset is 2 bytes per char and UNICODE charset is 3 bytes per char. + * the null char will use space to pad. + * + * @param totalLength the total length + * @return the string + * @throws IOException the io exception + */ + public String readChar(int totalLength) throws IOException { + byte[] charContent = new byte[totalLength]; + int numOfBytesRead = in.read(charContent); + if (charContent.length != 0 && numOfBytesRead != totalLength) { + throw new EOFException( + format("Fail to read the varchar. Expect %d bytes, get %d bytes", totalLength, numOfBytesRead)); + } + return new String(charContent, "UTF8"); + } + + /** + * Read DECIMAL(P, S). + * The representation of decimal in Teradata binary format is + * the byte number to read is decided solely by the precision(P), + * HiveDecimal is constructed through the byte array and scale. + * the null DECIMAL will use 0x00 to pad. + * + * @param scale the scale + * @param byteNum the byte num + * @return the hive decimal + * @throws IOException the io exception + */ + public HiveDecimal readDecimal(int scale, int byteNum) throws IOException { + byte[] decimalContent = new byte[byteNum]; + int numOfBytesRead = in.read(decimalContent); + if (decimalContent.length != 0 && numOfBytesRead != byteNum) { + throw new EOFException( + format("Fail to read the decimal. Expect %d bytes, get %d bytes", byteNum, numOfBytesRead)); + } + ArrayUtils.reverse(decimalContent); + return HiveDecimal.create(new BigInteger(decimalContent), scale); + } + + /** + * Read VARBYTE(N). + * The representation of VARBYTE in Teradata binary format is: + * the first two bytes represent the length N of this varchar field + * the next N bytes represent the content of this varchar field. + * To pad the null varbyte, the length will be 0 and the content will be none. + * + * @return the byte [ ] + * @throws IOException the io exception + */ + public byte[] readVarbyte() throws IOException { + int varbyteLength = readUnsignedShort(); + byte[] varbyteContent = new byte[varbyteLength]; + int numOfBytesRead = in.read(varbyteContent); + if (varbyteContent.length != 0 && numOfBytesRead != varbyteLength) { + throw new EOFException( + format("Fail to read the varbyte. Expect %d bytes, get %d bytes", varbyteLength, numOfBytesRead)); + } + return varbyteContent; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/8b16ad0f/src/java/org/apache/hadoop/hive/serde2/teradata/TeradataBinaryDataOutputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/hadoop/hive/serde2/teradata/TeradataBinaryDataOutputStream.java b/src/java/org/apache/hadoop/hive/serde2/teradata/TeradataBinaryDataOutputStream.java new file mode 100644 index 0000000..9bca182 --- /dev/null +++ b/src/java/org/apache/hadoop/hive/serde2/teradata/TeradataBinaryDataOutputStream.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.teradata; + +import org.apache.commons.io.EndianUtils; +import org.apache.commons.io.output.ByteArrayOutputStream; +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.io.HiveCharWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; + +import java.io.IOException; +import java.math.BigInteger; +import java.text.SimpleDateFormat; +import java.util.Arrays; + +import static java.lang.String.format; + +/** + * The TeradataBinaryDataOutputStream is used to produce the output in compliance with the Teradata binary format, + * so the output can be directly used to load into Teradata DB using TPT fastload. + * Since the TD binary format uses little-endian to handle the SHORT, INT, LONG, DOUBLE and etc. + * while the Hadoop uses big-endian, + * We extend SwappedDataInputStream to return qualified bytes for these types and extend to handle the Teradata + * specific types like VARCHAR, CHAR, TIMESTAMP, DATE... + */ +public class TeradataBinaryDataOutputStream extends ByteArrayOutputStream { + + private static final Log LOG = LogFactory.getLog(TeradataBinaryDataOutputStream.class); + + private static final int TIMESTAMP_NO_NANOS_BYTE_NUM = 19; + private final SimpleDateFormat dateFormat; + + public TeradataBinaryDataOutputStream() { + dateFormat = new SimpleDateFormat("yyyyMMdd"); + } + + /** + * Write VARCHAR(N). + * The representation of Varchar in Teradata binary format is: + * the first two bytes represent the length N of this varchar field, + * the next N bytes represent the content of this varchar field. + * To pad the null varchar, the length will be 0 and the content will be none. + * + * @param writable the writable + * @throws IOException the io exception + */ + public void writeVarChar(HiveVarcharWritable writable) throws IOException { + if (writable == null) { + EndianUtils.writeSwappedShort(this, (short) 0); + return; + } + Text t = writable.getTextValue(); + int varcharLength = t.getLength(); + EndianUtils.writeSwappedShort(this, (short) varcharLength); // write the varchar length + write(t.getBytes(), 0, varcharLength); // write the varchar content + } + + /** + * Write INT. + * using little-endian to write integer. + * + * @param i the + * @throws IOException the io exception + */ + public void writeInt(int i) throws IOException { + EndianUtils.writeSwappedInteger(this, i); + } + + /** + * Write TIMESTAMP(N). + * The representation of timestamp in Teradata binary format is: + * the byte number to read is based on the precision of timestamp, + * each byte represents one char and the timestamp is using string representation, + * eg: for 1911-11-11 19:20:21.433200 in TIMESTAMP(3), we will cut it to be 1911-11-11 19:20:21.433 and write + * 31 39 31 31 2d 31 31 2d 31 31 20 31 39 3a 32 30 3a 32 31 2e 34 33 33. + * the null timestamp will use space to pad. + * + * @param timestamp the timestamp + * @param byteNum the byte number the timestamp will write + * @throws IOException the io exception + */ + public void writeTimestamp(TimestampWritable timestamp, int byteNum) throws IOException { + if (timestamp == null) { + String pad = StringUtils.repeat(' ', byteNum); + write(pad.getBytes("UTF8")); + } else { + String sTimeStamp = timestamp.getTimestamp().toString(); + if (sTimeStamp.length() >= byteNum) { + write(sTimeStamp.substring(0, byteNum).getBytes("UTF8")); + } else { + write(sTimeStamp.getBytes("UTF8")); + String pad; + if (sTimeStamp.length() == TIMESTAMP_NO_NANOS_BYTE_NUM) { + pad = "." + StringUtils.repeat('0', byteNum - sTimeStamp.length() - 1); + } else { + pad = StringUtils.repeat('0', byteNum - sTimeStamp.length()); + } + write(pad.getBytes("UTF8")); + } + } + } + + /** + * Write DOUBLE. + * using little-endian to write double. + * + * @param d the d + * @throws IOException the io exception + */ + public void writeDouble(double d) throws IOException { + EndianUtils.writeSwappedDouble(this, d); + } + + /** + * Write DATE. + * The representation of date in Teradata binary format is: + * The Date D is a int with 4 bytes using little endian. + * The representation is (YYYYMMDD - 19000000).toInt -> D + * eg. 1911.11.11 -> 19111111 -> 111111 -> 07 b2 01 00 in little endian. + * the null date will use 0 to pad. + * + * @param date the date + * @throws IOException the io exception + */ + public void writeDate(DateWritable date) throws IOException { + if (date == null) { + EndianUtils.writeSwappedInteger(this, 0); + } else { + EndianUtils.writeSwappedInteger(this, Integer.parseInt(dateFormat.format(date.get())) - 19000000); + } + } + + /** + * Write LONG. + * using little-endian to write double. + * + * @param l the l + * @throws IOException the io exception + */ + public void writeLong(long l) throws IOException { + EndianUtils.writeSwappedLong(this, l); + } + + /** + * Write CHAR(N). + * The representation of char in Teradata binary format is: + * the byte number to read is based on the [charLength] * [bytePerChar] <- totalLength, + * bytePerChar is decided by the charset: LATAIN charset is 2 bytes per char and UNICODE charset is 3 bytes per char. + * the null char will use space to pad. + * + * @param writable the writable + * @param length the byte n + * @throws IOException the io exception + */ + public void writeChar(HiveCharWritable writable, int length) throws IOException { + if (writable == null) { + String pad = StringUtils.repeat(' ', length); + write(pad.getBytes("UTF8")); + return; + } + Text t = writable.getStrippedValue(); + int contentLength = t.getLength(); + write(t.getBytes(), 0, contentLength); + if (length - contentLength < 0) { + throw new IOException(format("The byte num %s of HiveCharWritable is more than the byte num %s we can hold. " + + "The content of HiveCharWritable is %s", contentLength, length, writable.getPaddedValue())); + } + if (length > contentLength) { + String pad = StringUtils.repeat(' ', length - contentLength); + write(pad.getBytes("UTF8")); + } + } + + /** + * Write DECIMAL(P, S). + * The representation of decimal in Teradata binary format is: + * the byte number to read is decided solely by the precision(P), + * HiveDecimal is constructed through the byte array and scale. + * the rest of byte will use 0x00 to pad (positive) and use 0xFF to pad (negative). + * the null DECIMAL will use 0x00 to pad. + * + * @param writable the writable + * @param byteNum the byte num + * @throws IOException the io exception + */ + public void writeDecimal(HiveDecimalWritable writable, int byteNum, int scale) throws IOException { + if (writable == null) { + byte[] pad = new byte[byteNum]; + write(pad); + return; + } + // since the HiveDecimal will auto adjust the scale to save resource + // we need to adjust it back otherwise the output bytes will be wrong + int hiveScale = writable.getHiveDecimal().scale(); + BigInteger bigInteger = writable.getHiveDecimal().unscaledValue(); + if (hiveScale < scale) { + BigInteger multiplicand = new BigInteger("1" + StringUtils.repeat('0', scale - hiveScale)); + bigInteger = bigInteger.multiply(multiplicand); + } + byte[] content = bigInteger.toByteArray(); + int signBit = content[0] >> 7 & 1; + ArrayUtils.reverse(content); + write(content); + if (byteNum > content.length) { + byte[] pad; + if (signBit == 0) { + pad = new byte[byteNum - content.length]; + } else { + pad = new byte[byteNum - content.length]; + Arrays.fill(pad, (byte) 255); + } + write(pad); + } + } + + /** + * Write SHORT. + * using little-endian to write short. + * + * @param s the s + * @throws IOException the io exception + */ + public void writeShort(short s) throws IOException { + EndianUtils.writeSwappedShort(this, s); + } + + /** + * Write VARBYTE(N). + * The representation of VARBYTE in Teradata binary format is: + * the first two bytes represent the length N of this varchar field, + * the next N bytes represent the content of this varchar field. + * To pad the null varbyte, the length will be 0 and the content will be none. + * + * @param writable the writable + * @throws IOException the io exception + */ + public void writeVarByte(BytesWritable writable) throws IOException { + if (writable == null) { + EndianUtils.writeSwappedShort(this, (short) 0); + return; + } + int varbyteLength = writable.getLength(); + EndianUtils.writeSwappedShort(this, (short) varbyteLength); // write the varbyte length + write(writable.getBytes(), 0, varbyteLength); // write the varchar content + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/8b16ad0f/src/java/org/apache/hadoop/hive/serde2/teradata/TeradataBinarySerde.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/hadoop/hive/serde2/teradata/TeradataBinarySerde.java b/src/java/org/apache/hadoop/hive/serde2/teradata/TeradataBinarySerde.java new file mode 100644 index 0000000..2254e88 --- /dev/null +++ b/src/java/org/apache/hadoop/hive/serde2/teradata/TeradataBinarySerde.java @@ -0,0 +1,597 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.teradata; + +import com.google.common.collect.ImmutableMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.serde2.io.ByteWritable; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.HiveCharWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable; +import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeSpec; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveCharObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; + +import javax.annotation.Nullable; +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.sql.Date; +import java.sql.Timestamp; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static java.lang.String.format; + +/** + * https://cwiki.apache.org/confluence/display/Hive/TeradataBinarySerde. + * TeradataBinarySerde handles the serialization and deserialization of Teradata Binary Record + * passed from TeradataBinaryRecordReader. + * + * The Teradata Binary Record uses little-endian to handle the SHORT, INT, LONG, DOUBLE... + * We extend SwappedDataInputStream to handle these types and extend to handle the Teradata + * specific types like VARCHAR, CHAR, TIMESTAMP, DATE... + * + * Currently we support 11 Teradata data types: VARCHAR ,INTEGER, TIMESTAMP, FLOAT, DATE, + * BYTEINT, BIGINT, CHARACTER, DECIMAL, SMALLINT, VARBYTE. + * The mapping between Teradata data type and Hive data type is + * Teradata Data Type: Hive Data Type + * VARCHAR: VARCHAR, + * INTEGER: INT, + * TIMESTAMP: TIMESTAMP, + * FLOAT: DOUBLE, + * DATE: DATE, + * BYTEINT: TINYINTÂ , + * BIGINT: BIGINT, + * CHAR: CHAR, + * DECIMAL: DECIMAL, + * SMALLINT: SMALLINT, + * VARBYTE: BINARY. + * + * TeradataBinarySerde currently doesn't support complex types like MAP, ARRAY and STRUCT. + */ +@SerDeSpec(schemaProps = { serdeConstants.LIST_COLUMNS, + serdeConstants.LIST_COLUMN_TYPES }) public class TeradataBinarySerde extends AbstractSerDe { + private static final Log LOG = LogFactory.getLog(TeradataBinarySerde.class); + + public static final String TD_SCHEMA_LITERAL = "teradata.schema.literal"; + + private StructObjectInspector rowOI; + private ArrayList<Object> row; + private byte[] inForNull; + + private int numCols; + private List<String> columnNames; + private List<TypeInfo> columnTypes; + + private TeradataBinaryDataOutputStream out; + private BytesWritable serializeBytesWritable; + private byte[] outForNull; + + public static final String TD_TIMESTAMP_PRECISION = "teradata.timestamp.precision"; + private int timestampPrecision; + private static final int DEFAULT_TIMESTAMP_BYTE_NUM = 19; + private static final String DEFAULT_TIMESTAMP_PRECISION = "6"; + + public static final String TD_CHAR_SET = "teradata.char.charset"; + private String charCharset; + private static final String DEFAULT_CHAR_CHARSET = "UNICODE"; + private static final Map<String, Integer> CHARSET_TO_BYTE_NUM = ImmutableMap.of("LATIN", 2, "UNICODE", 3); + + /** + * Initialize the HiveSerializer. + * + * @param conf + * System properties. Can be null in compile time + * @param tbl + * table properties + * @throws SerDeException + */ + @Override public void initialize(@Nullable Configuration conf, Properties tbl) throws SerDeException { + columnNames = Arrays.asList(tbl.getProperty(serdeConstants.LIST_COLUMNS).split(",")); + + String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); + LOG.debug(serdeConstants.LIST_COLUMN_TYPES + ": " + columnTypeProperty); + if (columnTypeProperty.length() == 0) { + columnTypes = new ArrayList<TypeInfo>(); + } else { + columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); + } + + assert columnNames.size() == columnTypes.size(); + numCols = columnNames.size(); + + // get the configured teradata timestamp precision + // you can configure to generate timestamp of different precision in the binary file generated by TPT/BTEQ + timestampPrecision = Integer.parseInt(tbl.getProperty(TD_TIMESTAMP_PRECISION, DEFAULT_TIMESTAMP_PRECISION)); + + // get the configured teradata char charset + // in TD, latin charset will have 2 bytes per char and unicode will have 3 bytes per char + charCharset = tbl.getProperty(TD_CHAR_SET, DEFAULT_CHAR_CHARSET); + if (!CHARSET_TO_BYTE_NUM.containsKey(charCharset)) { + throw new SerDeException( + format("%s isn't supported in Teradata Char Charset %s", charCharset, CHARSET_TO_BYTE_NUM.keySet())); + } + + // All columns have to be primitive. + // Constructing the row ObjectInspector: + List<ObjectInspector> columnOIs = new ArrayList<ObjectInspector>(numCols); + for (int i = 0; i < numCols; i++) { + if (columnTypes.get(i).getCategory() != ObjectInspector.Category.PRIMITIVE) { + throw new SerDeException( + getClass().getName() + " only accepts primitive columns, but column[" + i + "] named " + columnNames.get(i) + + " has category " + columnTypes.get(i).getCategory()); + } + columnOIs.add(TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(columnTypes.get(i))); + } + + rowOI = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, columnOIs); + + // Constructing the row object and will be reused for all rows + row = new ArrayList<Object>(numCols); + for (int i = 0; i < numCols; i++) { + row.add(null); + } + + // Initialize vars related to Null Array which represents the null bitmap + int byteNumForNullArray = (numCols / 8) + ((numCols % 8 == 0) ? 0 : 1); + LOG.debug(format("The Null Bytes for each record will have %s bytes", byteNumForNullArray)); + inForNull = new byte[byteNumForNullArray]; + + out = new TeradataBinaryDataOutputStream(); + serializeBytesWritable = new BytesWritable(); + outForNull = new byte[byteNumForNullArray]; + } + + /** + * Returns the Writable class that would be returned by the serialize method. + * This is used to initialize SequenceFile header. + */ + @Override public Class<? extends Writable> getSerializedClass() { + return ByteWritable.class; + } + + /** + * Serialize an object by navigating inside the Object with the + * ObjectInspector. In most cases, the return value of this function will be + * constant since the function will reuse the Writable object. If the client + * wants to keep a copy of the Writable, the client needs to clone the + * returned value. + + * @param obj + * @param objInspector + */ + @Override public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { + try { + out.reset(); + final StructObjectInspector outputRowOI = (StructObjectInspector) objInspector; + final List<? extends StructField> fieldRefs = outputRowOI.getAllStructFieldRefs(); + + if (fieldRefs.size() != numCols) { + throw new SerDeException( + "Cannot serialize the object because there are " + fieldRefs.size() + " fieldRefs but the table defined " + + numCols + " columns."); + } + + // Fully refresh the Null Array to write into the out + for (int i = 0; i < numCols; i++) { + Object objectForField = outputRowOI.getStructFieldData(obj, fieldRefs.get(i)); + if (objectForField == null) { + outForNull[i / 8] = (byte) (outForNull[i / 8] | (0x01 << (7 - (i % 8)))); + } else { + outForNull[i / 8] = (byte) (outForNull[i / 8] & ~(0x01 << (7 - (i % 8)))); + } + } + out.write(outForNull); + + // serialize each field using FieldObjectInspector + for (int i = 0; i < numCols; i++) { + Object objectForField = outputRowOI.getStructFieldData(obj, fieldRefs.get(i)); + serializeField(objectForField, fieldRefs.get(i).getFieldObjectInspector(), columnTypes.get(i)); + } + + serializeBytesWritable.set(out.toByteArray(), 0, out.size()); + return serializeBytesWritable; + } catch (IOException e) { + throw new SerDeException(e); + } + } + + private void serializeField(Object objectForField, ObjectInspector oi, TypeInfo ti) + throws IOException, SerDeException { + switch (oi.getCategory()) { + case PRIMITIVE: + PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi; + switch (poi.getPrimitiveCategory()) { + // Teradata Type: BYTEINT + case BYTE: + ByteObjectInspector boi = (ByteObjectInspector) poi; + byte b = 0; + if (objectForField != null) { + b = boi.get(objectForField); + } + out.write(b); + return; + // Teradata Type: SMALLINT + case SHORT: + ShortObjectInspector spoi = (ShortObjectInspector) poi; + short s = 0; + if (objectForField != null) { + s = spoi.get(objectForField); + } + out.writeShort(s); + return; + // Teradata Type: INT + case INT: + IntObjectInspector ioi = (IntObjectInspector) poi; + int i = 0; + if (objectForField != null) { + i = ioi.get(objectForField); + } + out.writeInt(i); + return; + // Teradata Type: BIGINT + case LONG: + LongObjectInspector loi = (LongObjectInspector) poi; + long l = 0; + if (objectForField != null) { + l = loi.get(objectForField); + } + out.writeLong(l); + return; + // Teradata Type: FLOAT + case DOUBLE: + DoubleObjectInspector doi = (DoubleObjectInspector) poi; + double d = 0; + if (objectForField != null) { + d = doi.get(objectForField); + } + out.writeDouble(d); + return; + // Teradata Type: VARCHAR + case VARCHAR: + HiveVarcharObjectInspector hvoi = (HiveVarcharObjectInspector) poi; + HiveVarcharWritable hv = hvoi.getPrimitiveWritableObject(objectForField); + // assert the length of varchar record fits into the table definition + if (hv != null) { + assert ((VarcharTypeInfo) ti).getLength() >= hv.getHiveVarchar().getCharacterLength(); + } + out.writeVarChar(hv); + return; + // Teradata Type: TIMESTAMP + case TIMESTAMP: + TimestampObjectInspector tsoi = (TimestampObjectInspector) poi; + TimestampWritable ts = tsoi.getPrimitiveWritableObject(objectForField); + out.writeTimestamp(ts, getTimeStampByteNum(timestampPrecision)); + return; + // Teradata Type: DATE + case DATE: + DateObjectInspector dtoi = (DateObjectInspector) poi; + DateWritable dw = dtoi.getPrimitiveWritableObject(objectForField); + out.writeDate(dw); + return; + // Teradata Type: CHAR + case CHAR: + HiveCharObjectInspector coi = (HiveCharObjectInspector) poi; + HiveCharWritable hc = coi.getPrimitiveWritableObject(objectForField); + // assert the length of char record fits into the table definition + if (hc != null) { + assert ((CharTypeInfo) ti).getLength() >= hc.getHiveChar().getCharacterLength(); + } + out.writeChar(hc, getCharByteNum(charCharset) * ((CharTypeInfo) ti).getLength()); + return; + // Teradata Type: DECIMAL + case DECIMAL: + DecimalTypeInfo dtype = (DecimalTypeInfo) ti; + int precision = dtype.precision(); + int scale = dtype.scale(); + HiveDecimalObjectInspector hdoi = (HiveDecimalObjectInspector) poi; + HiveDecimalWritable hd = hdoi.getPrimitiveWritableObject(objectForField); + // assert the precision of decimal record fits into the table definition + if (hd != null) { + assert (dtype.getPrecision() >= hd.precision()); + } + out.writeDecimal(hd, getDecimalByteNum(precision), scale); + return; + // Teradata Type: VARBYTE + case BINARY: + BinaryObjectInspector bnoi = (BinaryObjectInspector) poi; + BytesWritable byw = bnoi.getPrimitiveWritableObject(objectForField); + out.writeVarByte(byw); + return; + default: + throw new SerDeException("Unrecognized type: " + poi.getPrimitiveCategory()); + } + // Currently, serialization of complex types is not supported + case LIST: + case MAP: + case STRUCT: + default: + throw new SerDeException("Unrecognized type: " + oi.getCategory()); + } + } + + @Override public SerDeStats getSerDeStats() { + // no support for statistics + return null; + } + + /** + * Deserialize an object out of a Writable blob. In most cases, the return + * value of this function will be constant since the function will reuse the + * returned object. If the client wants to keep a copy of the object, the + * client needs to clone the returned value by calling + * ObjectInspectorUtils.getStandardObject(). + * + * @param blob + * The Writable object containing a serialized object + * @return A Java object representing the contents in the blob. + */ + @Override public Object deserialize(Writable blob) throws SerDeException { + try { + BytesWritable data = (BytesWritable) blob; + + // initialize the data to be the input stream + TeradataBinaryDataInputStream in = + new TeradataBinaryDataInputStream(new ByteArrayInputStream(data.getBytes(), 0, data.getLength())); + + int numOfByteRead = in.read(inForNull); + + if (inForNull.length != 0 && numOfByteRead != inForNull.length) { + throw new EOFException("not enough bytes for one object"); + } + + boolean isNull; + for (int i = 0; i < numCols; i++) { + // get if the ith field is null or not + isNull = ((inForNull[i / 8] & (128 >> (i % 8))) != 0); + row.set(i, deserializeField(in, columnTypes.get(i), row.get(i), isNull)); + } + + //After deserializing all the fields, the input should be over in which case in.read will return -1 + if (in.read() != -1) { + throw new EOFException("The inputstream has more after we deserialize all the fields - this is unexpected"); + } + } catch (EOFException e) { + LOG.warn("Catch thrown exception", e); + LOG.warn("This record has been polluted. We have reset all the row fields to be null"); + for (int i = 0; i < numCols; i++) { + row.set(i, null); + } + } catch (IOException e) { + throw new SerDeException(e); + } catch (ParseException e) { + throw new SerDeException(e); + } + return row; + } + + private Object deserializeField(TeradataBinaryDataInputStream in, TypeInfo type, Object reuse, boolean isNull) + throws IOException, ParseException, SerDeException { + // isNull: + // In the Teradata Binary file, even the field is null (isNull=true), + // thd data still has some default values to pad the record. + // In this case, you cannot avoid reading the bytes even it is not used. + switch (type.getCategory()) { + case PRIMITIVE: + PrimitiveTypeInfo ptype = (PrimitiveTypeInfo) type; + switch (ptype.getPrimitiveCategory()) { + case VARCHAR: // Teradata Type: VARCHAR + String st = in.readVarchar(); + if (isNull) { + return null; + } else { + HiveVarcharWritable r = reuse == null ? new HiveVarcharWritable() : (HiveVarcharWritable) reuse; + r.set(st, ((VarcharTypeInfo) type).getLength()); + return r; + } + case INT: // Teradata Type: INT + int i = in.readInt(); + if (isNull) { + return null; + } else { + IntWritable r = reuse == null ? new IntWritable() : (IntWritable) reuse; + r.set(i); + return r; + } + case TIMESTAMP: // Teradata Type: TIMESTAMP + Timestamp ts = in.readTimestamp(getTimeStampByteNum(timestampPrecision)); + if (isNull) { + return null; + } else { + TimestampWritable r = reuse == null ? new TimestampWritable() : (TimestampWritable) reuse; + r.set(ts); + return r; + } + case DOUBLE: // Teradata Type: FLOAT + double d = in.readDouble(); + if (isNull) { + return null; + } else { + DoubleWritable r = reuse == null ? new DoubleWritable() : (DoubleWritable) reuse; + r.set(d); + return r; + } + case DATE: // Teradata Type: DATE + Date dt = in.readDate(); + if (isNull) { + return null; + } else { + DateWritable r = reuse == null ? new DateWritable() : (DateWritable) reuse; + r.set(dt); + return r; + } + case BYTE: // Teradata Type: BYTEINT + byte bt = in.readByte(); + if (isNull) { + return null; + } else { + ByteWritable r = reuse == null ? new ByteWritable() : (ByteWritable) reuse; + r.set(bt); + return r; + } + case LONG: // Teradata Type: BIGINT + long l = in.readLong(); + if (isNull) { + return null; + } else { + LongWritable r = reuse == null ? new LongWritable() : (LongWritable) reuse; + r.set(l); + return r; + } + case CHAR: // Teradata Type: CHAR + CharTypeInfo ctype = (CharTypeInfo) type; + int length = ctype.getLength(); + String c = in.readChar(length * getCharByteNum(charCharset)); + if (isNull) { + return null; + } else { + HiveCharWritable r = reuse == null ? new HiveCharWritable() : (HiveCharWritable) reuse; + r.set(c, length); + return r; + } + case DECIMAL: // Teradata Type: DECIMAL + DecimalTypeInfo dtype = (DecimalTypeInfo) type; + int precision = dtype.precision(); + int scale = dtype.scale(); + HiveDecimal hd = in.readDecimal(scale, getDecimalByteNum(precision)); + if (isNull) { + return null; + } else { + HiveDecimalWritable r = (reuse == null ? new HiveDecimalWritable() : (HiveDecimalWritable) reuse); + r.set(hd); + return r; + } + case SHORT: // Teradata Type: SMALLINT + short s = in.readShort(); + if (isNull) { + return null; + } else { + ShortWritable r = reuse == null ? new ShortWritable() : (ShortWritable) reuse; + r.set(s); + return r; + } + case BINARY: // Teradata Type: VARBYTE + byte[] content = in.readVarbyte(); + if (isNull) { + return null; + } else { + BytesWritable r = new BytesWritable(); + r.set(content, 0, content.length); + return r; + } + default: + throw new SerDeException("Unrecognized type: " + ptype.getPrimitiveCategory()); + } + // Currently, deserialization of complex types is not supported + case LIST: + case MAP: + case STRUCT: + default: + throw new SerDeException("Unsupported category: " + type.getCategory()); + } + } + + /** + * Get the object inspector that can be used to navigate through the internal + * structure of the Object returned from deserialize(...). + */ + @Override public ObjectInspector getObjectInspector() throws SerDeException { + return rowOI; + } + + private int getTimeStampByteNum(int precision) { + if (precision == 0) { + return DEFAULT_TIMESTAMP_BYTE_NUM; + } else { + return precision + 1 + DEFAULT_TIMESTAMP_BYTE_NUM; + } + } + + private int getCharByteNum(String charset) throws SerDeException { + if (!CHARSET_TO_BYTE_NUM.containsKey(charCharset)) { + throw new SerDeException( + format("%s isn't supported in Teradata Char Charset %s", charCharset, CHARSET_TO_BYTE_NUM.keySet())); + } else { + return CHARSET_TO_BYTE_NUM.get(charset); + } + } + + private int getDecimalByteNum(int precision) throws SerDeException { + if (precision <= 0) { + throw new SerDeException(format("the precision of Decimal should be bigger than 0. %d is illegal", precision)); + } + if (precision <= 2) { + return 1; + } + if (precision <= 4) { + return 2; + } + if (precision <= 9) { + return 4; + } + if (precision <= 18) { + return 8; + } + if (precision <= 38) { + return 16; + } + throw new IllegalArgumentException( + format("the precision of Decimal should be smaller than 39. %d is illegal", precision)); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/8b16ad0f/src/test/org/apache/hadoop/hive/serde2/teradata/TestTeradataBinarySerdeForDate.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/hadoop/hive/serde2/teradata/TestTeradataBinarySerdeForDate.java b/src/test/org/apache/hadoop/hive/serde2/teradata/TestTeradataBinarySerdeForDate.java new file mode 100644 index 0000000..01ac43b --- /dev/null +++ b/src/test/org/apache/hadoop/hive/serde2/teradata/TestTeradataBinarySerdeForDate.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.teradata; + +import com.google.common.io.BaseEncoding; +import junit.framework.TestCase; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.io.BytesWritable; +import org.junit.Assert; + +import java.sql.Date; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +/** + * Test the data type DATE for Teradata binary format. + */ +public class TestTeradataBinarySerdeForDate extends TestCase { + + private final TeradataBinarySerde serde = new TeradataBinarySerde(); + private final Properties props = new Properties(); + + protected void setUp() throws Exception { + props.setProperty(serdeConstants.LIST_COLUMNS, "TD_DATE"); + props.setProperty(serdeConstants.LIST_COLUMN_TYPES, "date"); + serde.initialize(null, props); + } + + public void testTimestampBefore1900() throws Exception { + + //0060-01-01 + BytesWritable in = new BytesWritable(BaseEncoding.base16().lowerCase().decode("00653de7fe")); + + List<Object> row = (List<Object>) serde.deserialize(in); + Date ts = ((DateWritable) row.get(0)).get(); + Assert.assertTrue(ts.toString().equals( "0060-01-01")); + + BytesWritable res = (BytesWritable) serde.serialize(row, serde.getObjectInspector()); + Assert.assertTrue(Arrays.equals(in.copyBytes(), res.copyBytes())); + } + + public void testTimestampAfter1900() throws Exception { + + //9999-01-01 + BytesWritable in = new BytesWritable(BaseEncoding.base16().lowerCase().decode("0095cfd304")); + + List<Object> row = (List<Object>) serde.deserialize(in); + Date ts = ((DateWritable) row.get(0)).get(); + Assert.assertTrue(ts.toString().equals( "9999-01-01")); + + BytesWritable res = (BytesWritable) serde.serialize(row, serde.getObjectInspector()); + Assert.assertTrue(Arrays.equals(in.copyBytes(), res.copyBytes())); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/8b16ad0f/src/test/org/apache/hadoop/hive/serde2/teradata/TestTeradataBinarySerdeForDecimal.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/hadoop/hive/serde2/teradata/TestTeradataBinarySerdeForDecimal.java b/src/test/org/apache/hadoop/hive/serde2/teradata/TestTeradataBinarySerdeForDecimal.java new file mode 100644 index 0000000..6abdd3f --- /dev/null +++ b/src/test/org/apache/hadoop/hive/serde2/teradata/TestTeradataBinarySerdeForDecimal.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.teradata; + +import com.google.common.io.BaseEncoding; +import junit.framework.TestCase; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.io.BytesWritable; +import org.junit.Assert; + +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +/** + * Test the data type DECIMAL for Teradata binary format. + */ +public class TestTeradataBinarySerdeForDecimal extends TestCase { + + private final TeradataBinarySerde serde = new TeradataBinarySerde(); + private final Properties props = new Properties(); + + protected void setUp() throws Exception { + props.setProperty(serdeConstants.LIST_COLUMNS, "TD_DECIMAL"); + props.setProperty(serdeConstants.LIST_COLUMN_TYPES, "decimal(9,5)"); + + serde.initialize(null, props); + } + + public void testPositiveFraction() throws Exception { + BytesWritable in = new BytesWritable(BaseEncoding.base16().lowerCase().decode("0064000000")); + + List<Object> row = (List<Object>) serde.deserialize(in); + Assert.assertTrue("0.001".equals(((HiveDecimalWritable) row.get(0)).getHiveDecimal().toString())); + + BytesWritable res = (BytesWritable) serde.serialize(row, serde.getObjectInspector()); + Assert.assertTrue(Arrays.equals(in.copyBytes(), res.copyBytes())); + } + + public void testNegativeFraction() throws Exception { + BytesWritable in = new BytesWritable(BaseEncoding.base16().lowerCase().decode("009cffffff")); + + List<Object> row = (List<Object>) serde.deserialize(in); + Assert.assertTrue("-0.001".equals(((HiveDecimalWritable) row.get(0)).getHiveDecimal().toString())); + + BytesWritable res = (BytesWritable) serde.serialize(row, serde.getObjectInspector()); + Assert.assertTrue(Arrays.equals(in.copyBytes(), res.copyBytes())); + } + + public void testPositiveNumber1() throws Exception { + BytesWritable in = new BytesWritable(BaseEncoding.base16().lowerCase().decode("00a0860100")); + + List<Object> row = (List<Object>) serde.deserialize(in); + Assert.assertTrue("1".equals(((HiveDecimalWritable) row.get(0)).getHiveDecimal().toString())); + + BytesWritable res = (BytesWritable) serde.serialize(row, serde.getObjectInspector()); + Assert.assertTrue(Arrays.equals(in.copyBytes(), res.copyBytes())); + } + + public void testNegativeNumber1() throws Exception { + BytesWritable in = new BytesWritable(BaseEncoding.base16().lowerCase().decode("006079feff")); + + List<Object> row = (List<Object>) serde.deserialize(in); + Assert.assertTrue("-1".equals(((HiveDecimalWritable) row.get(0)).getHiveDecimal().toString())); + + BytesWritable res = (BytesWritable) serde.serialize(row, serde.getObjectInspector()); + Assert.assertTrue(Arrays.equals(in.copyBytes(), res.copyBytes())); + } + + public void testPositiveNumber2() throws Exception { + BytesWritable in = new BytesWritable(BaseEncoding.base16().lowerCase().decode("0080969800")); + + List<Object> row = (List<Object>) serde.deserialize(in); + Assert.assertTrue("100".equals(((HiveDecimalWritable) row.get(0)).getHiveDecimal().toString())); + + BytesWritable res = (BytesWritable) serde.serialize(row, serde.getObjectInspector()); + Assert.assertTrue(Arrays.equals(in.copyBytes(), res.copyBytes())); + } + + public void testNegativeNumber2() throws Exception { + BytesWritable in = new BytesWritable(BaseEncoding.base16().lowerCase().decode("000065c4e0")); + + List<Object> row = (List<Object>) serde.deserialize(in); + Assert.assertTrue("-5240".equals(((HiveDecimalWritable) row.get(0)).getHiveDecimal().toString())); + + BytesWritable res = (BytesWritable) serde.serialize(row, serde.getObjectInspector()); + Assert.assertTrue(Arrays.equals(in.copyBytes(), res.copyBytes())); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/8b16ad0f/src/test/org/apache/hadoop/hive/serde2/teradata/TestTeradataBinarySerdeForTimeStamp.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/hadoop/hive/serde2/teradata/TestTeradataBinarySerdeForTimeStamp.java b/src/test/org/apache/hadoop/hive/serde2/teradata/TestTeradataBinarySerdeForTimeStamp.java new file mode 100644 index 0000000..61cc2a5 --- /dev/null +++ b/src/test/org/apache/hadoop/hive/serde2/teradata/TestTeradataBinarySerdeForTimeStamp.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.teradata; + +import com.google.common.io.BaseEncoding; +import junit.framework.TestCase; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.io.BytesWritable; +import org.junit.Assert; + +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +/** + * Test the data type TIMESTAMP for Teradata binary format. + */ +public class TestTeradataBinarySerdeForTimeStamp extends TestCase { + + private final TeradataBinarySerde serde = new TeradataBinarySerde(); + private final Properties props = new Properties(); + + protected void setUp() throws Exception { + props.setProperty(serdeConstants.LIST_COLUMNS, "TD_TIMESTAMP"); + props.setProperty(serdeConstants.LIST_COLUMN_TYPES, "timestamp"); + } + + public void testTimestampPrecision6() throws Exception { + props.setProperty(TeradataBinarySerde.TD_TIMESTAMP_PRECISION, "6"); + serde.initialize(null, props); + + //2012-10-01 12:00:00.110000 + BytesWritable in = new BytesWritable( + BaseEncoding.base16().lowerCase().decode("00323031322d31302d30312031323a30303a30302e313130303030")); + + List<Object> row = (List<Object>) serde.deserialize(in); + Timestamp ts = ((TimestampWritable) row.get(0)).getTimestamp(); + Assert.assertEquals("2012-10-01 12:00:00.11", ts.toString()); + + + BytesWritable res = (BytesWritable) serde.serialize(row, serde.getObjectInspector()); + Assert.assertTrue(Arrays.equals(in.copyBytes(), res.copyBytes())); + } + + public void testTimestampPrecision0() throws Exception { + props.setProperty(TeradataBinarySerde.TD_TIMESTAMP_PRECISION, "0"); + serde.initialize(null, props); + + //2012-10-01 12:00:00 + BytesWritable in = + new BytesWritable(BaseEncoding.base16().lowerCase().decode("00323031322d31302d30312031323a30303a3030")); + + List<Object> row = (List<Object>) serde.deserialize(in); + Timestamp ts = ((TimestampWritable) row.get(0)).getTimestamp(); + Assert.assertEquals("2012-10-01 12:00:00.0", ts.toString()); + + + BytesWritable res = (BytesWritable) serde.serialize(row, serde.getObjectInspector()); + Assert.assertTrue(Arrays.equals(in.copyBytes(), res.copyBytes())); + } + + public void testTimestampPrecision3() throws Exception { + props.setProperty(TeradataBinarySerde.TD_TIMESTAMP_PRECISION, "3"); + serde.initialize(null, props); + + //2012-10-01 12:00:00.345 + BytesWritable in = + new BytesWritable(BaseEncoding.base16().lowerCase().decode("00323031322d31302d30312031323a30303a30302e333435")); + + List<Object> row = (List<Object>) serde.deserialize(in); + Timestamp ts = ((TimestampWritable) row.get(0)).getTimestamp(); + Assert.assertEquals("2012-10-01 12:00:00.345", ts.toString()); + + + BytesWritable res = (BytesWritable) serde.serialize(row, serde.getObjectInspector()); + Assert.assertTrue(Arrays.equals(in.copyBytes(), res.copyBytes())); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/8b16ad0f/src/test/org/apache/hadoop/hive/serde2/teradata/TestTeradataBinarySerdeGeneral.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/hadoop/hive/serde2/teradata/TestTeradataBinarySerdeGeneral.java b/src/test/org/apache/hadoop/hive/serde2/teradata/TestTeradataBinarySerdeGeneral.java new file mode 100644 index 0000000..aad77e9 --- /dev/null +++ b/src/test/org/apache/hadoop/hive/serde2/teradata/TestTeradataBinarySerdeGeneral.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.teradata; + +import com.google.common.io.BaseEncoding; +import junit.framework.TestCase; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.io.*; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.junit.Assert; + +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +/** + * Test all the data types supported for Teradata Binary Format. + */ +public class TestTeradataBinarySerdeGeneral extends TestCase { + + private final TeradataBinarySerde serde = new TeradataBinarySerde(); + private final Properties props = new Properties(); + + protected void setUp() throws Exception { + props.setProperty(serdeConstants.LIST_COLUMNS, + "TD_CHAR, TD_VARCHAR, TD_BIGINT, TD_INT, TD_SMALLINT, TD_BYTEINT, " + + "TD_FLOAT,TD_DECIMAL,TD_DATE, TD_TIMESTAMP, TD_VARBYTE"); + props.setProperty(serdeConstants.LIST_COLUMN_TYPES, + "char(3),varchar(100),bigint,int,smallint,tinyint,double,decimal(31,30),date,timestamp,binary"); + + serde.initialize(null, props); + } + + public void testDeserializeAndSerialize() throws Exception { + BytesWritable in = new BytesWritable(BaseEncoding.base16().lowerCase().decode( + "00004e6f762020202020201b006120646179203d2031312f31312f31312020202020202020203435ec10000000000000c5feffff" + + "7707010000000000002a40ef2b3dab0d14e6531c8908a72700000007b20100313931312d31312d31312031393a32303a32312e34" + + "33333230301b00746573743a20202020202020343333322020202020202020333135")); + + List<Object> row = (List<Object>) serde.deserialize(in); + Assert.assertEquals("Nov", ((HiveCharWritable) row.get(0)).toString()); + Assert.assertEquals("a day = 11/11/11 45", ((HiveVarcharWritable) row.get(1)).toString()); + Assert.assertEquals(4332L, ((LongWritable) row.get(2)).get()); + Assert.assertEquals(-315, ((IntWritable) row.get(3)).get()); + Assert.assertEquals((short) 1911, ((ShortWritable) row.get(4)).get()); + Assert.assertEquals((byte) 1, ((ByteWritable) row.get(5)).get()); + Assert.assertEquals((double) 13, ((DoubleWritable) row.get(6)).get(), 0); + Assert.assertEquals(30, ((HiveDecimalWritable) row.get(7)).getScale()); + Assert.assertEquals((double) 3.141592653589793238462643383279, + ((HiveDecimalWritable) row.get(7)).getHiveDecimal().doubleValue(), 0); + Assert.assertEquals("1911-11-11", ((DateWritable) row.get(8)).toString()); + Assert.assertEquals("1911-11-11 19:20:21.4332", ((TimestampWritable) row.get(9)).toString()); + Assert.assertEquals(27, ((BytesWritable) row.get(10)).getLength()); + + BytesWritable res = (BytesWritable) serde.serialize(row, serde.getObjectInspector()); + Assert.assertTrue(Arrays.equals(in.copyBytes(), res.copyBytes())); + } + + public void testDeserializeAndSerializeWithNull() throws Exception { + //null bitmap: 0160 -> 00000001 01100000, 7th, 9th, 10th is null + BytesWritable in = new BytesWritable(BaseEncoding.base16().lowerCase().decode( + "01604d61722020202020201b006120646179203d2031332f30332f303820202020202020202020397ca10000000000004300000" + + "0dd0700000000000048834000000000000000000000000000000000443f110020202020202020202020202020202020202020202" + + "020202020200000")); + List<Object> row = (List<Object>) serde.deserialize(in); + + Assert.assertEquals("Mar", ((HiveCharWritable) row.get(0)).toString()); + Assert.assertEquals(null, row.get(7)); + Assert.assertEquals(null, row.get(9)); + Assert.assertEquals(null, row.get(10)); + + BytesWritable res = (BytesWritable) serde.serialize(row, serde.getObjectInspector()); + Assert.assertTrue(Arrays.equals(in.copyBytes(), res.copyBytes())); + } + + public void testDeserializeAndSerializeAllNull() throws Exception { + BytesWritable in = new BytesWritable(BaseEncoding.base16().lowerCase().decode( + "ffe0202020202020202020000000000000000000000000000000000000000000000000000000000000000000000000000000000" + + "00000000020202020202020202020202020202020202020202020202020200000")); + List<Object> row = (List<Object>) serde.deserialize(in); + + Assert.assertEquals(null, row.get(0)); + Assert.assertEquals(null, row.get(1)); + Assert.assertEquals(null, row.get(3)); + Assert.assertEquals(null, row.get(4)); + Assert.assertEquals(null, row.get(5)); + Assert.assertEquals(null, row.get(6)); + Assert.assertEquals(null, row.get(7)); + Assert.assertEquals(null, row.get(8)); + Assert.assertEquals(null, row.get(9)); + Assert.assertEquals(null, row.get(10)); + + BytesWritable res = (BytesWritable) serde.serialize(row, serde.getObjectInspector()); + Assert.assertTrue(Arrays.equals(in.copyBytes(), res.copyBytes())); + } + + public void testDeserializeCorruptedRecord() throws Exception { + BytesWritable in = new BytesWritable(BaseEncoding.base16().lowerCase().decode( + "00004e6f762020202020201b006120646179203d2031312f31312f31312020202020202020203435ec10000000000000c5feff" + + "ff7707010000000000002a40ef2b3dab0d14e6531c8908a72700000007b20100313931312d31312d31312031393a32303a32312" + + "e3433333230301b00746573743a20202020202020343333322020202020202020333135ff")); + + List<Object> row = (List<Object>) serde.deserialize(in); + Assert.assertEquals(null, row.get(0)); + Assert.assertEquals(null, row.get(3)); + Assert.assertEquals(null, row.get(10)); + } +}