http://git-wip-us.apache.org/repos/asf/hive/blob/d467e172/serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleSerializeWrite.java ---------------------------------------------------------------------- diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleSerializeWrite.java b/serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleSerializeWrite.java index 1401ac3..ef77daf 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleSerializeWrite.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleSerializeWrite.java @@ -22,6 +22,10 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.sql.Date; import java.sql.Timestamp; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.List; +import java.util.Map; import org.apache.commons.codec.binary.Base64; import org.slf4j.Logger; @@ -48,7 +52,6 @@ import org.apache.hadoop.hive.serde2.lazy.LazyTimestamp; import org.apache.hadoop.hive.serde2.lazy.LazyUtils; import org.apache.hadoop.hive.serde2.fast.SerializeWrite; import org.apache.hadoop.io.Text; -import org.apache.hive.common.util.DateUtils; /* * Directly serialize, field-by-field, the LazyBinary format. @@ -60,7 +63,7 @@ public final class LazySimpleSerializeWrite implements SerializeWrite { private LazySerDeParameters lazyParams; - private byte separator; + private byte[] separators; private boolean[] needsEscape; private boolean isEscaped; private byte escapeChar; @@ -70,6 +73,8 @@ public final class LazySimpleSerializeWrite implements SerializeWrite { private int fieldCount; private int index; + private int currentLevel; + private Deque<Integer> indexStack = new ArrayDeque<Integer>(); // For thread safety, we allocate private writable objects for our use only. private DateWritable dateWritable; @@ -80,14 +85,14 @@ public final class LazySimpleSerializeWrite implements SerializeWrite { private byte[] decimalScratchBuffer; public LazySimpleSerializeWrite(int fieldCount, - byte separator, LazySerDeParameters lazyParams) { + LazySerDeParameters lazyParams) { this(); this.fieldCount = fieldCount; - - this.separator = separator; + this.lazyParams = lazyParams; + separators = lazyParams.getSeparators(); isEscaped = lazyParams.isEscaped(); escapeChar = lazyParams.getEscapeChar(); needsEscape = lazyParams.getNeedsEscape(); @@ -106,6 +111,7 @@ public final class LazySimpleSerializeWrite implements SerializeWrite { this.output = output; output.reset(); index = 0; + currentLevel = 0; } /* @@ -115,6 +121,7 @@ public final class LazySimpleSerializeWrite implements SerializeWrite { public void setAppend(Output output) { this.output = output; index = 0; + currentLevel = 0; } /* @@ -124,35 +131,19 @@ public final class LazySimpleSerializeWrite implements SerializeWrite { public void reset() { output.reset(); index = 0; + currentLevel = 0; } /* - * General Pattern: - * - * if (index > 0) { - * output.write(separator); - * } - * - * WHEN NOT NULL: Write value. - * OTHERWISE NULL: Write nullSequenceBytes. - * - * Increment index - * - */ - - /* * Write a NULL field. */ @Override public void writeNull() throws IOException { - - if (index > 0) { - output.write(separator); - } + beginPrimitive(); output.write(nullSequenceBytes); - index++; + finishPrimitive(); } /* @@ -160,18 +151,13 @@ public final class LazySimpleSerializeWrite implements SerializeWrite { */ @Override public void writeBoolean(boolean v) throws IOException { - - if (index > 0) { - output.write(separator); - } - + beginPrimitive(); if (v) { output.write(LazyUtils.trueBytes, 0, LazyUtils.trueBytes.length); } else { output.write(LazyUtils.falseBytes, 0, LazyUtils.falseBytes.length); } - - index++; + finishPrimitive(); } /* @@ -179,14 +165,9 @@ public final class LazySimpleSerializeWrite implements SerializeWrite { */ @Override public void writeByte(byte v) throws IOException { - - if (index > 0) { - output.write(separator); - } - + beginPrimitive(); LazyInteger.writeUTF8(output, v); - - index++; + finishPrimitive(); } /* @@ -194,14 +175,9 @@ public final class LazySimpleSerializeWrite implements SerializeWrite { */ @Override public void writeShort(short v) throws IOException { - - if (index > 0) { - output.write(separator); - } - + beginPrimitive(); LazyInteger.writeUTF8(output, v); - - index++; + finishPrimitive(); } /* @@ -209,14 +185,9 @@ public final class LazySimpleSerializeWrite implements SerializeWrite { */ @Override public void writeInt(int v) throws IOException { - - if (index > 0) { - output.write(separator); - } - + beginPrimitive(); LazyInteger.writeUTF8(output, v); - - index++; + finishPrimitive(); } /* @@ -224,14 +195,9 @@ public final class LazySimpleSerializeWrite implements SerializeWrite { */ @Override public void writeLong(long v) throws IOException { - - if (index > 0) { - output.write(separator); - } - + beginPrimitive(); LazyLong.writeUTF8(output, v); - - index++; + finishPrimitive(); } /* @@ -239,15 +205,10 @@ public final class LazySimpleSerializeWrite implements SerializeWrite { */ @Override public void writeFloat(float vf) throws IOException { - - if (index > 0) { - output.write(separator); - } - + beginPrimitive(); ByteBuffer b = Text.encode(String.valueOf(vf)); output.write(b.array(), 0, b.limit()); - - index++; + finishPrimitive(); } /* @@ -255,15 +216,10 @@ public final class LazySimpleSerializeWrite implements SerializeWrite { */ @Override public void writeDouble(double v) throws IOException { - - if (index > 0) { - output.write(separator); - } - + beginPrimitive(); ByteBuffer b = Text.encode(String.valueOf(v)); output.write(b.array(), 0, b.limit()); - - index++; + finishPrimitive(); } /* @@ -274,28 +230,20 @@ public final class LazySimpleSerializeWrite implements SerializeWrite { */ @Override public void writeString(byte[] v) throws IOException { - - if (index > 0) { - output.write(separator); + beginPrimitive(); + if (v.equals(nullSequenceBytes)) { } - LazyUtils.writeEscaped(output, v, 0, v.length, isEscaped, escapeChar, needsEscape); - - index++; + finishPrimitive(); } @Override public void writeString(byte[] v, int start, int length) throws IOException { - - if (index > 0) { - output.write(separator); - } - + beginPrimitive(); LazyUtils.writeEscaped(output, v, start, length, isEscaped, escapeChar, needsEscape); - - index++; + finishPrimitive(); } /* @@ -303,16 +251,11 @@ public final class LazySimpleSerializeWrite implements SerializeWrite { */ @Override public void writeHiveChar(HiveChar hiveChar) throws IOException { - - if (index > 0) { - output.write(separator); - } - + beginPrimitive(); ByteBuffer b = Text.encode(hiveChar.getPaddedValue()); LazyUtils.writeEscaped(output, b.array(), 0, b.limit(), isEscaped, escapeChar, needsEscape); - - index++; + finishPrimitive(); } /* @@ -320,16 +263,11 @@ public final class LazySimpleSerializeWrite implements SerializeWrite { */ @Override public void writeHiveVarchar(HiveVarchar hiveVarchar) throws IOException { - - if (index > 0) { - output.write(separator); - } - + beginPrimitive(); ByteBuffer b = Text.encode(hiveVarchar.getValue()); LazyUtils.writeEscaped(output, b.array(), 0, b.limit(), isEscaped, escapeChar, needsEscape); - - index++; + finishPrimitive(); } /* @@ -337,32 +275,22 @@ public final class LazySimpleSerializeWrite implements SerializeWrite { */ @Override public void writeBinary(byte[] v) throws IOException { - - if (index > 0) { - output.write(separator); - } - + beginPrimitive(); byte[] toEncode = new byte[v.length]; System.arraycopy(v, 0, toEncode, 0, v.length); byte[] toWrite = Base64.encodeBase64(toEncode); output.write(toWrite, 0, toWrite.length); - - index++; + finishPrimitive(); } @Override public void writeBinary(byte[] v, int start, int length) throws IOException { - - if (index > 0) { - output.write(separator); - } - + beginPrimitive(); byte[] toEncode = new byte[length]; System.arraycopy(v, start, toEncode, 0, length); byte[] toWrite = Base64.encodeBase64(toEncode); output.write(toWrite, 0, toWrite.length); - - index++; + finishPrimitive(); } /* @@ -370,35 +298,25 @@ public final class LazySimpleSerializeWrite implements SerializeWrite { */ @Override public void writeDate(Date date) throws IOException { - - if (index > 0) { - output.write(separator); - } - + beginPrimitive(); if (dateWritable == null) { dateWritable = new DateWritable(); } dateWritable.set(date); LazyDate.writeUTF8(output, dateWritable); - - index++; + finishPrimitive(); } // We provide a faster way to write a date without a Date object. @Override public void writeDate(int dateAsDays) throws IOException { - - if (index > 0) { - output.write(separator); - } - + beginPrimitive(); if (dateWritable == null) { dateWritable = new DateWritable(); } dateWritable.set(dateAsDays); LazyDate.writeUTF8(output, dateWritable); - - index++; + finishPrimitive(); } /* @@ -406,18 +324,13 @@ public final class LazySimpleSerializeWrite implements SerializeWrite { */ @Override public void writeTimestamp(Timestamp v) throws IOException { - - if (index > 0) { - output.write(separator); - } - + beginPrimitive(); if (timestampWritable == null) { timestampWritable = new TimestampWritable(); } timestampWritable.set(v); LazyTimestamp.writeUTF8(output, timestampWritable); - - index++; + finishPrimitive(); } /* @@ -425,35 +338,25 @@ public final class LazySimpleSerializeWrite implements SerializeWrite { */ @Override public void writeHiveIntervalYearMonth(HiveIntervalYearMonth viyt) throws IOException { - - if (index > 0) { - output.write(separator); - } - + beginPrimitive(); if (hiveIntervalYearMonthWritable == null) { hiveIntervalYearMonthWritable = new HiveIntervalYearMonthWritable(); } hiveIntervalYearMonthWritable.set(viyt); LazyHiveIntervalYearMonth.writeUTF8(output, hiveIntervalYearMonthWritable); - - index++; + finishPrimitive(); } @Override public void writeHiveIntervalYearMonth(int totalMonths) throws IOException { - - if (index > 0) { - output.write(separator); - } - + beginPrimitive(); if (hiveIntervalYearMonthWritable == null) { hiveIntervalYearMonthWritable = new HiveIntervalYearMonthWritable(); } hiveIntervalYearMonthWritable.set(totalMonths); LazyHiveIntervalYearMonth.writeUTF8(output, hiveIntervalYearMonthWritable); - - index++; + finishPrimitive(); } /* @@ -461,18 +364,13 @@ public final class LazySimpleSerializeWrite implements SerializeWrite { */ @Override public void writeHiveIntervalDayTime(HiveIntervalDayTime vidt) throws IOException { - - if (index > 0) { - output.write(separator); - } - + beginPrimitive(); if (hiveIntervalDayTimeWritable == null) { hiveIntervalDayTimeWritable = new HiveIntervalDayTimeWritable(); } hiveIntervalDayTimeWritable.set(vidt); LazyHiveIntervalDayTime.writeUTF8(output, hiveIntervalDayTimeWritable); - - index++; + finishPrimitive(); } /* @@ -483,29 +381,119 @@ public final class LazySimpleSerializeWrite implements SerializeWrite { */ @Override public void writeHiveDecimal(HiveDecimal dec, int scale) throws IOException { - if (index > 0) { - output.write(separator); - } - + beginPrimitive(); if (decimalScratchBuffer == null) { decimalScratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES]; } LazyHiveDecimal.writeUTF8(output, dec, scale, decimalScratchBuffer); - - index++; + finishPrimitive(); } @Override public void writeHiveDecimal(HiveDecimalWritable decWritable, int scale) throws IOException { - if (index > 0) { - output.write(separator); - } - + beginPrimitive(); if (decimalScratchBuffer == null) { decimalScratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES]; } LazyHiveDecimal.writeUTF8(output, decWritable, scale, decimalScratchBuffer); + finishPrimitive(); + } + + private void beginComplex() { + if (index > 0) { + output.write(separators[currentLevel]); + } + indexStack.push(index); + + // Always use index 0 so the write methods don't write a separator. + index = 0; + + // Set "global" separator member to next level. + currentLevel++; + } + + private void finishComplex() { + currentLevel--; + index = indexStack.pop(); + index++; + } + + @Override + public void beginList(List list) { + beginComplex(); + } + + @Override + public void separateList() { + } + + @Override + public void finishList() { + finishComplex(); + } + + @Override + public void beginMap(Map<?, ?> map) { + beginComplex(); + + // MAP requires 2 levels: key separator and key-pair separator. + currentLevel++; + } + + @Override + public void separateKey() { + index = 0; + output.write(separators[currentLevel]); + } + + @Override + public void separateKeyValuePair() { + index = 0; + output.write(separators[currentLevel - 1]); + } + + @Override + public void finishMap() { + // Remove MAP extra level. + currentLevel--; + + finishComplex(); + } + + @Override + public void beginStruct(List fieldValues) { + beginComplex(); + } + + @Override + public void separateStruct() { + } + + @Override + public void finishStruct() { + finishComplex(); + } + + @Override + public void beginUnion(int tag) throws IOException { + beginComplex(); + writeInt(tag); + output.write(separators[currentLevel]); + index = 0; + } + + @Override + public void finishUnion() { + finishComplex(); + } + + private void beginPrimitive() { + if (index > 0) { + output.write(separators[currentLevel]); + } + } + private void finishPrimitive() { index++; } }
http://git-wip-us.apache.org/repos/asf/hive/blob/d467e172/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinaryDeserializeRead.java ---------------------------------------------------------------------- diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinaryDeserializeRead.java b/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinaryDeserializeRead.java index e94ae99..8e0a499 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinaryDeserializeRead.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinaryDeserializeRead.java @@ -20,19 +20,28 @@ package org.apache.hadoop.hive.serde2.lazybinary.fast; import java.io.EOFException; import java.io.IOException; +import java.util.ArrayDeque; import java.util.Arrays; +import java.util.Deque; +import java.util.List; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.serde2.fast.DeserializeRead; import org.apache.hadoop.hive.serde2.io.TimestampWritable; -import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils; import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils.VInt; import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils.VLong; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; import org.apache.hadoop.io.WritableUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /* * Directly deserialize with the caller reading field-by-field the LazyBinary serialization format. @@ -55,26 +64,84 @@ public final class LazyBinaryDeserializeRead extends DeserializeRead { private int start; private int offset; private int end; - private int fieldCount; - private int fieldStart; - private int fieldIndex; - private byte nullByte; + + private boolean skipLengthPrefix = false; // Object to receive results of reading a decoded variable length int or long. private VInt tempVInt; private VLong tempVLong; + private Deque<Field> stack = new ArrayDeque<>(); + private Field root; + + private class Field { + Field[] children; + + Category category; + PrimitiveCategory primitiveCategory; + TypeInfo typeInfo; + + int index; + int count; + int start; + int end; + int nullByteStart; + byte nullByte; + byte tag; + } + public LazyBinaryDeserializeRead(TypeInfo[] typeInfos, boolean useExternalBuffer) { super(typeInfos, useExternalBuffer); - fieldCount = typeInfos.length; tempVInt = new VInt(); tempVLong = new VLong(); currentExternalBufferNeeded = false; + + root = new Field(); + root.category = Category.STRUCT; + root.children = createFields(typeInfos); + root.count = typeInfos.length; } - // Not public since we must have the field count so every 8 fields NULL bytes can be navigated. - private LazyBinaryDeserializeRead() { - super(); + private Field[] createFields(TypeInfo[] typeInfos) { + final Field[] children = new Field[typeInfos.length]; + for (int i = 0; i < typeInfos.length; i++) { + children[i] = createField(typeInfos[i]); + } + return children; + } + + private Field createField(TypeInfo typeInfo) { + final Field field = new Field(); + final Category category = typeInfo.getCategory(); + field.category = category; + field.typeInfo = typeInfo; + switch (category) { + case PRIMITIVE: + field.primitiveCategory = ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory(); + break; + case LIST: + field.children = new Field[1]; + field.children[0] = createField(((ListTypeInfo) typeInfo).getListElementTypeInfo()); + break; + case MAP: + field.children = new Field[2]; + field.children[0] = createField(((MapTypeInfo) typeInfo).getMapKeyTypeInfo()); + field.children[1] = createField(((MapTypeInfo) typeInfo).getMapValueTypeInfo()); + break; + case STRUCT: + final StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo; + final List<TypeInfo> fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos(); + field.children = createFields(fieldTypeInfos.toArray(new TypeInfo[fieldTypeInfos.size()])); + break; + case UNION: + final UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo; + final List<TypeInfo> objectTypeInfos = unionTypeInfo.getAllUnionObjectTypeInfos(); + field.children = createFields(objectTypeInfos.toArray(new TypeInfo[objectTypeInfos.size()])); + break; + default: + throw new RuntimeException(); + } + return field; } /* @@ -86,7 +153,20 @@ public final class LazyBinaryDeserializeRead extends DeserializeRead { this.offset = offset; start = offset; end = offset + length; - fieldIndex = 0; + + stack.clear(); + stack.push(root); + clearIndex(root); + } + + private void clearIndex(Field field) { + field.index = 0; + if (field.children == null) { + return; + } + for (Field child : field.children) { + clearIndex(child); + } } /* @@ -102,13 +182,13 @@ public final class LazyBinaryDeserializeRead extends DeserializeRead { sb.append(" for length "); sb.append(end - start); sb.append(" to read "); - sb.append(fieldCount); + sb.append(root.children.length); sb.append(" fields with types "); sb.append(Arrays.toString(typeInfos)); sb.append(". Read field #"); - sb.append(fieldIndex); + sb.append(root.index); sb.append(" at field start position "); - sb.append(fieldStart); + sb.append(root.start); sb.append(" current read offset "); sb.append(offset); @@ -127,263 +207,196 @@ public final class LazyBinaryDeserializeRead extends DeserializeRead { */ @Override public boolean readNextField() throws IOException { - if (fieldIndex >= fieldCount) { - return false; - } - - fieldStart = offset; + return readComplexField(); + } - if (fieldIndex == 0) { - // The rest of the range check for fields after the first is below after checking - // the NULL byte. - if (offset >= end) { + private boolean readPrimitive(Field field) throws IOException { + final PrimitiveCategory primitiveCategory = field.primitiveCategory; + final TypeInfo typeInfo = field.typeInfo; + switch (primitiveCategory) { + case BOOLEAN: + // No check needed for single byte read. + currentBoolean = (bytes[offset++] != 0); + break; + case BYTE: + // No check needed for single byte read. + currentByte = bytes[offset++]; + break; + case SHORT: + // Last item -- ok to be at end. + if (offset + 2 > end) { throw new EOFException(); } - nullByte = bytes[offset++]; - } - - // NOTE: The bit is set to 1 if a field is NOT NULL. boolean isNull; - if ((nullByte & (1 << (fieldIndex % 8))) == 0) { - - // Logically move past this field. - fieldIndex++; - - // Every 8 fields we read a new NULL byte. - if (fieldIndex < fieldCount) { - if ((fieldIndex % 8) == 0) { - // Get next null byte. - if (offset >= end) { - throw new EOFException(); - } - nullByte = bytes[offset++]; - } + currentShort = LazyBinaryUtils.byteArrayToShort(bytes, offset); + offset += 2; + break; + case INT: + // Parse the first byte of a vint/vlong to determine the number of bytes. + if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) { + throw new EOFException(); } - return false; - } else { - - // Make sure there is at least one byte that can be read for a value. - if (offset >= end) { + LazyBinaryUtils.readVInt(bytes, offset, tempVInt); + offset += tempVInt.length; + currentInt = tempVInt.value; + break; + case LONG: + // Parse the first byte of a vint/vlong to determine the number of bytes. + if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) { throw new EOFException(); } - - /* - * We have a field and are positioned to it. Read it. - */ - switch (primitiveCategories[fieldIndex]) { - case BOOLEAN: - // No check needed for single byte read. - currentBoolean = (bytes[offset++] != 0); - break; - case BYTE: - // No check needed for single byte read. - currentByte = bytes[offset++]; - break; - case SHORT: - // Last item -- ok to be at end. - if (offset + 2 > end) { - throw new EOFException(); - } - currentShort = LazyBinaryUtils.byteArrayToShort(bytes, offset); - offset += 2; - break; - case INT: + LazyBinaryUtils.readVLong(bytes, offset, tempVLong); + offset += tempVLong.length; + currentLong = tempVLong.value; + break; + case FLOAT: + // Last item -- ok to be at end. + if (offset + 4 > end) { + throw new EOFException(); + } + currentFloat = Float.intBitsToFloat(LazyBinaryUtils.byteArrayToInt(bytes, offset)); + offset += 4; + break; + case DOUBLE: + // Last item -- ok to be at end. + if (offset + 8 > end) { + throw new EOFException(); + } + currentDouble = Double.longBitsToDouble(LazyBinaryUtils.byteArrayToLong(bytes, offset)); + offset += 8; + break; + + case BINARY: + case STRING: + case CHAR: + case VARCHAR: + { + // using vint instead of 4 bytes // Parse the first byte of a vint/vlong to determine the number of bytes. if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) { throw new EOFException(); } LazyBinaryUtils.readVInt(bytes, offset, tempVInt); offset += tempVInt.length; - currentInt = tempVInt.value; - break; - case LONG: - // Parse the first byte of a vint/vlong to determine the number of bytes. - if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) { - throw new EOFException(); - } - LazyBinaryUtils.readVLong(bytes, offset, tempVLong); - offset += tempVLong.length; - currentLong = tempVLong.value; - break; - case FLOAT: + + int saveStart = offset; + int length = tempVInt.value; + offset += length; // Last item -- ok to be at end. - if (offset + 4 > end) { + if (offset > end) { throw new EOFException(); } - currentFloat = Float.intBitsToFloat(LazyBinaryUtils.byteArrayToInt(bytes, offset)); - offset += 4; - break; - case DOUBLE: + + currentBytes = bytes; + currentBytesStart = saveStart; + currentBytesLength = length; + } + break; + case DATE: + // Parse the first byte of a vint/vlong to determine the number of bytes. + if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) { + throw new EOFException(); + } + LazyBinaryUtils.readVInt(bytes, offset, tempVInt); + offset += tempVInt.length; + + currentDateWritable.set(tempVInt.value); + break; + case TIMESTAMP: + { + int length = TimestampWritable.getTotalLength(bytes, offset); + int saveStart = offset; + offset += length; // Last item -- ok to be at end. - if (offset + 8 > end) { + if (offset > end) { throw new EOFException(); } - currentDouble = Double.longBitsToDouble(LazyBinaryUtils.byteArrayToLong(bytes, offset)); - offset += 8; - break; - - case BINARY: - case STRING: - case CHAR: - case VARCHAR: - { - // using vint instead of 4 bytes - // Parse the first byte of a vint/vlong to determine the number of bytes. - if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) { - throw new EOFException(); - } - LazyBinaryUtils.readVInt(bytes, offset, tempVInt); - offset += tempVInt.length; - - int saveStart = offset; - int length = tempVInt.value; - offset += length; - // Last item -- ok to be at end. - if (offset > end) { - throw new EOFException(); - } - - currentBytes = bytes; - currentBytesStart = saveStart; - currentBytesLength = length; - } - break; - case DATE: + + currentTimestampWritable.set(bytes, saveStart); + } + break; + case INTERVAL_YEAR_MONTH: + // Parse the first byte of a vint/vlong to determine the number of bytes. + if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) { + throw new EOFException(); + } + LazyBinaryUtils.readVInt(bytes, offset, tempVInt); + offset += tempVInt.length; + + currentHiveIntervalYearMonthWritable.set(tempVInt.value); + break; + case INTERVAL_DAY_TIME: + // The first bounds check requires at least one more byte beyond for 2nd int (hence >=). + // Parse the first byte of a vint/vlong to determine the number of bytes. + if (offset + WritableUtils.decodeVIntSize(bytes[offset]) >= end) { + throw new EOFException(); + } + LazyBinaryUtils.readVLong(bytes, offset, tempVLong); + offset += tempVLong.length; + + // Parse the first byte of a vint/vlong to determine the number of bytes. + if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) { + throw new EOFException(); + } + LazyBinaryUtils.readVInt(bytes, offset, tempVInt); + offset += tempVInt.length; + + currentHiveIntervalDayTimeWritable.set(tempVLong.value, tempVInt.value); + break; + case DECIMAL: + { + // Since enforcing precision and scale can cause a HiveDecimal to become NULL, + // we must read it, enforce it here, and either return NULL or buffer the result. + + // These calls are to see how much data there is. The setFromBytes call below will do the same + // readVInt reads but actually unpack the decimal. + + // The first bounds check requires at least one more byte beyond for 2nd int (hence >=). // Parse the first byte of a vint/vlong to determine the number of bytes. - if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) { + if (offset + WritableUtils.decodeVIntSize(bytes[offset]) >= end) { throw new EOFException(); } LazyBinaryUtils.readVInt(bytes, offset, tempVInt); offset += tempVInt.length; + int readScale = tempVInt.value; - currentDateWritable.set(tempVInt.value); - break; - case TIMESTAMP: - { - int length = TimestampWritable.getTotalLength(bytes, offset); - int saveStart = offset; - offset += length; - // Last item -- ok to be at end. - if (offset > end) { - throw new EOFException(); - } - - currentTimestampWritable.set(bytes, saveStart); - } - break; - case INTERVAL_YEAR_MONTH: // Parse the first byte of a vint/vlong to determine the number of bytes. if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) { throw new EOFException(); } LazyBinaryUtils.readVInt(bytes, offset, tempVInt); offset += tempVInt.length; - - currentHiveIntervalYearMonthWritable.set(tempVInt.value); - break; - case INTERVAL_DAY_TIME: - // The first bounds check requires at least one more byte beyond for 2nd int (hence >=). - // Parse the first byte of a vint/vlong to determine the number of bytes. - if (offset + WritableUtils.decodeVIntSize(bytes[offset]) >= end) { + int saveStart = offset; + offset += tempVInt.value; + // Last item -- ok to be at end. + if (offset > end) { throw new EOFException(); } - LazyBinaryUtils.readVLong(bytes, offset, tempVLong); - offset += tempVLong.length; + int length = offset - saveStart; - // Parse the first byte of a vint/vlong to determine the number of bytes. - if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) { - throw new EOFException(); - } - LazyBinaryUtils.readVInt(bytes, offset, tempVInt); - offset += tempVInt.length; + // scale = 2, length = 6, value = -6065716379.11 + // \002\006\255\114\197\131\083\105 + // \255\114\197\131\083\105 - currentHiveIntervalDayTimeWritable.set(tempVLong.value, tempVInt.value); - break; - case DECIMAL: - { - // Since enforcing precision and scale can cause a HiveDecimal to become NULL, - // we must read it, enforce it here, and either return NULL or buffer the result. - - // These calls are to see how much data there is. The setFromBytes call below will do the same - // readVInt reads but actually unpack the decimal. - - // The first bounds check requires at least one more byte beyond for 2nd int (hence >=). - // Parse the first byte of a vint/vlong to determine the number of bytes. - if (offset + WritableUtils.decodeVIntSize(bytes[offset]) >= end) { - throw new EOFException(); - } - LazyBinaryUtils.readVInt(bytes, offset, tempVInt); - offset += tempVInt.length; - int readScale = tempVInt.value; - - // Parse the first byte of a vint/vlong to determine the number of bytes. - if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) { - throw new EOFException(); - } - LazyBinaryUtils.readVInt(bytes, offset, tempVInt); - offset += tempVInt.length; - int saveStart = offset; - offset += tempVInt.value; - // Last item -- ok to be at end. - if (offset > end) { - throw new EOFException(); - } - int length = offset - saveStart; - - // scale = 2, length = 6, value = -6065716379.11 - // \002\006\255\114\197\131\083\105 - // \255\114\197\131\083\105 - - currentHiveDecimalWritable.setFromBigIntegerBytesAndScale( - bytes, saveStart, length, readScale); - boolean decimalIsNull = !currentHiveDecimalWritable.isSet(); - if (!decimalIsNull) { - - DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfos[fieldIndex]; - - int precision = decimalTypeInfo.getPrecision(); - int scale = decimalTypeInfo.getScale(); - - decimalIsNull = !currentHiveDecimalWritable.mutateEnforcePrecisionScale(precision, scale); - } - if (decimalIsNull) { - - // Logically move past this field. - fieldIndex++; - - // Every 8 fields we read a new NULL byte. - if (fieldIndex < fieldCount) { - if ((fieldIndex % 8) == 0) { - // Get next null byte. - if (offset >= end) { - throw new EOFException(); - } - nullByte = bytes[offset++]; - } - } - return false; - } - } - break; + currentHiveDecimalWritable.setFromBigIntegerBytesAndScale( + bytes, saveStart, length, readScale); + boolean decimalIsNull = !currentHiveDecimalWritable.isSet(); + if (!decimalIsNull) { - default: - throw new Error("Unexpected primitive category " + primitiveCategories[fieldIndex].name()); - } - } + final DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo; - // Logically move past this field. - fieldIndex++; + final int precision = decimalTypeInfo.getPrecision(); + final int scale = decimalTypeInfo.getScale(); - // Every 8 fields we read a new NULL byte. - if (fieldIndex < fieldCount) { - if ((fieldIndex % 8) == 0) { - // Get next null byte. - if (offset >= end) { - throw new EOFException(); + decimalIsNull = !currentHiveDecimalWritable.mutateEnforcePrecisionScale(precision, scale); + } + if (decimalIsNull) { + return false; } - nullByte = bytes[offset++]; } + break; + default: + throw new Error("Unexpected primitive category " + primitiveCategory.name()); } - return true; } @@ -394,8 +407,37 @@ public final class LazyBinaryDeserializeRead extends DeserializeRead { * Designed for skipping columns that are not included. */ public void skipNextField() throws IOException { - // Not a known use case for LazyBinary -- so don't optimize. - readNextField(); + final Field current = stack.peek(); + final boolean isNull = isNull(current); + + if (isNull) { + current.index++; + return; + } + + if (readUnionTag(current)) { + current.index++; + return; + } + + final Field child = getChild(current); + + if (child.category == Category.PRIMITIVE) { + readPrimitive(child); + current.index++; + } else { + parseHeader(child); + stack.push(child); + + for (int i = 0; i < child.count; i++) { + skipNextField(); + } + finishComplexVariableFieldsType(); + } + + if (offset > end) { + throw new EOFException(); + } } /* @@ -412,4 +454,141 @@ public final class LazyBinaryDeserializeRead extends DeserializeRead { public boolean isEndOfInputReached() { return (offset == end); } + + private boolean isNull(Field field) { + final byte b = (byte) (1 << (field.index % 8)); + switch (field.category) { + case PRIMITIVE: + return false; + case LIST: + case MAP: + final byte nullByte = bytes[field.nullByteStart + (field.index / 8)]; + return (nullByte & b) == 0; + case STRUCT: + if (field.index % 8 == 0) { + field.nullByte = bytes[offset++]; + } + return (field.nullByte & b) == 0; + case UNION: + return false; + default: + throw new RuntimeException(); + } + } + + private void parseHeader(Field field) { + // Init + field.index = 0; + field.start = offset; + + // Read length + if (!skipLengthPrefix) { + final int length = LazyBinaryUtils.byteArrayToInt(bytes, offset); + offset += 4; + field.end = offset + length; + } + + switch (field.category) { + case LIST: + case MAP: + // Read count + LazyBinaryUtils.readVInt(bytes, offset, tempVInt); + if (field.category == Category.LIST) { + field.count = tempVInt.value; + } else { + field.count = tempVInt.value * 2; + } + offset += tempVInt.length; + + // Null byte start + field.nullByteStart = offset; + offset += ((field.count) + 7) / 8; + break; + case STRUCT: + field.count = ((StructTypeInfo) field.typeInfo).getAllStructFieldTypeInfos().size(); + break; + case UNION: + field.count = 2; + break; + } + } + + private Field getChild(Field field) { + switch (field.category) { + case LIST: + return field.children[0]; + case MAP: + return field.children[field.index % 2]; + case STRUCT: + return field.children[field.index]; + case UNION: + return field.children[field.tag]; + default: + throw new RuntimeException(); + } + } + + private boolean readUnionTag(Field field) { + if (field.category == Category.UNION && field.index == 0) { + field.tag = bytes[offset++]; + currentInt = field.tag; + return true; + } else { + return false; + } + } + + // Push or next + @Override + public boolean readComplexField() throws IOException { + final Field current = stack.peek(); + boolean isNull = isNull(current); + + if (isNull) { + current.index++; + return false; + } + + if (readUnionTag(current)) { + current.index++; + return true; + } + + final Field child = getChild(current); + + if (child.category == Category.PRIMITIVE) { + isNull = !readPrimitive(child); + current.index++; + } else { + parseHeader(child); + stack.push(child); + } + + if (offset > end) { + throw new EOFException(); + } + return !isNull; + } + + // Pop (list, map) + @Override + public boolean isNextComplexMultiValue() { + Field current = stack.peek(); + final boolean isNext = current.index < current.count; + if (!isNext) { + stack.pop(); + stack.peek().index++; + } + return isNext; + } + + // Pop (struct, union) + @Override + public void finishComplexVariableFieldsType() { + stack.pop(); + if (stack.peek() == null) { + throw new RuntimeException(); + } + stack.peek().index++; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/d467e172/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinarySerializeWrite.java ---------------------------------------------------------------------- diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinarySerializeWrite.java b/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinarySerializeWrite.java index 085d71c..e50ff5e 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinarySerializeWrite.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinarySerializeWrite.java @@ -21,7 +21,13 @@ package org.apache.hadoop.hive.serde2.lazybinary.fast; import java.io.IOException; import java.sql.Date; import java.sql.Timestamp; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.type.HiveChar; @@ -38,7 +44,11 @@ import org.apache.hadoop.hive.serde2.io.TimestampWritable; import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils; import org.apache.hadoop.hive.serde2.fast.SerializeWrite; -import org.apache.hive.common.util.DateUtils; + +import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category.LIST; +import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category.MAP; +import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category.STRUCT; +import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category.UNION; /* * Directly serialize, field-by-field, the LazyBinary format. @@ -50,10 +60,8 @@ public class LazyBinarySerializeWrite implements SerializeWrite { private Output output; - private int fieldCount; - private int fieldIndex; - private byte nullByte; - private long nullOffset; + private int rootFieldCount; + private boolean skipLengthPrefix = false; // For thread safety, we allocate private writable objects for our use only. private TimestampWritable timestampWritable; @@ -64,10 +72,30 @@ public class LazyBinarySerializeWrite implements SerializeWrite { private long[] scratchLongs; private byte[] scratchBuffer; + private Field root; + private Deque<Field> stack = new ArrayDeque<>(); + private LazyBinarySerDe.BooleanRef warnedOnceNullMapKey; + + private static class Field { + Category type; + + int fieldCount; + int fieldIndex; + int byteSizeStart; + int start; + long nullOffset; + byte nullByte; + + Field(Category type) { + this.type = type; + } + } + public LazyBinarySerializeWrite(int fieldCount) { this(); vLongBytes = new byte[LazyBinaryUtils.VLONG_BYTES_LEN]; - this.fieldCount = fieldCount; + this.rootFieldCount = fieldCount; + resetWithoutOutput(); } // Not public since we must have the field count and other information. @@ -81,9 +109,7 @@ public class LazyBinarySerializeWrite implements SerializeWrite { public void set(Output output) { this.output = output; output.reset(); - fieldIndex = 0; - nullByte = 0; - nullOffset = 0; + resetWithoutOutput(); } /* @@ -92,9 +118,8 @@ public class LazyBinarySerializeWrite implements SerializeWrite { @Override public void setAppend(Output output) { this.output = output; - fieldIndex = 0; - nullByte = 0; - nullOffset = output.getLength(); + resetWithoutOutput(); + root.nullOffset = output.getLength(); } /* @@ -103,57 +128,45 @@ public class LazyBinarySerializeWrite implements SerializeWrite { @Override public void reset() { output.reset(); - fieldIndex = 0; - nullByte = 0; - nullOffset = 0; + resetWithoutOutput(); } - /* - * General Pattern: - * - * // Every 8 fields we write a NULL byte. - * IF ((fieldIndex % 8) == 0), then - * IF (fieldIndex > 0), then - * Write back previous NullByte - * NullByte = 0 - * Remember write position - * Allocate room for next NULL byte. - * - * WHEN NOT NULL: Set bit in NULL byte; Write value. - * OTHERWISE NULL: We do not set a bit in the nullByte when we are writing a null. - * - * Increment fieldIndex - * - * IF (fieldIndex == fieldCount), then - * Write back final NullByte - * - */ + private void resetWithoutOutput() { + root = new Field(STRUCT); + root.fieldCount = rootFieldCount; + stack.clear(); + stack.push(root); + warnedOnceNullMapKey = null; + } /* * Write a NULL field. */ @Override public void writeNull() throws IOException { - - // Every 8 fields we write a NULL byte. - if ((fieldIndex % 8) == 0) { - if (fieldIndex > 0) { - // Write back previous 8 field's NULL byte. - output.writeByte(nullOffset, nullByte); - nullByte = 0; - nullOffset = output.getLength(); + final Field current = stack.peek(); + + if (current.type == STRUCT) { + // Every 8 fields we write a NULL byte. + if ((current.fieldIndex % 8) == 0) { + if (current.fieldIndex > 0) { + // Write back previous 8 field's NULL byte. + output.writeByte(current.nullOffset, current.nullByte); + current.nullByte = 0; + current.nullOffset = output.getLength(); + } + // Allocate next NULL byte. + output.reserve(1); } - // Allocate next NULL byte. - output.reserve(1); - } - // We DO NOT set a bit in the NULL byte when we are writing a NULL. + // We DO NOT set a bit in the NULL byte when we are writing a NULL. - fieldIndex++; + current.fieldIndex++; - if (fieldIndex == fieldCount) { - // Write back the final NULL byte before the last fields. - output.writeByte(nullOffset, nullByte); + if (current.fieldIndex == current.fieldCount) { + // Write back the final NULL byte before the last fields. + output.writeByte(current.nullOffset, current.nullByte); + } } } @@ -162,30 +175,9 @@ public class LazyBinarySerializeWrite implements SerializeWrite { */ @Override public void writeBoolean(boolean v) throws IOException { - - // Every 8 fields we write a NULL byte. - if ((fieldIndex % 8) == 0) { - if (fieldIndex > 0) { - // Write back previous 8 field's NULL byte. - output.writeByte(nullOffset, nullByte); - nullByte = 0; - nullOffset = output.getLength(); - } - // Allocate next NULL byte. - output.reserve(1); - } - - // Set bit in NULL byte when a field is NOT NULL. - nullByte |= 1 << (fieldIndex % 8); - + beginElement(); output.write((byte) (v ? 1 : 0)); - - fieldIndex++; - - if (fieldIndex == fieldCount) { - // Write back the final NULL byte before the last fields. - output.writeByte(nullOffset, nullByte); - } + finishElement(); } /* @@ -193,30 +185,9 @@ public class LazyBinarySerializeWrite implements SerializeWrite { */ @Override public void writeByte(byte v) throws IOException { - - // Every 8 fields we write a NULL byte. - if ((fieldIndex % 8) == 0) { - if (fieldIndex > 0) { - // Write back previous 8 field's NULL byte. - output.writeByte(nullOffset, nullByte); - nullByte = 0; - nullOffset = output.getLength(); - } - // Allocate next NULL byte. - output.reserve(1); - } - - // Set bit in NULL byte when a field is NOT NULL. - nullByte |= 1 << (fieldIndex % 8); - + beginElement(); output.write(v); - - fieldIndex++; - - if (fieldIndex == fieldCount) { - // Write back the final NULL byte before the last fields. - output.writeByte(nullOffset, nullByte); - } + finishElement(); } /* @@ -224,31 +195,10 @@ public class LazyBinarySerializeWrite implements SerializeWrite { */ @Override public void writeShort(short v) throws IOException { - - // Every 8 fields we write a NULL byte. - if ((fieldIndex % 8) == 0) { - if (fieldIndex > 0) { - // Write back previous 8 field's NULL byte. - output.writeByte(nullOffset, nullByte); - nullByte = 0; - nullOffset = output.getLength(); - } - // Allocate next NULL byte. - output.reserve(1); - } - - // Set bit in NULL byte when a field is NOT NULL. - nullByte |= 1 << (fieldIndex % 8); - + beginElement(); output.write((byte) (v >> 8)); output.write((byte) (v)); - - fieldIndex++; - - if (fieldIndex == fieldCount) { - // Write back the final NULL byte before the last fields. - output.writeByte(nullOffset, nullByte); - } + finishElement(); } /* @@ -256,30 +206,9 @@ public class LazyBinarySerializeWrite implements SerializeWrite { */ @Override public void writeInt(int v) throws IOException { - - // Every 8 fields we write a NULL byte. - if ((fieldIndex % 8) == 0) { - if (fieldIndex > 0) { - // Write back previous 8 field's NULL byte. - output.writeByte(nullOffset, nullByte); - nullByte = 0; - nullOffset = output.getLength(); - } - // Allocate next NULL byte. - output.reserve(1); - } - - // Set bit in NULL byte when a field is NOT NULL. - nullByte |= 1 << (fieldIndex % 8); - + beginElement(); writeVInt(v); - - fieldIndex++; - - if (fieldIndex == fieldCount) { - // Write back the final NULL byte before the last fields. - output.writeByte(nullOffset, nullByte); - } + finishElement(); } /* @@ -287,30 +216,9 @@ public class LazyBinarySerializeWrite implements SerializeWrite { */ @Override public void writeLong(long v) throws IOException { - - // Every 8 fields we write a NULL byte. - if ((fieldIndex % 8) == 0) { - if (fieldIndex > 0) { - // Write back previous 8 field's NULL byte. - output.writeByte(nullOffset, nullByte); - nullByte = 0; - nullOffset = output.getLength(); - } - // Allocate next NULL byte. - output.reserve(1); - } - - // Set bit in NULL byte when a field is NOT NULL. - nullByte |= 1 << (fieldIndex % 8); - + beginElement(); writeVLong(v); - - fieldIndex++; - - if (fieldIndex == fieldCount) { - // Write back the final NULL byte before the last fields. - output.writeByte(nullOffset, nullByte); - } + finishElement(); } /* @@ -318,34 +226,13 @@ public class LazyBinarySerializeWrite implements SerializeWrite { */ @Override public void writeFloat(float vf) throws IOException { - - // Every 8 fields we write a NULL byte. - if ((fieldIndex % 8) == 0) { - if (fieldIndex > 0) { - // Write back previous 8 field's NULL byte. - output.writeByte(nullOffset, nullByte); - nullByte = 0; - nullOffset = output.getLength(); - } - // Allocate next NULL byte. - output.reserve(1); - } - - // Set bit in NULL byte when a field is NOT NULL. - nullByte |= 1 << (fieldIndex % 8); - + beginElement(); int v = Float.floatToIntBits(vf); output.write((byte) (v >> 24)); output.write((byte) (v >> 16)); output.write((byte) (v >> 8)); output.write((byte) (v)); - - fieldIndex++; - - if (fieldIndex == fieldCount) { - // Write back the final NULL byte before the last fields. - output.writeByte(nullOffset, nullByte); - } + finishElement(); } /* @@ -353,97 +240,32 @@ public class LazyBinarySerializeWrite implements SerializeWrite { */ @Override public void writeDouble(double v) throws IOException { - - // Every 8 fields we write a NULL byte. - if ((fieldIndex % 8) == 0) { - if (fieldIndex > 0) { - // Write back previous 8 field's NULL byte. - output.writeByte(nullOffset, nullByte); - nullByte = 0; - nullOffset = output.getLength(); - } - // Allocate next NULL byte. - output.reserve(1); - } - - // Set bit in NULL byte when a field is NOT NULL. - nullByte |= 1 << (fieldIndex % 8); - + beginElement(); LazyBinaryUtils.writeDouble(output, v); - - fieldIndex++; - - if (fieldIndex == fieldCount) { - // Write back the final NULL byte before the last fields. - output.writeByte(nullOffset, nullByte); - } + finishElement(); } /* * STRING. - * + * * Can be used to write CHAR and VARCHAR when the caller takes responsibility for * truncation/padding issues. */ @Override public void writeString(byte[] v) throws IOException { - - // Every 8 fields we write a NULL byte. - if ((fieldIndex % 8) == 0) { - if (fieldIndex > 0) { - // Write back previous 8 field's NULL byte. - output.writeByte(nullOffset, nullByte); - nullByte = 0; - nullOffset = output.getLength(); - } - // Allocate next NULL byte. - output.reserve(1); - } - - // Set bit in NULL byte when a field is NOT NULL. - nullByte |= 1 << (fieldIndex % 8); - - int length = v.length; + beginElement(); + final int length = v.length; writeVInt(length); - output.write(v, 0, length); - - fieldIndex++; - - if (fieldIndex == fieldCount) { - // Write back the final NULL byte before the last fields. - output.writeByte(nullOffset, nullByte); - } + finishElement(); } @Override public void writeString(byte[] v, int start, int length) throws IOException { - - // Every 8 fields we write a NULL byte. - if ((fieldIndex % 8) == 0) { - if (fieldIndex > 0) { - // Write back previous 8 field's NULL byte. - output.writeByte(nullOffset, nullByte); - nullByte = 0; - nullOffset = output.getLength(); - } - // Allocate next NULL byte. - output.reserve(1); - } - - // Set bit in NULL byte when a field is NOT NULL. - nullByte |= 1 << (fieldIndex % 8); - + beginElement(); writeVInt(length); - output.write(v, start, length); - - fieldIndex++; - - if (fieldIndex == fieldCount) { - // Write back the final NULL byte before the last fields. - output.writeByte(nullOffset, nullByte); - } + finishElement(); } /* @@ -451,8 +273,8 @@ public class LazyBinarySerializeWrite implements SerializeWrite { */ @Override public void writeHiveChar(HiveChar hiveChar) throws IOException { - String string = hiveChar.getStrippedValue(); - byte[] bytes = string.getBytes(); + final String string = hiveChar.getStrippedValue(); + final byte[] bytes = string.getBytes(); writeString(bytes); } @@ -461,8 +283,8 @@ public class LazyBinarySerializeWrite implements SerializeWrite { */ @Override public void writeHiveVarchar(HiveVarchar hiveVarchar) throws IOException { - String string = hiveVarchar.getValue(); - byte[] bytes = string.getBytes(); + final String string = hiveVarchar.getValue(); + final byte[] bytes = string.getBytes(); writeString(bytes); } @@ -484,59 +306,17 @@ public class LazyBinarySerializeWrite implements SerializeWrite { */ @Override public void writeDate(Date date) throws IOException { - - // Every 8 fields we write a NULL byte. - if ((fieldIndex % 8) == 0) { - if (fieldIndex > 0) { - // Write back previous 8 field's NULL byte. - output.writeByte(nullOffset, nullByte); - nullByte = 0; - nullOffset = output.getLength(); - } - // Allocate next NULL byte. - output.reserve(1); - } - - // Set bit in NULL byte when a field is NOT NULL. - nullByte |= 1 << (fieldIndex % 8); - + beginElement(); writeVInt(DateWritable.dateToDays(date)); - - fieldIndex++; - - if (fieldIndex == fieldCount) { - // Write back the final NULL byte before the last fields. - output.writeByte(nullOffset, nullByte); - } + finishElement(); } // We provide a faster way to write a date without a Date object. @Override public void writeDate(int dateAsDays) throws IOException { - - // Every 8 fields we write a NULL byte. - if ((fieldIndex % 8) == 0) { - if (fieldIndex > 0) { - // Write back previous 8 field's NULL byte. - output.writeByte(nullOffset, nullByte); - nullByte = 0; - nullOffset = output.getLength(); - } - // Allocate next NULL byte. - output.reserve(1); - } - - // Set bit in NULL byte when a field is NOT NULL. - nullByte |= 1 << (fieldIndex % 8); - + beginElement(); writeVInt(dateAsDays); - - fieldIndex++; - - if (fieldIndex == fieldCount) { - // Write back the final NULL byte before the last fields. - output.writeByte(nullOffset, nullByte); - } + finishElement(); } /* @@ -544,34 +324,13 @@ public class LazyBinarySerializeWrite implements SerializeWrite { */ @Override public void writeTimestamp(Timestamp v) throws IOException { - - // Every 8 fields we write a NULL byte. - if ((fieldIndex % 8) == 0) { - if (fieldIndex > 0) { - // Write back previous 8 field's NULL byte. - output.writeByte(nullOffset, nullByte); - nullByte = 0; - nullOffset = output.getLength(); - } - // Allocate next NULL byte. - output.reserve(1); - } - - // Set bit in NULL byte when a field is NOT NULL. - nullByte |= 1 << (fieldIndex % 8); - + beginElement(); if (timestampWritable == null) { timestampWritable = new TimestampWritable(); } timestampWritable.set(v); timestampWritable.writeToByteStream(output); - - fieldIndex++; - - if (fieldIndex == fieldCount) { - // Write back the final NULL byte before the last fields. - output.writeByte(nullOffset, nullByte); - } + finishElement(); } /* @@ -579,66 +338,24 @@ public class LazyBinarySerializeWrite implements SerializeWrite { */ @Override public void writeHiveIntervalYearMonth(HiveIntervalYearMonth viyt) throws IOException { - - // Every 8 fields we write a NULL byte. - if ((fieldIndex % 8) == 0) { - if (fieldIndex > 0) { - // Write back previous 8 field's NULL byte. - output.writeByte(nullOffset, nullByte); - nullByte = 0; - nullOffset = output.getLength(); - } - // Allocate next NULL byte. - output.reserve(1); - } - - // Set bit in NULL byte when a field is NOT NULL. - nullByte |= 1 << (fieldIndex % 8); - + beginElement(); if (hiveIntervalYearMonthWritable == null) { hiveIntervalYearMonthWritable = new HiveIntervalYearMonthWritable(); } hiveIntervalYearMonthWritable.set(viyt); hiveIntervalYearMonthWritable.writeToByteStream(output); - - fieldIndex++; - - if (fieldIndex == fieldCount) { - // Write back the final NULL byte before the last fields. - output.writeByte(nullOffset, nullByte); - } + finishElement(); } @Override public void writeHiveIntervalYearMonth(int totalMonths) throws IOException { - - // Every 8 fields we write a NULL byte. - if ((fieldIndex % 8) == 0) { - if (fieldIndex > 0) { - // Write back previous 8 field's NULL byte. - output.writeByte(nullOffset, nullByte); - nullByte = 0; - nullOffset = output.getLength(); - } - // Allocate next NULL byte. - output.reserve(1); - } - - // Set bit in NULL byte when a field is NOT NULL. - nullByte |= 1 << (fieldIndex % 8); - + beginElement(); if (hiveIntervalYearMonthWritable == null) { hiveIntervalYearMonthWritable = new HiveIntervalYearMonthWritable(); } hiveIntervalYearMonthWritable.set(totalMonths); hiveIntervalYearMonthWritable.writeToByteStream(output); - - fieldIndex++; - - if (fieldIndex == fieldCount) { - // Write back the final NULL byte before the last fields. - output.writeByte(nullOffset, nullByte); - } + finishElement(); } /* @@ -646,34 +363,13 @@ public class LazyBinarySerializeWrite implements SerializeWrite { */ @Override public void writeHiveIntervalDayTime(HiveIntervalDayTime vidt) throws IOException { - - // Every 8 fields we write a NULL byte. - if ((fieldIndex % 8) == 0) { - if (fieldIndex > 0) { - // Write back previous 8 field's NULL byte. - output.writeByte(nullOffset, nullByte); - nullByte = 0; - nullOffset = output.getLength(); - } - // Allocate next NULL byte. - output.reserve(1); - } - - // Set bit in NULL byte when a field is NOT NULL. - nullByte |= 1 << (fieldIndex % 8); - + beginElement(); if (hiveIntervalDayTimeWritable == null) { hiveIntervalDayTimeWritable = new HiveIntervalDayTimeWritable(); } hiveIntervalDayTimeWritable.set(vidt); hiveIntervalDayTimeWritable.writeToByteStream(output); - - fieldIndex++; - - if (fieldIndex == fieldCount) { - // Write back the final NULL byte before the last fields. - output.writeByte(nullOffset, nullByte); - } + finishElement(); } /* @@ -684,22 +380,7 @@ public class LazyBinarySerializeWrite implements SerializeWrite { */ @Override public void writeHiveDecimal(HiveDecimal dec, int scale) throws IOException { - - // Every 8 fields we write a NULL byte. - if ((fieldIndex % 8) == 0) { - if (fieldIndex > 0) { - // Write back previous 8 field's NULL byte. - output.writeByte(nullOffset, nullByte); - nullByte = 0; - nullOffset = output.getLength(); - } - // Allocate next NULL byte. - output.reserve(1); - } - - // Set bit in NULL byte when a field is NOT NULL. - nullByte |= 1 << (fieldIndex % 8); - + beginElement(); if (scratchLongs == null) { scratchLongs = new long[HiveDecimal.SCRATCH_LONGS_LEN]; scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_BIG_INTEGER_BYTES]; @@ -709,33 +390,12 @@ public class LazyBinarySerializeWrite implements SerializeWrite { dec, scratchLongs, scratchBuffer); - - fieldIndex++; - - if (fieldIndex == fieldCount) { - // Write back the final NULL byte before the last fields. - output.writeByte(nullOffset, nullByte); - } + finishElement(); } @Override public void writeHiveDecimal(HiveDecimalWritable decWritable, int scale) throws IOException { - - // Every 8 fields we write a NULL byte. - if ((fieldIndex % 8) == 0) { - if (fieldIndex > 0) { - // Write back previous 8 field's NULL byte. - output.writeByte(nullOffset, nullByte); - nullByte = 0; - nullOffset = output.getLength(); - } - // Allocate next NULL byte. - output.reserve(1); - } - - // Set bit in NULL byte when a field is NOT NULL. - nullByte |= 1 << (fieldIndex % 8); - + beginElement(); if (scratchLongs == null) { scratchLongs = new long[HiveDecimal.SCRATCH_LONGS_LEN]; scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_BIG_INTEGER_BYTES]; @@ -745,13 +405,7 @@ public class LazyBinarySerializeWrite implements SerializeWrite { decWritable, scratchLongs, scratchBuffer); - - fieldIndex++; - - if (fieldIndex == fieldCount) { - // Write back the final NULL byte before the last fields. - output.writeByte(nullOffset, nullByte); - } + finishElement(); } /* @@ -767,4 +421,241 @@ public class LazyBinarySerializeWrite implements SerializeWrite { final int len = LazyBinaryUtils.writeVLongToByteArray(vLongBytes, v); output.write(vLongBytes, 0, len); } + + @Override + public void beginList(List list) { + final Field current = new Field(LIST); + beginComplex(current); + + final int size = list.size(); + current.fieldCount = size; + + if (!skipLengthPrefix) { + // 1/ reserve spaces for the byte size of the list + // which is a integer and takes four bytes + current.byteSizeStart = output.getLength(); + output.reserve(4); + current.start = output.getLength(); + } + // 2/ write the size of the list as a VInt + LazyBinaryUtils.writeVInt(output, size); + + // 3/ write the null bytes + byte nullByte = 0; + for (int eid = 0; eid < size; eid++) { + // set the bit to 1 if an element is not null + if (null != list.get(eid)) { + nullByte |= 1 << (eid % 8); + } + // store the byte every eight elements or + // if this is the last element + if (7 == eid % 8 || eid == size - 1) { + output.write(nullByte); + nullByte = 0; + } + } + } + + @Override + public void separateList() { + } + + @Override + public void finishList() { + final Field current = stack.peek(); + + if (!skipLengthPrefix) { + // 5/ update the list byte size + int listEnd = output.getLength(); + int listSize = listEnd - current.start; + writeSizeAtOffset(output, current.byteSizeStart, listSize); + } + + finishComplex(); + } + + @Override + public void beginMap(Map<?, ?> map) { + final Field current = new Field(MAP); + beginComplex(current); + + if (!skipLengthPrefix) { + // 1/ reserve spaces for the byte size of the map + // which is a integer and takes four bytes + current.byteSizeStart = output.getLength(); + output.reserve(4); + current.start = output.getLength(); + } + + // 2/ write the size of the map which is a VInt + final int size = map.size(); + current.fieldIndex = size; + LazyBinaryUtils.writeVInt(output, size); + + // 3/ write the null bytes + int b = 0; + byte nullByte = 0; + for (Map.Entry<?, ?> entry : map.entrySet()) { + // set the bit to 1 if a key is not null + if (null != entry.getKey()) { + nullByte |= 1 << (b % 8); + } else if (warnedOnceNullMapKey != null) { + if (!warnedOnceNullMapKey.value) { + LOG.warn("Null map key encountered! Ignoring similar problems."); + } + warnedOnceNullMapKey.value = true; + } + b++; + // set the bit to 1 if a value is not null + if (null != entry.getValue()) { + nullByte |= 1 << (b % 8); + } + b++; + // write the byte to stream every 4 key-value pairs + // or if this is the last key-value pair + if (0 == b % 8 || b == size * 2) { + output.write(nullByte); + nullByte = 0; + } + } + } + + @Override + public void separateKey() { + } + + @Override + public void separateKeyValuePair() { + } + + @Override + public void finishMap() { + final Field current = stack.peek(); + + if (!skipLengthPrefix) { + // 5/ update the byte size of the map + int mapEnd = output.getLength(); + int mapSize = mapEnd - current.start; + writeSizeAtOffset(output, current.byteSizeStart, mapSize); + } + + finishComplex(); + } + + @Override + public void beginStruct(List fieldValues) { + final Field current = new Field(STRUCT); + beginComplex(current); + + current.fieldCount = fieldValues.size(); + + if (!skipLengthPrefix) { + // 1/ reserve spaces for the byte size of the struct + // which is a integer and takes four bytes + current.byteSizeStart = output.getLength(); + output.reserve(4); + current.start = output.getLength(); + } + current.nullOffset = output.getLength(); + } + + @Override + public void separateStruct() { + } + + @Override + public void finishStruct() { + final Field current = stack.peek(); + + if (!skipLengthPrefix) { + // 3/ update the byte size of the struct + int typeEnd = output.getLength(); + int typeSize = typeEnd - current.start; + writeSizeAtOffset(output, current.byteSizeStart, typeSize); + } + + finishComplex(); + } + + @Override + public void beginUnion(int tag) throws IOException { + final Field current = new Field(UNION); + beginComplex(current); + + current.fieldCount = 1; + + if (!skipLengthPrefix) { + // 1/ reserve spaces for the byte size of the struct + // which is a integer and takes four bytes + current.byteSizeStart = output.getLength(); + output.reserve(4); + current.start = output.getLength(); + } + + // 2/ serialize the union + output.write(tag); + } + + @Override + public void finishUnion() { + final Field current = stack.peek(); + + if (!skipLengthPrefix) { + // 3/ update the byte size of the struct + int typeEnd = output.getLength(); + int typeSize = typeEnd - current.start; + writeSizeAtOffset(output, current.byteSizeStart, typeSize); + } + + finishComplex(); + } + + private void beginElement() { + final Field current = stack.peek(); + + if (current.type == STRUCT) { + // Every 8 fields we write a NULL byte. + if ((current.fieldIndex % 8) == 0) { + if (current.fieldIndex > 0) { + // Write back previous 8 field's NULL byte. + output.writeByte(current.nullOffset, current.nullByte); + current.nullByte = 0; + current.nullOffset = output.getLength(); + } + // Allocate next NULL byte. + output.reserve(1); + } + + // Set bit in NULL byte when a field is NOT NULL. + current.nullByte |= 1 << (current.fieldIndex % 8); + } + } + + private void finishElement() { + final Field current = stack.peek(); + + if (current.type == STRUCT) { + current.fieldIndex++; + + if (current.fieldIndex == current.fieldCount) { + // Write back the final NULL byte before the last fields. + output.writeByte(current.nullOffset, current.nullByte); + } + } + } + + private void beginComplex(Field field) { + beginElement(); + stack.push(field); + } + + private void finishComplex() { + stack.pop(); + finishElement(); + } + + private static void writeSizeAtOffset( + ByteStream.RandomAccessOutput byteStream, int byteSizeStart, int size) { + byteStream.writeInt(byteSizeStart, size); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/d467e172/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/StandardUnionObjectInspector.java ---------------------------------------------------------------------- diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/StandardUnionObjectInspector.java b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/StandardUnionObjectInspector.java index f26c9ec..7b28682 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/StandardUnionObjectInspector.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/StandardUnionObjectInspector.java @@ -79,6 +79,31 @@ public class StandardUnionObjectInspector extends SettableUnionObjectInspector { public String toString() { return tag + ":" + object; } + + @Override + public int hashCode() { + if (object == null) { + return tag; + } else { + return object.hashCode() ^ tag; + } + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof StandardUnion)) { + return false; + } + StandardUnion that = (StandardUnion) obj; + if (this.object == null || that.object == null) { + return this.tag == that.tag && this.object == that.object; + } else { + return this.tag == that.tag && this.object.equals(that.object); + } + } } /**
