>From Peeyush Gupta <[email protected]>:

Peeyush Gupta has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19125 )


Change subject: wip: test patch to get number of columns
......................................................................

wip: test patch to get number of columns

Change-Id: I4a318ae69538daf385060df26e9a5fd1d2d494f8
---
M 
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
M 
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeWriteLeafFrame.java
M 
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManager.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
M 
asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
A 
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteFlushColumnMetadata.java
A 
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteColumnTransformer.java
M 
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
10 files changed, 382 insertions(+), 31 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/25/19125/1

diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManager.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManager.java
index 6db8a78..ea85d8b 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManager.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManager.java
@@ -74,8 +74,8 @@
         try {
             Mutable<IColumnWriteMultiPageOp> multiPageOpRef = new 
MutableObject<>();
             IColumnValuesWriterFactory writerFactory = new 
ColumnValuesWriterFactory(multiPageOpRef);
-            return FlushColumnMetadata.create(datasetType, metaType, 
primaryKeys, keySourceIndicator, writerFactory,
-                    multiPageOpRef, metadata);
+            return FlushColumnMetadata.create(datasetType, metaType, 
primaryKeys.size(), keySourceIndicator,
+                    writerFactory, multiPageOpRef, metadata);
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
index f514638..5251508 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
@@ -70,9 +70,9 @@
  * Flush column metadata belongs to a flushing {@link ILSMMemoryComponent}
  * The schema here is mutable and can change according to the flushed records
  */
-public final class FlushColumnMetadata extends AbstractColumnMetadata {
+public class FlushColumnMetadata extends AbstractColumnMetadata {
     private static final Logger LOGGER = LogManager.getLogger();
-    private final Map<AbstractSchemaNestedNode, RunLengthIntArray> 
definitionLevels;
+    protected final Map<AbstractSchemaNestedNode, RunLengthIntArray> 
definitionLevels;
     private final Mutable<IColumnWriteMultiPageOp> multiPageOpRef;
     private final IFieldNamesDictionary fieldNamesDictionary;
     private final ObjectSchemaNode root;
@@ -81,11 +81,11 @@
     private final List<IColumnValuesWriter> columnWriters;
     private final ArrayBackedValueStorage serializedMetadata;
     private final PathInfoSerializer pathInfoSerializer;
-    private final IntArrayList nullWriterIndexes;
+    protected final IntArrayList nullWriterIndexes;
     private final boolean metaContainsKeys;
     private boolean changed;
-    private int level;
-    private int repeated;
+    protected int level;
+    protected int repeated;

     public FlushColumnMetadata(ARecordType datasetType, ARecordType metaType, 
List<List<String>> primaryKeys,
             List<Integer> keySourceIndicator, IColumnValuesWriterFactory 
columnWriterFactory,
@@ -124,13 +124,13 @@
         serializeColumnsMetadata();
     }

-    private FlushColumnMetadata(ARecordType datasetType, ARecordType metaType, 
List<List<String>> primaryKeys,
+    public FlushColumnMetadata(ARecordType datasetType, ARecordType metaType, 
int numPrimaryKeys,
             boolean metaContainsKeys, IColumnValuesWriterFactory 
columnWriterFactory,
             Mutable<IColumnWriteMultiPageOp> multiPageOpRef, 
List<IColumnValuesWriter> columnWriters,
             IFieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root, 
ObjectSchemaNode metaRoot,
             Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels,
             ArrayBackedValueStorage serializedMetadata) {
-        super(datasetType, metaType, primaryKeys.size());
+        super(datasetType, metaType, numPrimaryKeys);
         this.multiPageOpRef = multiPageOpRef;
         this.columnWriterFactory = columnWriterFactory;
         this.definitionLevels = definitionLevels;
@@ -226,21 +226,21 @@
         IntegerPointable.setInteger(serializedMetadata.getByteArray(), 
pointer, offset);
     }

-    public static FlushColumnMetadata create(ARecordType datasetType, 
ARecordType metaType,
-            List<List<String>> primaryKeys, List<Integer> keySourceIndicator,
-            IColumnValuesWriterFactory columnWriterFactory, 
Mutable<IColumnWriteMultiPageOp> multiPageOpRef,
-            IValueReference serializedMetadata) throws HyracksDataException {
+    public static FlushColumnMetadata create(ARecordType datasetType, 
ARecordType metaType, int numPrimaryKeys,
+            List<Integer> keySourceIndicator, IColumnValuesWriterFactory 
columnWriterFactory,
+            Mutable<IColumnWriteMultiPageOp> multiPageOpRef, IValueReference 
serializedMetadata)
+            throws HyracksDataException {
         boolean metaContainsKeys = metaType != null && 
keySourceIndicator.get(0) == 1;
         try {
-            return createMutableMetadata(datasetType, metaType, primaryKeys, 
metaContainsKeys, columnWriterFactory,
+            return createMutableMetadata(datasetType, metaType, 
numPrimaryKeys, metaContainsKeys, columnWriterFactory,
                     multiPageOpRef, serializedMetadata);
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
     }

-    private static FlushColumnMetadata createMutableMetadata(ARecordType 
datasetType, ARecordType metaType,
-            List<List<String>> primaryKeys, boolean metaContainsKeys, 
IColumnValuesWriterFactory columnWriterFactory,
+    public static FlushColumnMetadata createMutableMetadata(ARecordType 
datasetType, ARecordType metaType,
+            int numPrimaryKeys, boolean metaContainsKeys, 
IColumnValuesWriterFactory columnWriterFactory,
             Mutable<IColumnWriteMultiPageOp> multiPageOpRef, IValueReference 
serializedMetadata) throws IOException {
         DataInput input = new DataInputStream(new 
ByteArrayInputStream(serializedMetadata.getByteArray(),
                 serializedMetadata.getStartOffset(), 
serializedMetadata.getLength()));
@@ -265,7 +265,7 @@
         ArrayBackedValueStorage schemaStorage = new 
ArrayBackedValueStorage(serializedMetadata.getLength());
         schemaStorage.append(serializedMetadata);
         logSchema(root, metaRoot, fieldNamesDictionary);
-        return new FlushColumnMetadata(datasetType, metaType, primaryKeys, 
metaContainsKeys, columnWriterFactory,
+        return new FlushColumnMetadata(datasetType, metaType, numPrimaryKeys, 
metaContainsKeys, columnWriterFactory,
                 multiPageOpRef, writers, fieldNamesDictionary, root, metaRoot, 
definitionLevels, schemaStorage);
     }

@@ -525,7 +525,7 @@
         return createdChild;
     }

-    private AbstractSchemaNode createChild(ATypeTag childTypeTag) throws 
HyracksDataException {
+    protected AbstractSchemaNode createChild(ATypeTag childTypeTag) throws 
HyracksDataException {
         switch (childTypeTag) {
             case OBJECT:
                 return addDefinitionLevelsAndGet(new ObjectSchemaNode());
@@ -571,7 +571,7 @@
         }
     }

-    private AbstractSchemaNode 
addDefinitionLevelsAndGet(AbstractSchemaNestedNode nestedNode) {
+    protected AbstractSchemaNode 
addDefinitionLevelsAndGet(AbstractSchemaNestedNode nestedNode) {
         definitionLevels.put(nestedNode, new RunLengthIntArray());
         return nestedNode;
     }
@@ -590,4 +590,9 @@
             LOGGER.debug("Schema for {} has changed: {}", META_RECORD_SCHEMA, 
metaRecordSchema);
         }
     }
+
+    public boolean isMetaContainsKey() {
+        return metaContainsKeys;
+    }
+
 }
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
index 65f5eb4..563c66c 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
@@ -18,13 +18,18 @@
  */
 package org.apache.asterix.column.operation.lsm.flush;

+import java.io.IOException;
 import java.nio.ByteBuffer;

 import org.apache.asterix.column.values.IColumnValuesWriter;
+import org.apache.asterix.column.values.IColumnValuesWriterFactory;
 import org.apache.asterix.column.values.writer.ColumnBatchWriter;
+import org.apache.asterix.column.values.writer.ColumnValuesWriterFactory;
 import 
org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
 import org.apache.asterix.om.lazy.RecordLazyVisitablePointable;
 import org.apache.asterix.om.lazy.TypedRecordLazyVisitablePointable;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import 
org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
@@ -34,10 +39,13 @@

 public class FlushColumnTupleWriter extends AbstractColumnTupleWriter {
     protected final FlushColumnMetadata columnMetadata;
+    protected final NoWriteFlushColumnMetadata columnMetadataWithCurrentTuple;
+
     protected final BatchFinalizerVisitor finalizer;
     protected final ColumnBatchWriter writer;

     private final ColumnTransformer transformer;
+    private final NoWriteColumnTransformer transformerForCurrentTuple;
     private final RecordLazyVisitablePointable pointable;
     private final int maxNumberOfTuples;
     private final IColumnValuesWriter[] primaryKeyWriters;
@@ -60,6 +68,26 @@
         for (int i = 0; i < numberOfPrimaryKeys; i++) {
             primaryKeyWriters[i] = columnMetadata.getWriter(i);
         }
+
+        Mutable<IColumnWriteMultiPageOp> multiPageOpRef = new 
MutableObject<>();
+        IColumnValuesWriterFactory writerFactory = new 
ColumnValuesWriterFactory(multiPageOpRef);
+        try {
+            columnMetadataWithCurrentTuple = 
NoWriteFlushColumnMetadata.createMutableMetadata(
+                    columnMetadata.getDatasetType(), 
columnMetadata.getMetaType(),
+                    columnMetadata.getNumberOfPrimaryKeys(), 
columnMetadata.isMetaContainsKey(), writerFactory,
+                    multiPageOpRef, columnMetadata.serializeColumnsMetadata());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        transformerForCurrentTuple =
+                new NoWriteColumnTransformer(columnMetadataWithCurrentTuple, 
columnMetadataWithCurrentTuple.getRoot());
+    }
+
+    public void updateColumnMetadataForCurrentTuple(ITupleReference tuple) 
throws HyracksDataException {
+        int recordFieldId = columnMetadata.getRecordFieldIndex();
+        pointable.set(tuple.getFieldData(recordFieldId), 
tuple.getFieldStart(recordFieldId),
+                tuple.getFieldLength(recordFieldId));
+        transformerForCurrentTuple.transform(pointable);
     }

     @Override
@@ -68,8 +96,12 @@
     }

     @Override
-    public final int getNumberOfColumns() {
-        return columnMetadata.getNumberOfColumns();
+    public final int getNumberOfColumns(boolean includeCurrentTupleColumns) {
+        if (includeCurrentTupleColumns) {
+            return columnMetadataWithCurrentTuple.getNumberOfColumns();
+        } else {
+            return columnMetadata.getNumberOfColumns();
+        }
     }

     @Override
@@ -85,7 +117,7 @@

     @Override
     public final int getOccupiedSpace() {
-        int numberOfColumns = getNumberOfColumns();
+        int numberOfColumns = getNumberOfColumns(true);
         int filterSize = numberOfColumns * 
AbstractColumnFilterWriter.FILTER_SIZE;
         return primaryKeysEstimatedSize + filterSize;
     }
@@ -124,7 +156,7 @@

     @Override
     public final int flush(ByteBuffer pageZero) throws HyracksDataException {
-        writer.setPageZeroBuffer(pageZero, getNumberOfColumns(), 
columnMetadata.getNumberOfPrimaryKeys());
+        writer.setPageZeroBuffer(pageZero, getNumberOfColumns(false), 
columnMetadata.getNumberOfPrimaryKeys());
         transformer.resetStringLengths();
         return finalizer.finalizeBatch(writer, columnMetadata);
     }
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteColumnTransformer.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteColumnTransformer.java
new file mode 100644
index 0000000..222f1bf
--- /dev/null
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteColumnTransformer.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.operation.lsm.flush;
+
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNestedNode;
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
+import org.apache.asterix.column.metadata.schema.ObjectSchemaNode;
+import org.apache.asterix.column.metadata.schema.UnionSchemaNode;
+import 
org.apache.asterix.column.metadata.schema.collection.AbstractCollectionSchemaNode;
+import org.apache.asterix.column.util.RunLengthIntArray;
+import org.apache.asterix.om.lazy.AbstractLazyVisitablePointable;
+import org.apache.asterix.om.lazy.AbstractListLazyVisitablePointable;
+import org.apache.asterix.om.lazy.FlatLazyVisitablePointable;
+import org.apache.asterix.om.lazy.ILazyVisitablePointableVisitor;
+import org.apache.asterix.om.lazy.RecordLazyVisitablePointable;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public class NoWriteColumnTransformer
+        implements ILazyVisitablePointableVisitor<AbstractSchemaNode, 
AbstractSchemaNode> {
+    private final NoWriteFlushColumnMetadata columnMetadata;
+    private final ObjectSchemaNode root;
+    private AbstractSchemaNestedNode currentParent;
+
+    public NoWriteColumnTransformer(NoWriteFlushColumnMetadata columnMetadata, 
ObjectSchemaNode root) {
+        this.columnMetadata = columnMetadata;
+        this.root = root;
+    }
+
+    /**
+     * Transform a tuple in row format into columns
+     *
+     * @param pointable record pointable
+     * @return the estimated size (possibly overestimated) of the primary 
key(s) columns
+     */
+    public int transform(RecordLazyVisitablePointable pointable) throws 
HyracksDataException {
+        pointable.accept(this, root);
+        return 0;
+    }
+
+    @Override
+    public AbstractSchemaNode visit(RecordLazyVisitablePointable pointable, 
AbstractSchemaNode arg)
+            throws HyracksDataException {
+        columnMetadata.enterNode(currentParent, arg);
+        AbstractSchemaNestedNode previousParent = currentParent;
+
+        ObjectSchemaNode objectNode = (ObjectSchemaNode) arg;
+        currentParent = objectNode;
+        for (int i = 0; i < pointable.getNumberOfChildren(); i++) {
+            pointable.nextChild();
+            IValueReference fieldName = pointable.getFieldName();
+            ATypeTag childTypeTag = pointable.getChildTypeTag();
+            if (childTypeTag != ATypeTag.MISSING) {
+                //Only write actual field values (including NULL) but ignore 
MISSING fields
+                AbstractSchemaNode childNode = 
objectNode.getOrCreateChild(fieldName, childTypeTag, columnMetadata);
+                acceptActualNode(pointable.getChildVisitablePointable(), 
childNode);
+            }
+        }
+
+        if (pointable.getNumberOfChildren() == 0) {
+            // Set as empty object
+            objectNode.setEmptyObject(columnMetadata);
+        }
+
+        columnMetadata.exitNode(arg);
+        currentParent = previousParent;
+        return null;
+    }
+
+    @Override
+    public AbstractSchemaNode visit(AbstractListLazyVisitablePointable 
pointable, AbstractSchemaNode arg)
+            throws HyracksDataException {
+        columnMetadata.enterNode(currentParent, arg);
+        AbstractSchemaNestedNode previousParent = currentParent;
+
+        AbstractCollectionSchemaNode collectionNode = 
(AbstractCollectionSchemaNode) arg;
+        RunLengthIntArray defLevels = 
columnMetadata.getDefinitionLevels(collectionNode);
+        //the level at which an item is missing
+        int missingLevel = columnMetadata.getLevel();
+        currentParent = collectionNode;
+
+        int numberOfChildren = pointable.getNumberOfChildren();
+        for (int i = 0; i < numberOfChildren; i++) {
+            pointable.nextChild();
+            ATypeTag childTypeTag = pointable.getChildTypeTag();
+            AbstractSchemaNode childNode = 
collectionNode.getOrCreateItem(childTypeTag, columnMetadata);
+            acceptActualNode(pointable.getChildVisitablePointable(), 
childNode);
+            /*
+             * The array item may change (e.g., BIGINT --> UNION). Thus, new 
items would be considered as missing
+             */
+            defLevels.add(missingLevel);
+        }
+
+        // Add missing as a last element of the array to help indicate empty 
arrays
+        collectionNode.getOrCreateItem(ATypeTag.MISSING, columnMetadata);
+        defLevels.add(missingLevel);
+
+        columnMetadata.exitCollectionNode(collectionNode, numberOfChildren);
+        currentParent = previousParent;
+        return null;
+    }
+
+    @Override
+    public AbstractSchemaNode visit(FlatLazyVisitablePointable pointable, 
AbstractSchemaNode arg)
+            throws HyracksDataException {
+        columnMetadata.enterNode(currentParent, arg);
+        columnMetadata.exitNode(arg);
+        return null;
+    }
+
+    private void acceptActualNode(AbstractLazyVisitablePointable pointable, 
AbstractSchemaNode node)
+            throws HyracksDataException {
+        if (node.getTypeTag() == ATypeTag.UNION) {
+            columnMetadata.enterNode(currentParent, node);
+            AbstractSchemaNestedNode previousParent = currentParent;
+
+            UnionSchemaNode unionNode = (UnionSchemaNode) node;
+            currentParent = unionNode;
+
+            ATypeTag childTypeTag = pointable.getTypeTag();
+
+            if (childTypeTag == ATypeTag.NULL || childTypeTag == 
ATypeTag.MISSING) {
+                /*
+                 * NULL and MISSING are tracked since the start to be written 
in the originalType (i.e., the type
+                 * before injecting a union between the parent and the 
original node).
+                 */
+                AbstractSchemaNode actualNode = unionNode.getOriginalType();
+                acceptActualNode(pointable, actualNode);
+            } else {
+                AbstractSchemaNode actualNode = 
unionNode.getOrCreateChild(pointable.getTypeTag(), columnMetadata);
+                pointable.accept(this, actualNode);
+            }
+
+            currentParent = previousParent;
+            columnMetadata.exitNode(node);
+        } else if (pointable.getTypeTag() == ATypeTag.NULL && node.isNested()) 
{
+            columnMetadata.addNestedNull(currentParent, 
(AbstractSchemaNestedNode) node);
+        } else {
+            pointable.accept(this, node);
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteFlushColumnMetadata.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteFlushColumnMetadata.java
new file mode 100644
index 0000000..70dcf34
--- /dev/null
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteFlushColumnMetadata.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.operation.lsm.flush;
+
+import static 
org.apache.asterix.column.util.ColumnValuesUtil.getNormalizedTypeTag;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.column.metadata.IFieldNamesDictionary;
+import 
org.apache.asterix.column.metadata.dictionary.AbstractFieldNamesDictionary;
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNestedNode;
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
+import org.apache.asterix.column.metadata.schema.ObjectSchemaNode;
+import org.apache.asterix.column.metadata.schema.collection.ArraySchemaNode;
+import org.apache.asterix.column.metadata.schema.collection.MultisetSchemaNode;
+import org.apache.asterix.column.metadata.schema.primitive.PrimitiveSchemaNode;
+import org.apache.asterix.column.util.RunLengthIntArray;
+import org.apache.asterix.column.values.IColumnValuesWriter;
+import org.apache.asterix.column.values.IColumnValuesWriterFactory;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+
+/**
+ * Flush column metadata belongs to a flushing {@link ILSMMemoryComponent}
+ * The schema here is mutable and can change according to the flushed records
+ */
+public final class NoWriteFlushColumnMetadata extends FlushColumnMetadata {
+
+    private int numColumns;
+
+    public NoWriteFlushColumnMetadata(ARecordType datasetType, ARecordType 
metaType, int numPrimaryKeys,
+            boolean metaContainsKeys, IColumnValuesWriterFactory 
columnWriterFactory,
+            Mutable<IColumnWriteMultiPageOp> multiPageOpRef, 
List<IColumnValuesWriter> writers,
+            IFieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root, 
ObjectSchemaNode metaRoot,
+            Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels, 
ArrayBackedValueStorage schemaStorage) {
+        super(datasetType, metaType, numPrimaryKeys, metaContainsKeys, 
columnWriterFactory, multiPageOpRef, writers,
+                fieldNamesDictionary, root, metaRoot, definitionLevels, 
schemaStorage);
+        numColumns = 0;
+    }
+
+    public static NoWriteFlushColumnMetadata createMutableMetadata(ARecordType 
datasetType, ARecordType metaType,
+            int numPrimaryKeys, boolean metaContainsKeys, 
IColumnValuesWriterFactory columnWriterFactory,
+            Mutable<IColumnWriteMultiPageOp> multiPageOpRef, IValueReference 
serializedMetadata) throws IOException {
+        DataInput input = new DataInputStream(new 
ByteArrayInputStream(serializedMetadata.getByteArray(),
+                serializedMetadata.getStartOffset(), 
serializedMetadata.getLength()));
+        //Skip offsets
+        input.skipBytes(OFFSETS_SIZE);
+
+        //ColumnWriter
+        List<IColumnValuesWriter> writers = new ArrayList<>();
+        deserializeWriters(input, writers, columnWriterFactory);
+
+        //FieldNames
+        IFieldNamesDictionary fieldNamesDictionary = 
AbstractFieldNamesDictionary.deserialize(input);
+
+        //Schema
+        Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels = 
new HashMap<>();
+        ObjectSchemaNode root = (ObjectSchemaNode) 
AbstractSchemaNode.deserialize(input, definitionLevels);
+        ObjectSchemaNode metaRoot = null;
+        if (metaType != null) {
+            metaRoot = (ObjectSchemaNode) 
AbstractSchemaNode.deserialize(input, definitionLevels);
+        }
+
+        ArrayBackedValueStorage schemaStorage = new 
ArrayBackedValueStorage(serializedMetadata.getLength());
+        schemaStorage.append(serializedMetadata);
+        return new NoWriteFlushColumnMetadata(datasetType, metaType, 
numPrimaryKeys, metaContainsKeys,
+                columnWriterFactory, multiPageOpRef, writers, 
fieldNamesDictionary, root, metaRoot, definitionLevels,
+                schemaStorage);
+    }
+
+    @Override
+    public int getNumberOfColumns() {
+        return numColumns;
+    }
+
+    public void close() {
+    }
+
+    @Override
+    protected AbstractSchemaNode createChild(ATypeTag childTypeTag) throws 
HyracksDataException {
+        switch (childTypeTag) {
+            case OBJECT:
+                return addDefinitionLevelsAndGet(new ObjectSchemaNode());
+            case ARRAY:
+                return addDefinitionLevelsAndGet(new ArraySchemaNode());
+            case MULTISET:
+                return addDefinitionLevelsAndGet(new MultisetSchemaNode());
+            case NULL:
+            case MISSING:
+            case BOOLEAN:
+            case FLOAT:
+            case DOUBLE:
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
+            case BIGINT:
+            case STRING:
+            case UUID:
+                int columnIndex = nullWriterIndexes.isEmpty() ? numColumns : 
nullWriterIndexes.removeInt(0);
+                boolean primaryKey = columnIndex < getNumberOfPrimaryKeys();
+                ATypeTag normalizedTypeTag = primaryKey ? childTypeTag : 
getNormalizedTypeTag(childTypeTag);
+                if (columnIndex == numColumns) {
+                    numColumns++;
+                }
+                return new PrimitiveSchemaNode(columnIndex, normalizedTypeTag, 
primaryKey);
+            default:
+                throw new IllegalStateException("Unsupported type " + 
childTypeTag);
+
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
index 6ff2cd9..fc7b859 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
@@ -95,7 +95,7 @@
     }

     @Override
-    public int getNumberOfColumns() {
+    public int getNumberOfColumns(boolean includeCurrentTupleColumns) {
         return columnMetadata.getNumberOfColumns();
     }

@@ -106,7 +106,7 @@

     @Override
     public int getOccupiedSpace() {
-        int numberOfColumns = getNumberOfColumns();
+        int numberOfColumns = getNumberOfColumns(true);
         int filterSize = numberOfColumns * 
AbstractColumnFilterWriter.FILTER_SIZE;
         return primaryKeysEstimatedSize + filterSize;
     }
@@ -251,4 +251,7 @@
         node.put("componentIndex", componentIndex);
         node.put("count", count);
     }
+
+    public void updateColumnMetadataForCurrentTuple(ITupleReference tuple) 
throws HyracksDataException {
+    }
 }
diff --git 
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
 
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
index 55097c5..8fd9f6b 100644
--- 
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
+++ 
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
@@ -193,7 +193,7 @@
         pageZero.position(HEADER_SIZE);
         pageZero.putInt(MEGA_LEAF_NODE_LENGTH, writer.flush(pageZero));
         //Write page header
-        int numberOfColumn = writer.getNumberOfColumns();
+        int numberOfColumn = writer.getNumberOfColumns(false);
         pageZero.putInt(TUPLE_COUNT_OFFSET, tupleCount);
         pageZero.putInt(NUMBER_OF_COLUMNS_OFFSET, numberOfColumn);

diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
index 6d26a45..92b3892 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
@@ -49,10 +49,12 @@
      */
     public abstract void init(IColumnWriteMultiPageOp multiPageOp) throws 
HyracksDataException;

+    public abstract void updateColumnMetadataForCurrentTuple(ITupleReference 
tuple) throws HyracksDataException;
+
     /**
-     * @return The current number of columns
+     * @return The current number of columns including the current tuple
      */
-    public abstract int getNumberOfColumns();
+    public abstract int getNumberOfColumns(boolean includeCurrentTupleColumns);

     /**
      * Currently, a column offset takes 4-byte (fixed). But in the future, we 
can reformat the offsets. For example,
@@ -61,7 +63,7 @@
      * @return the size needed to store columns' offsets
      */
     public final int getColumnOffsetsSize() {
-        return Integer.BYTES * getNumberOfColumns();
+        return Integer.BYTES * getNumberOfColumns(true);
     }

     /**
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
index d9aec0c..ee61fa6 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
@@ -99,7 +99,7 @@
         return tupleWriter.createTupleReference();
     }

-    private boolean isFull(ITupleReference tuple) {
+    private boolean isFull(ITupleReference tuple) throws HyracksDataException {
         if (tupleCount == 0) {
             return false;
         } else if (tupleCount >= columnWriter.getMaxNumberOfTuples()) {
@@ -108,6 +108,7 @@
         }
         int requiredFreeSpace = AbstractColumnBTreeLeafFrame.HEADER_SIZE;
         //Columns' Offsets
+        columnWriter.updateColumnMetadataForCurrentTuple(tuple);
         requiredFreeSpace += columnWriter.getColumnOffsetsSize();
         //Occupied space from previous writes
         requiredFreeSpace += columnWriter.getOccupiedSpace();
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeWriteLeafFrame.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeWriteLeafFrame.java
index c725084..bced7ef 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeWriteLeafFrame.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeWriteLeafFrame.java
@@ -60,7 +60,7 @@
         rowTupleWriter.writeTuple(maxKey, buf.array(), offset);

         // Write page information
-        int numberOfColumns = columnWriter.getNumberOfColumns();
+        int numberOfColumns = columnWriter.getNumberOfColumns(false);
         buf.putInt(TUPLE_COUNT_OFFSET, numberOfTuples);
         buf.putInt(NUMBER_OF_COLUMNS_OFFSET, numberOfColumns);
         buf.putInt(SIZE_OF_COLUMNS_OFFSETS_OFFSET, 
columnWriter.getColumnOffsetsSize());

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19125
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: goldfish
Gerrit-Change-Id: I4a318ae69538daf385060df26e9a5fd1d2d494f8
Gerrit-Change-Number: 19125
Gerrit-PatchSet: 1
Gerrit-Owner: Peeyush Gupta <[email protected]>
Gerrit-MessageType: newchange

Reply via email to