HIVE-20225: SerDe to support Teradata Binary Format (Lu Li, reviewed by Carl Steinbach)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/eb842b75 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/eb842b75 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/eb842b75 Branch: refs/heads/branch-3 Commit: eb842b75b4b0235da9eef6951e091f48640202ad Parents: b1c8b2a Author: Daniel Dai <dai...@gmail.com> Authored: Tue Sep 11 12:58:05 2018 -0700 Committer: Daniel Dai <dai...@gmail.com> Committed: Tue Sep 11 13:00:05 2018 -0700 ---------------------------------------------------------------------- .../td_data_with_1mb_rowsize.teradata.gz | Bin 0 -> 616 bytes .../teradata_binary_table.deflate | Bin 0 -> 1329 bytes .../ql/io/TeradataBinaryFileInputFormat.java | 66 ++ .../ql/io/TeradataBinaryFileOutputFormat.java | 112 ++++ .../hive/ql/io/TeradataBinaryRecordReader.java | 280 +++++++++ .../clientpositive/test_teradatabinaryfile.q | 123 ++++ .../test_teradatabinaryfile.q.out | 537 +++++++++++++++++ .../teradata/TeradataBinaryDataInputStream.java | 199 +++++++ .../TeradataBinaryDataOutputStream.java | 270 +++++++++ .../serde2/teradata/TeradataBinarySerde.java | 597 +++++++++++++++++++ .../TestTeradataBinarySerdeForDate.java | 76 +++ .../TestTeradataBinarySerdeForDecimal.java | 106 ++++ .../TestTeradataBinarySerdeForTimeStamp.java | 111 ++++ .../TestTeradataBinarySerdeGeneral.java | 133 +++++ 14 files changed, 2610 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/eb842b75/data/files/teradata_binary_file/td_data_with_1mb_rowsize.teradata.gz ---------------------------------------------------------------------- diff --git a/data/files/teradata_binary_file/td_data_with_1mb_rowsize.teradata.gz b/data/files/teradata_binary_file/td_data_with_1mb_rowsize.teradata.gz new file mode 100644 index 0000000..7319e3c Binary files /dev/null and b/data/files/teradata_binary_file/td_data_with_1mb_rowsize.teradata.gz differ http://git-wip-us.apache.org/repos/asf/hive/blob/eb842b75/data/files/teradata_binary_file/teradata_binary_table.deflate ---------------------------------------------------------------------- diff --git a/data/files/teradata_binary_file/teradata_binary_table.deflate b/data/files/teradata_binary_file/teradata_binary_table.deflate new file mode 100644 index 0000000..fd53dde Binary files /dev/null and b/data/files/teradata_binary_file/teradata_binary_table.deflate differ http://git-wip-us.apache.org/repos/asf/hive/blob/eb842b75/ql/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryFileInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryFileInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryFileInputFormat.java new file mode 100644 index 0000000..bed87c5 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryFileInputFormat.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +/** + * https://cwiki.apache.org/confluence/display/Hive/TeradataBinarySerde. + * FileInputFormat for Teradata binary files. + * + * In the Teradata Binary File, each record constructs as below: + * The first 2 bytes represents the length of the bytes next for this record. + * Then the null bitmap whose length is depended on the number of fields is followed. + * Then each field of the record is serialized into bytes - the serialization strategy is decided by the type of field. + * At last, there is one byte (0x0a) in the end of the record. + * + * This InputFormat currently doesn't support the split of the file. + * Teradata binary files are using little endian. + */ +public class TeradataBinaryFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> { + + @Override public RecordReader<NullWritable, BytesWritable> getRecordReader(InputSplit split, JobConf job, + Reporter reporter) throws IOException { + reporter.setStatus(split.toString()); + return new TeradataBinaryRecordReader(job, (FileSplit) split); + } + + /** + * the <code>TeradataBinaryFileInputFormat</code> is not splittable right now. + * Override the <code>isSplitable</code> function. + * + * @param fs the file system that the file is on + * @param filename the file name to check + * @return is this file splitable? + */ + @Override protected boolean isSplitable(FileSystem fs, Path filename) { + return false; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/eb842b75/ql/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryFileOutputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryFileOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryFileOutputFormat.java new file mode 100644 index 0000000..0469825 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryFileOutputFormat.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Properties; + +import org.apache.commons.io.EndianUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.util.Progressable; + +import static java.lang.String.format; + +/** + * https://cwiki.apache.org/confluence/display/Hive/TeradataBinarySerde. + * FileOutputFormat for Teradata binary files. + * + * In the Teradata Binary File, each record constructs as below: + * The first 2 bytes represents the length of the bytes next for this record (null bitmap and fields). + * Then the null bitmap whose length is depended on the number of fields is followe. + * Then each field of the record is serialized into bytes - the serialization strategy is decided by the type of field. + * At last, there is one byte (0x0a) in the end of the record. + * + * Teradata binary files are using little endian. + */ +public class TeradataBinaryFileOutputFormat<K extends WritableComparable, V extends Writable> + extends HiveIgnoreKeyTextOutputFormat<K, V> { + private static final Log LOG = LogFactory.getLog(TeradataBinaryFileOutputFormat.class); + + static final byte RECORD_END_BYTE = (byte) 0x0a; + + /** + * create the final out file, and output row by row. After one row is + * appended, a configured row separator is appended + * + * @param jc + * the job configuration file + * @param outPath + * the final output file to be created + * @param valueClass + * the value class used for create + * @param isCompressed + * whether the content is compressed or not + * @param tableProperties + * the tableProperties of this file's corresponding table + * @param progress + * progress used for status report + * @return the RecordWriter + */ + @Override public RecordWriter getHiveRecordWriter(JobConf jc, Path outPath, Class<? extends Writable> valueClass, + boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException { + FileSystem fs = outPath.getFileSystem(jc); + final OutputStream outStream = Utilities.createCompressedStream(jc, fs.create(outPath, progress), isCompressed); + return new RecordWriter() { + @Override public void write(Writable r) throws IOException { + BytesWritable bw = (BytesWritable) r; + int recordLength = bw.getLength(); + + //Based on the row length to decide if the length is int or short + String rowLength = tableProperties + .getProperty(TeradataBinaryRecordReader.TD_ROW_LENGTH, TeradataBinaryRecordReader.DEFAULT_TD_ROW_LENGTH) + .toLowerCase(); + LOG.debug(format("The table property %s is: %s", TeradataBinaryRecordReader.TD_ROW_LENGTH, rowLength)); + + if (TeradataBinaryRecordReader.TD_ROW_LENGTH_TO_BYTE_NUM.containsKey(rowLength)) { + if (rowLength.equals(TeradataBinaryRecordReader.DEFAULT_TD_ROW_LENGTH)) { + EndianUtils.writeSwappedShort(outStream, (short) recordLength); // write the length using little endian + } else if (rowLength.equals(TeradataBinaryRecordReader.TD_ROW_LENGTH_1MB)) { + EndianUtils.writeSwappedInteger(outStream, recordLength); // write the length using little endian + } + } else { + throw new IllegalArgumentException(format("%s doesn't support the value %s, the supported values are %s", + TeradataBinaryRecordReader.TD_ROW_LENGTH, rowLength, + TeradataBinaryRecordReader.TD_ROW_LENGTH_TO_BYTE_NUM.keySet())); + } + + outStream.write(bw.getBytes(), 0, bw.getLength()); // write the content (the content is in little endian) + outStream.write(RECORD_END_BYTE); //write the record ending + } + + @Override public void close(boolean abort) throws IOException { + outStream.close(); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/eb842b75/ql/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryRecordReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryRecordReader.java new file mode 100644 index 0000000..337b5d2 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryRecordReader.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; + +import com.google.common.collect.ImmutableMap; +import org.apache.commons.codec.binary.Hex; +import org.apache.commons.io.EndianUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; + +import static java.lang.String.format; + +/** + * The TeradataBinaryRecordReader reads the record from Teradata binary files. + * + * In the Teradata Binary File, each record constructs as below: + * The first 2 bytes represents the length of the bytes next for this record. + * Then the null bitmap whose length is depended on the number of fields is followed. + * Then each field of the record is serialized into bytes - the serialization strategy is decided by the type of field. + * At last, there is one byte (0x0a) in the end of the record. + * + * This InputFormat currently doesn't support the split of the file. + * Teradata binary files are using little endian. + */ +public class TeradataBinaryRecordReader implements RecordReader<NullWritable, BytesWritable> { + + private static final Log LOG = LogFactory.getLog(TeradataBinaryRecordReader.class); + + private CompressionCodecFactory compressionCodecs = null; + private InputStream in; + private long start; + private long pos; + private long end; + private final Seekable filePosition; + private CompressionCodec codec; + + static final String TD_ROW_LENGTH = "teradata.row.length"; + static final Map<String, Integer> TD_ROW_LENGTH_TO_BYTE_NUM = ImmutableMap.of("64kb", 2, "1mb", 4); + static final String DEFAULT_TD_ROW_LENGTH = "64kb"; + static final String TD_ROW_LENGTH_1MB = "1mb"; + + private byte[] recordLengthBytes; + private byte[] valueByteArray = new byte[65536]; // max byte array + private byte[] endOfRecord = new byte[1]; + + private int recordLength = 0; + + public TeradataBinaryRecordReader(JobConf job, FileSplit fileSplit) throws IOException { + LOG.debug("initialize the TeradataBinaryRecordReader"); + + String rowLength = job.get(TD_ROW_LENGTH); + if (rowLength == null) { + LOG.debug("No table property in JobConf. Try to recover the table directly"); + Map<String, PartitionDesc> partitionDescMap = Utilities.getMapRedWork(job).getMapWork().getAliasToPartnInfo(); + for (String alias : Utilities.getMapRedWork(job).getMapWork().getAliasToPartnInfo().keySet()) { + LOG.debug(format("the current alias: %s", alias)); + rowLength = partitionDescMap.get(alias).getTableDesc().getProperties().getProperty(TD_ROW_LENGTH); + if (rowLength != null) { + break; + } + } + } + + if (rowLength == null) { + rowLength = DEFAULT_TD_ROW_LENGTH; + } else { + rowLength = rowLength.toLowerCase(); + } + + if (TD_ROW_LENGTH_TO_BYTE_NUM.containsKey(rowLength)) { + recordLengthBytes = new byte[TD_ROW_LENGTH_TO_BYTE_NUM.get(rowLength)]; + } else { + throw new IllegalArgumentException( + format("%s doesn't support the value %s, the supported values are %s", TD_ROW_LENGTH, rowLength, + TD_ROW_LENGTH_TO_BYTE_NUM.keySet())); + } + + start = fileSplit.getStart(); + end = start + fileSplit.getLength(); + + LOG.debug(format("The start of the file split is: %s", start)); + LOG.debug(format("The end of the file split is: %s", end)); + + final Path file = fileSplit.getPath(); + compressionCodecs = new CompressionCodecFactory(job); + codec = compressionCodecs.getCodec(file); + FileSystem fs = file.getFileSystem(job); + FSDataInputStream fileIn = fs.open(fileSplit.getPath()); + + /* currently the TeradataBinaryRecord file doesn't support file split at all */ + filePosition = fileIn; + if (isCompressedInput()) { + LOG.info(format("Input file is compressed. Using compression code %s", codec.getClass().getName())); + in = codec.createInputStream(fileIn); + } else { + LOG.info("The input file is not compressed"); + in = fileIn; + } + pos = start; + } + + /** + * Reads the next key/value pair from the input for processing. + * + * @param key the key to read data into + * @param value the value to read data into + * @return true iff a key/value was read, false if at EOF + */ + @Override public synchronized boolean next(NullWritable key, BytesWritable value) throws IOException { + + /* read the record length */ + int lengthExpected = recordLengthBytes.length; + int hasConsumed = readExpectedBytes(recordLengthBytes, lengthExpected); + if (hasConsumed == 0) { + LOG.info("Reach the End of File. No more record"); + return false; + } else if (hasConsumed < lengthExpected) { + LOG.error( + format("We expect %s bytes for the record length but read %d byte and reach the End of File.", lengthExpected, + hasConsumed)); + LOG.error(format("The current position in the file : %s", getFilePosition())); + LOG.error(format("The current consumed bytes: %s", pos)); + LOG.error(format("The bytes for the current record is: %s", Hex.encodeHexString(recordLengthBytes))); + throw new EOFException("When reading the record length, reach the unexpected end of file."); + } + /* get the record contend length to prepare to read the content */ + recordLength = EndianUtils.readSwappedUnsignedShort(recordLengthBytes, 0); + pos += lengthExpected; + + /* read the record content */ + lengthExpected = recordLength; + hasConsumed = readExpectedBytes(valueByteArray, lengthExpected); + if (hasConsumed < lengthExpected) { + LOG.error(format("We expect %s bytes for the record content but read %d byte and reach the End of File.", + lengthExpected, hasConsumed)); + LOG.error(format("The current position in the file : %s", getFilePosition())); + LOG.error(format("The current consumed bytes: %s", pos)); + LOG.error(format("The bytes for the current record is: %s", + Hex.encodeHexString(recordLengthBytes) + Hex.encodeHexString(valueByteArray))); + throw new EOFException("When reading the contend of the record, reach the unexpected end of file."); + } + value.set(valueByteArray, 0, recordLength); + pos += lengthExpected; + + /* read the record end */ + lengthExpected = endOfRecord.length; + hasConsumed = readExpectedBytes(endOfRecord, lengthExpected); + if (hasConsumed < lengthExpected) { + LOG.error(format("We expect %s bytes for the record end symbol but read %d byte and reach the End of File.", + lengthExpected, hasConsumed)); + LOG.error(format("The current position in the file : %s", getFilePosition())); + LOG.error(format("The current consumed bytes: %s", pos)); + LOG.error(format("The bytes for the current record is: %s", + Hex.encodeHexString(recordLengthBytes) + Hex.encodeHexString(valueByteArray) + Hex + .encodeHexString(endOfRecord))); + throw new EOFException("When reading the end of record, reach the unexpected end of file."); + } + + if (endOfRecord[0] != TeradataBinaryFileOutputFormat.RECORD_END_BYTE) { + throw new IOException(format("We expect 0x0a as the record end but get %s.", Hex.encodeHexString(endOfRecord))); + } + pos += lengthExpected; + + return true; + } + + /** + * Create an object of the appropriate type to be used as a key. + * + * @return a new key object. + */ + @Override public NullWritable createKey() { + return NullWritable.get(); + } + + /** + * Create an object of the appropriate type to be used as a value. + * + * @return a new value object. + */ + @Override public BytesWritable createValue() { + return new BytesWritable(); + } + + /** + * Returns the current position in the input. + * + * @return the current position in the input. + * @throws IOException + */ + @Override public long getPos() throws IOException { + return pos; + } + + /** + * + * @throws IOException + */ + @Override public void close() throws IOException { + if (in != null) { + in.close(); + } + } + + /** + * How much of the input has the {@link RecordReader} consumed i.e. + * has been processed by? + * + * @return progress from <code>0.0</code> to <code>1.0</code>. + * @throws IOException + */ + @Override public float getProgress() throws IOException { + if (start == end) { + return 0.0F; + } else { + return Math.min(1.0F, (float) (getFilePosition() - start) / (float) (end - start)); + } + } + + private boolean isCompressedInput() { + return codec != null; + } + + private synchronized long getFilePosition() throws IOException { + long retVal; + if (isCompressedInput() && filePosition != null) { + retVal = filePosition.getPos(); + } else { + retVal = getPos(); + } + return retVal; + } + + private synchronized int readExpectedBytes(byte[] toWrite, int lengthExpected) throws IOException { + int curPos = 0; + do { + int numOfByteRead = in.read(toWrite, curPos, lengthExpected - curPos); + if (numOfByteRead < 0) { + return curPos; + } else { + curPos += numOfByteRead; + } + } while (curPos < lengthExpected); + return curPos; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/eb842b75/ql/src/test/queries/clientpositive/test_teradatabinaryfile.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/test_teradatabinaryfile.q b/ql/src/test/queries/clientpositive/test_teradatabinaryfile.q new file mode 100644 index 0000000..33ab677 --- /dev/null +++ b/ql/src/test/queries/clientpositive/test_teradatabinaryfile.q @@ -0,0 +1,123 @@ +DROP TABLE if exists teradata_binary_table_64kb; +DROP TABLE if exists teradata_binary_table_1mb; +DROP TABLE if exists teradata_binary_table_64kb_insert; +DROP TABLE if exists teradata_binary_table_1mb_insert; + + +CREATE TABLE `teradata_binary_table_64kb`( + `test_tinyint` tinyint, + `test_smallint` smallint, + `test_int` int, + `test_bigint` bigint, + `test_double` double, + `test_decimal` decimal(15,2), + `test_date` date, + `test_timestamp` timestamp, + `test_char` char(1), + `test_varchar` varchar(40), + `test_binary` binary + ) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.serde2.teradata.TeradataBinarySerde' +STORED AS INPUTFORMAT + 'org.apache.hadoop.hive.ql.io.TeradataBinaryFileInputFormat' +OUTPUTFORMAT + 'org.apache.hadoop.hive.ql.io.TeradataBinaryFileOutputFormat' +TBLPROPERTIES ( + 'teradata.timestamp.precision'='0', + 'teradata.char.charset'='LATIN', + 'teradata.row.length'='64KB' +); + +CREATE TABLE `teradata_binary_table_1mb`( + `test_tinyint` tinyint, + `test_smallint` smallint, + `test_int` int, + `test_bigint` bigint, + `test_double` double, + `test_decimal` decimal(15,2), + `test_date` date, + `test_timestamp` timestamp, + `test_char` char(1), + `test_varchar` varchar(40), + `test_binary` binary + ) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.serde2.teradata.TeradataBinarySerde' +STORED AS INPUTFORMAT + 'org.apache.hadoop.hive.ql.io.TeradataBinaryFileInputFormat' +OUTPUTFORMAT + 'org.apache.hadoop.hive.ql.io.TeradataBinaryFileOutputFormat' +TBLPROPERTIES ( + 'teradata.timestamp.precision'='6', + 'teradata.char.charset'='UNICODE', + 'teradata.row.length'='1MB' +); + +CREATE TABLE `teradata_binary_table_64kb_insert`( + `test_tinyint` tinyint, + `test_decimal` decimal(15,2), + `test_date` date, + `test_timestamp` timestamp + ) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.serde2.teradata.TeradataBinarySerde' +STORED AS INPUTFORMAT + 'org.apache.hadoop.hive.ql.io.TeradataBinaryFileInputFormat' +OUTPUTFORMAT + 'org.apache.hadoop.hive.ql.io.TeradataBinaryFileOutputFormat' +TBLPROPERTIES ( + 'teradata.timestamp.precision'='0', + 'teradata.char.charset'='LATIN', + 'teradata.row.length'='64KB' +); + +CREATE TABLE `teradata_binary_table_1mb_insert`( + `test_tinyint` tinyint, + `test_int` int + ) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.serde2.teradata.TeradataBinarySerde' +STORED AS INPUTFORMAT + 'org.apache.hadoop.hive.ql.io.TeradataBinaryFileInputFormat' +OUTPUTFORMAT + 'org.apache.hadoop.hive.ql.io.TeradataBinaryFileOutputFormat' +TBLPROPERTIES ( + 'teradata.timestamp.precision'='6', + 'teradata.char.charset'='UNICODE', + 'teradata.row.length'='1MB' +); + +LOAD DATA LOCAL INPATH '../../data/files/teradata_binary_file/teradata_binary_table.deflate' OVERWRITE INTO TABLE teradata_binary_table_64kb; +LOAD DATA LOCAL INPATH '../../data/files/teradata_binary_file/td_data_with_1mb_rowsize.teradata.gz' OVERWRITE INTO TABLE teradata_binary_table_1mb; + +SELECT * from teradata_binary_table_64kb; +SELECT * from teradata_binary_table_1mb; + +SELECT COUNT(*) FROM teradata_binary_table_64kb; +SELECT COUNT(*) FROM teradata_binary_table_1mb; + +SELECT max(date_format(test_timestamp, 'y')) FROM teradata_binary_table_64kb; +SELECT max(date_format(test_date, 'y')) FROM teradata_binary_table_64kb; +SELECT max(Floor(test_decimal)) FROM teradata_binary_table_64kb; + +SELECT max(date_format(test_timestamp, 'y')) FROM teradata_binary_table_1mb; +SELECT max(date_format(test_date, 'y')) FROM teradata_binary_table_1mb; +SELECT max(Floor(test_decimal)) FROM teradata_binary_table_1mb; + +SELECT test_tinyint, MAX(test_decimal) FROM teradata_binary_table_64kb GROUP BY test_tinyint; +SELECT test_tinyint, MAX(test_decimal) FROM teradata_binary_table_1mb GROUP BY test_tinyint; + +INSERT OVERWRITE TABLE teradata_binary_table_64kb_insert +SELECT test_tinyint, test_decimal, test_date, test_timestamp FROM teradata_binary_table_64kb; + +INSERT OVERWRITE TABLE teradata_binary_table_1mb_insert +SELECT 1, 15; + +DESC FORMATTED teradata_binary_table_64kb_insert; +DESC FORMATTED teradata_binary_table_1mb_insert; + +DROP TABLE if exists teradata_binary_table_64kb; +DROP TABLE if exists teradata_binary_table_1mb; +DROP TABLE if exists teradata_binary_table_64kb_insert; +DROP TABLE if exists teradata_binary_table_1mb_insert; http://git-wip-us.apache.org/repos/asf/hive/blob/eb842b75/ql/src/test/results/clientpositive/test_teradatabinaryfile.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/test_teradatabinaryfile.q.out b/ql/src/test/results/clientpositive/test_teradatabinaryfile.q.out new file mode 100644 index 0000000..9db1372 --- /dev/null +++ b/ql/src/test/results/clientpositive/test_teradatabinaryfile.q.out @@ -0,0 +1,537 @@ +PREHOOK: query: DROP TABLE if exists teradata_binary_table_64kb +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE if exists teradata_binary_table_64kb +POSTHOOK: type: DROPTABLE +PREHOOK: query: DROP TABLE if exists teradata_binary_table_1mb +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE if exists teradata_binary_table_1mb +POSTHOOK: type: DROPTABLE +PREHOOK: query: DROP TABLE if exists teradata_binary_table_64kb_insert +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE if exists teradata_binary_table_64kb_insert +POSTHOOK: type: DROPTABLE +PREHOOK: query: DROP TABLE if exists teradata_binary_table_1mb_insert +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE if exists teradata_binary_table_1mb_insert +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE `teradata_binary_table_64kb`( + `test_tinyint` tinyint, + `test_smallint` smallint, + `test_int` int, + `test_bigint` bigint, + `test_double` double, + `test_decimal` decimal(15,2), + `test_date` date, + `test_timestamp` timestamp, + `test_char` char(1), + `test_varchar` varchar(40), + `test_binary` binary + ) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.serde2.teradata.TeradataBinarySerde' +STORED AS INPUTFORMAT + 'org.apache.hadoop.hive.ql.io.TeradataBinaryFileInputFormat' +OUTPUTFORMAT + 'org.apache.hadoop.hive.ql.io.TeradataBinaryFileOutputFormat' +TBLPROPERTIES ( + 'teradata.timestamp.precision'='0', + 'teradata.char.charset'='LATIN', + 'teradata.row.length'='64KB' +) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@teradata_binary_table_64kb +POSTHOOK: query: CREATE TABLE `teradata_binary_table_64kb`( + `test_tinyint` tinyint, + `test_smallint` smallint, + `test_int` int, + `test_bigint` bigint, + `test_double` double, + `test_decimal` decimal(15,2), + `test_date` date, + `test_timestamp` timestamp, + `test_char` char(1), + `test_varchar` varchar(40), + `test_binary` binary + ) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.serde2.teradata.TeradataBinarySerde' +STORED AS INPUTFORMAT + 'org.apache.hadoop.hive.ql.io.TeradataBinaryFileInputFormat' +OUTPUTFORMAT + 'org.apache.hadoop.hive.ql.io.TeradataBinaryFileOutputFormat' +TBLPROPERTIES ( + 'teradata.timestamp.precision'='0', + 'teradata.char.charset'='LATIN', + 'teradata.row.length'='64KB' +) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@teradata_binary_table_64kb +PREHOOK: query: CREATE TABLE `teradata_binary_table_1mb`( + `test_tinyint` tinyint, + `test_smallint` smallint, + `test_int` int, + `test_bigint` bigint, + `test_double` double, + `test_decimal` decimal(15,2), + `test_date` date, + `test_timestamp` timestamp, + `test_char` char(1), + `test_varchar` varchar(40), + `test_binary` binary + ) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.serde2.teradata.TeradataBinarySerde' +STORED AS INPUTFORMAT + 'org.apache.hadoop.hive.ql.io.TeradataBinaryFileInputFormat' +OUTPUTFORMAT + 'org.apache.hadoop.hive.ql.io.TeradataBinaryFileOutputFormat' +TBLPROPERTIES ( + 'teradata.timestamp.precision'='6', + 'teradata.char.charset'='UNICODE', + 'teradata.row.length'='1MB' +) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@teradata_binary_table_1mb +POSTHOOK: query: CREATE TABLE `teradata_binary_table_1mb`( + `test_tinyint` tinyint, + `test_smallint` smallint, + `test_int` int, + `test_bigint` bigint, + `test_double` double, + `test_decimal` decimal(15,2), + `test_date` date, + `test_timestamp` timestamp, + `test_char` char(1), + `test_varchar` varchar(40), + `test_binary` binary + ) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.serde2.teradata.TeradataBinarySerde' +STORED AS INPUTFORMAT + 'org.apache.hadoop.hive.ql.io.TeradataBinaryFileInputFormat' +OUTPUTFORMAT + 'org.apache.hadoop.hive.ql.io.TeradataBinaryFileOutputFormat' +TBLPROPERTIES ( + 'teradata.timestamp.precision'='6', + 'teradata.char.charset'='UNICODE', + 'teradata.row.length'='1MB' +) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@teradata_binary_table_1mb +PREHOOK: query: CREATE TABLE `teradata_binary_table_64kb_insert`( + `test_tinyint` tinyint, + `test_decimal` decimal(15,2), + `test_date` date, + `test_timestamp` timestamp + ) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.serde2.teradata.TeradataBinarySerde' +STORED AS INPUTFORMAT + 'org.apache.hadoop.hive.ql.io.TeradataBinaryFileInputFormat' +OUTPUTFORMAT + 'org.apache.hadoop.hive.ql.io.TeradataBinaryFileOutputFormat' +TBLPROPERTIES ( + 'teradata.timestamp.precision'='0', + 'teradata.char.charset'='LATIN', + 'teradata.row.length'='64KB' +) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@teradata_binary_table_64kb_insert +POSTHOOK: query: CREATE TABLE `teradata_binary_table_64kb_insert`( + `test_tinyint` tinyint, + `test_decimal` decimal(15,2), + `test_date` date, + `test_timestamp` timestamp + ) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.serde2.teradata.TeradataBinarySerde' +STORED AS INPUTFORMAT + 'org.apache.hadoop.hive.ql.io.TeradataBinaryFileInputFormat' +OUTPUTFORMAT + 'org.apache.hadoop.hive.ql.io.TeradataBinaryFileOutputFormat' +TBLPROPERTIES ( + 'teradata.timestamp.precision'='0', + 'teradata.char.charset'='LATIN', + 'teradata.row.length'='64KB' +) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@teradata_binary_table_64kb_insert +PREHOOK: query: CREATE TABLE `teradata_binary_table_1mb_insert`( + `test_tinyint` tinyint, + `test_int` int + ) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.serde2.teradata.TeradataBinarySerde' +STORED AS INPUTFORMAT + 'org.apache.hadoop.hive.ql.io.TeradataBinaryFileInputFormat' +OUTPUTFORMAT + 'org.apache.hadoop.hive.ql.io.TeradataBinaryFileOutputFormat' +TBLPROPERTIES ( + 'teradata.timestamp.precision'='6', + 'teradata.char.charset'='UNICODE', + 'teradata.row.length'='1MB' +) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@teradata_binary_table_1mb_insert +POSTHOOK: query: CREATE TABLE `teradata_binary_table_1mb_insert`( + `test_tinyint` tinyint, + `test_int` int + ) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.serde2.teradata.TeradataBinarySerde' +STORED AS INPUTFORMAT + 'org.apache.hadoop.hive.ql.io.TeradataBinaryFileInputFormat' +OUTPUTFORMAT + 'org.apache.hadoop.hive.ql.io.TeradataBinaryFileOutputFormat' +TBLPROPERTIES ( + 'teradata.timestamp.precision'='6', + 'teradata.char.charset'='UNICODE', + 'teradata.row.length'='1MB' +) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@teradata_binary_table_1mb_insert +PREHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/teradata_binary_file/teradata_binary_table.deflate' OVERWRITE INTO TABLE teradata_binary_table_64kb +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@teradata_binary_table_64kb +POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/teradata_binary_file/teradata_binary_table.deflate' OVERWRITE INTO TABLE teradata_binary_table_64kb +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@teradata_binary_table_64kb +PREHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/teradata_binary_file/td_data_with_1mb_rowsize.teradata.gz' OVERWRITE INTO TABLE teradata_binary_table_1mb +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@teradata_binary_table_1mb +POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/teradata_binary_file/td_data_with_1mb_rowsize.teradata.gz' OVERWRITE INTO TABLE teradata_binary_table_1mb +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@teradata_binary_table_1mb +PREHOOK: query: SELECT * from teradata_binary_table_64kb +PREHOOK: type: QUERY +PREHOOK: Input: default@teradata_binary_table_64kb +#### A masked pattern was here #### +POSTHOOK: query: SELECT * from teradata_binary_table_64kb +POSTHOOK: type: QUERY +POSTHOOK: Input: default@teradata_binary_table_64kb +#### A masked pattern was here #### +10 34 139997714 32307660 18.6717 59.99 2018-08-23 2018-07-23 01:45:55 A NULL NULL +10 28 89082024 53367308 5.9069 27.90 2018-08-23 2018-07-23 19:45:36 A NULL NULL +10 31 65499801 9495835 5.9064 29.99 2018-08-23 2018-07-23 09:15:10 A NULL NULL +10 20 144923884 123337561 20.1037 50.50 2018-08-23 2018-07-23 22:49:52 A NULL NULL +10 9 118474716 110462827 18.6697 29.99 2018-08-23 2018-07-23 10:13:03 A NULL NULL +10 4 116098596 555556155 20.1017 29.99 2018-07-23 2018-07-23 13:12:10 X SELF_SERVICE SELF_SERVICE +10 10 84492975 100052093 15.4913 29.99 2018-08-23 2018-07-23 17:56:32 A NULL NULL +10 31 101314613 45413087 5.9064 29.99 2018-08-23 2018-07-23 11:26:24 A NULL NULL +10 1 156962113 554297748 NULL 29.99 2018-08-23 2018-07-23 11:31:31 A NULL NULL +10 10 92560875 380929783 20.1011 20.91 2018-07-30 2018-07-23 05:02:42 S RCHARGE_FAILURE RCHARGE_FAILURE +10 5 154490193 186062438 20.1037 29.99 2018-07-23 2018-07-23 10:17:20 X NULL NULL +10 31 2954435 34009387 0.0214 24.23 2018-08-23 2018-07-23 15:46:21 A NULL NULL +10 4 156942563 55362740 0.0024 29.99 2018-08-23 2018-07-23 08:16:49 A NULL NULL +10 31 90527523 126581551 7.5689 59.99 2018-08-23 2018-07-23 03:40:28 A NULL NULL +10 1 118477496 598803186 NULL 29.99 2018-08-23 2018-07-23 10:45:28 A NULL NULL +10 75 137653654 38440942 20.1037 29.99 2018-08-23 2018-07-23 19:01:04 A NULL NULL +10 2 142697304 106829658 20.1008 24.21 2018-07-23 2018-07-23 05:22:17 S RCHARGE_FAILURE RCHARGE_FAILURE +10 14 134043823 264156349 20.1008 24.21 2018-08-23 2018-07-23 12:12:48 A NULL NULL +10 7 91359485 7008957 20.1011 20.91 2018-08-23 2018-07-23 23:42:04 A NULL NULL +10 1 118512426 222159750 NULL 29.99 2018-08-23 2018-07-23 17:06:25 A NULL NULL +10 5 155168873 135968937 18.6697 59.99 2018-07-30 2018-07-23 18:01:35 S RCHARGE_FAILURE RCHARGE_FAILURE +10 4 151084943 38355275 20.1017 29.99 2018-08-23 2018-07-23 04:12:32 A NULL NULL +10 6 118452556 90264779 20.1017 59.99 2018-08-23 2018-07-23 05:18:44 A NULL NULL +10 31 53127101 18622653 0.0115 49.95 2018-08-23 2018-07-23 07:38:05 A NULL NULL +10 1 118479736 216825119 NULL 29.99 2018-08-23 2018-07-23 11:11:51 A NULL NULL +10 4 142708764 21984202 30.5785 27.50 2018-08-23 2018-07-23 10:36:22 A NULL NULL +10 4 142713364 33598449 20.1017 29.99 2018-07-23 2018-07-23 12:49:24 X SELF_SERVICE SELF_SERVICE +10 22 103578546 152144452 20.1017 29.99 2018-08-23 2018-07-23 11:18:44 A NULL NULL +10 22 111233194 69051 20.1017 29.99 2018-08-23 2018-07-23 08:58:16 A NULL NULL +10 12 132376034 2651098 20.1017 29.99 2018-08-23 2018-07-23 06:01:44 A NULL NULL +10 11 135778714 29866847 18.6717 59.99 2018-08-23 2018-07-23 02:35:58 A NULL NULL +10 10 118525066 34556421 5.9064 29.99 2018-08-23 2018-07-23 21:15:29 A NULL NULL +10 7 144897784 532208226 20.1017 29.99 2018-08-23 2018-07-23 14:35:42 A NULL NULL +10 34 87091713 93626084 5.9064 29.99 2018-08-23 2018-07-23 08:56:25 A NULL NULL +10 21 129323704 14298869 30.5516 55.03 2018-08-23 2018-07-23 05:48:14 A NULL NULL +10 31 112813163 36762074 5.9064 29.99 2018-08-23 2018-07-23 18:07:23 A NULL NULL +10 1 156980833 58308375 NULL 59.99 2018-08-23 2018-07-23 14:54:17 A NULL NULL +10 5 150357953 101207194 20.1017 29.99 2018-08-14 2018-07-23 13:53:14 S NULL NULL +10 1 118462836 668498576 NULL 55.03 2018-08-23 2018-07-23 07:44:11 A NULL NULL +10 7 129423664 312394041 20.1017 29.99 2018-08-23 2018-07-23 20:40:42 A NULL NULL +10 10 122518074 5448199 20.1017 29.99 2018-08-23 2018-07-23 01:30:03 A NULL NULL +10 3 113469566 593079639 20.1037 29.99 2018-08-23 2018-07-23 19:39:05 A NULL NULL +10 4 144878314 88960410 18.6689 55.03 2018-08-23 2018-07-23 11:43:56 A NULL NULL +10 8 146831593 168164335 30.5786 28.03 2018-08-23 2018-07-23 11:34:36 A NULL NULL +10 4 91358385 23752815 29.9896 27.21 2018-08-23 2018-07-23 23:20:30 A NULL NULL +10 3 118533306 286487393 30.5529 44.02 2019-07-23 2018-07-23 23:48:14 A NULL NULL +10 7 103618686 339052539 18.6697 59.99 2018-08-23 2018-07-23 18:26:54 A NULL NULL +10 11 92556375 196464425 29.9896 27.21 2018-08-23 2018-07-23 03:15:07 A NULL NULL +10 11 137563254 239883707 18.6697 59.99 2018-08-23 2018-07-23 02:01:31 A NULL NULL +10 2 116078336 61997052 20.1017 29.99 2018-07-23 2018-07-23 00:55:05 X SELF_SERVICE SELF_SERVICE +PREHOOK: query: SELECT * from teradata_binary_table_1mb +PREHOOK: type: QUERY +PREHOOK: Input: default@teradata_binary_table_1mb +#### A masked pattern was here #### +POSTHOOK: query: SELECT * from teradata_binary_table_1mb +POSTHOOK: type: QUERY +POSTHOOK: Input: default@teradata_binary_table_1mb +#### A masked pattern was here #### +-6 0 -99999 -1 NULL 0.00 2011-01-02 2009-02-28 12:34:56 æ° AABBCC +5 3200 -9999 NULL 3.14159 314000000.00 NULL 2011-02-28 12:34:56 ABC NULL +-127 32000 -9 1234567890123456789 2.01E10 3.14 2011-01-02 2022-02-28 12:34:56 æ° ãããã¨ããããã¾ã �7��c� +-1 -32000 0 123456789012345678 2.0108E10 314.15 0001-12-31 NULL A thank you �7��c�HOOK: Input: default@teradata_binary_table_1mb +#### A masked pattern was here #### +POSTHOOK: query: SELECT max(date_format(test_timestamp, 'y')) FROM teradata_binary_table_1mb +POSTHOOK: type: QUERY +POSTHOOK: Input: default@teradata_binary_table_1mb +#### A masked pattern was here #### +2999 +PREHOOK: query: SELECT max(date_format(test_date, 'y')) FROM teradata_binary_table_1mb +PREHOOK: type: QUERY +PREHOOK: Input: default@teradata_binary_table_1mb +#### A masked pattern was here #### +POSTHOOK: query: SELECT max(date_format(test_date, 'y')) FROM teradata_binary_table_1mb +POSTHOOK: type: QUERY +POSTHOOK: Input: default@teradata_binary_table_1mb +#### A masked pattern was here #### +2999 +PREHOOK: query: SELECT max(Floor(test_decimal)) FROM teradata_binary_table_1mb +PREHOOK: type: QUERY +PREHOOK: Input: default@teradata_binary_table_1mb +#### A masked pattern was here #### +POSTHOOK: query: SELECT max(Floor(test_decimal)) FROM teradata_binary_table_1mb +POSTHOOK: type: QUERY +POSTHOOK: Input: default@teradata_binary_table_1mb +#### A masked pattern was here #### +3140000000000 +PREHOOK: query: SELECT test_tinyint, MAX(test_decimal) FROM teradata_binary_table_64kb GROUP BY test_tinyint +PREHOOK: type: QUERY +PREHOOK: Input: default@teradata_binary_table_64kb +#### A masked pattern was here #### +POSTHOOK: query: SELECT test_tinyint, MAX(test_decimal) FROM teradata_binary_table_64kb GROUP BY test_tinyint +POSTHOOK: type: QUERY +POSTHOOK: Input: default@teradata_binary_table_64kb +#### A masked pattern was here #### +10 59.99 +PREHOOK: query: SELECT test_tinyint, MAX(test_decimal) FROM teradata_binary_table_1mb GROUP BY test_tinyint +PREHOOK: type: QUERY +PREHOOK: Input: default@teradata_binary_table_1mb +#### A masked pattern was here #### +POSTHOOK: query: SELECT test_tinyint, MAX(test_decimal) FROM teradata_binary_table_1mb GROUP BY test_tinyint +POSTHOOK: type: QUERY +POSTHOOK: Input: default@teradata_binary_table_1mb +#### A masked pattern was here #### +NULL 12.00 +-127 3.14 +-6 0.00 +-4 3.14 +-1 314.15 +2 NULL +3 3140000000000.00 +5 314000000.00 +7 NULL +127 0.04 +PREHOOK: query: INSERT OVERWRITE TABLE teradata_binary_table_64kb_insert +SELECT test_tinyint, test_decimal, test_date, test_timestamp FROM teradata_binary_table_64kb +PREHOOK: type: QUERY +PREHOOK: Input: default@teradata_binary_table_64kb +PREHOOK: Output: default@teradata_binary_table_64kb_insert +POSTHOOK: query: INSERT OVERWRITE TABLE teradata_binary_table_64kb_insert +SELECT test_tinyint, test_decimal, test_date, test_timestamp FROM teradata_binary_table_64kb +POSTHOOK: type: QUERY +POSTHOOK: Input: default@teradata_binary_table_64kb +POSTHOOK: Output: default@teradata_binary_table_64kb_insert +POSTHOOK: Lineage: teradata_binary_table_64kb_insert.test_date SIMPLE [(teradata_binary_table_64kb)teradata_binary_table_64kb.FieldSchema(name:test_date, type:date, comment:from deserializer), ] +POSTHOOK: Lineage: teradata_binary_table_64kb_insert.test_decimal SIMPLE [(teradata_binary_table_64kb)teradata_binary_table_64kb.FieldSchema(name:test_decimal, type:decimal(15,2), comment:from deserializer), ] +POSTHOOK: Lineage: teradata_binary_table_64kb_insert.test_timestamp SIMPLE [(teradata_binary_table_64kb)teradata_binary_table_64kb.FieldSchema(name:test_timestamp, type:timestamp, comment:from deserializer), ] +POSTHOOK: Lineage: teradata_binary_table_64kb_insert.test_tinyint SIMPLE [(teradata_binary_table_64kb)teradata_binary_table_64kb.FieldSchema(name:test_tinyint, type:tinyint, comment:from deserializer), ] +PREHOOK: query: INSERT OVERWRITE TABLE teradata_binary_table_1mb_insert +SELECT 1, 15 +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@teradata_binary_table_1mb_insert +POSTHOOK: query: INSERT OVERWRITE TABLE teradata_binary_table_1mb_insert +SELECT 1, 15 +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@teradata_binary_table_1mb_insert +POSTHOOK: Lineage: teradata_binary_table_1mb_insert.test_int SIMPLE [] +POSTHOOK: Lineage: teradata_binary_table_1mb_insert.test_tinyint EXPRESSION [] +PREHOOK: query: DESC FORMATTED teradata_binary_table_64kb_insert +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@teradata_binary_table_64kb_insert +POSTHOOK: query: DESC FORMATTED teradata_binary_table_64kb_insert +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@teradata_binary_table_64kb_insert +# col_name data_type comment +test_tinyint tinyint from deserializer +test_decimal decimal(15,2) from deserializer +test_date date from deserializer +test_timestamp timestamp from deserializer + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} + bucketing_version 2 + numFiles 1 + numRows 50 + rawDataSize 0 + teradata.char.charset LATIN + teradata.row.length 64KB + teradata.timestamp.precision 0 + totalSize 1800 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.teradata.TeradataBinarySerde +InputFormat: org.apache.hadoop.hive.ql.io.TeradataBinaryFileInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.TeradataBinaryFileOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: DESC FORMATTED teradata_binary_table_1mb_insert +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@teradata_binary_table_1mb_insert +POSTHOOK: query: DESC FORMATTED teradata_binary_table_1mb_insert +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@teradata_binary_table_1mb_insert +# col_name data_type comment +test_tinyint tinyint from deserializer +test_int int from deserializer + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"test_int\":\"true\",\"test_tinyint\":\"true\"}} + bucketing_version 2 + numFiles 1 + numRows 1 + rawDataSize 0 + teradata.char.charset UNICODE + teradata.row.length 1MB + teradata.timestamp.precision 6 + totalSize 11 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.teradata.TeradataBinarySerde +InputFormat: org.apache.hadoop.hive.ql.io.TeradataBinaryFileInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.TeradataBinaryFileOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: DROP TABLE if exists teradata_binary_table_64kb +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@teradata_binary_table_64kb +PREHOOK: Output: default@teradata_binary_table_64kb +POSTHOOK: query: DROP TABLE if exists teradata_binary_table_64kb +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@teradata_binary_table_64kb +POSTHOOK: Output: default@teradata_binary_table_64kb +PREHOOK: query: DROP TABLE if exists teradata_binary_table_1mb +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@teradata_binary_table_1mb +PREHOOK: Output: default@teradata_binary_table_1mb +POSTHOOK: query: DROP TABLE if exists teradata_binary_table_1mb +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@teradata_binary_table_1mb +POSTHOOK: Output: default@teradata_binary_table_1mb +PREHOOK: query: DROP TABLE if exists teradata_binary_table_64kb_insert +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@teradata_binary_table_64kb_insert +PREHOOK: Output: default@teradata_binary_table_64kb_insert +POSTHOOK: query: DROP TABLE if exists teradata_binary_table_64kb_insert +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@teradata_binary_table_64kb_insert +POSTHOOK: Output: default@teradata_binary_table_64kb_insert +PREHOOK: query: DROP TABLE if exists teradata_binary_table_1mb_insert +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@teradata_binary_table_1mb_insert +PREHOOK: Output: default@teradata_binary_table_1mb_insert +POSTHOOK: query: DROP TABLE if exists teradata_binary_table_1mb_insert +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@teradata_binary_table_1mb_insert +POSTHOOK: Output: default@teradata_binary_table_1mb_insert http://git-wip-us.apache.org/repos/asf/hive/blob/eb842b75/serde/src/java/org/apache/hadoop/hive/serde2/teradata/TeradataBinaryDataInputStream.java ---------------------------------------------------------------------- diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/teradata/TeradataBinaryDataInputStream.java b/serde/src/java/org/apache/hadoop/hive/serde2/teradata/TeradataBinaryDataInputStream.java new file mode 100644 index 0000000..b26d342 --- /dev/null +++ b/serde/src/java/org/apache/hadoop/hive/serde2/teradata/TeradataBinaryDataInputStream.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.teradata; + +import org.apache.commons.io.input.SwappedDataInputStream; +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.common.type.Date; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.common.type.Timestamp; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.math.BigInteger; +import java.text.ParseException; + +import static java.lang.String.format; + +/** + * The TeradataBinaryDataInputStream is used to handle the Teradata binary format input for record. + * Since the TD binary format uses little-endian to handle the SHORT, INT, LONG, DOUBLE and etc. + * while the Hadoop uses big-endian, + * We extend SwappedDataInputStream to handle these types and extend to handle the Teradata + * specific types like VARCHAR, CHAR, TIMESTAMP, DATE... + */ +public class TeradataBinaryDataInputStream extends SwappedDataInputStream { + + private static final int DATE_STRING_LENGTH = 8; + + /** + * Instantiates a new Teradata binary data input stream. + * + * @param input the input + */ + public TeradataBinaryDataInputStream(InputStream input) { + super(input); + } + + /** + * Read VARCHAR(N). + * The representation of Varchar in Teradata binary format is: + * the first two bytes represent the length N of this varchar field, + * the next N bytes represent the content of this varchar field. + * To pad the null varchar, the length will be 0 and the content will be none. + * + * @return the string + * @throws IOException the io exception + */ + public String readVarchar() throws IOException { + int varcharLength = readUnsignedShort(); + byte[] varcharContent = new byte[varcharLength]; + int numOfBytesRead = in.read(varcharContent); + if (varcharContent.length != 0 && numOfBytesRead != varcharLength) { + throw new EOFException( + format("Fail to read the varchar. Expect %d bytes, get %d bytes", varcharLength, numOfBytesRead)); + } + //force it to be UTF8 string + return new String(varcharContent, "UTF8"); + } + + /** + * Read TIMESTAMP(P). + * The representation of timestamp in Teradata binary format is: + * the byte number to read is based on the precision of timestamp, + * each byte represents one char and the timestamp is using string representation, + * eg: for TIMESTAMP(6), we need to read 26 bytes + * 31 39 31 31 2d 31 31 2d 31 31 20 31 39 3a 32 30 3a 32 31 2e 34 33 33 32 30 30 + * will represent 1911-11-11 19:20:21.433200. + * the null timestamp will use space to pad. + * + * @param byteNum the byte number that will be read from inputstream + * @return the timestamp + * @throws IOException the io exception + */ + public Timestamp readTimestamp(Integer byteNum) throws IOException { + // yyyy-mm-dd hh:mm:ss + byte[] timestampContent = new byte[byteNum]; + int numOfBytesRead = in.read(timestampContent); + if (timestampContent.length != 0 && numOfBytesRead != byteNum) { + throw new EOFException( + format("Fail to read the timestamp. Expect %d bytes, get %d bytes", byteNum, numOfBytesRead)); + } + String timestampStr = new String(timestampContent, "UTF8"); + if (timestampStr.trim().length() == 0) { + return null; + } + return Timestamp.valueOf(timestampStr); + } + + /** + * Read DATE. + * The representation of date in Teradata binary format is: + * The Date D is a int with 4 bytes using little endian, + * The representation is (D+19000000).ToString -> YYYYMMDD, + * eg: Date 07 b2 01 00 -> 111111 in little endian -> 19111111 - > 1911.11.11. + * the null date will use 0 to pad. + * + * @return the date + * @throws IOException the io exception + * @throws ParseException the parse exception + */ + public Date readDate() throws IOException, ParseException { + int di = readInt(); + if (di == 0) { + return null; + } + String dateString = String.valueOf(di + 19000000); + if (dateString.length() < DATE_STRING_LENGTH) { + dateString = StringUtils.leftPad(dateString, DATE_STRING_LENGTH, '0'); + } + Date date = new Date(); + date.setYear(Integer.parseInt(dateString.substring(0, 4))); + date.setMonth(Integer.parseInt(dateString.substring(4, 6))); + date.setDayOfMonth(Integer.parseInt(dateString.substring(6, 8))); + return date; + } + + /** + * Read CHAR(N). + * The representation of char in Teradata binary format is + * the byte number to read is based on the [charLength] * [bytePerChar] <- totalLength, + * bytePerChar is decided by the charset: LATAIN charset is 2 bytes per char and UNICODE charset is 3 bytes per char. + * the null char will use space to pad. + * + * @param totalLength the total length + * @return the string + * @throws IOException the io exception + */ + public String readChar(int totalLength) throws IOException { + byte[] charContent = new byte[totalLength]; + int numOfBytesRead = in.read(charContent); + if (charContent.length != 0 && numOfBytesRead != totalLength) { + throw new EOFException( + format("Fail to read the varchar. Expect %d bytes, get %d bytes", totalLength, numOfBytesRead)); + } + return new String(charContent, "UTF8"); + } + + /** + * Read DECIMAL(P, S). + * The representation of decimal in Teradata binary format is + * the byte number to read is decided solely by the precision(P), + * HiveDecimal is constructed through the byte array and scale. + * the null DECIMAL will use 0x00 to pad. + * + * @param scale the scale + * @param byteNum the byte num + * @return the hive decimal + * @throws IOException the io exception + */ + public HiveDecimal readDecimal(int scale, int byteNum) throws IOException { + byte[] decimalContent = new byte[byteNum]; + int numOfBytesRead = in.read(decimalContent); + if (decimalContent.length != 0 && numOfBytesRead != byteNum) { + throw new EOFException( + format("Fail to read the decimal. Expect %d bytes, get %d bytes", byteNum, numOfBytesRead)); + } + ArrayUtils.reverse(decimalContent); + return HiveDecimal.create(new BigInteger(decimalContent), scale); + } + + /** + * Read VARBYTE(N). + * The representation of VARBYTE in Teradata binary format is: + * the first two bytes represent the length N of this varchar field + * the next N bytes represent the content of this varchar field. + * To pad the null varbyte, the length will be 0 and the content will be none. + * + * @return the byte [ ] + * @throws IOException the io exception + */ + public byte[] readVarbyte() throws IOException { + int varbyteLength = readUnsignedShort(); + byte[] varbyteContent = new byte[varbyteLength]; + int numOfBytesRead = in.read(varbyteContent); + if (varbyteContent.length != 0 && numOfBytesRead != varbyteLength) { + throw new EOFException( + format("Fail to read the varbyte. Expect %d bytes, get %d bytes", varbyteLength, numOfBytesRead)); + } + return varbyteContent; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/eb842b75/serde/src/java/org/apache/hadoop/hive/serde2/teradata/TeradataBinaryDataOutputStream.java ---------------------------------------------------------------------- diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/teradata/TeradataBinaryDataOutputStream.java b/serde/src/java/org/apache/hadoop/hive/serde2/teradata/TeradataBinaryDataOutputStream.java new file mode 100644 index 0000000..f2f801d --- /dev/null +++ b/serde/src/java/org/apache/hadoop/hive/serde2/teradata/TeradataBinaryDataOutputStream.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.teradata; + +import org.apache.commons.io.EndianUtils; +import org.apache.commons.io.output.ByteArrayOutputStream; +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.serde2.io.DateWritableV2; +import org.apache.hadoop.hive.serde2.io.HiveCharWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable; +import org.apache.hadoop.hive.serde2.io.TimestampWritableV2; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; + +import java.io.IOException; +import java.math.BigInteger; +import java.util.Arrays; +import java.util.Collections; + +import static java.lang.String.join; +import static java.lang.String.format; + + +/** + * The TeradataBinaryDataOutputStream is used to produce the output in compliance with the Teradata binary format, + * so the output can be directly used to load into Teradata DB using TPT fastload. + * Since the TD binary format uses little-endian to handle the SHORT, INT, LONG, DOUBLE and etc. + * while the Hadoop uses big-endian, + * We extend SwappedDataInputStream to return qualified bytes for these types and extend to handle the Teradata + * specific types like VARCHAR, CHAR, TIMESTAMP, DATE... + */ +public class TeradataBinaryDataOutputStream extends ByteArrayOutputStream { + + private static final Log LOG = LogFactory.getLog(TeradataBinaryDataOutputStream.class); + + private static final int TIMESTAMP_NO_NANOS_BYTE_NUM = 19; + + public TeradataBinaryDataOutputStream() { + } + + /** + * Write VARCHAR(N). + * The representation of Varchar in Teradata binary format is: + * the first two bytes represent the length N of this varchar field, + * the next N bytes represent the content of this varchar field. + * To pad the null varchar, the length will be 0 and the content will be none. + * + * @param writable the writable + * @throws IOException the io exception + */ + public void writeVarChar(HiveVarcharWritable writable) throws IOException { + if (writable == null) { + EndianUtils.writeSwappedShort(this, (short) 0); + return; + } + Text t = writable.getTextValue(); + int varcharLength = t.getLength(); + EndianUtils.writeSwappedShort(this, (short) varcharLength); // write the varchar length + write(t.getBytes(), 0, varcharLength); // write the varchar content + } + + /** + * Write INT. + * using little-endian to write integer. + * + * @param i the + * @throws IOException the io exception + */ + public void writeInt(int i) throws IOException { + EndianUtils.writeSwappedInteger(this, i); + } + + /** + * Write TIMESTAMP(N). + * The representation of timestamp in Teradata binary format is: + * the byte number to read is based on the precision of timestamp, + * each byte represents one char and the timestamp is using string representation, + * eg: for 1911-11-11 19:20:21.433200 in TIMESTAMP(3), we will cut it to be 1911-11-11 19:20:21.433 and write + * 31 39 31 31 2d 31 31 2d 31 31 20 31 39 3a 32 30 3a 32 31 2e 34 33 33. + * the null timestamp will use space to pad. + * + * @param timestamp the timestamp + * @param byteNum the byte number the timestamp will write + * @throws IOException the io exception + */ + public void writeTimestamp(TimestampWritableV2 timestamp, int byteNum) throws IOException { + if (timestamp == null) { + String pad = join("", Collections.nCopies(byteNum, " ")); + write(pad.getBytes("UTF8")); + return; + } + String sTimeStamp = timestamp.getTimestamp().toString(); + if (sTimeStamp.length() >= byteNum) { + write(sTimeStamp.substring(0, byteNum).getBytes("UTF8")); + return; + } + write(sTimeStamp.getBytes("UTF8")); + String pad; + if (sTimeStamp.length() == TIMESTAMP_NO_NANOS_BYTE_NUM) { + pad = "." + join("", Collections.nCopies(byteNum - sTimeStamp.length() - 1, "0")); + } else { + pad = join("", Collections.nCopies(byteNum - sTimeStamp.length(), "0")); + } + write(pad.getBytes("UTF8")); + } + + /** + * Write DOUBLE. + * using little-endian to write double. + * + * @param d the d + * @throws IOException the io exception + */ + public void writeDouble(double d) throws IOException { + EndianUtils.writeSwappedDouble(this, d); + } + + /** + * Write DATE. + * The representation of date in Teradata binary format is: + * The Date D is a int with 4 bytes using little endian. + * The representation is (YYYYMMDD - 19000000).toInt -> D + * eg. 1911.11.11 -> 19111111 -> 111111 -> 07 b2 01 00 in little endian. + * the null date will use 0 to pad. + * + * @param date the date + * @throws IOException the io exception + */ + public void writeDate(DateWritableV2 date) throws IOException { + if (date == null) { + EndianUtils.writeSwappedInteger(this, 0); + return; + } + int toWrite = date.get().getYear() * 10000 + date.get().getMonth() * 100 + date.get().getDay() - 19000000; + EndianUtils.writeSwappedInteger(this, toWrite); + } + + /** + * Write LONG. + * using little-endian to write double. + * + * @param l the l + * @throws IOException the io exception + */ + public void writeLong(long l) throws IOException { + EndianUtils.writeSwappedLong(this, l); + } + + /** + * Write CHAR(N). + * The representation of char in Teradata binary format is: + * the byte number to read is based on the [charLength] * [bytePerChar] <- totalLength, + * bytePerChar is decided by the charset: LATAIN charset is 2 bytes per char and UNICODE charset is 3 bytes per char. + * the null char will use space to pad. + * + * @param writable the writable + * @param length the byte n + * @throws IOException the io exception + */ + public void writeChar(HiveCharWritable writable, int length) throws IOException { + if (writable == null) { + String pad = join("", Collections.nCopies(length, " ")); + write(pad.getBytes("UTF8")); + return; + } + Text t = writable.getStrippedValue(); + int contentLength = t.getLength(); + write(t.getBytes(), 0, contentLength); + if (length - contentLength < 0) { + throw new IOException(format("The byte num %s of HiveCharWritable is more than the byte num %s we can hold. " + + "The content of HiveCharWritable is %s", contentLength, length, writable.getPaddedValue())); + } + if (length > contentLength) { + String pad = join("", Collections.nCopies(length - contentLength, " ")); + write(pad.getBytes("UTF8")); + } + } + + /** + * Write DECIMAL(P, S). + * The representation of decimal in Teradata binary format is: + * the byte number to read is decided solely by the precision(P), + * HiveDecimal is constructed through the byte array and scale. + * the rest of byte will use 0x00 to pad (positive) and use 0xFF to pad (negative). + * the null DECIMAL will use 0x00 to pad. + * + * @param writable the writable + * @param byteNum the byte num + * @throws IOException the io exception + */ + public void writeDecimal(HiveDecimalWritable writable, int byteNum, int scale) throws IOException { + if (writable == null) { + byte[] pad = new byte[byteNum]; + write(pad); + return; + } + // since the HiveDecimal will auto adjust the scale to save resource + // we need to adjust it back otherwise the output bytes will be wrong + int hiveScale = writable.getHiveDecimal().scale(); + BigInteger bigInteger = writable.getHiveDecimal().unscaledValue(); + if (hiveScale < scale) { + BigInteger multiplicand = new BigInteger("1" + join("", Collections.nCopies(scale - hiveScale, "0"))); + bigInteger = bigInteger.multiply(multiplicand); + } + byte[] content = bigInteger.toByteArray(); + int signBit = content[0] >> 7 & 1; + ArrayUtils.reverse(content); + write(content); + if (byteNum > content.length) { + byte[] pad; + if (signBit == 0) { + pad = new byte[byteNum - content.length]; + } else { + pad = new byte[byteNum - content.length]; + Arrays.fill(pad, (byte) 255); + } + write(pad); + } + } + + /** + * Write SHORT. + * using little-endian to write short. + * + * @param s the s + * @throws IOException the io exception + */ + public void writeShort(short s) throws IOException { + EndianUtils.writeSwappedShort(this, s); + } + + /** + * Write VARBYTE(N). + * The representation of VARBYTE in Teradata binary format is: + * the first two bytes represent the length N of this varchar field, + * the next N bytes represent the content of this varchar field. + * To pad the null varbyte, the length will be 0 and the content will be none. + * + * @param writable the writable + * @throws IOException the io exception + */ + public void writeVarByte(BytesWritable writable) throws IOException { + if (writable == null) { + EndianUtils.writeSwappedShort(this, (short) 0); + return; + } + int varbyteLength = writable.getLength(); + EndianUtils.writeSwappedShort(this, (short) varbyteLength); // write the varbyte length + write(writable.getBytes(), 0, varbyteLength); // write the varchar content + } +}