>From Savyasach Reddy <[email protected]>:

Savyasach Reddy has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19130 )


Change subject: Improve error handling
......................................................................

Improve error handling

Change-Id: Icdb0b51cf7444de4074e22b78f02d3b29b07414f
---
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
6 files changed, 47 insertions(+), 17 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/30/19130/1

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index 135b93f..934ba1d 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -73,6 +73,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.api.util.ExceptionUtils;
 import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.hdfs.dataflow.ConfFactory;
 import org.apache.hyracks.hdfs.dataflow.InputSplitsFactory;
@@ -160,8 +161,10 @@
             }
         } catch (FileNotFoundException ex) {
             throw 
CompilationException.create(ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
-        } catch (InterruptedException | IOException ex) {
+        } catch (InterruptedException ex) {
             throw HyracksDataException.create(ex);
+        } catch (IOException ex) {
+            throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, 
ExceptionUtils.getMessageOrToString(ex));
         }
     }

@@ -229,10 +232,11 @@
      * 1. when target files are not null, it generates a file aware input 
stream that validate
      * against the files
      * 2. if the data is binary, it returns a generic reader */
-    public AsterixInputStream createInputStream(IHyracksTaskContext ctx) 
throws HyracksDataException {
+    public AsterixInputStream createInputStream(IHyracksTaskContext ctx, 
IExternalDataRuntimeContext context)
+            throws HyracksDataException {
         try {
             restoreConfig(ctx);
-            return new HDFSInputStream(read, inputSplits, readSchedule, 
nodeName, conf, configuration, ugi);
+            return new HDFSInputStream(read, inputSplits, readSchedule, 
nodeName, conf, configuration, ugi, context);
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
@@ -307,7 +311,7 @@
         try {
             if (recordReaderClazz != null) {
                 StreamRecordReader streamReader = (StreamRecordReader) 
recordReaderClazz.getConstructor().newInstance();
-                streamReader.configure(ctx, createInputStream(ctx), 
configuration);
+                streamReader.configure(ctx, createInputStream(ctx, context), 
configuration);
                 return streamReader;
             }
             restoreConfig(ctx);
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
index d508336..c1bad6c 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
@@ -24,9 +24,12 @@

 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import 
org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
 import org.apache.asterix.external.input.record.reader.hdfs.EmptyRecordReader;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -49,12 +52,14 @@
     private JobConf conf;
     private int pos = 0;
     private UserGroupInformation ugi;
+    private IExternalFilterValueEmbedder valueEmbedder;

     @SuppressWarnings("unchecked")
     public HDFSInputStream(boolean read[], InputSplit[] inputSplits, String[] 
readSchedule, String nodeName,
-            JobConf conf, Map<String, String> configuration, 
UserGroupInformation ugi)
-            throws IOException, AsterixException {
+            JobConf conf, Map<String, String> configuration, 
UserGroupInformation ugi,
+            IExternalDataRuntimeContext context) throws IOException, 
AsterixException {
         this.ugi = ugi;
+        this.valueEmbedder = context.getValueEmbedder();
         this.read = read;
         this.inputSplits = inputSplits;
         this.readSchedule = readSchedule;
@@ -169,6 +174,7 @@
     }

     private RecordReader<Object, Text> getRecordReader(int splitIndex) throws 
IOException {
+        valueEmbedder.setPath(getPath(inputSplits[splitIndex]));
         try {
             reader = ugi == null ? getReader(splitIndex)
                     : ugi.doAs((PrivilegedExceptionAction<RecordReader<Object, 
Text>>) () -> getReader(splitIndex));
@@ -187,4 +193,12 @@
     private RecordReader<Object, Text> getReader(int splitIndex) throws 
IOException {
         return (RecordReader<Object, Text>) 
inputFormat.getRecordReader(inputSplits[splitIndex], conf, Reporter.NULL);
     }
+
+    private String getPath(InputSplit split) {
+        if (split instanceof FileSplit) {
+            return ((FileSplit) split).getPath().toString();
+        } else {
+            return split.toString();
+        }
+    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index f06638d..4d093f5 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -1134,8 +1134,7 @@
                 protocol = nodePathPair[0];
                 break;
             case ExternalDataConstants.KEY_ADAPTER_NAME_HDFS:
-                protocol = ExternalDataConstants.KEY_HDFS_URL;
-                break;
+                return 
configurations.get(ExternalDataConstants.KEY_HDFS_URL).replaceAll("/+$", "");
             default:
                 return "";
         }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index 75d68ba..b08c507 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -547,6 +547,9 @@

     public static void validateProperties(Map<String, String> configuration, 
SourceLocation srcLoc,
             IWarningCollector collector) throws CompilationException {
+        if (configuration.get(ExternalDataConstants.KEY_HDFS_URL) == null) {
+            throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, 
srcLoc, ExternalDataConstants.KEY_HDFS_URL);
+        }
         if (configuration.get(ExternalDataConstants.KEY_INPUT_FORMAT) == null) 
{
             throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, 
srcLoc,
                     ExternalDataConstants.KEY_INPUT_FORMAT);
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
index 9203c8f..f06b8e9 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.external.writer;

+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -79,7 +81,7 @@
                     }
                 }
             } catch (IOException ex) {
-                throw HyracksDataException.create(ex);
+                throw 
RuntimeDataException.create(ErrorCode.EXTERNAL_SINK_ERROR, ex, 
getMessageOrToString(ex));
             }
         }
     }
@@ -94,8 +96,8 @@
             printer.newStream(outputStream);
         } catch (FileAlreadyExistsException e) {
             return false;
-        } catch (IOException e) {
-            throw HyracksDataException.create(e);
+        } catch (IOException ex) {
+            throw RuntimeDataException.create(ErrorCode.EXTERNAL_SINK_ERROR, 
ex, getMessageOrToString(ex));
         }
         return true;
     }
