Michael Blow has submitted this change and it was merged. Change subject: Some Code Reduction/Cleanup ......................................................................
Some Code Reduction/Cleanup Change-Id: I179a16fc183bd50d5d58cc12321234df1615abfd Reviewed-on: https://asterix-gerrit.ics.uci.edu/916 Reviewed-by: Michael Blow <mb...@apache.org> Integration-Tests: Michael Blow <mb...@apache.org> Tested-by: Michael Blow <mb...@apache.org> --- M asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ARecordSerializerDeserializer.java M asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java M asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/RecordRemoveFieldsTypeComputer.java 3 files changed, 68 insertions(+), 79 deletions(-) Approvals: Michael Blow: Looks good to me, approved; Verified; Verified diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ARecordSerializerDeserializer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ARecordSerializerDeserializer.java index 75aa61d..0f762ec 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ARecordSerializerDeserializer.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ARecordSerializerDeserializer.java @@ -25,7 +25,6 @@ import org.apache.asterix.builders.IARecordBuilder; import org.apache.asterix.builders.RecordBuilder; -import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider; import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider; import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; @@ -54,13 +53,15 @@ public static final ARecordSerializerDeserializer SCHEMALESS_INSTANCE = new ARecordSerializerDeserializer(); + private static final IAObject[] NO_FIELDS = new IAObject[0]; + private final ARecordType recordType; private final int numberOfSchemaFields; @SuppressWarnings("rawtypes") - private final ISerializerDeserializer serializers[]; + private final ISerializerDeserializer[] serializers; @SuppressWarnings("rawtypes") - private final ISerializerDeserializer deserializers[]; + private final ISerializerDeserializer[] deserializers; private ARecordSerializerDeserializer() { this(null); @@ -74,12 +75,7 @@ deserializers = new ISerializerDeserializer[numberOfSchemaFields]; for (int i = 0; i < numberOfSchemaFields; i++) { IAType t = recordType.getFieldTypes()[i]; - IAType t2; - if (t.getTypeTag() == ATypeTag.UNION) { - t2 = ((AUnionType) t).getActualType(); - } else { - t2 = t; - } + IAType t2 = (t.getTypeTag() == ATypeTag.UNION) ? ((AUnionType) t).getActualType() : t; serializers[i] = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(t2); deserializers[i] = SerializerDeserializerProvider.INSTANCE.getNonTaggedSerializerDeserializer(t2); } @@ -94,47 +90,8 @@ @Override public ARecord deserialize(DataInput in) throws HyracksDataException { try { - boolean isExpanded = false; - in.readInt(); // recordSize - if (recordType == null) { - isExpanded = in.readBoolean(); - in.readInt(); // openPartOffset - } else { - if (recordType.isOpen()) { - isExpanded = in.readBoolean(); - if (isExpanded) { - in.readInt(); // openPartOffset - } - } else { - isExpanded = false; - } - } - IAObject[] closedFields = null; - if (numberOfSchemaFields > 0) { - in.readInt(); // read number of closed fields. - boolean hasOptionalFields = NonTaggedFormatUtil.hasOptionalField(this.recordType); - byte[] nullBitMap = null; - if (hasOptionalFields) { - int nullBitMapSize = (int) (Math.ceil(numberOfSchemaFields / 4.0)); - nullBitMap = new byte[nullBitMapSize]; - in.readFully(nullBitMap); - } - closedFields = new IAObject[numberOfSchemaFields]; - for (int i = 0; i < numberOfSchemaFields; i++) { - in.readInt(); - } - for (int fieldId = 0; fieldId < numberOfSchemaFields; fieldId++) { - if (hasOptionalFields && ((nullBitMap[fieldId / 4] & (1 << (7 - 2 * (fieldId % 4)))) == 0)) { - closedFields[fieldId] = ANull.NULL; - continue; - } - if (hasOptionalFields && ((nullBitMap[fieldId / 4] & (1 << (7 - 2 * (fieldId % 4) - 1))) == 0)) { - closedFields[fieldId] = AMissing.MISSING; - continue; - } - closedFields[fieldId] = (IAObject) deserializers[fieldId].deserialize(in); - } - } + boolean isExpanded = isExpandedRecord(in); + IAObject[] schemaFields = getValuesForSchemaFields(in); if (isExpanded) { int numberOfOpenFields = in.readInt(); @@ -153,17 +110,63 @@ ARecordType openPartRecType = new ARecordType(null, fieldNames, fieldTypes, true); if (numberOfSchemaFields > 0) { ARecordType mergedRecordType = mergeRecordTypes(this.recordType, openPartRecType); - IAObject[] mergedFields = mergeFields(closedFields, openFields); + IAObject[] mergedFields = mergeFields(schemaFields, openFields); return new ARecord(mergedRecordType, mergedFields); } else { return new ARecord(openPartRecType, openFields); } } else { - return new ARecord(this.recordType, closedFields); + return new ARecord(this.recordType, schemaFields); } - } catch (IOException | AsterixException e) { + } catch (IOException e) { throw new HyracksDataException(e); } + } + + private boolean isExpandedRecord(DataInput in) throws IOException { + in.readInt(); // recordSize + if (recordType == null) { + boolean exp = in.readBoolean(); + in.readInt(); // openPartOffset + return exp; + } else { + if (recordType.isOpen()) { + boolean exp = in.readBoolean(); + if (exp) { + in.readInt(); // openPartOffset + } + return exp; + } + return false; + } + } + + private IAObject[] getValuesForSchemaFields(DataInput in) throws IOException { + if (numberOfSchemaFields <= 0) { + return NO_FIELDS; + } + in.readInt(); // read number of schema fields. + boolean hasOptionalFields = NonTaggedFormatUtil.hasOptionalField(this.recordType); + byte[] nullBitMap = null; + if (hasOptionalFields) { + int nullBitMapSize = (int) (Math.ceil(numberOfSchemaFields / 4.0)); + nullBitMap = new byte[nullBitMapSize]; + in.readFully(nullBitMap); + } + for (int i = 0; i < numberOfSchemaFields; i++) { + in.readInt(); + } + IAObject[] schemaFields = new IAObject[numberOfSchemaFields]; + for (int fieldId = 0; fieldId < numberOfSchemaFields; fieldId++) { + if (hasOptionalFields && ((nullBitMap[fieldId / 4] & (1 << (7 - 2 * (fieldId % 4)))) == 0)) { + schemaFields[fieldId] = ANull.NULL; + } else if (hasOptionalFields && ((nullBitMap[fieldId / 4] & (1 << (7 - 2 * (fieldId % 4) - 1))) == 0)) { + schemaFields[fieldId] = AMissing.MISSING; + } else { + schemaFields[fieldId] = (IAObject) deserializers[fieldId].deserialize(in); + } + } + return schemaFields; } @Override @@ -244,8 +247,7 @@ return fields; } - private ARecordType mergeRecordTypes(ARecordType recType1, ARecordType recType2) throws AsterixException { - + private ARecordType mergeRecordTypes(ARecordType recType1, ARecordType recType2) { String[] fieldNames = new String[recType1.getFieldNames().length + recType2.getFieldNames().length]; IAType[] fieldTypes = new IAType[recType1.getFieldTypes().length + recType2.getFieldTypes().length]; @@ -266,7 +268,7 @@ return AInt32SerializerDeserializer.getInt(serRecord, offset); } - public static final int getFieldOffsetById(byte[] serRecord, int offset, int fieldId, int nullBitmapSize, + public static int getFieldOffsetById(byte[] serRecord, int offset, int fieldId, int nullBitmapSize, boolean isOpen) { final byte nullTestCode = (byte) (1 << (7 - 2 * (fieldId % 4))); final byte missingTestCode = (byte) (1 << (7 - 2 * (fieldId % 4) - 1)); @@ -303,26 +305,15 @@ return offset + AInt32SerializerDeserializer.getInt(serRecord, pointer + nullBitmapSize + (4 * fieldId)); } - public static final int getFieldOffsetByName(byte[] serRecord, int start, int len, byte[] fieldName, int nstart) + public static int getFieldOffsetByName(byte[] serRecord, int start, int len, byte[] fieldName, int nstart) throws HyracksDataException { - int openPartOffset; - if (serRecord[start] == ATypeTag.SERIALIZED_RECORD_TYPE_TAG) { - if (len <= 5) { - // Empty record - return -1; - } - // 5 is the index of the byte that determines whether the record is - // expanded or not, i.e. it has an open part. - if (serRecord[start + 5] == 1) { // true - // 6 is the index of the first byte of the openPartOffset value. - openPartOffset = start + AInt32SerializerDeserializer.getInt(serRecord, start + 6); - } else { - return -1; // this record does not have an open part - } - } else { - return -1; // this record does not have an open part + // 5 is the index of the byte that determines whether the record is expanded or not, i.e. it has an open part. + // a record with len < 5 is empty + if (serRecord[start] != ATypeTag.SERIALIZED_RECORD_TYPE_TAG || len <= 5 || serRecord[start + 5] != 1) { + return -1; } - + // 6 is the index of the first byte of the openPartOffset value. + int openPartOffset = start + AInt32SerializerDeserializer.getInt(serRecord, start + 6); int numberOfOpenField = AInt32SerializerDeserializer.getInt(serRecord, openPartOffset); int fieldUtflength = UTF8StringUtil.getUTFLength(fieldName, nstart + 1); int fieldUtfMetaLen = UTF8StringUtil.getNumBytesToStoreLength(fieldUtflength); diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java index c5a0ee5..066d562 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java @@ -65,8 +65,6 @@ private final PointableAllocator allocator = new PointableAllocator(); private final ResettableByteArrayOutputStream typeBos = new ResettableByteArrayOutputStream(); - private final DataOutputStream typeDos = new DataOutputStream(typeBos); - private final UTF8StringWriter utf8Writer = new UTF8StringWriter(); private final ResettableByteArrayOutputStream dataBos = new ResettableByteArrayOutputStream(); private final DataOutputStream dataDos = new DataOutputStream(dataBos); @@ -95,8 +93,9 @@ // initialize the buffer for closed parts(fieldName bytes+ type bytes) + // constant(null bytes) - typeBos.reset(); try { + final DataOutputStream typeDos = new DataOutputStream(typeBos); + final UTF8StringWriter utf8Writer = new UTF8StringWriter(); for (int i = 0; i < numberOfSchemaFields; i++) { ATypeTag ftypeTag = fieldTypes[i].getTypeTag(); diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/RecordRemoveFieldsTypeComputer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/RecordRemoveFieldsTypeComputer.java index 2758666..fc2ebff 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/RecordRemoveFieldsTypeComputer.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/RecordRemoveFieldsTypeComputer.java @@ -331,8 +331,7 @@ case ANY: return DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE; case UNION: - AUnionType u = (AUnionType) type0; - IAType t1 = u.getActualType(); + IAType t1 = ((AUnionType) type0).getActualType(); if (t1.getTypeTag() == ATypeTag.RECORD) { return (ARecordType) t1; } else if (t1.getTypeTag() == ATypeTag.ANY) { -- To view, visit https://asterix-gerrit.ics.uci.edu/916 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I179a16fc183bd50d5d58cc12321234df1615abfd Gerrit-PatchSet: 14 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Till Westmann <ti...@apache.org> Gerrit-Reviewer: Cameron Samak <csa...@apache.org> Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Gerrit-Reviewer: Michael Blow <mb...@apache.org>