>From Peeyush Gupta <[email protected]>:

Peeyush Gupta has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19128 )


Change subject: wip: test patch for columnar flush issue (no extra schema 
inference)
......................................................................

wip: test patch for columnar flush issue (no extra schema inference)

Change-Id: I693a2953cf2fcc8ff5aa3620bd772dc9e0fa7eec
---
M 
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
M 
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
M 
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManager.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
M 
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/FloatColumnValuesWriter.java
M 
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/AbstractColumnValuesWriter.java
M 
asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
M 
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnValuesWriterFactory.java
M 
asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/DummyColumnValuesWriter.java
M 
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java
M 
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java
M 
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NullMissingColumnValuesWriter.java
M 
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java
M 
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java
M 
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
M 
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/BooleanColumnValuesWriter.java
M 
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/UUIDColumnValuesWriter.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
M 
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesWriter.java
19 files changed, 225 insertions(+), 58 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/28/19128/1

diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManager.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManager.java
index 6db8a78..7e031f8 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManager.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManager.java
@@ -65,7 +65,7 @@
     @Override
     public IColumnMetadata activate() throws HyracksDataException {
         Mutable<IColumnWriteMultiPageOp> multiPageOpRef = new 
MutableObject<>();
-        IColumnValuesWriterFactory factory = new 
ColumnValuesWriterFactory(multiPageOpRef);
+        IColumnValuesWriterFactory factory = new 
ColumnValuesWriterFactory(multiPageOpRef, true);
         return new FlushColumnMetadata(datasetType, metaType, primaryKeys, 
keySourceIndicator, factory, multiPageOpRef);
     }

@@ -73,7 +73,7 @@
     public IColumnMetadata activate(IValueReference metadata) throws 
HyracksDataException {
         try {
             Mutable<IColumnWriteMultiPageOp> multiPageOpRef = new 
MutableObject<>();
-            IColumnValuesWriterFactory writerFactory = new 
ColumnValuesWriterFactory(multiPageOpRef);
+            IColumnValuesWriterFactory writerFactory = new 
ColumnValuesWriterFactory(multiPageOpRef, true);
             return FlushColumnMetadata.create(datasetType, metaType, 
primaryKeys, keySourceIndicator, writerFactory,
                     multiPageOpRef, metadata);
         } catch (IOException e) {
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
index f514638..5a7281d 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
@@ -124,13 +124,13 @@
         serializeColumnsMetadata();
     }
 
-    private FlushColumnMetadata(ARecordType datasetType, ARecordType metaType, 
List<List<String>> primaryKeys,
+    private FlushColumnMetadata(ARecordType datasetType, ARecordType metaType, 
int numPrimaryKeys,
             boolean metaContainsKeys, IColumnValuesWriterFactory 
columnWriterFactory,
             Mutable<IColumnWriteMultiPageOp> multiPageOpRef, 
List<IColumnValuesWriter> columnWriters,
             IFieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root, 
ObjectSchemaNode metaRoot,
             Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels,
             ArrayBackedValueStorage serializedMetadata) {
-        super(datasetType, metaType, primaryKeys.size());
+        super(datasetType, metaType, numPrimaryKeys);
         this.multiPageOpRef = multiPageOpRef;
         this.columnWriterFactory = columnWriterFactory;
         this.definitionLevels = definitionLevels;
@@ -239,6 +239,32 @@
         }
     }

+    public static FlushColumnMetadata createMutableMetadata(ARecordType 
datasetType, ARecordType metaType,
+            int numPrimaryKeys, boolean metaContainsKeys, 
IColumnValuesWriterFactory columnWriterFactory,
+            List<IColumnValuesWriter> writers, 
Mutable<IColumnWriteMultiPageOp> multiPageOpRef,
+            IValueReference serializedMetadata) throws IOException {
+        DataInput input = new DataInputStream(new 
ByteArrayInputStream(serializedMetadata.getByteArray(),
+                serializedMetadata.getStartOffset(), 
serializedMetadata.getLength()));
+        //Skip offsets
+        input.skipBytes(OFFSETS_SIZE);
+
+        //FieldNames
+        IFieldNamesDictionary fieldNamesDictionary = 
AbstractFieldNamesDictionary.deserialize(input);
+
+        //Schema
+        Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels = 
new HashMap<>();
+        ObjectSchemaNode root = (ObjectSchemaNode) 
AbstractSchemaNode.deserialize(input, definitionLevels);
+        ObjectSchemaNode metaRoot = null;
+        if (metaType != null) {
+            metaRoot = (ObjectSchemaNode) 
AbstractSchemaNode.deserialize(input, definitionLevels);
+        }
+
+        ArrayBackedValueStorage schemaStorage = new 
ArrayBackedValueStorage(serializedMetadata.getLength());
+        schemaStorage.append(serializedMetadata);
+        return new FlushColumnMetadata(datasetType, metaType, numPrimaryKeys, 
metaContainsKeys, columnWriterFactory,
+                multiPageOpRef, writers, fieldNamesDictionary, root, metaRoot, 
definitionLevels, schemaStorage);
+    }
+
     private static FlushColumnMetadata createMutableMetadata(ARecordType 
datasetType, ARecordType metaType,
             List<List<String>> primaryKeys, boolean metaContainsKeys, 
IColumnValuesWriterFactory columnWriterFactory,
             Mutable<IColumnWriteMultiPageOp> multiPageOpRef, IValueReference 
serializedMetadata) throws IOException {
@@ -265,7 +291,7 @@
         ArrayBackedValueStorage schemaStorage = new 
ArrayBackedValueStorage(serializedMetadata.getLength());
         schemaStorage.append(serializedMetadata);
         logSchema(root, metaRoot, fieldNamesDictionary);
-        return new FlushColumnMetadata(datasetType, metaType, primaryKeys, 
metaContainsKeys, columnWriterFactory,
+        return new FlushColumnMetadata(datasetType, metaType, 
primaryKeys.size(), metaContainsKeys, columnWriterFactory,
                 multiPageOpRef, writers, fieldNamesDictionary, root, metaRoot, 
definitionLevels, schemaStorage);
     }

@@ -590,4 +616,20 @@
             LOGGER.debug("Schema for {} has changed: {}", META_RECORD_SCHEMA, 
metaRecordSchema);
         }
     }
