>From Hussain Towaileb <[email protected]>:

Hussain Towaileb has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20531?usp=email )


Change subject: [NO ISSUE][EXT]: ensure CSV header is written in all files
......................................................................

[NO ISSUE][EXT]: ensure CSV header is written in all files

Details:
- if header is enabled in COPY TO CSV, ensure
  header is written to all files, not just the
  first file.

Ext-ref: MB-69147
Change-Id: I7716408908ddb40ec9f26e892da4f7d888751a8b
---
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/AbstractTextualExternalPrinter.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java
M 
asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ARecordPrinterFactory.java
M 
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java
M 
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/csv/ACSVRecordPrinter.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinter.java
M 
hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IPrinter.java
12 files changed, 81 insertions(+), 12 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/31/20531/1

diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
index f93ae92..7941e8f 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
@@ -106,6 +106,20 @@
     }

     @Override
+    public void write(IValueReference value, long tupleNumber) throws 
HyracksDataException {
+        try {
+            runWithRetryIfSdkException(() -> printer.print(value, 
tupleNumber));
+        } catch (HyracksDataException e) {
+            throw e;
+        } catch (Exception e) {
+            if (isSdkException(e)) {
+                throw 
RuntimeDataException.create(ErrorCode.EXTERNAL_SINK_ERROR, e, 
getMessageOrToString(e));
+            }
+            throw e;
+        }
+    }
+
+    @Override
     public final void abort() throws HyracksDataException {
         try {
             if (cloudWriter != null) {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
index f06b8e9..ce8af7e 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
@@ -108,6 +108,11 @@
     }

     @Override
+    public void write(IValueReference value, long tupleNumber) throws 
HyracksDataException {
+        printer.print(value, tupleNumber);
+    }
+
+    @Override
     public void abort() throws HyracksDataException {
         try {
             printer.close();
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java
index 4166dde..cdbeb96 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java
@@ -76,6 +76,11 @@
     }

     @Override
+    public void write(IValueReference value, long tupleNumber) throws 
HyracksDataException {
+        printer.print(value, tupleNumber);
+    }
+
+    @Override
     public void abort() throws HyracksDataException {
         printer.close();
     }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/AbstractTextualExternalPrinter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/AbstractTextualExternalPrinter.java
index 8d7d60f..af62785 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/AbstractTextualExternalPrinter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/AbstractTextualExternalPrinter.java
@@ -64,6 +64,13 @@
         delegate.checkError();
     }

+    @Override
+    public void print(IValueReference value, long tupleNumber) throws 
HyracksDataException {
+        printer.print(value.getByteArray(), value.getStartOffset(), 
value.getLength(), tupleNumber, printStream);
+        afterPrint();
+        delegate.checkError();
+    }
+
     abstract void afterPrint() throws HyracksDataException;

     @Override
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java
index 1a3aea1..70ef30e 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java
@@ -91,6 +91,11 @@
     }

     @Override
+    public void print(IValueReference value, long tupleNumber) throws 
HyracksDataException {
+        print(value);
+    }
+
+    @Override
     public void close() throws HyracksDataException {
         if (this.writer != null) {
             try {
diff --git 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ARecordPrinterFactory.java
 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ARecordPrinterFactory.java
index cef0e6e..53981ae 100644
--- 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ARecordPrinterFactory.java
+++ 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ARecordPrinterFactory.java
@@ -21,9 +21,9 @@
 import java.io.PrintStream;
 import java.util.Map;
 
+import org.apache.asterix.om.pointables.ARecordVisitablePointable;
 import org.apache.asterix.om.pointables.PointableAllocator;
 import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
-import org.apache.asterix.om.pointables.base.IVisitablePointable;
 import org.apache.asterix.om.pointables.printer.csv.APrintVisitor;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
@@ -56,7 +56,7 @@
         final PointableAllocator allocator = new PointableAllocator();
         final IAType inputType =
                 recType == null ? 
DefaultOpenFieldType.getDefaultOpenFieldType(ATypeTag.OBJECT) : recType;
-        final IVisitablePointable recAccessor = 
allocator.allocateRecordValue(inputType);
+        final ARecordVisitablePointable recAccessor = 
allocator.allocateRecordValue(inputType);
         final APrintVisitor printVisitor = new APrintVisitor(context, 
itemType, formatConfigs, configuration);
         final Pair<PrintStream, ATypeTag> arg = new Pair<>(null, null);

@@ -72,6 +72,15 @@
                 arg.first = ps;
                 recAccessor.accept(printVisitor, arg);
             }
+
+            @Override
+            public void print(byte[] b, int start, int l, long tupleNumber, 
PrintStream ps)
+                    throws HyracksDataException {
+                recAccessor.set(b, start, l);
+                recAccessor.setRecordNumber(tupleNumber);
+                arg.first = ps;
+                recAccessor.accept(printVisitor, arg);
+            }
         };
     }
 }
