This is an automated email from the ASF dual-hosted git repository. arina pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 912698b58dca284efe525971707d53aebf9f9f3c Author: Paul Rogers <par0...@yahoo.com> AuthorDate: Tue Mar 31 13:23:05 2020 -0700 DRILL-7683: Add "message parsing" to new JSON loader Adds the ability to parse "extra" JSON around the data payload, as often needed for a REST API. closes #2045 --- .../impl/scan/framework/SchemaNegotiator.java | 5 + .../impl/scan/framework/SchemaNegotiatorImpl.java | 13 ++ .../exec/store/easy/json/loader/JsonLoader.java | 8 +- .../store/easy/json/loader/JsonLoaderImpl.java | 105 ++++++++++-- .../store/easy/json/loader/JsonLoaderOptions.java | 9 ++ .../exec/store/easy/json/parser/ErrorFactory.java | 8 +- .../easy/json/parser/JsonStructureOptions.java | 14 ++ .../easy/json/parser/JsonStructureParser.java | 94 ++++++++++- .../exec/store/easy/json/parser/MessageParser.java | 43 +++++ .../exec/store/easy/json/parser/RootParser.java | 34 ++++ .../easy/json/parser/SimpleMessageParser.java | 141 ++++++++++++++++ .../store/easy/json/loader/BaseJsonLoaderTest.java | 13 +- .../store/easy/json/parser/BaseTestJsonParser.java | 33 ++-- .../easy/json/parser/TestJsonParserMessage.java | 179 +++++++++++++++++++++ 14 files changed, 654 insertions(+), 45 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java index 7c9a753..b2a793d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java @@ -22,6 +22,9 @@ import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.resultSet.ResultSetLoader; import org.apache.drill.exec.physical.resultSet.RowSetLoader; import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.server.options.OptionSet; + +import com.typesafe.config.Config; /** * Negotiates the table schema with the scanner framework and provides @@ -100,6 +103,8 @@ import org.apache.drill.exec.record.metadata.TupleMetadata; public interface SchemaNegotiator { OperatorContext context(); + Config drillConfig(); + OptionSet queryOptions(); /** * Specify an advanced error context which allows the reader to diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java index 8763dd4..64bac43 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java @@ -21,8 +21,11 @@ import org.apache.drill.common.exceptions.CustomErrorContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.resultSet.ResultSetLoader; import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.server.options.OptionSet; import org.apache.drill.exec.vector.ValueVector; +import com.typesafe.config.Config; + /** * Implementation of the schema negotiation between scan operator and * batch reader. Anticipates that the select list (and/or the list of @@ -94,6 +97,16 @@ public class SchemaNegotiatorImpl implements SchemaNegotiator { } @Override + public Config drillConfig() { + return context().getFragmentContext().getConfig(); + } + + @Override + public OptionSet queryOptions() { + return context().getFragmentContext().getOptions(); + } + + @Override public CustomErrorContext parentErrorContext() { return framework.errorContext(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoader.java index 9d9afed..e4c5a0f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoader.java @@ -56,13 +56,7 @@ public interface JsonLoader { * @throws RuntimeException for unexpected errors, most often due * to code errors */ - boolean next(); - - /** - * Indicates that a batch is complete. Tells the loader to materialize - * any deferred null fields. (See {@link TupleListener} for details.) - */ - void endBatch(); + boolean readBatch(); /** * Releases resources held by this class including the input stream. diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java index 9fd64bd..ecfaf4b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.store.easy.json.loader; import java.io.IOException; import java.io.InputStream; +import java.io.Reader; import java.util.ArrayList; import java.util.List; @@ -28,8 +29,12 @@ import org.apache.drill.exec.physical.resultSet.ResultSetLoader; import org.apache.drill.exec.physical.resultSet.RowSetLoader; import org.apache.drill.exec.record.metadata.ColumnMetadata; import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.server.options.OptionSet; import org.apache.drill.exec.store.easy.json.parser.ErrorFactory; import org.apache.drill.exec.store.easy.json.parser.JsonStructureParser; +import org.apache.drill.exec.store.easy.json.parser.JsonStructureParser.JsonStructureParserBuilder; +import org.apache.drill.exec.store.easy.json.parser.MessageParser; +import org.apache.drill.exec.store.easy.json.parser.MessageParser.MessageContextException; import org.apache.drill.exec.store.easy.json.parser.ValueDef; import org.apache.drill.exec.store.easy.json.parser.ValueDef.JsonType; import org.apache.drill.exec.vector.accessor.UnsupportedConversionError; @@ -97,7 +102,7 @@ import com.fasterxml.jackson.core.JsonToken; * <li>Reports errors as {@link UserException} objects, complete with context * information, rather than as generic Java exception as in the prior version.</li> * <li>Moves parse options into a separate {@link JsonOptions} class.</li> - * <li>Iteration protocol is simpler: simply call {@link #next()} until it returns + * <li>Iteration protocol is simpler: simply call {@link #readBatch()} until it returns * {@code false}. Errors are reported out-of-band via an exception.</li> * <li>The result set loader abstraction is perfectly happy with an empty schema. * For this reason, this version (unlike the original) does not make up a dummy @@ -128,6 +133,60 @@ import com.fasterxml.jackson.core.JsonToken; public class JsonLoaderImpl implements JsonLoader, ErrorFactory { protected static final Logger logger = LoggerFactory.getLogger(JsonLoaderImpl.class); + public static class JsonLoaderBuilder { + private ResultSetLoader rsLoader; + private TupleMetadata providedSchema; + private JsonLoaderOptions options; + private CustomErrorContext errorContext; + private InputStream stream; + private Reader reader; + private MessageParser messageParser; + + public JsonLoaderBuilder resultSetLoader(ResultSetLoader rsLoader) { + this.rsLoader = rsLoader; + return this; + } + + public JsonLoaderBuilder providedSchema(TupleMetadata providedSchema) { + this.providedSchema = providedSchema; + return this; + } + + public JsonLoaderBuilder standardOptions(OptionSet optionSet) { + this.options = new JsonLoaderOptions(optionSet); + return this; + } + + public JsonLoaderBuilder options(JsonLoaderOptions options) { + this.options = options; + return this; + } + + public JsonLoaderBuilder errorContext(CustomErrorContext errorContext) { + this.errorContext = errorContext; + return this; + } + + public JsonLoaderBuilder fromStream(InputStream stream) { + this.stream = stream; + return this; + } + + public JsonLoaderBuilder fromReader(Reader reader) { + this.reader = reader; + return this; + } + + public JsonLoaderBuilder messageParser(MessageParser messageParser) { + this.messageParser = messageParser; + return this; + } + + public JsonLoader build() { + return new JsonLoaderImpl(this); + } + } + interface NullTypeMarker { void forceResolution(); } @@ -146,30 +205,33 @@ public class JsonLoaderImpl implements JsonLoader, ErrorFactory { * inference, and not JSON tokens have been seen which would hint * at a type. Not needed when a schema is provided. */ - // Using a simple list. Won't perform well if we have hundreds of // null fields; but then we've never seen such a pathologically bad - // case... Usually just one or two fields have deferred nulls. + // case. Usually just one or two fields have deferred nulls. private final List<NullTypeMarker> nullStates = new ArrayList<>(); - public JsonLoaderImpl(ResultSetLoader rsLoader, TupleMetadata providedSchema, - JsonLoaderOptions options, CustomErrorContext errorContext, - InputStream stream) { - this.rsLoader = rsLoader; - this.options = options; - this.errorContext = errorContext; - this.rowListener = new TupleListener(this, rsLoader.writer(), providedSchema); - this.parser = new JsonStructureParser(stream, options, rowListener, this); + private JsonLoaderImpl(JsonLoaderBuilder builder) { + this.rsLoader = builder.rsLoader; + this.options = builder.options; + this.errorContext = builder. errorContext; + this.rowListener = new TupleListener(this, rsLoader.writer(), builder.providedSchema); + this.parser = new JsonStructureParserBuilder() + .fromStream(builder.stream) + .fromReader(builder.reader) + .options(builder.options) + .rootListener(rowListener) + .errorFactory(this) + .messageParser(builder.messageParser) + .build(); } public JsonLoaderOptions options() { return options; } @Override // JsonLoader - public boolean next() { + public boolean readBatch() { if (eof) { return false; } - rsLoader.startBatch(); RowSetLoader rowWriter = rsLoader.writer(); while (rowWriter.start()) { if (parser.next()) { @@ -179,6 +241,7 @@ public class JsonLoaderImpl implements JsonLoader, ErrorFactory { break; } } + endBatch(); return rsLoader.hasRows(); } @@ -209,8 +272,7 @@ public class JsonLoaderImpl implements JsonLoader, ErrorFactory { * Bottom line: the user is responsible for not giving Drill * ambiguous data that would require Drill to predict the future. */ - @Override // JsonLoader - public void endBatch() { + private void endBatch() { // Make a copy. Forcing resolution will remove the // element from the original list. @@ -237,8 +299,7 @@ public class JsonLoaderImpl implements JsonLoader, ErrorFactory { @Override // ErrorFactory public RuntimeException ioException(IOException e) { throw buildError( - UserException.dataReadError(e) - .addContext(errorContext)); + UserException.dataReadError(e)); } @Override // ErrorFactory @@ -325,6 +386,16 @@ public class JsonLoaderImpl implements JsonLoader, ErrorFactory { .addContext("Array nesting", dims)); } + @Override + public RuntimeException messageParseError(MessageContextException e) { + return buildError( + UserException.validationError(e) + .message("Syntax error when parsing a JSON message") + .addContext(e.getMessage()) + .addContext("Looking for field", e.nextElement) + .addContext("On token", e.token.name())); + } + protected UserException buildError(ColumnMetadata schema, UserException.Builder builder) { return buildError(builder .addContext("Column", schema.name()) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderOptions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderOptions.java index 59a9faa..d982e11 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderOptions.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderOptions.java @@ -17,6 +17,8 @@ */ package org.apache.drill.exec.store.easy.json.loader; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.server.options.OptionSet; import org.apache.drill.exec.store.easy.json.parser.JsonStructureOptions; /** @@ -41,4 +43,11 @@ public class JsonLoaderOptions extends JsonStructureOptions { * </ul> */ public boolean classicArrayNulls; + + public JsonLoaderOptions() { } + + public JsonLoaderOptions(OptionSet options) { + super(options); + this.readNumbersAsDouble = options.getBoolean(ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ErrorFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ErrorFactory.java index b58763f..e3a208b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ErrorFactory.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ErrorFactory.java @@ -69,8 +69,14 @@ public interface ErrorFactory { RuntimeException syntaxError(JsonToken token); /** - * Error recover is on, the structure parser tried to recover, but + * Error recovery is on, the structure parser tried to recover, but * encountered too many other errors and gave up. */ RuntimeException unrecoverableError(); + + /** + * Parser is configured to find a message tag within the JSON + * and a syntax occurred when following the data path. + */ + RuntimeException messageParseError(MessageParser.MessageContextException e); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureOptions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureOptions.java index 78d2e67..6e072d2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureOptions.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureOptions.java @@ -17,6 +17,9 @@ */ package org.apache.drill.exec.store.easy.json.parser; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.server.options.OptionSet; + /** * Input to the JSON structure parser which defines guidelines * for low-level parsing as well as listeners for higher-level @@ -51,4 +54,15 @@ public class JsonStructureOptions { * two or three valid records before it stabilizes. */ public boolean skipMalformedRecords; + + public boolean enableEscapeAnyChar; + + public JsonStructureOptions() { } + + public JsonStructureOptions(OptionSet options) { + this.allTextMode = options.getBoolean(ExecConstants.JSON_ALL_TEXT_MODE); + this.allowNanInf = options.getBoolean(ExecConstants.JSON_READER_NAN_INF_NUMBERS); + this.skipMalformedRecords = options.getBoolean(ExecConstants.JSON_READER_SKIP_INVALID_RECORDS_FLAG); + this.enableEscapeAnyChar = options.getBoolean(ExecConstants.JSON_READER_ESCAPE_ANY_CHAR); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java index 14016a8..2b814ac 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java @@ -19,7 +19,10 @@ package org.apache.drill.exec.store.easy.json.parser; import java.io.IOException; import java.io.InputStream; +import java.io.Reader; +import org.apache.drill.exec.store.easy.json.parser.MessageParser.MessageContextException; +import org.apache.drill.exec.store.easy.json.parser.RootParser.NestedRootArrayParser; import org.apache.drill.exec.store.easy.json.parser.RootParser.RootArrayParser; import org.apache.drill.exec.store.easy.json.parser.RootParser.RootObjectParser; import org.apache.drill.exec.store.easy.json.parser.TokenIterator.RecoverableJsonException; @@ -59,6 +62,62 @@ import com.fasterxml.jackson.databind.ObjectMapper; public class JsonStructureParser { protected static final Logger logger = LoggerFactory.getLogger(JsonStructureParser.class); + public static class JsonStructureParserBuilder { + private InputStream stream; + private Reader reader; + private JsonStructureOptions options; + private ObjectListener rootListener; + private ErrorFactory errorFactory; + private String dataPath; + private MessageParser messageParser; + + public JsonStructureParserBuilder options(JsonStructureOptions options) { + this.options = options; + return this; + } + + public JsonStructureParserBuilder rootListener(ObjectListener rootListener) { + this.rootListener = rootListener; + return this; + } + + public JsonStructureParserBuilder errorFactory(ErrorFactory errorFactory) { + this.errorFactory = errorFactory; + return this; + } + + public JsonStructureParserBuilder fromStream(InputStream stream) { + this.stream = stream; + return this; + } + + public JsonStructureParserBuilder fromReader(Reader reader) { + this.reader = reader; + return this; + } + + public JsonStructureParserBuilder messageParser(MessageParser messageParser) { + this.messageParser = messageParser; + return this; + } + + public JsonStructureParserBuilder dataPath(String dataPath) { + this.dataPath = dataPath; + return this; + } + + public JsonStructureParser build() { + if (dataPath != null) { + dataPath = dataPath.trim(); + dataPath = dataPath.isEmpty() ? null : dataPath; + } + if (dataPath != null && messageParser == null) { + messageParser = new SimpleMessageParser(dataPath); + } + return new JsonStructureParser(this); + } + } + private final JsonParser parser; private final JsonStructureOptions options; private final ObjectListener rootListener; @@ -78,31 +137,50 @@ public class JsonStructureParser { * @param errorFactory factory for errors thrown for various * conditions */ - public JsonStructureParser(InputStream stream, JsonStructureOptions options, - ObjectListener rootListener, ErrorFactory errorFactory) { - this.options = Preconditions.checkNotNull(options); - this.rootListener = Preconditions.checkNotNull(rootListener); - this.errorFactory = Preconditions.checkNotNull(errorFactory); + private JsonStructureParser(JsonStructureParserBuilder builder) { + this.options = Preconditions.checkNotNull(builder.options); + this.rootListener = Preconditions.checkNotNull(builder.rootListener); + this.errorFactory = Preconditions.checkNotNull(builder.errorFactory); try { ObjectMapper mapper = new ObjectMapper() .configure(JsonParser.Feature.ALLOW_COMMENTS, true) .configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true) - .configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, options.allowNanInf); + .configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, options.allowNanInf) + .configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, options.enableEscapeAnyChar); - parser = mapper.getFactory().createParser(stream); + if (builder.stream != null) { + parser = mapper.getFactory().createParser(builder.stream); + } else { + parser = mapper.getFactory().createParser(Preconditions.checkNotNull(builder.reader)); + } } catch (JsonParseException e) { throw errorFactory().parseError("Failed to create the JSON parser", e); } catch (IOException e) { throw errorFactory().ioException(e); } tokenizer = new TokenIterator(parser, options, errorFactory()); - rootState = makeRootState(); + if (builder.messageParser == null) { + rootState = makeRootState(); + } else { + rootState = makeCustomRoot(builder.messageParser); + } } public JsonStructureOptions options() { return options; } public ErrorFactory errorFactory() { return errorFactory; } public ObjectListener rootListener() { return rootListener; } + private RootParser makeCustomRoot(MessageParser messageParser) { + try { + if (! messageParser.parsePrefix(tokenizer)) { + return null; + } + } catch (MessageContextException e) { + throw errorFactory.messageParseError(e); + } + return new NestedRootArrayParser(this, messageParser); + } + private RootParser makeRootState() { JsonToken token = tokenizer.next(); if (token == null) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/MessageParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/MessageParser.java new file mode 100644 index 0000000..5b02f97 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/MessageParser.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.json.parser; + +import com.fasterxml.jackson.core.JsonToken; + +/** + * Optional custom parser for the portion of a JSON message that + * surrounds the data "payload". Can be used to extract status codes. + * See {@link SimpleMessageParser} to simply skip all fields but + * a given path. + */ +public interface MessageParser { + + @SuppressWarnings("serial") + class MessageContextException extends Exception { + public final JsonToken token; + public final String nextElement; + + public MessageContextException(JsonToken token, String nextElement, String descrip) { + super(descrip); + this.token = token; + this.nextElement = nextElement; + } + } + boolean parsePrefix(TokenIterator tokenizer) throws MessageContextException; + void parseSuffix(TokenIterator tokenizer) throws MessageContextException; +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/RootParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/RootParser.java index f81fd3a..32460a2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/RootParser.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/RootParser.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.easy.json.parser; +import org.apache.drill.exec.store.easy.json.parser.MessageParser.MessageContextException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -118,4 +119,37 @@ public abstract class RootParser implements ElementParser { } } } + + public static class NestedRootArrayParser extends RootParser { + + private final MessageParser messageParser; + + public NestedRootArrayParser(JsonStructureParser structParser, MessageParser messageParser) { + super(structParser); + this.messageParser = messageParser; + } + + @Override + public boolean parseRoot(TokenIterator tokenizer) { + JsonToken token = tokenizer.next(); + if (token == null) { + // Position: { ... EOF ^ + // Saw EOF, but no closing ]. Warn and ignore. + // Note that the Jackson parser won't let us get here; + // it will have already thrown a syntax error. + logger.warn("Failed to close outer array. {}", + tokenizer.context()); + return false; + } else if (token == JsonToken.END_ARRAY) { + try { + messageParser.parseSuffix(tokenizer); + } catch (MessageContextException e) { + throw errorFactory().messageParseError(e); + } + return false; + } else { + return parseRootObject(token, tokenizer); + } + } + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/SimpleMessageParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/SimpleMessageParser.java new file mode 100644 index 0000000..956b910 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/SimpleMessageParser.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.json.parser; + +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; + +import com.fasterxml.jackson.core.JsonToken; + +/** + * A message parser which accepts a path to the data encoded as a + * slash-separated string. Given the following JSON message: + * + * <pre><code: + * { status: { + * succeeded: true, + * runTimeMs: 123, + * } + * response: { + * rowCount: 10, + * rows: [ + * { ... }, + * { ... } ] + * }, + * footer: "something interesting" + * } + * </code></pre> + * + * The path to the actual data would be {@code "response/rows"}. + * <p> + * The message parser will "free-wheel" over all objects not on the + * data path. Thus, this class will skip over the nested structure + * within the {@code status} member. + * <p> + * If the data path is not found then this class reports EOF of + * the whole data stream. It may have skipped over the actual payload + * if the path is mis-configured. + */ +public class SimpleMessageParser implements MessageParser { + + private final String[] path; + + public SimpleMessageParser(String dataPath) { + path = dataPath.split("/"); + Preconditions.checkArgument(path.length > 0, + "Data path should not be empty."); + } + + @Override + public boolean parsePrefix(TokenIterator tokenizer) throws MessageContextException { + JsonToken token = tokenizer.next(); + if (token == null) { + return false; + } + if (token != JsonToken.START_OBJECT) { + throw new MessageContextException(token, + path[0], "Unexpected top-level array"); + } + return parseToElement(tokenizer, 0); + } + + private boolean parseToElement(TokenIterator tokenizer, int level) throws MessageContextException { + for (;;) { + JsonToken token = tokenizer.requireNext(); + switch (token) { + case FIELD_NAME: + break; + case END_OBJECT: + return false; + default: + throw new MessageContextException(token, + path[0], "Unexpected token"); + } + + String fieldName = tokenizer.textValue(); + if (fieldName.equals(path[level])) { + return parseInnerLevel(tokenizer, level); + } else { + skipElement(tokenizer); + } + } + } + + private boolean parseInnerLevel(TokenIterator tokenizer, int level) throws MessageContextException { + JsonToken token = tokenizer.requireNext(); + if (level == path.length - 1) { + switch (token) { + case VALUE_NULL: + return false; + case START_ARRAY: + return true; + default: + throw new MessageContextException(token, + path[level], "Expected JSON array for final path element"); + } + } + if (token != JsonToken.START_OBJECT) { + throw new MessageParser.MessageContextException(token, + path[level], "Expected JSON object"); + } + return parseToElement(tokenizer, level + 1); + } + + private void skipElement(TokenIterator tokenizer) { + int level = 0; + do { + JsonToken token = tokenizer.requireNext(); + switch (token) { + case START_OBJECT: + case START_ARRAY: + level++; + break; + case END_OBJECT: + case END_ARRAY: + level--; + break; + default: + break; + } + } while (level > 0); + } + + @Override + public void parseSuffix(TokenIterator tokenizer) { + // No need to parse the unwanted tail elements. + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/BaseJsonLoaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/BaseJsonLoaderTest.java index d45b197..17cdc06 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/BaseJsonLoaderTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/BaseJsonLoaderTest.java @@ -28,6 +28,7 @@ import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl; import org.apache.drill.exec.physical.resultSet.impl.ResultSetOptionBuilder; import org.apache.drill.exec.physical.rowSet.RowSet; import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder; import org.apache.drill.test.SubOperatorTest; public class BaseJsonLoaderTest extends SubOperatorTest { @@ -43,7 +44,13 @@ public class BaseJsonLoaderTest extends SubOperatorTest { public void open(InputStream is) { rsLoader = new ResultSetLoaderImpl(fixture.allocator(), rsLoaderOptions.build()); - loader = new JsonLoaderImpl(rsLoader, providedSchema, jsonOptions, errorContext, is); + loader = new JsonLoaderBuilder() + .resultSetLoader(rsLoader) + .providedSchema(providedSchema) + .options(jsonOptions) + .errorContext(errorContext) + .fromStream(is) + .build(); } public void open(String json) { @@ -52,10 +59,10 @@ public class BaseJsonLoaderTest extends SubOperatorTest { } public RowSet next() { - if (!loader.next()) { + rsLoader.startBatch(); + if (!loader.readBatch()) { return null; } - loader.endBatch(); return fixture.wrap(rsLoader.harvest()); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/BaseTestJsonParser.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/BaseTestJsonParser.java index c77807d..8e75109 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/BaseTestJsonParser.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/BaseTestJsonParser.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Set; import org.apache.commons.io.input.ReaderInputStream; +import org.apache.drill.exec.store.easy.json.parser.JsonStructureParser.JsonStructureParserBuilder; import org.apache.drill.exec.vector.accessor.UnsupportedConversionError; import com.fasterxml.jackson.core.JsonParseException; @@ -63,37 +64,42 @@ public class BaseTestJsonParser { @Override public RuntimeException parseError(String msg, JsonParseException e) { - throw new JsonErrorFixture("parseError", msg, e); + return new JsonErrorFixture("parseError", msg, e); } @Override public RuntimeException ioException(IOException e) { - throw new JsonErrorFixture("ioException", "", e); + return new JsonErrorFixture("ioException", "", e); } @Override public RuntimeException structureError(String msg) { - throw new JsonErrorFixture("structureError", msg); + return new JsonErrorFixture("structureError", msg); } @Override public RuntimeException syntaxError(JsonParseException e) { - throw new JsonErrorFixture("syntaxError", "", e); + return new JsonErrorFixture("syntaxError", "", e); } @Override public RuntimeException typeError(UnsupportedConversionError e) { - throw new JsonErrorFixture("typeError", "", e); + return new JsonErrorFixture("typeError", "", e); } @Override public RuntimeException syntaxError(JsonToken token) { - throw new JsonErrorFixture("syntaxError", token.toString()); + return new JsonErrorFixture("syntaxError", token.toString()); } @Override public RuntimeException unrecoverableError() { - throw new JsonErrorFixture("unrecoverableError", ""); + return new JsonErrorFixture("unrecoverableError", ""); + } + + @Override + public RuntimeException messageParseError(MessageParser.MessageContextException e) { + return new JsonErrorFixture("messageParseError", "Message parse error", e); } } @@ -252,16 +258,25 @@ public class BaseTestJsonParser { } protected static class JsonParserFixture { + JsonStructureParserBuilder builder; JsonStructureOptions options = new JsonStructureOptions(); JsonStructureParser parser; ObjectListenerFixture rootObject = new ObjectListenerFixture(); ErrorFactory errorFactory = new ErrorFactoryFixture(); + public JsonParserFixture() { + builder = new JsonStructureParserBuilder(); + } + public void open(String json) { InputStream inStream = new ReaderInputStream(new StringReader(json)); - parser = new JsonStructureParser(inStream, options, rootObject, - errorFactory); + builder + .fromStream(inStream) + .options(options) + .rootListener(rootObject) + .errorFactory(errorFactory); + parser = builder.build(); } public boolean next() { diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/TestJsonParserMessage.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/TestJsonParserMessage.java new file mode 100644 index 0000000..4905d1f --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/TestJsonParserMessage.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.json.parser; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import org.apache.drill.exec.store.easy.json.parser.ValueDef.JsonType; +import org.junit.Test; + +import com.fasterxml.jackson.core.JsonToken; + +public class TestJsonParserMessage extends BaseTestJsonParser { + + /** + * Example message parser. A real parser would provide much better + * error messages for badly-formed JSON or error codes. + */ + private static class MessageParserFixture implements MessageParser { + + @Override + public boolean parsePrefix(TokenIterator tokenizer) { + assertEquals(JsonToken.START_OBJECT, tokenizer.requireNext()); + assertEquals(JsonToken.FIELD_NAME, tokenizer.requireNext()); + assertEquals(JsonToken.VALUE_STRING, tokenizer.requireNext()); + if (!"ok".equals(tokenizer.stringValue())) { + return false; + } + assertEquals(JsonToken.FIELD_NAME, tokenizer.requireNext()); + assertEquals(JsonToken.START_ARRAY, tokenizer.requireNext()); + return true; + } + + @Override + public void parseSuffix(TokenIterator tokenizer) { + assertEquals(JsonToken.END_OBJECT, tokenizer.requireNext()); + } + } + + /** + * Test the ability to wrap the data objects with a custom message + * structure, typical of a REST call. + */ + @Test + public void testMessageParser() { + final String json = + "{ status: \"ok\", data: [{a: 0}, {a: 100}, {a: null}]}"; + JsonParserFixture fixture = new JsonParserFixture(); + fixture.builder.messageParser(new MessageParserFixture()); + fixture.open(json); + assertTrue(fixture.next()); + ValueListenerFixture a = fixture.field("a"); + assertEquals(JsonType.INTEGER, a.valueDef.type()); + assertEquals(2, fixture.read()); + assertEquals(1, a.nullCount); + assertEquals(100L, a.value); + fixture.close(); + } + + /** + * Test the ability to cancel the data load if a message header + * indicates that there is no data. + */ + @Test + public void testMessageParserEOF() { + final String json = + "{ status: \"fail\", data: [{a: 0}, {a: 100}, {a: null}]}"; + JsonParserFixture fixture = new JsonParserFixture(); + fixture.builder.messageParser(new MessageParserFixture()); + fixture.open(json); + assertFalse(fixture.next()); + fixture.close(); + } + + @Test + public void testDataPath() { + final String json = + "{ status: \"ok\", data: [{a: 0}, {a: 100}, {a: null}]}"; + JsonParserFixture fixture = new JsonParserFixture(); + fixture.builder.dataPath("data"); + fixture.open(json); + assertTrue(fixture.next()); + ValueListenerFixture a = fixture.field("a"); + assertEquals(JsonType.INTEGER, a.valueDef.type()); + assertEquals(2, fixture.read()); + assertEquals(1, a.nullCount); + assertEquals(100L, a.value); + fixture.close(); + } + + @Test + public void testComplexDataPath() { + final String json = + "{ status: {result : \"ok\", runtime: 123},\n" + + " response: { rowCount: 1,\n" + + " data: [{a: 0}, {a: 100}, {a: null}]},\n" + + " footer: \"some stuff\"}"; + JsonParserFixture fixture = new JsonParserFixture(); + fixture.builder.dataPath("response/data"); + fixture.open(json); + assertTrue(fixture.next()); + ValueListenerFixture a = fixture.field("a"); + assertEquals(JsonType.INTEGER, a.valueDef.type()); + assertEquals(2, fixture.read()); + assertEquals(1, a.nullCount); + assertEquals(100L, a.value); + fixture.close(); + } + + @Test + public void testDataPathNull() { + final String json = + "{ status: \"fail\", data: null}"; + JsonParserFixture fixture = new JsonParserFixture(); + fixture.builder.messageParser(new MessageParserFixture()); + fixture.open(json); + assertFalse(fixture.next()); + fixture.close(); + } + + @Test + public void testDataPathMissing() { + final String json = + "{ status: \"fail\"}"; + JsonParserFixture fixture = new JsonParserFixture(); + fixture.builder.messageParser(new MessageParserFixture()); + fixture.open(json); + assertFalse(fixture.next()); + fixture.close(); + } + + @Test + public void testDataPathErrorRoot() { + final String json = "\"Bogus!\""; + JsonParserFixture fixture = new JsonParserFixture(); + fixture.builder.dataPath("data"); + try { + fixture.open(json); + fail(); + } catch (JsonErrorFixture e) { + assertTrue(e.errorType.equals("messageParseError")); + assertTrue(e.getCause() instanceof MessageParser.MessageContextException); + } + fixture.close(); + } + + @Test + public void testDataPathErrorLeaf() { + final String json = + "{ status: \"bogus\", data: { notValid: \"must be array\"}}"; + JsonParserFixture fixture = new JsonParserFixture(); + fixture.builder.dataPath("data"); + try { + fixture.open(json); + fail(); + } catch (JsonErrorFixture e) { + assertTrue(e.errorType.equals("messageParseError")); + assertTrue(e.getCause() instanceof MessageParser.MessageContextException); + } + fixture.close(); + } +}