Xikui Wang has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1713
Change subject: Fix continue ingestion bug when exception happens ...................................................................... Fix continue ingestion bug when exception happens 1. Fix the bug when exception happens, localfs adapter couldn't pick up a new file to continue the ingestion. 2. Change the exception handling from string to error code. Change-Id: Ie8656a4d1afabbc1b481eb97509a861b22478676 --- M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java M asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java 5 files changed, 11 insertions(+), 7 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/13/1713/1 diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java index 9de9dde..702cb0a 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java @@ -117,7 +117,7 @@ public static final int INPUT_RECORD_RECORD_WITH_METADATA_AND_PK_NULL_IN_NON_OPTIONAL = 3018; public static final int INPUT_RECORD_RECORD_WITH_METADATA_AND_PK_CANNT_GET_PKEY = 3019; public static final int FEED_CHANGE_FEED_CONNECTIVITY_ON_ALIVE_FEED = 3020; - public static final int FEED_MANAGEMENT_FEED_EVENT_REGISTER_INTAKE_JOB_FAIL = 3021; + public static final int RECORD_READER_MALFORMED_INPUT_STREAM = 3021; public static final int PROVIDER_DATAFLOW_CONTROLLER_UNKNOWN_DATA_SOURCE = 3022; public static final int PROVIDER_DATASOURCE_FACTORY_UNKNOWN_INPUT_STREAM_FACTORY = 3023; public static final int UTIL_EXTERNAL_DATA_UTILS_FAIL_CREATE_STREAM_FACTORY = 3024; diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties index b6423f6..fa64037 100644 --- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties +++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties @@ -103,7 +103,7 @@ 3018 = Field %1$s of meta record is not an optional type so it cannot accept null value. 3019 = Can't get PK from record part 3020 = This operation cannot be done when Feed %1$s is alive. -3021 = Could not register feed intake job [%1$s] for feed %2$s +3021 = Malformed Input Stream. 3022 = Unknown data source type: %1$s 3023 = Unknown input stream factory: %1$s 3024 = Failed to create stream factory diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java index 1b12dc1..d01859e 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java @@ -177,7 +177,7 @@ if (!recordReader.handleException(th)) { finish(); } - return closed.get(); + return !closed.get(); } public IRecordReader<T> getReader() { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java index 4d6d004..7614e6e 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java @@ -20,7 +20,9 @@ import java.io.IOException; +import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.ExceptionUtils; +import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.external.api.AsterixInputStream; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -101,7 +103,7 @@ // corrupted file. clear the buffer and stop reading reader.reset(); bufferPosn = bufferLength = 0; - throw new IOException("Malformed input stream"); + throw new RuntimeDataException(ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM); } } } @@ -141,7 +143,7 @@ } catch (IOException e) { reader.reset(); bufferPosn = bufferLength = 0; - throw new IOException("Malformed input stream"); + throw new RuntimeDataException(ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM); } } } while (!hasFinished); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java index 3c3b8fb..eb099f1 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java @@ -22,6 +22,8 @@ import java.io.FileInputStream; import java.io.IOException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.external.api.AsterixInputStream; import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController; import org.apache.asterix.external.util.ExternalDataConstants; @@ -154,8 +156,8 @@ return false; } if (th instanceof IOException) { - // TODO: Change from string check to exception type - if (th.getCause().getMessage().contains("Malformed input stream")) { + if (th instanceof RuntimeDataException + && ((RuntimeDataException) th).getErrorCode() == ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM) { if (currentFile != null) { try { logManager.logRecord(currentFile.getAbsolutePath(), "Corrupted input file"); -- To view, visit https://asterix-gerrit.ics.uci.edu/1713 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ie8656a4d1afabbc1b481eb97509a861b22478676 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Xikui Wang <xkk...@gmail.com>