+
+    public ArrayBackedValueStorage getSerializedMetadata() {
+        return serializedMetadata;
+    }
+
+    public boolean isMetaContainsKeys() {
+        return metaContainsKeys;
+    }
+
+    public List<IColumnValuesWriter> getColumnWriters() {
+        return columnWriters;
+    }
+
+    public IColumnValuesWriterFactory getColumnWriterFactory() {
+        return columnWriterFactory;
+    }
 }
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
index 65f5eb4..35f6093 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
@@ -33,7 +33,9 @@
 import org.apache.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference;

 public class FlushColumnTupleWriter extends AbstractColumnTupleWriter {
-    protected final FlushColumnMetadata columnMetadata;
+    protected FlushColumnMetadata columnMetadata;
+    protected FlushColumnMetadata prevColumnMetadata;
+
     protected final BatchFinalizerVisitor finalizer;
     protected final ColumnBatchWriter writer;

@@ -111,6 +113,7 @@

     @Override
     public void writeTuple(ITupleReference tuple) throws HyracksDataException {
+        columnMetadata.serializeColumnsMetadata();
         //This from an in-memory component, hence the cast
         LSMBTreeTupleReference btreeTuple = (LSMBTreeTupleReference) tuple;
         if (btreeTuple.isAntimatter()) {
@@ -139,4 +142,30 @@
     protected void writeMeta(LSMBTreeTupleReference btreeTuple) throws 
HyracksDataException {
         //NoOp
     }
+
+    @Override
+    public void applyCache(ITupleReference tuple) throws HyracksDataException {
+        for (int i = 0; i < columnMetadata.getNumberOfColumns(); i++) {
+            columnMetadata.getWriter(i).applyCache();
+        }
+    }
+
+    @Override
+    public void undo() throws HyracksDataException {
+        try {
+            prevColumnMetadata = columnMetadata;
+            columnMetadata = 
FlushColumnMetadata.createMutableMetadata(columnMetadata.getDatasetType(),
+                    columnMetadata.getMetaType(), 
columnMetadata.getNumberOfPrimaryKeys(),
+                    columnMetadata.isMetaContainsKeys(), 
columnMetadata.getColumnWriterFactory(),
+                    columnMetadata.getColumnWriters(), 
columnMetadata.getMultiPageOpRef(),
+                    columnMetadata.getSerializedMetadata());
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public void redo() throws HyracksDataException {
+        columnMetadata = prevColumnMetadata;
+    }
 }
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
index 6ff2cd9..8852af3 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
@@ -40,7 +40,7 @@
 import com.fasterxml.jackson.databind.node.ObjectNode;

 public class MergeColumnTupleWriter extends AbstractColumnTupleWriter {
-    private final MergeColumnWriteMetadata columnMetadata;
+    private MergeColumnWriteMetadata columnMetadata;
     private final int maxLeafNodeSize;
     private final MergeColumnTupleReference[] componentsTuples;
     private final RunLengthIntArray writtenComponents;
@@ -112,7 +112,7 @@
     }

     @Override
-    public void writeTuple(ITupleReference tuple) throws HyracksDataException {
+    public void applyCache(ITupleReference tuple) throws HyracksDataException {
         MergeColumnTupleReference columnTuple = (MergeColumnTupleReference) 
tuple;
         int componentIndex = columnTuple.getComponentIndex();
         int skipCount = columnTuple.getAndResetSkipCount();
@@ -251,4 +251,16 @@
         node.put("componentIndex", componentIndex);
         node.put("count", count);
     }
+
+    @Override
+    public void writeTuple(ITupleReference tuple) throws HyracksDataException {
+    }
+
+    @Override
+    public void undo() throws HyracksDataException {
+    }
+
+    @Override
+    public void redo() throws HyracksDataException {
+    }
 }
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java
index b0d1a01..2f8fc19 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java
@@ -92,6 +92,13 @@
     }

     public static MergeColumnWriteMetadata create(ARecordType datasetType, 
ARecordType metaType,
+            int numberOfPrimaryKeys, List<IColumnValuesWriter> writers, 
Mutable<IColumnWriteMultiPageOp> multiPageOpRef,
+            IValueReference serializedMetadata, List<IColumnTupleIterator> 
componentsTuples) throws IOException {
+        return new MergeColumnWriteMetadata(datasetType, metaType, 
numberOfPrimaryKeys, multiPageOpRef, writers,
+                serializedMetadata, componentsTuples);
+    }
+
+    public static MergeColumnWriteMetadata create(ARecordType datasetType, 
ARecordType metaType,
             int numberOfPrimaryKeys, Mutable<IColumnWriteMultiPageOp> 
multiPageOpRef,
             IValueReference serializedMetadata, List<IColumnTupleIterator> 
componentsTuples) throws IOException {
         byte[] bytes = serializedMetadata.getByteArray();
@@ -101,7 +108,7 @@
         int writersOffset = offset + IntegerPointable.getInteger(bytes, offset 
+ WRITERS_POINTER);
         DataInput input = new DataInputStream(new ByteArrayInputStream(bytes, 
writersOffset, length));

-        IColumnValuesWriterFactory writerFactory = new 
ColumnValuesWriterFactory(multiPageOpRef);
+        IColumnValuesWriterFactory writerFactory = new 
ColumnValuesWriterFactory(multiPageOpRef, false);
         List<IColumnValuesWriter> writers = new ArrayList<>();
         FlushColumnMetadata.deserializeWriters(input, writers, writerFactory);

@@ -112,4 +119,12 @@
     public List<IColumnTupleIterator> getComponentsTuples() {
         return componentsTuples;
     }
+
+    public IValueReference getSerializedMetadata() {
+        return serializedMetadata;
+    }
+
+    public List<IColumnValuesWriter> getColumnWriters() {
+        return columnWriters;
+    }
 }
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesWriter.java
index 2e2aa9e..add2442 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesWriter.java
@@ -141,4 +141,6 @@
      * @param output destination to which the writer should be serialized to
      */
     void serialize(DataOutput output) throws IOException;
+
+    void applyCache() throws HyracksDataException;
 }
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/AbstractColumnValuesWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/AbstractColumnValuesWriter.java
index accf4a0..e7e7f1e 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/AbstractColumnValuesWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/AbstractColumnValuesWriter.java
@@ -22,6 +22,8 @@
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;

 import 
org.apache.asterix.column.bytes.encoder.ParquetRunLengthBitPackingHybridEncoder;
 import org.apache.asterix.column.util.ColumnValuesUtil;
@@ -32,6 +34,7 @@
 import 
org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
 import org.apache.asterix.column.values.writer.filters.NoOpColumnFilterWriter;
 import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.logging.log4j.LogManager;
@@ -52,8 +55,11 @@
     private final int nullBitMask;
     private int count;
     private boolean writeValues;
+    protected final boolean cacheFirst;
+    protected List<Pair<ATypeTag, IValueReference>> cachedValues;
+    protected List<Pair<Integer, Integer>> cachedLevels;
 
-    AbstractColumnValuesWriter(int columnIndex, int level, boolean collection, 
boolean filtered) {
+    AbstractColumnValuesWriter(int columnIndex, int level, boolean collection, 
boolean filtered, boolean cacheFirst) {
         this.columnIndex = columnIndex;
         this.level = level;
         this.collection = collection;
@@ -61,6 +67,9 @@
         int width = ColumnValuesUtil.getBitWidth(level);
         definitionLevels = new ParquetRunLengthBitPackingHybridEncoder(width);
         this.filterWriter = filtered ? createFilter() : 
NoOpColumnFilterWriter.INSTANCE;
+        this.cacheFirst = cacheFirst;
+        cachedValues = new ArrayList<>();
+        cachedLevels = new ArrayList<>();
     }

     @Override
@@ -91,9 +100,9 @@

     @Override
     public final void writeValue(ATypeTag tag, IValueReference value) throws 
HyracksDataException {
-        addLevel(level);
+        addOrCacheLevel(level, 1);
         try {
-            addValue(tag, value);
+            addOrCacheValue(tag, value);
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
@@ -101,20 +110,12 @@

     @Override
     public final void writeLevel(int level) throws HyracksDataException {
-        addLevel(level);
+        addOrCacheLevel(level, 1);
     }

     @Override
     public void writeLevels(int level, int count) throws HyracksDataException {
-        writeValues = writeValues || this.level == level;
-        this.count += count;
-        try {
-            for (int i = 0; i < count; i++) {
-                definitionLevels.writeInt(level);
-            }
-        } catch (IOException e) {
-            throw HyracksDataException.create(e);
-        }
+        addOrCacheLevel(level, count);
     }

     @Override
@@ -124,7 +125,7 @@

     @Override
     public final void writeNull(int level) throws HyracksDataException {
-        addLevel(level | nullBitMask);
+        addOrCacheLevel(level | nullBitMask, 1);
     }

     @Override
@@ -138,9 +139,9 @@

     @Override
     public void writeAntiMatter(ATypeTag tag, IValueReference value) throws 
HyracksDataException {
-        addLevel(0);
+        addOrCacheLevel(0, 1);
         try {
-            addValue(tag, value);
+            addOrCacheValue(tag, value);
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
@@ -226,11 +227,44 @@
         return writerFactory.createValueWriter(typeTag, columnIndex, level, 
collection, filtered);
     }

-    protected void addLevel(int level) throws HyracksDataException {
+    protected void addOrCacheLevel(int level, int count) throws 
HyracksDataException {
+        if (cacheFirst) {
+            cachedLevels.add(new Pair<>(level, count));
+        } else {
+            addLevel(level, count);
+        }
+    }
+
+    protected void addLevel(int level, int count) throws HyracksDataException {
+        writeValues = writeValues || this.level == level;
+        this.count += count;
         try {
-            writeValues = writeValues || this.level == level;
-            definitionLevels.writeInt(level);
-            count++;
+            for (int i = 0; i < count; i++) {
+                definitionLevels.writeInt(level);
+            }
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    protected void addOrCacheValue(ATypeTag tag, IValueReference value) throws 
IOException {
+        if (cacheFirst) {
+            cachedValues.add(new Pair<>(tag, value));
+        } else {
+            addValue(tag, value);
+        }
+    }
+
+    public void applyCache() throws HyracksDataException {
+        try {
+            for (Pair<Integer, Integer> level : cachedLevels) {
+                addLevel(level.getFirst(), level.getSecond());
+            }
+            cachedLevels.clear();
+            for (Pair<ATypeTag, IValueReference> value : cachedValues) {
+                addValue(value.getFirst(), value.getSecond());
+            }
+            cachedValues.clear();
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/BooleanColumnValuesWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/BooleanColumnValuesWriter.java
index 7d50cb1..1ab948b 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/BooleanColumnValuesWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/BooleanColumnValuesWriter.java
@@ -31,8 +31,9 @@
 public final class BooleanColumnValuesWriter extends 
AbstractColumnValuesWriter {
     private final ParquetRunLengthBitPackingHybridEncoder booleanWriter;

-    public BooleanColumnValuesWriter(int columnIndex, int level, boolean 
collection, boolean filtered) {
-        super(columnIndex, level, collection, filtered);
+    public BooleanColumnValuesWriter(int columnIndex, int level, boolean 
collection, boolean filtered,
+            boolean cacheFirst) {
+        super(columnIndex, level, collection, filtered, cacheFirst);
         booleanWriter = new ParquetRunLengthBitPackingHybridEncoder(1);
     }

diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnValuesWriterFactory.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnValuesWriterFactory.java
index 3d32a27..b9d68d8 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnValuesWriterFactory.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnValuesWriterFactory.java
@@ -26,9 +26,11 @@

 public class ColumnValuesWriterFactory implements IColumnValuesWriterFactory {
     private final Mutable<IColumnWriteMultiPageOp> multiPageOpRef;
+    private boolean cacheFirst;

-    public ColumnValuesWriterFactory(Mutable<IColumnWriteMultiPageOp> 
multiPageOpRef) {
+    public ColumnValuesWriterFactory(Mutable<IColumnWriteMultiPageOp> 
multiPageOpRef, boolean cacheFirst) {
         this.multiPageOpRef = multiPageOpRef;
+        this.cacheFirst = cacheFirst;
     }

     @Override
@@ -37,23 +39,27 @@
         switch (typeTag) {
             case MISSING:
             case NULL:
-                return new NullMissingColumnValuesWriter(columnIndex, 
maxLevel, writeAlways, filtered);
+                return new NullMissingColumnValuesWriter(columnIndex, 
maxLevel, writeAlways, filtered, cacheFirst);
             case BOOLEAN:
-                return new BooleanColumnValuesWriter(columnIndex, maxLevel, 
writeAlways, filtered);
+                return new BooleanColumnValuesWriter(columnIndex, maxLevel, 
writeAlways, filtered, cacheFirst);
             case TINYINT:
             case SMALLINT:
             case INTEGER:
             case BIGINT:
-                return new LongColumnValuesWriter(multiPageOpRef, columnIndex, 
maxLevel, writeAlways, filtered,
-                        typeTag);
+                return new LongColumnValuesWriter(multiPageOpRef, columnIndex, 
maxLevel, writeAlways, filtered, typeTag,
+                        cacheFirst);
             case FLOAT:
-                return new FloatColumnValuesWriter(multiPageOpRef, 
columnIndex, maxLevel, writeAlways, filtered);
+                return new FloatColumnValuesWriter(multiPageOpRef, 
columnIndex, maxLevel, writeAlways, filtered,
+                        cacheFirst);
             case DOUBLE:
-                return new DoubleColumnValuesWriter(multiPageOpRef, 
columnIndex, maxLevel, writeAlways, filtered);
+                return new DoubleColumnValuesWriter(multiPageOpRef, 
columnIndex, maxLevel, writeAlways, filtered,
+                        cacheFirst);
             case STRING:
-                return new StringColumnValuesWriter(multiPageOpRef, 
columnIndex, maxLevel, writeAlways, filtered);
+                return new StringColumnValuesWriter(multiPageOpRef, 
columnIndex, maxLevel, writeAlways, filtered,
+                        cacheFirst);
             case UUID:
-                return new UUIDColumnValuesWriter(multiPageOpRef, columnIndex, 
maxLevel, writeAlways, filtered);
+                return new UUIDColumnValuesWriter(multiPageOpRef, columnIndex, 
maxLevel, writeAlways, filtered,
+                        cacheFirst);
             default:
                 throw new UnsupportedOperationException(typeTag + " is not 
supported");
         }
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java
index 9e6f906..db844fb 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java
@@ -40,8 +40,8 @@
     private final ParquetPlainFixedLengthValuesWriter doubleWriter;

     public DoubleColumnValuesWriter(Mutable<IColumnWriteMultiPageOp> 
multiPageOpRef, int columnIndex, int level,
-            boolean collection, boolean filtered) {
-        super(columnIndex, level, collection, filtered);
+            boolean collection, boolean filtered, boolean cacheFirst) {
+        super(columnIndex, level, collection, filtered, cacheFirst);
         doubleWriter = new ParquetPlainFixedLengthValuesWriter(multiPageOpRef);
     }
 
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/FloatColumnValuesWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/FloatColumnValuesWriter.java
index 39abcad..992a007 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/FloatColumnValuesWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/FloatColumnValuesWriter.java
@@ -40,8 +40,8 @@
     private final ParquetPlainFixedLengthValuesWriter floatWriter;

     public FloatColumnValuesWriter(Mutable<IColumnWriteMultiPageOp> 
multiPageOpRef, int columnIndex, int level,
-            boolean collection, boolean filtered) {
-        super(columnIndex, level, collection, filtered);
+            boolean collection, boolean filtered, boolean cacheFirst) {
+        super(columnIndex, level, collection, filtered, cacheFirst);
         floatWriter = new ParquetPlainFixedLengthValuesWriter(multiPageOpRef);
     }

diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java
index 516f56d..3cb6b18 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java
@@ -43,8 +43,8 @@
     private final ATypeTag typeTag;

     public LongColumnValuesWriter(Mutable<IColumnWriteMultiPageOp> 
multiPageOpRef, int columnIndex, int level,
-            boolean collection, boolean filtered, ATypeTag typeTag) {
-        super(columnIndex, level, collection, filtered);
+            boolean collection, boolean filtered, ATypeTag typeTag, boolean 
cacheFirst) {
+        super(columnIndex, level, collection, filtered, cacheFirst);
         longWriter = filtered ? new 
ParquetDeltaBinaryPackingValuesWriterForLong(multiPageOpRef)
                 : new ParquetPlainFixedLengthValuesWriter(multiPageOpRef);

diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NullMissingColumnValuesWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NullMissingColumnValuesWriter.java
index 2d9f5bf..e6f9780 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NullMissingColumnValuesWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NullMissingColumnValuesWriter.java
@@ -33,15 +33,16 @@
     private static final BytesInput EMPTY = BytesInput.empty();
     private final RunLengthIntArray defLevelsIntArray;

-    NullMissingColumnValuesWriter(int columnIndex, int level, boolean 
collection, boolean filtered) {
-        super(columnIndex, level, collection, filtered);
+    NullMissingColumnValuesWriter(int columnIndex, int level, boolean 
collection, boolean filtered,
+            boolean cacheFirst) {
+        super(columnIndex, level, collection, filtered, cacheFirst);
         defLevelsIntArray = new RunLengthIntArray();
     }

     @Override
-    protected void addLevel(int level) throws HyracksDataException {
+    protected void addLevel(int level, int count) throws HyracksDataException {
         defLevelsIntArray.add(level);
-        super.addLevel(level);
+        super.addLevel(level, 0);
     }

     @Override
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java
index 5b1977f..074439c 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java
@@ -38,14 +38,14 @@
     private final boolean skipLengthBytes;

     public StringColumnValuesWriter(Mutable<IColumnWriteMultiPageOp> 
multiPageOpRef, int columnIndex, int level,
-            boolean collection, boolean filtered) {
+            boolean collection, boolean filtered, boolean cacheFirst) {
         this(columnIndex, level, collection, filtered, true, filtered ? new 
ParquetDeltaByteArrayWriter(multiPageOpRef)
-                : new ParquetPlainVariableLengthValuesWriter(multiPageOpRef));
+                : new ParquetPlainVariableLengthValuesWriter(multiPageOpRef), 
cacheFirst);
     }

     protected StringColumnValuesWriter(int columnIndex, int level, boolean 
collection, boolean filtered,
-            boolean skipLengthBytes, AbstractParquetValuesWriter stringWriter) 
{
-        super(columnIndex, level, collection, filtered);
+            boolean skipLengthBytes, AbstractParquetValuesWriter stringWriter, 
boolean cacheFirst) {
+        super(columnIndex, level, collection, filtered, cacheFirst);
         this.stringWriter = stringWriter;
         this.skipLengthBytes = skipLengthBytes;
     }
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/UUIDColumnValuesWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/UUIDColumnValuesWriter.java
index 9d4ff9a..2801407 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/UUIDColumnValuesWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/UUIDColumnValuesWriter.java
@@ -28,9 +28,10 @@
 final class UUIDColumnValuesWriter extends StringColumnValuesWriter {

     public UUIDColumnValuesWriter(Mutable<IColumnWriteMultiPageOp> 
multiPageOpRef, int columnIndex, int level,
-            boolean collection, boolean filtered) {
+            boolean collection, boolean filtered, boolean cacheFirst) {
         // UUID is always written without encoding
-        super(columnIndex, level, collection, filtered, false, new 
ParquetPlainFixedLengthValuesWriter(multiPageOpRef));
+        super(columnIndex, level, collection, filtered, false, new 
ParquetPlainFixedLengthValuesWriter(multiPageOpRef),
+                cacheFirst);
     }

     @Override
diff --git 
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
 
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
index 55097c5..55de467 100644
--- 
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
+++ 
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
@@ -120,7 +120,7 @@

     protected FlushColumnMetadata prepareNewFile(int fileId) throws 
HyracksDataException {
         Mutable<IColumnWriteMultiPageOp> multiPageOpRef = new 
MutableObject<>();
-        IColumnValuesWriterFactory writerFactory = new 
ColumnValuesWriterFactory(multiPageOpRef);
+        IColumnValuesWriterFactory writerFactory = new 
ColumnValuesWriterFactory(multiPageOpRef, true);
         FlushColumnMetadata columnMetadata = new 
FlushColumnMetadata(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE, null,
                 Collections.emptyList(), null, writerFactory, multiPageOpRef);
         IColumnWriteMultiPageOp multiPageOp = new 
TestWriteMultiPageOp(dummyBufferCache, fileId);
diff --git 
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/DummyColumnValuesWriter.java
 
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/DummyColumnValuesWriter.java
index d287417..d03ac23 100644
--- 
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/DummyColumnValuesWriter.java
+++ 
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/DummyColumnValuesWriter.java
@@ -152,4 +152,9 @@
     public RunLengthIntArray getDefinitionLevelsIntArray() {
         return definitionLevels;
     }
+
+    @Override
+    public void applyCache() throws HyracksDataException {
+
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
index 6d26a45..f74426c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
@@ -92,4 +92,11 @@
      * Close the current writer and release all allocated temporary buffers
      */
     public abstract void close();
+
+    public abstract void applyCache(ITupleReference tuple) throws 
HyracksDataException;
+
+    public abstract void undo() throws HyracksDataException;
+
+    public abstract void redo() throws HyracksDataException;
+
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
index d9aec0c..1c10b2d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
@@ -84,13 +84,16 @@

     @Override
     public void add(ITupleReference tuple) throws HyracksDataException {
+        columnWriter.writeTuple(tuple);
         if (isFull(tuple)) {
+            columnWriter.undo();
             writeFullLeafPage();
             confiscateNewLeafPage();
+            columnWriter.redo();
         }
         //Save the key of the last inserted tuple
         setMinMaxKeys(tuple);
-        columnWriter.writeTuple(tuple);
+        columnWriter.applyCache(tuple);
         tupleCount++;
     }


--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19128
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: goldfish
Gerrit-Change-Id: I693a2953cf2fcc8ff5aa3620bd772dc9e0fa7eec
Gerrit-Change-Number: 19128
Gerrit-PatchSet: 1
Gerrit-Owner: Peeyush Gupta <[email protected]>
Gerrit-MessageType: newchange

Reply via email to