@@ -113,7 +115,7 @@
                 fs.delete(path, false);
             }
         } catch (IOException ex) {
-            throw HyracksDataException.create(ex);
+            throw RuntimeDataException.create(ErrorCode.EXTERNAL_SINK_ERROR, 
ex, getMessageOrToString(ex));
         }
     }

@@ -125,7 +127,7 @@
                 fs.rename(path, new Path(path.getParent(), 
path.getName().substring(1)));
             }
         } catch (IOException ex) {
-            throw HyracksDataException.create(ex);
+            throw RuntimeDataException.create(ErrorCode.EXTERNAL_SINK_ERROR, 
ex, getMessageOrToString(ex));
         }
     }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
index a4113d1..7bb07bc 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.external.writer;

-import static 
org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR;
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;

 import java.io.IOException;
@@ -89,7 +88,7 @@
         } catch (InterruptedException ex) {
             throw HyracksDataException.create(ex);
         } catch (IOException ex) {
-            throw CompilationException.create(EXTERNAL_SOURCE_ERROR, ex, 
getMessageOrToString(ex));
+            throw new CompilationException(ErrorCode.EXTERNAL_SINK_ERROR, ex, 
getMessageOrToString(ex));
         }
     }

@@ -135,7 +134,7 @@
                 doValidate(testFs);
             }
         } catch (IOException ex) {
-            throw CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR, 
ExceptionUtils.getMessageOrToString(ex));
+            throw new CompilationException(ErrorCode.EXTERNAL_SINK_ERROR, 
ExceptionUtils.getMessageOrToString(ex));
         }
     }

@@ -171,7 +170,7 @@
                 outputStream.write(0);
             }
         } catch (IOException ex) {
-            throw CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR, 
ex, getMessageOrToString(ex));
+            throw new CompilationException(ErrorCode.EXTERNAL_SINK_ERROR, ex, 
getMessageOrToString(ex));
         }
     }
 }

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19130
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Icdb0b51cf7444de4074e22b78f02d3b29b07414f
Gerrit-Change-Number: 19130
Gerrit-PatchSet: 1
Gerrit-Owner: Savyasach Reddy <[email protected]>
Gerrit-MessageType: newchange

Reply via email to