This is an automated email from the ASF dual-hosted git repository. imaxon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 91f91d25427257fac34ec17386f2ad5113c08242 Author: Michael Blow <mb...@apache.org> AuthorDate: Thu May 6 16:06:47 2021 -0400 [NO ISSUE][EXT] Refactor JSONDataParser Change-Id: Ia47f2fe449cfb0ca8e6e3a6308ef01ee6810ec03 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/11363 Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Michael Blow <mb...@apache.org> Reviewed-by: Till Westmann <ti...@apache.org> --- ...DataParser.java => AbstractJsonDataParser.java} | 78 +--- .../asterix/external/parser/JSONDataParser.java | 435 +-------------------- 2 files changed, 27 insertions(+), 486 deletions(-) diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/JSONDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractJsonDataParser.java similarity index 85% copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/JSONDataParser.java copy to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractJsonDataParser.java index b2036c0..a5d5a08 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/JSONDataParser.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractJsonDataParser.java @@ -20,7 +20,6 @@ package org.apache.asterix.external.parser; import java.io.DataOutput; import java.io.IOException; -import java.io.InputStream; import java.util.BitSet; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -29,9 +28,6 @@ import org.apache.asterix.builders.IARecordBuilder; import org.apache.asterix.builders.IAsterixListBuilder; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; -import org.apache.asterix.external.api.IRawRecord; -import org.apache.asterix.external.api.IRecordDataParser; -import org.apache.asterix.external.api.IStreamDataParser; import org.apache.asterix.external.parser.jackson.ADMToken; import org.apache.asterix.external.parser.jackson.GeometryCoParser; import org.apache.asterix.external.parser.jackson.ParserContext; @@ -56,21 +52,18 @@ import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonStreamContext; import com.fasterxml.jackson.core.JsonToken; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.TreeTraversingParser; /** - * JSON format parser using Jakson parser. + * JSON format parser using Jackson parser. */ -public class JSONDataParser extends AbstractNestedDataParser<ADMToken> - implements IStreamDataParser, IRecordDataParser<char[]> { +public abstract class AbstractJsonDataParser extends AbstractNestedDataParser<ADMToken> { protected final ParserContext parserContext; protected final JsonFactory jsonFactory; protected final ARecordType rootType; protected final GeometryCoParser geometryCoParser; - private Supplier<String> dataSourceName; - private LongSupplier lineNumber; + protected Supplier<String> dataSourceName = ExternalDataConstants.EMPTY_STRING; + protected LongSupplier lineNumber = ExternalDataConstants.NO_LINES; protected JsonParser jsonParser; @@ -82,15 +75,13 @@ public class JSONDataParser extends AbstractNestedDataParser<ADMToken> * @param jsonFactory * Jackson JSON parser factory. */ - public JSONDataParser(ARecordType recordType, JsonFactory jsonFactory) { + public AbstractJsonDataParser(ARecordType recordType, JsonFactory jsonFactory) { // recordType currently cannot be null, however this is to guarantee for any future changes. this.rootType = recordType != null ? recordType : RecordUtil.FULLY_OPEN_RECORD_TYPE; this.jsonFactory = jsonFactory; - //GeometyCoParser to parse GeoJSON objects to AsterixDB internal spatial types. + //GeometryCoParser to parse GeoJSON objects to AsterixDB internal spatial types. geometryCoParser = new GeometryCoParser(jsonParser); parserContext = new ParserContext(); - this.dataSourceName = ExternalDataConstants.EMPTY_STRING; - this.lineNumber = ExternalDataConstants.NO_LINES; } /* @@ -99,53 +90,6 @@ public class JSONDataParser extends AbstractNestedDataParser<ADMToken> **************************************************** */ - @Override - public void configure(Supplier<String> dataSourceName, LongSupplier lineNumber) { - this.dataSourceName = dataSourceName == null ? ExternalDataConstants.EMPTY_STRING : dataSourceName; - this.lineNumber = lineNumber == null ? ExternalDataConstants.NO_LINES : lineNumber; - } - - @Override - public final boolean parse(IRawRecord<? extends char[]> record, DataOutput out) throws HyracksDataException { - try { - //TODO(wyk): find a way to reset byte[] instead of creating a new parser for each record. - jsonParser = jsonFactory.createParser(record.get(), 0, record.size()); - geometryCoParser.reset(jsonParser); - nextToken(); - parseObject(rootType, out); - return true; - } catch (IOException e) { - throw createException(e); - } - } - - @Override - public void setInputStream(InputStream in) throws IOException { - setInput(jsonFactory.createParser(in)); - } - - public void setInputNode(JsonNode node) { - setInput(new TreeTraversingParser(node)); - } - - private void setInput(JsonParser parser) { - jsonParser = parser; - geometryCoParser.reset(jsonParser); - } - - @Override - public boolean parse(DataOutput out) throws HyracksDataException { - try { - if (nextToken() == ADMToken.EOF) { - return false; - } - parseObject(rootType, out); - return true; - } catch (IOException e) { - throw new RuntimeDataException(ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM, e); - } - } - public boolean parseAnyValue(DataOutput out) throws HyracksDataException { try { if (nextToken() == ADMToken.EOF) { @@ -158,12 +102,6 @@ public class JSONDataParser extends AbstractNestedDataParser<ADMToken> } } - @Override - public boolean reset(InputStream in) throws IOException { - setInputStream(in); - return true; - } - /* **************************************************** * Abstract method implementation @@ -361,7 +299,7 @@ public class JSONDataParser extends AbstractNestedDataParser<ADMToken> * @param out * @throws IOException */ - private void parseObject(IAType actualType, DataOutput out) throws IOException { + protected void parseObject(IAType actualType, DataOutput out) throws IOException { if (actualType.getTypeTag() == ATypeTag.OBJECT) { parseObject((ARecordType) actualType, out); } else { @@ -477,7 +415,7 @@ public class JSONDataParser extends AbstractNestedDataParser<ADMToken> } } - private HyracksDataException createException(IOException e) { + protected HyracksDataException createException(IOException e) { if (jsonParser != null) { String msg; if (e instanceof JsonParseException) { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/JSONDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/JSONDataParser.java index b2036c0..b4ec46e 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/JSONDataParser.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/JSONDataParser.java @@ -21,61 +21,31 @@ package org.apache.asterix.external.parser; import java.io.DataOutput; import java.io.IOException; import java.io.InputStream; -import java.util.BitSet; import java.util.function.LongSupplier; import java.util.function.Supplier; -import org.apache.asterix.builders.IARecordBuilder; -import org.apache.asterix.builders.IAsterixListBuilder; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.external.api.IRawRecord; import org.apache.asterix.external.api.IRecordDataParser; import org.apache.asterix.external.api.IStreamDataParser; import org.apache.asterix.external.parser.jackson.ADMToken; -import org.apache.asterix.external.parser.jackson.GeometryCoParser; -import org.apache.asterix.external.parser.jackson.ParserContext; import org.apache.asterix.external.util.ExternalDataConstants; -import org.apache.asterix.om.base.ABoolean; -import org.apache.asterix.om.base.ANull; -import org.apache.asterix.om.base.AUnorderedList; -import org.apache.asterix.om.types.AOrderedListType; import org.apache.asterix.om.types.ARecordType; -import org.apache.asterix.om.types.ATypeTag; -import org.apache.asterix.om.types.AbstractCollectionType; -import org.apache.asterix.om.types.BuiltinType; -import org.apache.asterix.om.types.IAType; -import org.apache.asterix.om.utils.RecordUtil; -import org.apache.asterix.runtime.exceptions.UnsupportedTypeException; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.util.ExceptionUtils; -import org.apache.hyracks.data.std.api.IMutableValueStorage; import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonStreamContext; -import com.fasterxml.jackson.core.JsonToken; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.TreeTraversingParser; /** - * JSON format parser using Jakson parser. + * JSON format parser using Jackson parser. */ -public class JSONDataParser extends AbstractNestedDataParser<ADMToken> - implements IStreamDataParser, IRecordDataParser<char[]> { - - protected final ParserContext parserContext; - protected final JsonFactory jsonFactory; - protected final ARecordType rootType; - protected final GeometryCoParser geometryCoParser; - private Supplier<String> dataSourceName; - private LongSupplier lineNumber; - - protected JsonParser jsonParser; +public class JSONDataParser extends AbstractJsonDataParser implements IStreamDataParser, IRecordDataParser<char[]> { /** - * Initialize JSONDataParser with GeometryCoParser + * Initialize JSONDataParser * * @param recordType * defined type. @@ -83,40 +53,7 @@ public class JSONDataParser extends AbstractNestedDataParser<ADMToken> * Jackson JSON parser factory. */ public JSONDataParser(ARecordType recordType, JsonFactory jsonFactory) { - // recordType currently cannot be null, however this is to guarantee for any future changes. - this.rootType = recordType != null ? recordType : RecordUtil.FULLY_OPEN_RECORD_TYPE; - this.jsonFactory = jsonFactory; - //GeometyCoParser to parse GeoJSON objects to AsterixDB internal spatial types. - geometryCoParser = new GeometryCoParser(jsonParser); - parserContext = new ParserContext(); - this.dataSourceName = ExternalDataConstants.EMPTY_STRING; - this.lineNumber = ExternalDataConstants.NO_LINES; - } - - /* - **************************************************** - * Public methods - **************************************************** - */ - - @Override - public void configure(Supplier<String> dataSourceName, LongSupplier lineNumber) { - this.dataSourceName = dataSourceName == null ? ExternalDataConstants.EMPTY_STRING : dataSourceName; - this.lineNumber = lineNumber == null ? ExternalDataConstants.NO_LINES : lineNumber; - } - - @Override - public final boolean parse(IRawRecord<? extends char[]> record, DataOutput out) throws HyracksDataException { - try { - //TODO(wyk): find a way to reset byte[] instead of creating a new parser for each record. - jsonParser = jsonFactory.createParser(record.get(), 0, record.size()); - geometryCoParser.reset(jsonParser); - nextToken(); - parseObject(rootType, out); - return true; - } catch (IOException e) { - throw createException(e); - } + super(recordType, jsonFactory); } @Override @@ -134,24 +71,32 @@ public class JSONDataParser extends AbstractNestedDataParser<ADMToken> } @Override - public boolean parse(DataOutput out) throws HyracksDataException { + public boolean parse(IRawRecord<? extends char[]> record, DataOutput out) throws HyracksDataException { try { - if (nextToken() == ADMToken.EOF) { - return false; - } + //TODO(wyk): find a way to reset byte[] instead of creating a new parser for each record. + jsonParser = jsonFactory.createParser(record.get(), 0, record.size()); + geometryCoParser.reset(jsonParser); + nextToken(); parseObject(rootType, out); return true; } catch (IOException e) { - throw new RuntimeDataException(ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM, e); + throw createException(e); } } - public boolean parseAnyValue(DataOutput out) throws HyracksDataException { + @Override + public void configure(Supplier<String> dataSourceName, LongSupplier lineNumber) { + this.dataSourceName = dataSourceName == null ? ExternalDataConstants.EMPTY_STRING : dataSourceName; + this.lineNumber = lineNumber == null ? ExternalDataConstants.NO_LINES : lineNumber; + } + + @Override + public boolean parse(DataOutput out) throws HyracksDataException { try { if (nextToken() == ADMToken.EOF) { return false; } - parseValue(BuiltinType.ANY, out); + parseObject(rootType, out); return true; } catch (IOException e) { throw new RuntimeDataException(ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM, e); @@ -163,346 +108,4 @@ public class JSONDataParser extends AbstractNestedDataParser<ADMToken> setInputStream(in); return true; } - - /* - **************************************************** - * Abstract method implementation - **************************************************** - */ - - /** - * Jackson token to ADM token mapper - */ - @Override - protected final ADMToken advanceToNextToken() throws IOException { - final JsonToken jsonToken = jsonParser.nextToken(); - if (jsonToken == null) { - return ADMToken.EOF; - } - ADMToken token; - switch (jsonToken) { - case VALUE_FALSE: - token = ADMToken.FALSE; - break; - case VALUE_TRUE: - token = ADMToken.TRUE; - break; - case VALUE_STRING: - token = ADMToken.STRING; - break; - case VALUE_NULL: - token = ADMToken.NULL; - break; - case VALUE_NUMBER_FLOAT: - token = ADMToken.DOUBLE; - break; - case VALUE_NUMBER_INT: - token = ADMToken.INT; - break; - case START_OBJECT: - token = ADMToken.OBJECT_START; - break; - case END_OBJECT: - token = ADMToken.OBJECT_END; - break; - case START_ARRAY: - token = ADMToken.ARRAY_START; - break; - case END_ARRAY: - token = ADMToken.ARRAY_END; - break; - case FIELD_NAME: - token = ADMToken.FIELD_NAME; - break; - default: - throw new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED, jsonParser.currentToken().toString()); - } - - return token; - } - /* - **************************************************** - * Overridden methods - **************************************************** - */ - - /** - * In the case of JSON, we can parse GeoJSON objects as internal AsterixDB spatial types. - */ - @Override - protected boolean isConvertable(ATypeTag parsedTypeTag, ATypeTag definedTypeTag) { - if (parsedTypeTag == ATypeTag.OBJECT && (definedTypeTag == ATypeTag.POINT || definedTypeTag == ATypeTag.LINE - || definedTypeTag == ATypeTag.POLYGON)) { - return true; - } - return super.isConvertable(parsedTypeTag, definedTypeTag); - } - - /* - **************************************************** - * Complex types parsers - **************************************************** - */ - - @Override - protected final void parseObject(ARecordType recordType, DataOutput out) throws IOException { - final IMutableValueStorage valueBuffer = parserContext.enterObject(); - final IARecordBuilder objectBuilder = parserContext.getObjectBuilder(recordType); - final BitSet nullBitMap = parserContext.getNullBitmap(recordType.getFieldTypes().length); - while (nextToken() != ADMToken.OBJECT_END) { - /* - * Jackson parser calls String.intern() for field names (if enabled). - * Calling getCurrentName() will not create multiple objects. - */ - final String fieldName = jsonParser.getCurrentName(); - final int fieldIndex = recordType.getFieldIndex(fieldName); - - if (!recordType.isOpen() && fieldIndex < 0) { - throw new RuntimeDataException(ErrorCode.PARSER_ADM_DATA_PARSER_EXTRA_FIELD_IN_CLOSED_RECORD, - fieldName); - } - valueBuffer.reset(); - nextToken(); - - if (fieldIndex < 0) { - //field is not defined and the type is open - parseValue(BuiltinType.ANY, valueBuffer.getDataOutput()); - objectBuilder.addField(parserContext.getSerializedFieldName(fieldName), valueBuffer); - } else { - //field is defined - final IAType fieldType = recordType.getFieldType(fieldName); - - //fail fast if the current field is not nullable - if (currentToken() == ADMToken.NULL && !isNullableType(fieldType)) { - throw new RuntimeDataException(ErrorCode.PARSER_TWEET_PARSER_CLOSED_FIELD_NULL, fieldName); - } - - nullBitMap.set(fieldIndex); - parseValue(fieldType, valueBuffer.getDataOutput()); - objectBuilder.addField(fieldIndex, valueBuffer); - } - } - - /* - * Check for any possible missed values for a defined (non-nullable) type. - * Throws exception if there is a violation - */ - if (nullBitMap != null) { - checkOptionalConstraints(recordType, nullBitMap); - } - parserContext.exitObject(valueBuffer, nullBitMap, objectBuilder); - objectBuilder.write(out, true); - } - - /** - * Geometry in GeoJSON is an object - * - * @param typeTag - * geometry typeTag - * @param out - * @throws IOException - */ - private void parseGeometry(ATypeTag typeTag, DataOutput out) throws IOException { - //Start the co-parser - geometryCoParser.starGeometry(); - while (nextToken() != ADMToken.OBJECT_END) { - if (currentToken() == ADMToken.FIELD_NAME) { - geometryCoParser.checkFieldName(jsonParser.getCurrentName()); - } else if (!geometryCoParser.checkValue(currentToken())) { - throw new IOException(geometryCoParser.getErrorMessage()); - } - } - - geometryCoParser.serialize(typeTag, out); - } - - @Override - protected final void parseArray(AOrderedListType listType, DataOutput out) throws IOException { - parseCollection(listType, ADMToken.ARRAY_END, out); - } - - @Override - protected void parseMultiset(AUnorderedList listType, DataOutput out) throws IOException { - throw new UnsupportedTypeException("JSON parser", ATypeTag.SERIALIZED_UNORDEREDLIST_TYPE_TAG); - } - - protected final void parseCollection(AbstractCollectionType collectionType, ADMToken endToken, DataOutput out) - throws IOException { - final IMutableValueStorage valueBuffer = parserContext.enterCollection(); - final IAsterixListBuilder arrayBuilder = parserContext.getCollectionBuilder(collectionType); - final boolean isOpen = collectionType.getItemType().getTypeTag() == ATypeTag.ANY; - while (nextToken() != endToken) { - valueBuffer.reset(); - if (isOpen) { - parseValue(BuiltinType.ANY, valueBuffer.getDataOutput()); - } else { - //fail fast if current value is null - if (currentToken() == ADMToken.NULL) { - throw new RuntimeDataException(ErrorCode.PARSER_COLLECTION_ITEM_CANNOT_BE_NULL); - } - parseValue(collectionType.getItemType(), valueBuffer.getDataOutput()); - } - arrayBuilder.addItem(valueBuffer); - } - parserContext.exitCollection(valueBuffer, arrayBuilder); - arrayBuilder.write(out, true); - } - - /* - **************************************************** - * Value parsers and serializers - **************************************************** - */ - - /** - * Parse JSON object or GeoJSON object. - * - * @param actualType - * @param out - * @throws IOException - */ - private void parseObject(IAType actualType, DataOutput out) throws IOException { - if (actualType.getTypeTag() == ATypeTag.OBJECT) { - parseObject((ARecordType) actualType, out); - } else { - parseGeometry(actualType.getTypeTag(), out); - } - } - - protected void parseValue(IAType definedType, DataOutput out) throws IOException { - final ATypeTag currentTypeTag = currentToken().getTypeTag(); - /* - * In case of type mismatch, checkAndGetType will throw an exception. - */ - final IAType actualType = checkAndGetType(definedType, currentTypeTag); - - switch (currentToken()) { - case NULL: - nullSerde.serialize(ANull.NULL, out); - break; - case FALSE: - booleanSerde.serialize(ABoolean.FALSE, out); - break; - case TRUE: - booleanSerde.serialize(ABoolean.TRUE, out); - break; - case INT: - case DOUBLE: - serailizeNumeric(actualType.getTypeTag(), out); - break; - case STRING: - serializeString(actualType.getTypeTag(), out); - break; - case OBJECT_START: - parseObject(actualType, out); - break; - case ARRAY_START: - parseArray((AOrderedListType) actualType, out); - break; - default: - throw new RuntimeDataException(ErrorCode.PARSE_ERROR, jsonParser.currentToken().toString()); - } - } - - /** - * Given that numeric values may underflow or overflow, an exception will be thrown. - * - * @param numericType - * @param out - * @throws IOException - */ - private void serailizeNumeric(ATypeTag numericType, DataOutput out) throws IOException { - final ATypeTag typeToUse = numericType == ATypeTag.ANY ? currentToken().getTypeTag() : numericType; - - switch (typeToUse) { - case BIGINT: - aInt64.setValue(jsonParser.getLongValue()); - int64Serde.serialize(aInt64, out); - break; - case INTEGER: - aInt32.setValue(jsonParser.getIntValue()); - int32Serde.serialize(aInt32, out); - break; - case SMALLINT: - aInt16.setValue(jsonParser.getShortValue()); - int16Serde.serialize(aInt16, out); - break; - case TINYINT: - aInt8.setValue(jsonParser.getByteValue()); - int8Serde.serialize(aInt8, out); - break; - case DOUBLE: - aDouble.setValue(jsonParser.getDoubleValue()); - doubleSerde.serialize(aDouble, out); - break; - case FLOAT: - aFloat.setValue(jsonParser.getFloatValue()); - floatSerde.serialize(aFloat, out); - break; - default: - throw new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED, jsonParser.currentToken().toString()); - } - } - - /** - * Serialize the string value. - * TODO(wyk) avoid String objects for type STRING - * - * @param stringVariantType - * @param out - * @throws IOException - */ - private void serializeString(ATypeTag stringVariantType, DataOutput out) throws IOException { - char[] buffer = jsonParser.getTextCharacters(); - int begin = jsonParser.getTextOffset(); - int len = jsonParser.getTextLength(); - final ATypeTag typeToUse = stringVariantType == ATypeTag.ANY ? currentToken().getTypeTag() : stringVariantType; - - switch (typeToUse) { - case STRING: - parseString(buffer, begin, len, out); - break; - case DATE: - parseDate(buffer, begin, len, out); - break; - case DATETIME: - parseDateTime(buffer, begin, len, out); - break; - case TIME: - parseTime(buffer, begin, len, out); - break; - default: - throw new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED, jsonParser.currentToken().toString()); - - } - } - - private HyracksDataException createException(IOException e) { - if (jsonParser != null) { - String msg; - if (e instanceof JsonParseException) { - msg = ((JsonParseException) e).getOriginalMessage(); - } else { - msg = ExceptionUtils.getRootCause(e).getMessage(); - } - if (msg == null) { - msg = ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM.errorMessage(); - } - long lineNum = lineNumber.getAsLong() + jsonParser.getCurrentLocation().getLineNr() - 1; - JsonStreamContext parsingContext = jsonParser.getParsingContext(); - String fieldName = "N/A"; - while (parsingContext != null) { - String currentFieldName = parsingContext.getCurrentName(); - if (currentFieldName != null) { - fieldName = currentFieldName; - break; - } - parsingContext = parsingContext.getParent(); - } - - return HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.PARSING_ERROR, - dataSourceName.get(), lineNum, fieldName, msg); - } - return new RuntimeDataException(ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM, e); - } }