>From Hussain Towaileb <[email protected]>:
Hussain Towaileb has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20531?usp=email )
Change subject: [NO ISSUE][EXT]: ensure CSV header is written in all files
......................................................................
[NO ISSUE][EXT]: ensure CSV header is written in all files
Details:
- if header is enabled in COPY TO CSV, ensure
header is written to all files, not just the
first file.
Ext-ref: MB-69147
Change-Id: I7716408908ddb40ec9f26e892da4f7d888751a8b
---
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/AbstractTextualExternalPrinter.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java
M
asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ARecordPrinterFactory.java
M
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java
M
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/csv/ACSVRecordPrinter.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinter.java
M
hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IPrinter.java
12 files changed, 81 insertions(+), 12 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/31/20531/1
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
index f93ae92..7941e8f 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
@@ -106,6 +106,20 @@
}
@Override
+ public void write(IValueReference value, long tupleNumber) throws
HyracksDataException {
+ try {
+ runWithRetryIfSdkException(() -> printer.print(value,
tupleNumber));
+ } catch (HyracksDataException e) {
+ throw e;
+ } catch (Exception e) {
+ if (isSdkException(e)) {
+ throw
RuntimeDataException.create(ErrorCode.EXTERNAL_SINK_ERROR, e,
getMessageOrToString(e));
+ }
+ throw e;
+ }
+ }
+
+ @Override
public final void abort() throws HyracksDataException {
try {
if (cloudWriter != null) {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
index f06b8e9..ce8af7e 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
@@ -108,6 +108,11 @@
}
@Override
+ public void write(IValueReference value, long tupleNumber) throws
HyracksDataException {
+ printer.print(value, tupleNumber);
+ }
+
+ @Override
public void abort() throws HyracksDataException {
try {
printer.close();
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java
index 4166dde..cdbeb96 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java
@@ -76,6 +76,11 @@
}
@Override
+ public void write(IValueReference value, long tupleNumber) throws
HyracksDataException {
+ printer.print(value, tupleNumber);
+ }
+
+ @Override
public void abort() throws HyracksDataException {
printer.close();
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/AbstractTextualExternalPrinter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/AbstractTextualExternalPrinter.java
index 8d7d60f..af62785 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/AbstractTextualExternalPrinter.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/AbstractTextualExternalPrinter.java
@@ -64,6 +64,13 @@
delegate.checkError();
}
+ @Override
+ public void print(IValueReference value, long tupleNumber) throws
HyracksDataException {
+ printer.print(value.getByteArray(), value.getStartOffset(),
value.getLength(), tupleNumber, printStream);
+ afterPrint();
+ delegate.checkError();
+ }
+
abstract void afterPrint() throws HyracksDataException;
@Override
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java
index 1a3aea1..70ef30e 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java
@@ -91,6 +91,11 @@
}
@Override
+ public void print(IValueReference value, long tupleNumber) throws
HyracksDataException {
+ print(value);
+ }
+
+ @Override
public void close() throws HyracksDataException {
if (this.writer != null) {
try {
diff --git
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ARecordPrinterFactory.java
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ARecordPrinterFactory.java
index cef0e6e..53981ae 100644
---
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ARecordPrinterFactory.java
+++
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ARecordPrinterFactory.java
@@ -21,9 +21,9 @@
import java.io.PrintStream;
import java.util.Map;
+import org.apache.asterix.om.pointables.ARecordVisitablePointable;
import org.apache.asterix.om.pointables.PointableAllocator;
import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
-import org.apache.asterix.om.pointables.base.IVisitablePointable;
import org.apache.asterix.om.pointables.printer.csv.APrintVisitor;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
@@ -56,7 +56,7 @@
final PointableAllocator allocator = new PointableAllocator();
final IAType inputType =
recType == null ?
DefaultOpenFieldType.getDefaultOpenFieldType(ATypeTag.OBJECT) : recType;
- final IVisitablePointable recAccessor =
allocator.allocateRecordValue(inputType);
+ final ARecordVisitablePointable recAccessor =
allocator.allocateRecordValue(inputType);
final APrintVisitor printVisitor = new APrintVisitor(context,
itemType, formatConfigs, configuration);
final Pair<PrintStream, ATypeTag> arg = new Pair<>(null, null);
@@ -72,6 +72,15 @@
arg.first = ps;
recAccessor.accept(printVisitor, arg);
}
+
+ @Override
+ public void print(byte[] b, int start, int l, long tupleNumber,
PrintStream ps)
+ throws HyracksDataException {
+ recAccessor.set(b, start, l);
+ recAccessor.setRecordNumber(tupleNumber);
+ arg.first = ps;
+ recAccessor.accept(printVisitor, arg);
+ }
};
}
}
diff --git
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java
index 440fd6f..0f1eeb1 100644
---
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java
+++
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java
@@ -78,8 +78,9 @@
private final IVisitablePointable nullReference =
PointableAllocator.allocateUnrestableEmpty();
private final IVisitablePointable missingReference =
PointableAllocator.allocateUnrestableEmpty();
- private int closedPartTypeInfoSize = 0;
+ private int closedPartTypeInfoSize;
private ATypeTag typeTag;
+ private long recordNumber = 0;
/**
* private constructor, to prevent constructing it arbitrarily
@@ -335,6 +336,14 @@
return inputRecType;
}
+ public void setRecordNumber(long recordNumber) {
+ this.recordNumber = recordNumber;
+ }
+
+ public long getRecordNumber() {
+ return recordNumber;
+ }
+
@Override
public <R, T> R accept(IVisitablePointableVisitor<R, T> vistor, T tag)
throws HyracksDataException {
return vistor.visit(this, tag);
diff --git
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/csv/ACSVRecordPrinter.java
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/csv/ACSVRecordPrinter.java
index b031c64..ecf1cbd 100644
---
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/csv/ACSVRecordPrinter.java
+++
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/csv/ACSVRecordPrinter.java
@@ -52,7 +52,6 @@
private final boolean header;
private final String recordDelimiter;
private final Map<String, ATypeTag> recordSchemaDetails = new HashMap<>();
- private boolean firstRecord;
private List<String> expectedFieldNames;
private List<IAType> expectedFieldTypes;
@@ -62,7 +61,6 @@
this.warningCollector = warningCollector;
this.header = header;
this.schema = schema;
- this.firstRecord = true;
this.recordDelimiter = recordDelimiter;
if (schema != null) {
this.expectedFieldNames = Arrays.asList(schema.getFieldNames());
@@ -88,16 +86,14 @@
if (isValidSchema(recordAccessor)) {
nameVisitorArg.first = ps;
itemVisitorArg.first = ps;
- if (header && firstRecord) {
+ if (header && recordAccessor.getRecordNumber() == 0) {
printHeader(recordAccessor, ps, visitor);
- firstRecord = false;
}
// add record delimiter
// by default the separator between the header and the records is
"\n"
- if (firstRecord) {
- firstRecord = false;
- } else {
+ // print new line (delimiter) every time except if header is false
and we're writing the first record
+ if (header || recordAccessor.getRecordNumber() > 0) {
ps.print(recordDelimiter);
}
final List<IVisitablePointable> fieldNames =
recordAccessor.getFieldNames();
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
index e8bae00..d7e81fb 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
@@ -66,7 +66,7 @@
newFile();
}
- writer.write(value);
+ writer.write(value, tupleCounter);
tupleCounter++;
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java
index f8ae81b..c7619d0 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java
@@ -48,13 +48,21 @@
boolean newFile(String directory, String fileName) throws
HyracksDataException;
/**
- * Writer the provided value
+ * Write the provided value
*
* @param value to write
*/
void write(IValueReference value) throws HyracksDataException;
/**
+ * Write the provided value and pass along the tuple number relative to
the current file
+ *
+ * @param value to write
+ * @param tupleNumber tuple number
+ */
+ void write(IValueReference value, long tupleNumber) throws
HyracksDataException;
+
+ /**
* Run the abort sequence in case of a failure
*/
void abort() throws HyracksDataException;
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinter.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinter.java
index 54fd152..b6f3b9e 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinter.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinter.java
@@ -48,6 +48,14 @@
void print(IValueReference value) throws HyracksDataException;
/**
+ * Print the provided value and pass along the tuple number relative to
the current file
+ *
+ * @param value to print
+ * @param tupleNumber tuple number
+ */
+ void print(IValueReference value, long tupleNumber) throws
HyracksDataException;
+
+ /**
* Flush and close the printer
*/
void close() throws HyracksDataException;
diff --git
a/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IPrinter.java
b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IPrinter.java
index 2ccb450..2664254 100644
---
a/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IPrinter.java
+++
b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IPrinter.java
@@ -28,4 +28,7 @@
}
void print(byte[] b, int s, int l, PrintStream ps) throws
HyracksDataException;
+
+ default void print(byte[] b, int s, int l, long tupleNumber, PrintStream
ps) throws HyracksDataException {
+ };
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20531?usp=email
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: phoenix
Gerrit-Change-Id: I7716408908ddb40ec9f26e892da4f7d888751a8b
Gerrit-Change-Number: 20531
Gerrit-PatchSet: 1
Gerrit-Owner: Hussain Towaileb <[email protected]>