diff --git 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java
 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java
index 440fd6f..0f1eeb1 100644
--- 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java
+++ 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java
@@ -78,8 +78,9 @@
     private final IVisitablePointable nullReference = 
PointableAllocator.allocateUnrestableEmpty();
     private final IVisitablePointable missingReference = 
PointableAllocator.allocateUnrestableEmpty();

-    private int closedPartTypeInfoSize = 0;
+    private int closedPartTypeInfoSize;
     private ATypeTag typeTag;
+    private long recordNumber = 0;

     /**
      * private constructor, to prevent constructing it arbitrarily
@@ -335,6 +336,14 @@
         return inputRecType;
     }

+    public void setRecordNumber(long recordNumber) {
+        this.recordNumber = recordNumber;
+    }
+
+    public long getRecordNumber() {
+        return recordNumber;
+    }
+
     @Override
     public <R, T> R accept(IVisitablePointableVisitor<R, T> vistor, T tag) 
throws HyracksDataException {
         return vistor.visit(this, tag);
diff --git 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/csv/ACSVRecordPrinter.java
 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/csv/ACSVRecordPrinter.java
index b031c64..ecf1cbd 100644
--- 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/csv/ACSVRecordPrinter.java
+++ 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/csv/ACSVRecordPrinter.java
@@ -52,7 +52,6 @@
     private final boolean header;
     private final String recordDelimiter;
     private final Map<String, ATypeTag> recordSchemaDetails = new HashMap<>();
-    private boolean firstRecord;
     private List<String> expectedFieldNames;
     private List<IAType> expectedFieldTypes;

@@ -62,7 +61,6 @@
         this.warningCollector = warningCollector;
         this.header = header;
         this.schema = schema;
-        this.firstRecord = true;
         this.recordDelimiter = recordDelimiter;
         if (schema != null) {
             this.expectedFieldNames = Arrays.asList(schema.getFieldNames());
@@ -88,16 +86,14 @@
         if (isValidSchema(recordAccessor)) {
             nameVisitorArg.first = ps;
             itemVisitorArg.first = ps;
-            if (header && firstRecord) {
+            if (header && recordAccessor.getRecordNumber() == 0) {
                 printHeader(recordAccessor, ps, visitor);
-                firstRecord = false;
             }

             // add record delimiter
             // by default the separator between the header and the records is 
"\n"
-            if (firstRecord) {
-                firstRecord = false;
-            } else {
+            // print new line (delimiter) every time except if header is false 
and we're writing the first record
+            if (header || recordAccessor.getRecordNumber() > 0) {
                 ps.print(recordDelimiter);
             }
             final List<IVisitablePointable> fieldNames = 
recordAccessor.getFieldNames();
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
index e8bae00..d7e81fb 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
@@ -66,7 +66,7 @@
             newFile();
         }

-        writer.write(value);
+        writer.write(value, tupleCounter);
         tupleCounter++;
     }

diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java
index f8ae81b..c7619d0 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java
@@ -48,13 +48,21 @@
     boolean newFile(String directory, String fileName) throws 
HyracksDataException;

     /**
-     * Writer the provided value
+     * Write the provided value
      *
      * @param value to write
      */
     void write(IValueReference value) throws HyracksDataException;

     /**
+     * Write the provided value and pass along the tuple number relative to 
the current file
+     *
+     * @param value to write
+     * @param tupleNumber tuple number
+     */
+    void write(IValueReference value, long tupleNumber) throws 
HyracksDataException;
+
+    /**
      * Run the abort sequence in case of a failure
      */
     void abort() throws HyracksDataException;
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinter.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinter.java
index 54fd152..b6f3b9e 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinter.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinter.java
@@ -48,6 +48,14 @@
     void print(IValueReference value) throws HyracksDataException;

     /**
+     * Print the provided value and pass along the tuple number relative to 
the current file
+     *
+     * @param value to print
+     * @param tupleNumber tuple number
+     */
+    void print(IValueReference value, long tupleNumber) throws 
HyracksDataException;
+
+    /**
      * Flush and close the printer
      */
     void close() throws HyracksDataException;
diff --git 
a/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IPrinter.java
 
b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IPrinter.java
index 2ccb450..2664254 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IPrinter.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IPrinter.java
@@ -28,4 +28,7 @@
     }

     void print(byte[] b, int s, int l, PrintStream ps) throws 
HyracksDataException;
+
+    default void print(byte[] b, int s, int l, long tupleNumber, PrintStream 
ps) throws HyracksDataException {
+    };
 }

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20531?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: phoenix
Gerrit-Change-Id: I7716408908ddb40ec9f26e892da4f7d888751a8b
Gerrit-Change-Number: 20531
Gerrit-PatchSet: 1
Gerrit-Owner: Hussain Towaileb <[email protected]>

Reply via email to