>From Peeyush Gupta <[email protected]>:
Peeyush Gupta has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19125 )
Change subject: wip: test patch to get number of columns
......................................................................
wip: test patch to get number of columns
Change-Id: I4a318ae69538daf385060df26e9a5fd1d2d494f8
---
M
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
M
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeWriteLeafFrame.java
M
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManager.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
M
asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
A
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteFlushColumnMetadata.java
A
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteColumnTransformer.java
M
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
10 files changed, 382 insertions(+), 31 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/25/19125/1
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManager.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManager.java
index 6db8a78..ea85d8b 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManager.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManager.java
@@ -74,8 +74,8 @@
try {
Mutable<IColumnWriteMultiPageOp> multiPageOpRef = new
MutableObject<>();
IColumnValuesWriterFactory writerFactory = new
ColumnValuesWriterFactory(multiPageOpRef);
- return FlushColumnMetadata.create(datasetType, metaType,
primaryKeys, keySourceIndicator, writerFactory,
- multiPageOpRef, metadata);
+ return FlushColumnMetadata.create(datasetType, metaType,
primaryKeys.size(), keySourceIndicator,
+ writerFactory, multiPageOpRef, metadata);
} catch (IOException e) {
throw HyracksDataException.create(e);
}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
index f514638..5251508 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
@@ -70,9 +70,9 @@
* Flush column metadata belongs to a flushing {@link ILSMMemoryComponent}
* The schema here is mutable and can change according to the flushed records
*/
-public final class FlushColumnMetadata extends AbstractColumnMetadata {
+public class FlushColumnMetadata extends AbstractColumnMetadata {
private static final Logger LOGGER = LogManager.getLogger();
- private final Map<AbstractSchemaNestedNode, RunLengthIntArray>
definitionLevels;
+ protected final Map<AbstractSchemaNestedNode, RunLengthIntArray>
definitionLevels;
private final Mutable<IColumnWriteMultiPageOp> multiPageOpRef;
private final IFieldNamesDictionary fieldNamesDictionary;
private final ObjectSchemaNode root;
@@ -81,11 +81,11 @@
private final List<IColumnValuesWriter> columnWriters;
private final ArrayBackedValueStorage serializedMetadata;
private final PathInfoSerializer pathInfoSerializer;
- private final IntArrayList nullWriterIndexes;
+ protected final IntArrayList nullWriterIndexes;
private final boolean metaContainsKeys;
private boolean changed;
- private int level;
- private int repeated;
+ protected int level;
+ protected int repeated;
public FlushColumnMetadata(ARecordType datasetType, ARecordType metaType,
List<List<String>> primaryKeys,
List<Integer> keySourceIndicator, IColumnValuesWriterFactory
columnWriterFactory,
@@ -124,13 +124,13 @@
serializeColumnsMetadata();
}
- private FlushColumnMetadata(ARecordType datasetType, ARecordType metaType,
List<List<String>> primaryKeys,
+ public FlushColumnMetadata(ARecordType datasetType, ARecordType metaType,
int numPrimaryKeys,
boolean metaContainsKeys, IColumnValuesWriterFactory
columnWriterFactory,
Mutable<IColumnWriteMultiPageOp> multiPageOpRef,
List<IColumnValuesWriter> columnWriters,
IFieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root,
ObjectSchemaNode metaRoot,
Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels,
ArrayBackedValueStorage serializedMetadata) {
- super(datasetType, metaType, primaryKeys.size());
+ super(datasetType, metaType, numPrimaryKeys);
this.multiPageOpRef = multiPageOpRef;
this.columnWriterFactory = columnWriterFactory;
this.definitionLevels = definitionLevels;
@@ -226,21 +226,21 @@
IntegerPointable.setInteger(serializedMetadata.getByteArray(),
pointer, offset);
}
- public static FlushColumnMetadata create(ARecordType datasetType,
ARecordType metaType,
- List<List<String>> primaryKeys, List<Integer> keySourceIndicator,
- IColumnValuesWriterFactory columnWriterFactory,
Mutable<IColumnWriteMultiPageOp> multiPageOpRef,
- IValueReference serializedMetadata) throws HyracksDataException {
+ public static FlushColumnMetadata create(ARecordType datasetType,
ARecordType metaType, int numPrimaryKeys,
+ List<Integer> keySourceIndicator, IColumnValuesWriterFactory
columnWriterFactory,
+ Mutable<IColumnWriteMultiPageOp> multiPageOpRef, IValueReference
serializedMetadata)
+ throws HyracksDataException {
boolean metaContainsKeys = metaType != null &&
keySourceIndicator.get(0) == 1;
try {
- return createMutableMetadata(datasetType, metaType, primaryKeys,
metaContainsKeys, columnWriterFactory,
+ return createMutableMetadata(datasetType, metaType,
numPrimaryKeys, metaContainsKeys, columnWriterFactory,
multiPageOpRef, serializedMetadata);
} catch (IOException e) {
throw HyracksDataException.create(e);
}
}
- private static FlushColumnMetadata createMutableMetadata(ARecordType
datasetType, ARecordType metaType,
- List<List<String>> primaryKeys, boolean metaContainsKeys,
IColumnValuesWriterFactory columnWriterFactory,
+ public static FlushColumnMetadata createMutableMetadata(ARecordType
datasetType, ARecordType metaType,
+ int numPrimaryKeys, boolean metaContainsKeys,
IColumnValuesWriterFactory columnWriterFactory,
Mutable<IColumnWriteMultiPageOp> multiPageOpRef, IValueReference
serializedMetadata) throws IOException {
DataInput input = new DataInputStream(new
ByteArrayInputStream(serializedMetadata.getByteArray(),
serializedMetadata.getStartOffset(),
serializedMetadata.getLength()));
@@ -265,7 +265,7 @@
ArrayBackedValueStorage schemaStorage = new
ArrayBackedValueStorage(serializedMetadata.getLength());
schemaStorage.append(serializedMetadata);
logSchema(root, metaRoot, fieldNamesDictionary);
- return new FlushColumnMetadata(datasetType, metaType, primaryKeys,
metaContainsKeys, columnWriterFactory,
+ return new FlushColumnMetadata(datasetType, metaType, numPrimaryKeys,
metaContainsKeys, columnWriterFactory,
multiPageOpRef, writers, fieldNamesDictionary, root, metaRoot,
definitionLevels, schemaStorage);
}
@@ -525,7 +525,7 @@
return createdChild;
}
- private AbstractSchemaNode createChild(ATypeTag childTypeTag) throws
HyracksDataException {
+ protected AbstractSchemaNode createChild(ATypeTag childTypeTag) throws
HyracksDataException {
switch (childTypeTag) {
case OBJECT:
return addDefinitionLevelsAndGet(new ObjectSchemaNode());
@@ -571,7 +571,7 @@
}
}
- private AbstractSchemaNode
addDefinitionLevelsAndGet(AbstractSchemaNestedNode nestedNode) {
+ protected AbstractSchemaNode
addDefinitionLevelsAndGet(AbstractSchemaNestedNode nestedNode) {
definitionLevels.put(nestedNode, new RunLengthIntArray());
return nestedNode;
}
@@ -590,4 +590,9 @@
LOGGER.debug("Schema for {} has changed: {}", META_RECORD_SCHEMA,
metaRecordSchema);
}
}
+
+ public boolean isMetaContainsKey() {
+ return metaContainsKeys;
+ }
+
}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
index 65f5eb4..563c66c 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
@@ -18,13 +18,18 @@
*/
package org.apache.asterix.column.operation.lsm.flush;
+import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.asterix.column.values.IColumnValuesWriter;
+import org.apache.asterix.column.values.IColumnValuesWriterFactory;
import org.apache.asterix.column.values.writer.ColumnBatchWriter;
+import org.apache.asterix.column.values.writer.ColumnValuesWriterFactory;
import
org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
import org.apache.asterix.om.lazy.RecordLazyVisitablePointable;
import org.apache.asterix.om.lazy.TypedRecordLazyVisitablePointable;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import
org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
@@ -34,10 +39,13 @@
public class FlushColumnTupleWriter extends AbstractColumnTupleWriter {
protected final FlushColumnMetadata columnMetadata;
+ protected final NoWriteFlushColumnMetadata columnMetadataWithCurrentTuple;
+
protected final BatchFinalizerVisitor finalizer;
protected final ColumnBatchWriter writer;
private final ColumnTransformer transformer;
+ private final NoWriteColumnTransformer transformerForCurrentTuple;
private final RecordLazyVisitablePointable pointable;
private final int maxNumberOfTuples;
private final IColumnValuesWriter[] primaryKeyWriters;
@@ -60,6 +68,26 @@
for (int i = 0; i < numberOfPrimaryKeys; i++) {
primaryKeyWriters[i] = columnMetadata.getWriter(i);
}
+
+ Mutable<IColumnWriteMultiPageOp> multiPageOpRef = new
MutableObject<>();
+ IColumnValuesWriterFactory writerFactory = new
ColumnValuesWriterFactory(multiPageOpRef);
+ try {
+ columnMetadataWithCurrentTuple =
NoWriteFlushColumnMetadata.createMutableMetadata(
+ columnMetadata.getDatasetType(),
columnMetadata.getMetaType(),
+ columnMetadata.getNumberOfPrimaryKeys(),
columnMetadata.isMetaContainsKey(), writerFactory,
+ multiPageOpRef, columnMetadata.serializeColumnsMetadata());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ transformerForCurrentTuple =
+ new NoWriteColumnTransformer(columnMetadataWithCurrentTuple,
columnMetadataWithCurrentTuple.getRoot());
+ }
+
+ public void updateColumnMetadataForCurrentTuple(ITupleReference tuple)
throws HyracksDataException {
+ int recordFieldId = columnMetadata.getRecordFieldIndex();
+ pointable.set(tuple.getFieldData(recordFieldId),
tuple.getFieldStart(recordFieldId),
+ tuple.getFieldLength(recordFieldId));
+ transformerForCurrentTuple.transform(pointable);
}
@Override
@@ -68,8 +96,12 @@
}
@Override
- public final int getNumberOfColumns() {
- return columnMetadata.getNumberOfColumns();
+ public final int getNumberOfColumns(boolean includeCurrentTupleColumns) {
+ if (includeCurrentTupleColumns) {
+ return columnMetadataWithCurrentTuple.getNumberOfColumns();
+ } else {
+ return columnMetadata.getNumberOfColumns();
+ }
}
@Override
@@ -85,7 +117,7 @@
@Override
public final int getOccupiedSpace() {
- int numberOfColumns = getNumberOfColumns();
+ int numberOfColumns = getNumberOfColumns(true);
int filterSize = numberOfColumns *
AbstractColumnFilterWriter.FILTER_SIZE;
return primaryKeysEstimatedSize + filterSize;
}
@@ -124,7 +156,7 @@
@Override
public final int flush(ByteBuffer pageZero) throws HyracksDataException {
- writer.setPageZeroBuffer(pageZero, getNumberOfColumns(),
columnMetadata.getNumberOfPrimaryKeys());
+ writer.setPageZeroBuffer(pageZero, getNumberOfColumns(false),
columnMetadata.getNumberOfPrimaryKeys());
transformer.resetStringLengths();
return finalizer.finalizeBatch(writer, columnMetadata);
}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteColumnTransformer.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteColumnTransformer.java
new file mode 100644
index 0000000..222f1bf
--- /dev/null
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteColumnTransformer.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.operation.lsm.flush;
+
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNestedNode;
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
+import org.apache.asterix.column.metadata.schema.ObjectSchemaNode;
+import org.apache.asterix.column.metadata.schema.UnionSchemaNode;
+import
org.apache.asterix.column.metadata.schema.collection.AbstractCollectionSchemaNode;
+import org.apache.asterix.column.util.RunLengthIntArray;
+import org.apache.asterix.om.lazy.AbstractLazyVisitablePointable;
+import org.apache.asterix.om.lazy.AbstractListLazyVisitablePointable;
+import org.apache.asterix.om.lazy.FlatLazyVisitablePointable;
+import org.apache.asterix.om.lazy.ILazyVisitablePointableVisitor;
+import org.apache.asterix.om.lazy.RecordLazyVisitablePointable;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public class NoWriteColumnTransformer
+ implements ILazyVisitablePointableVisitor<AbstractSchemaNode,
AbstractSchemaNode> {
+ private final NoWriteFlushColumnMetadata columnMetadata;
+ private final ObjectSchemaNode root;
+ private AbstractSchemaNestedNode currentParent;
+
+ public NoWriteColumnTransformer(NoWriteFlushColumnMetadata columnMetadata,
ObjectSchemaNode root) {
+ this.columnMetadata = columnMetadata;
+ this.root = root;
+ }
+
+ /**
+ * Transform a tuple in row format into columns
+ *
+ * @param pointable record pointable
+ * @return the estimated size (possibly overestimated) of the primary
key(s) columns
+ */
+ public int transform(RecordLazyVisitablePointable pointable) throws
HyracksDataException {
+ pointable.accept(this, root);
+ return 0;
+ }
+
+ @Override
+ public AbstractSchemaNode visit(RecordLazyVisitablePointable pointable,
AbstractSchemaNode arg)
+ throws HyracksDataException {
+ columnMetadata.enterNode(currentParent, arg);
+ AbstractSchemaNestedNode previousParent = currentParent;
+
+ ObjectSchemaNode objectNode = (ObjectSchemaNode) arg;
+ currentParent = objectNode;
+ for (int i = 0; i < pointable.getNumberOfChildren(); i++) {
+ pointable.nextChild();
+ IValueReference fieldName = pointable.getFieldName();
+ ATypeTag childTypeTag = pointable.getChildTypeTag();
+ if (childTypeTag != ATypeTag.MISSING) {
+ //Only write actual field values (including NULL) but ignore
MISSING fields
+ AbstractSchemaNode childNode =
objectNode.getOrCreateChild(fieldName, childTypeTag, columnMetadata);
+ acceptActualNode(pointable.getChildVisitablePointable(),
childNode);
+ }
+ }
+
+ if (pointable.getNumberOfChildren() == 0) {
+ // Set as empty object
+ objectNode.setEmptyObject(columnMetadata);
+ }
+
+ columnMetadata.exitNode(arg);
+ currentParent = previousParent;
+ return null;
+ }
+
+ @Override
+ public AbstractSchemaNode visit(AbstractListLazyVisitablePointable
pointable, AbstractSchemaNode arg)
+ throws HyracksDataException {
+ columnMetadata.enterNode(currentParent, arg);
+ AbstractSchemaNestedNode previousParent = currentParent;
+
+ AbstractCollectionSchemaNode collectionNode =
(AbstractCollectionSchemaNode) arg;
+ RunLengthIntArray defLevels =
columnMetadata.getDefinitionLevels(collectionNode);
+ //the level at which an item is missing
+ int missingLevel = columnMetadata.getLevel();
+ currentParent = collectionNode;
+
+ int numberOfChildren = pointable.getNumberOfChildren();
+ for (int i = 0; i < numberOfChildren; i++) {
+ pointable.nextChild();
+ ATypeTag childTypeTag = pointable.getChildTypeTag();
+ AbstractSchemaNode childNode =
collectionNode.getOrCreateItem(childTypeTag, columnMetadata);
+ acceptActualNode(pointable.getChildVisitablePointable(),
childNode);
+ /*
+ * The array item may change (e.g., BIGINT --> UNION). Thus, new
items would be considered as missing
+ */
+ defLevels.add(missingLevel);
+ }
+
+ // Add missing as a last element of the array to help indicate empty
arrays
+ collectionNode.getOrCreateItem(ATypeTag.MISSING, columnMetadata);
+ defLevels.add(missingLevel);
+
+ columnMetadata.exitCollectionNode(collectionNode, numberOfChildren);
+ currentParent = previousParent;
+ return null;
+ }
+
+ @Override
+ public AbstractSchemaNode visit(FlatLazyVisitablePointable pointable,
AbstractSchemaNode arg)
+ throws HyracksDataException {
+ columnMetadata.enterNode(currentParent, arg);
+ columnMetadata.exitNode(arg);
+ return null;
+ }
+
+ private void acceptActualNode(AbstractLazyVisitablePointable pointable,
AbstractSchemaNode node)
+ throws HyracksDataException {
+ if (node.getTypeTag() == ATypeTag.UNION) {
+ columnMetadata.enterNode(currentParent, node);
+ AbstractSchemaNestedNode previousParent = currentParent;
+
+ UnionSchemaNode unionNode = (UnionSchemaNode) node;
+ currentParent = unionNode;
+
+ ATypeTag childTypeTag = pointable.getTypeTag();
+
+ if (childTypeTag == ATypeTag.NULL || childTypeTag ==
ATypeTag.MISSING) {
+ /*
+ * NULL and MISSING are tracked since the start to be written
in the originalType (i.e., the type
+ * before injecting a union between the parent and the
original node).
+ */
+ AbstractSchemaNode actualNode = unionNode.getOriginalType();
+ acceptActualNode(pointable, actualNode);
+ } else {
+ AbstractSchemaNode actualNode =
unionNode.getOrCreateChild(pointable.getTypeTag(), columnMetadata);
+ pointable.accept(this, actualNode);
+ }
+
+ currentParent = previousParent;
+ columnMetadata.exitNode(node);
+ } else if (pointable.getTypeTag() == ATypeTag.NULL && node.isNested())
{
+ columnMetadata.addNestedNull(currentParent,
(AbstractSchemaNestedNode) node);
+ } else {
+ pointable.accept(this, node);
+ }
+ }
+}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteFlushColumnMetadata.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteFlushColumnMetadata.java
new file mode 100644
index 0000000..70dcf34
--- /dev/null
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteFlushColumnMetadata.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.operation.lsm.flush;
+
+import static
org.apache.asterix.column.util.ColumnValuesUtil.getNormalizedTypeTag;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.column.metadata.IFieldNamesDictionary;
+import
org.apache.asterix.column.metadata.dictionary.AbstractFieldNamesDictionary;
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNestedNode;
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
+import org.apache.asterix.column.metadata.schema.ObjectSchemaNode;
+import org.apache.asterix.column.metadata.schema.collection.ArraySchemaNode;
+import org.apache.asterix.column.metadata.schema.collection.MultisetSchemaNode;
+import org.apache.asterix.column.metadata.schema.primitive.PrimitiveSchemaNode;
+import org.apache.asterix.column.util.RunLengthIntArray;
+import org.apache.asterix.column.values.IColumnValuesWriter;
+import org.apache.asterix.column.values.IColumnValuesWriterFactory;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import
org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+
+/**
+ * Flush column metadata belongs to a flushing {@link ILSMMemoryComponent}
+ * The schema here is mutable and can change according to the flushed records
+ */
+public final class NoWriteFlushColumnMetadata extends FlushColumnMetadata {
+
+ private int numColumns;
+
+ public NoWriteFlushColumnMetadata(ARecordType datasetType, ARecordType
metaType, int numPrimaryKeys,
+ boolean metaContainsKeys, IColumnValuesWriterFactory
columnWriterFactory,
+ Mutable<IColumnWriteMultiPageOp> multiPageOpRef,
List<IColumnValuesWriter> writers,
+ IFieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root,
ObjectSchemaNode metaRoot,
+ Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels,
ArrayBackedValueStorage schemaStorage) {
+ super(datasetType, metaType, numPrimaryKeys, metaContainsKeys,
columnWriterFactory, multiPageOpRef, writers,
+ fieldNamesDictionary, root, metaRoot, definitionLevels,
schemaStorage);
+ numColumns = 0;
+ }
+
+ public static NoWriteFlushColumnMetadata createMutableMetadata(ARecordType
datasetType, ARecordType metaType,
+ int numPrimaryKeys, boolean metaContainsKeys,
IColumnValuesWriterFactory columnWriterFactory,
+ Mutable<IColumnWriteMultiPageOp> multiPageOpRef, IValueReference
serializedMetadata) throws IOException {
+ DataInput input = new DataInputStream(new
ByteArrayInputStream(serializedMetadata.getByteArray(),
+ serializedMetadata.getStartOffset(),
serializedMetadata.getLength()));
+ //Skip offsets
+ input.skipBytes(OFFSETS_SIZE);
+
+ //ColumnWriter
+ List<IColumnValuesWriter> writers = new ArrayList<>();
+ deserializeWriters(input, writers, columnWriterFactory);
+
+ //FieldNames
+ IFieldNamesDictionary fieldNamesDictionary =
AbstractFieldNamesDictionary.deserialize(input);
+
+ //Schema
+ Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels =
new HashMap<>();
+ ObjectSchemaNode root = (ObjectSchemaNode)
AbstractSchemaNode.deserialize(input, definitionLevels);
+ ObjectSchemaNode metaRoot = null;
+ if (metaType != null) {
+ metaRoot = (ObjectSchemaNode)
AbstractSchemaNode.deserialize(input, definitionLevels);
+ }
+
+ ArrayBackedValueStorage schemaStorage = new
ArrayBackedValueStorage(serializedMetadata.getLength());
+ schemaStorage.append(serializedMetadata);
+ return new NoWriteFlushColumnMetadata(datasetType, metaType,
numPrimaryKeys, metaContainsKeys,
+ columnWriterFactory, multiPageOpRef, writers,
fieldNamesDictionary, root, metaRoot, definitionLevels,
+ schemaStorage);
+ }
+
+ @Override
+ public int getNumberOfColumns() {
+ return numColumns;
+ }
+
+ public void close() {
+ }
+
+ @Override
+ protected AbstractSchemaNode createChild(ATypeTag childTypeTag) throws
HyracksDataException {
+ switch (childTypeTag) {
+ case OBJECT:
+ return addDefinitionLevelsAndGet(new ObjectSchemaNode());
+ case ARRAY:
+ return addDefinitionLevelsAndGet(new ArraySchemaNode());
+ case MULTISET:
+ return addDefinitionLevelsAndGet(new MultisetSchemaNode());
+ case NULL:
+ case MISSING:
+ case BOOLEAN:
+ case FLOAT:
+ case DOUBLE:
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ case BIGINT:
+ case STRING:
+ case UUID:
+ int columnIndex = nullWriterIndexes.isEmpty() ? numColumns :
nullWriterIndexes.removeInt(0);
+ boolean primaryKey = columnIndex < getNumberOfPrimaryKeys();
+ ATypeTag normalizedTypeTag = primaryKey ? childTypeTag :
getNormalizedTypeTag(childTypeTag);
+ if (columnIndex == numColumns) {
+ numColumns++;
+ }
+ return new PrimitiveSchemaNode(columnIndex, normalizedTypeTag,
primaryKey);
+ default:
+ throw new IllegalStateException("Unsupported type " +
childTypeTag);
+
+ }
+ }
+}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
index 6ff2cd9..fc7b859 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
@@ -95,7 +95,7 @@
}
@Override
- public int getNumberOfColumns() {
+ public int getNumberOfColumns(boolean includeCurrentTupleColumns) {
return columnMetadata.getNumberOfColumns();
}
@@ -106,7 +106,7 @@
@Override
public int getOccupiedSpace() {
- int numberOfColumns = getNumberOfColumns();
+ int numberOfColumns = getNumberOfColumns(true);
int filterSize = numberOfColumns *
AbstractColumnFilterWriter.FILTER_SIZE;
return primaryKeysEstimatedSize + filterSize;
}
@@ -251,4 +251,7 @@
node.put("componentIndex", componentIndex);
node.put("count", count);
}
+
+ public void updateColumnMetadataForCurrentTuple(ITupleReference tuple)
throws HyracksDataException {
+ }
}
diff --git
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
index 55097c5..8fd9f6b 100644
---
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
+++
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
@@ -193,7 +193,7 @@
pageZero.position(HEADER_SIZE);
pageZero.putInt(MEGA_LEAF_NODE_LENGTH, writer.flush(pageZero));
//Write page header
- int numberOfColumn = writer.getNumberOfColumns();
+ int numberOfColumn = writer.getNumberOfColumns(false);
pageZero.putInt(TUPLE_COUNT_OFFSET, tupleCount);
pageZero.putInt(NUMBER_OF_COLUMNS_OFFSET, numberOfColumn);
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
index 6d26a45..92b3892 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
@@ -49,10 +49,12 @@
*/
public abstract void init(IColumnWriteMultiPageOp multiPageOp) throws
HyracksDataException;
+ public abstract void updateColumnMetadataForCurrentTuple(ITupleReference
tuple) throws HyracksDataException;
+
/**
- * @return The current number of columns
+ * @return The current number of columns including the current tuple
*/
- public abstract int getNumberOfColumns();
+ public abstract int getNumberOfColumns(boolean includeCurrentTupleColumns);
/**
* Currently, a column offset takes 4-byte (fixed). But in the future, we
can reformat the offsets. For example,
@@ -61,7 +63,7 @@
* @return the size needed to store columns' offsets
*/
public final int getColumnOffsetsSize() {
- return Integer.BYTES * getNumberOfColumns();
+ return Integer.BYTES * getNumberOfColumns(true);
}
/**
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
index d9aec0c..ee61fa6 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
@@ -99,7 +99,7 @@
return tupleWriter.createTupleReference();
}
- private boolean isFull(ITupleReference tuple) {
+ private boolean isFull(ITupleReference tuple) throws HyracksDataException {
if (tupleCount == 0) {
return false;
} else if (tupleCount >= columnWriter.getMaxNumberOfTuples()) {
@@ -108,6 +108,7 @@
}
int requiredFreeSpace = AbstractColumnBTreeLeafFrame.HEADER_SIZE;
//Columns' Offsets
+ columnWriter.updateColumnMetadataForCurrentTuple(tuple);
requiredFreeSpace += columnWriter.getColumnOffsetsSize();
//Occupied space from previous writes
requiredFreeSpace += columnWriter.getOccupiedSpace();
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeWriteLeafFrame.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeWriteLeafFrame.java
index c725084..bced7ef 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeWriteLeafFrame.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeWriteLeafFrame.java
@@ -60,7 +60,7 @@
rowTupleWriter.writeTuple(maxKey, buf.array(), offset);
// Write page information
- int numberOfColumns = columnWriter.getNumberOfColumns();
+ int numberOfColumns = columnWriter.getNumberOfColumns(false);
buf.putInt(TUPLE_COUNT_OFFSET, numberOfTuples);
buf.putInt(NUMBER_OF_COLUMNS_OFFSET, numberOfColumns);
buf.putInt(SIZE_OF_COLUMNS_OFFSETS_OFFSET,
columnWriter.getColumnOffsetsSize());
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19125
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: goldfish
Gerrit-Change-Id: I4a318ae69538daf385060df26e9a5fd1d2d494f8
Gerrit-Change-Number: 19125
Gerrit-PatchSet: 1
Gerrit-Owner: Peeyush Gupta <[email protected]>
Gerrit-MessageType: newchange