Xikui Wang has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1713

Change subject: Fix continue ingestion bug when exception happens
......................................................................

Fix continue ingestion bug when exception happens

1. Fix the bug when exception happens, localfs adapter couldn't pick up
   a new file to continue the ingestion.
2. Change the exception handling from string to error code.

Change-Id: Ie8656a4d1afabbc1b481eb97509a861b22478676
---
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
M asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
5 files changed, 11 insertions(+), 7 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/13/1713/1

diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 9de9dde..702cb0a 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -117,7 +117,7 @@
     public static final int 
INPUT_RECORD_RECORD_WITH_METADATA_AND_PK_NULL_IN_NON_OPTIONAL = 3018;
     public static final int 
INPUT_RECORD_RECORD_WITH_METADATA_AND_PK_CANNT_GET_PKEY = 3019;
     public static final int FEED_CHANGE_FEED_CONNECTIVITY_ON_ALIVE_FEED = 3020;
-    public static final int 
FEED_MANAGEMENT_FEED_EVENT_REGISTER_INTAKE_JOB_FAIL = 3021;
+    public static final int RECORD_READER_MALFORMED_INPUT_STREAM = 3021;
     public static final int PROVIDER_DATAFLOW_CONTROLLER_UNKNOWN_DATA_SOURCE = 
3022;
     public static final int 
PROVIDER_DATASOURCE_FACTORY_UNKNOWN_INPUT_STREAM_FACTORY = 3023;
     public static final int 
UTIL_EXTERNAL_DATA_UTILS_FAIL_CREATE_STREAM_FACTORY = 3024;
diff --git 
a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties 
b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index b6423f6..fa64037 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -103,7 +103,7 @@
 3018 = Field %1$s of meta record is not an optional type so it cannot accept 
null value.
 3019 = Can't get PK from record part
 3020 = This operation cannot be done when Feed %1$s is alive.
-3021 = Could not register feed intake job [%1$s] for feed  %2$s
+3021 = Malformed Input Stream.
 3022 = Unknown data source type: %1$s
 3023 = Unknown input stream factory: %1$s
 3024 = Failed to create stream factory
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 1b12dc1..d01859e 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -177,7 +177,7 @@
         if (!recordReader.handleException(th)) {
             finish();
         }
-        return closed.get();
+        return !closed.get();
     }
 
     public IRecordReader<T> getReader() {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
index 4d6d004..7614e6e 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
@@ -20,7 +20,9 @@
 
 import java.io.IOException;
 
+import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -101,7 +103,7 @@
                         // corrupted file. clear the buffer and stop reading
                         reader.reset();
                         bufferPosn = bufferLength = 0;
-                        throw new IOException("Malformed input stream");
+                        throw new 
RuntimeDataException(ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM);
                     }
                 }
             }
@@ -141,7 +143,7 @@
                 } catch (IOException e) {
                     reader.reset();
                     bufferPosn = bufferLength = 0;
-                    throw new IOException("Malformed input stream");
+                    throw new 
RuntimeDataException(ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM);
                 }
             }
         } while (!hasFinished);
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
index 3c3b8fb..eb099f1 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
@@ -22,6 +22,8 @@
 import java.io.FileInputStream;
 import java.io.IOException;
 
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.asterix.external.util.ExternalDataConstants;
@@ -154,8 +156,8 @@
             return false;
         }
         if (th instanceof IOException) {
-            // TODO: Change from string check to exception type
-            if (th.getCause().getMessage().contains("Malformed input stream")) 
{
+            if (th instanceof RuntimeDataException
+                    && ((RuntimeDataException) th).getErrorCode() == 
ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM) {
                 if (currentFile != null) {
                     try {
                         logManager.logRecord(currentFile.getAbsolutePath(), 
"Corrupted input file");

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1713
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ie8656a4d1afabbc1b481eb97509a861b22478676
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <xkk...@gmail.com>

Reply via email to