>From Savyasach Reddy <[email protected]>:
Savyasach Reddy has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19130 )
Change subject: Improve error handling
......................................................................
Improve error handling
Change-Id: Icdb0b51cf7444de4074e22b78f02d3b29b07414f
---
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
6 files changed, 47 insertions(+), 17 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/30/19130/1
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index 135b93f..934ba1d 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -73,6 +73,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.api.util.ExceptionUtils;
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.hdfs.dataflow.ConfFactory;
import org.apache.hyracks.hdfs.dataflow.InputSplitsFactory;
@@ -160,8 +161,10 @@
}
} catch (FileNotFoundException ex) {
throw
CompilationException.create(ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
- } catch (InterruptedException | IOException ex) {
+ } catch (InterruptedException ex) {
throw HyracksDataException.create(ex);
+ } catch (IOException ex) {
+ throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR,
ExceptionUtils.getMessageOrToString(ex));
}
}
@@ -229,10 +232,11 @@
* 1. when target files are not null, it generates a file aware input
stream that validate
* against the files
* 2. if the data is binary, it returns a generic reader */
- public AsterixInputStream createInputStream(IHyracksTaskContext ctx)
throws HyracksDataException {
+ public AsterixInputStream createInputStream(IHyracksTaskContext ctx,
IExternalDataRuntimeContext context)
+ throws HyracksDataException {
try {
restoreConfig(ctx);
- return new HDFSInputStream(read, inputSplits, readSchedule,
nodeName, conf, configuration, ugi);
+ return new HDFSInputStream(read, inputSplits, readSchedule,
nodeName, conf, configuration, ugi, context);
} catch (Exception e) {
throw HyracksDataException.create(e);
}
@@ -307,7 +311,7 @@
try {
if (recordReaderClazz != null) {
StreamRecordReader streamReader = (StreamRecordReader)
recordReaderClazz.getConstructor().newInstance();
- streamReader.configure(ctx, createInputStream(ctx),
configuration);
+ streamReader.configure(ctx, createInputStream(ctx, context),
configuration);
return streamReader;
}
restoreConfig(ctx);
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
index d508336..c1bad6c 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
@@ -24,9 +24,12 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import
org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
import org.apache.asterix.external.input.record.reader.hdfs.EmptyRecordReader;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -49,12 +52,14 @@
private JobConf conf;
private int pos = 0;
private UserGroupInformation ugi;
+ private IExternalFilterValueEmbedder valueEmbedder;
@SuppressWarnings("unchecked")
public HDFSInputStream(boolean read[], InputSplit[] inputSplits, String[]
readSchedule, String nodeName,
- JobConf conf, Map<String, String> configuration,
UserGroupInformation ugi)
- throws IOException, AsterixException {
+ JobConf conf, Map<String, String> configuration,
UserGroupInformation ugi,
+ IExternalDataRuntimeContext context) throws IOException,
AsterixException {
this.ugi = ugi;
+ this.valueEmbedder = context.getValueEmbedder();
this.read = read;
this.inputSplits = inputSplits;
this.readSchedule = readSchedule;
@@ -169,6 +174,7 @@
}
private RecordReader<Object, Text> getRecordReader(int splitIndex) throws
IOException {
+ valueEmbedder.setPath(getPath(inputSplits[splitIndex]));
try {
reader = ugi == null ? getReader(splitIndex)
: ugi.doAs((PrivilegedExceptionAction<RecordReader<Object,
Text>>) () -> getReader(splitIndex));
@@ -187,4 +193,12 @@
private RecordReader<Object, Text> getReader(int splitIndex) throws
IOException {
return (RecordReader<Object, Text>)
inputFormat.getRecordReader(inputSplits[splitIndex], conf, Reporter.NULL);
}
+
+ private String getPath(InputSplit split) {
+ if (split instanceof FileSplit) {
+ return ((FileSplit) split).getPath().toString();
+ } else {
+ return split.toString();
+ }
+ }
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index f06638d..4d093f5 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -1134,8 +1134,7 @@
protocol = nodePathPair[0];
break;
case ExternalDataConstants.KEY_ADAPTER_NAME_HDFS:
- protocol = ExternalDataConstants.KEY_HDFS_URL;
- break;
+ return
configurations.get(ExternalDataConstants.KEY_HDFS_URL).replaceAll("/+$", "");
default:
return "";
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index 75d68ba..b08c507 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -547,6 +547,9 @@
public static void validateProperties(Map<String, String> configuration,
SourceLocation srcLoc,
IWarningCollector collector) throws CompilationException {
+ if (configuration.get(ExternalDataConstants.KEY_HDFS_URL) == null) {
+ throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED,
srcLoc, ExternalDataConstants.KEY_HDFS_URL);
+ }
if (configuration.get(ExternalDataConstants.KEY_INPUT_FORMAT) == null)
{
throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED,
srcLoc,
ExternalDataConstants.KEY_INPUT_FORMAT);
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
index 9203c8f..f06b8e9 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.external.writer;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -79,7 +81,7 @@
}
}
} catch (IOException ex) {
- throw HyracksDataException.create(ex);
+ throw
RuntimeDataException.create(ErrorCode.EXTERNAL_SINK_ERROR, ex,
getMessageOrToString(ex));
}
}
}
@@ -94,8 +96,8 @@
printer.newStream(outputStream);
} catch (FileAlreadyExistsException e) {
return false;
- } catch (IOException e) {
- throw HyracksDataException.create(e);
+ } catch (IOException ex) {
+ throw RuntimeDataException.create(ErrorCode.EXTERNAL_SINK_ERROR,
ex, getMessageOrToString(ex));
}
return true;
}
@@ -113,7 +115,7 @@
fs.delete(path, false);
}
} catch (IOException ex) {
- throw HyracksDataException.create(ex);
+ throw RuntimeDataException.create(ErrorCode.EXTERNAL_SINK_ERROR,
ex, getMessageOrToString(ex));
}
}
@@ -125,7 +127,7 @@
fs.rename(path, new Path(path.getParent(),
path.getName().substring(1)));
}
} catch (IOException ex) {
- throw HyracksDataException.create(ex);
+ throw RuntimeDataException.create(ErrorCode.EXTERNAL_SINK_ERROR,
ex, getMessageOrToString(ex));
}
}
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
index a4113d1..7bb07bc 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.external.writer;
-import static
org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR;
import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
import java.io.IOException;
@@ -89,7 +88,7 @@
} catch (InterruptedException ex) {
throw HyracksDataException.create(ex);
} catch (IOException ex) {
- throw CompilationException.create(EXTERNAL_SOURCE_ERROR, ex,
getMessageOrToString(ex));
+ throw new CompilationException(ErrorCode.EXTERNAL_SINK_ERROR, ex,
getMessageOrToString(ex));
}
}
@@ -135,7 +134,7 @@
doValidate(testFs);
}
} catch (IOException ex) {
- throw CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR,
ExceptionUtils.getMessageOrToString(ex));
+ throw new CompilationException(ErrorCode.EXTERNAL_SINK_ERROR,
ExceptionUtils.getMessageOrToString(ex));
}
}
@@ -171,7 +170,7 @@
outputStream.write(0);
}
} catch (IOException ex) {
- throw CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR,
ex, getMessageOrToString(ex));
+ throw new CompilationException(ErrorCode.EXTERNAL_SINK_ERROR, ex,
getMessageOrToString(ex));
}
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19130
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Icdb0b51cf7444de4074e22b78f02d3b29b07414f
Gerrit-Change-Number: 19130
Gerrit-PatchSet: 1
Gerrit-Owner: Savyasach Reddy <[email protected]>
Gerrit-MessageType: newchange