>From Peeyush Gupta <[email protected]>:
Peeyush Gupta has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19128 )
Change subject: wip: test patch for columnar flush issue (no extra schema
inference)
......................................................................
wip: test patch for columnar flush issue (no extra schema inference)
Change-Id: I693a2953cf2fcc8ff5aa3620bd772dc9e0fa7eec
---
M
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
M
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
M
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManager.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
M
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/FloatColumnValuesWriter.java
M
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/AbstractColumnValuesWriter.java
M
asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
M
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnValuesWriterFactory.java
M
asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/DummyColumnValuesWriter.java
M
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java
M
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java
M
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NullMissingColumnValuesWriter.java
M
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java
M
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java
M
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
M
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/BooleanColumnValuesWriter.java
M
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/UUIDColumnValuesWriter.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
M
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesWriter.java
19 files changed, 225 insertions(+), 58 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/28/19128/1
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManager.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManager.java
index 6db8a78..7e031f8 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManager.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManager.java
@@ -65,7 +65,7 @@
@Override
public IColumnMetadata activate() throws HyracksDataException {
Mutable<IColumnWriteMultiPageOp> multiPageOpRef = new
MutableObject<>();
- IColumnValuesWriterFactory factory = new
ColumnValuesWriterFactory(multiPageOpRef);
+ IColumnValuesWriterFactory factory = new
ColumnValuesWriterFactory(multiPageOpRef, true);
return new FlushColumnMetadata(datasetType, metaType, primaryKeys,
keySourceIndicator, factory, multiPageOpRef);
}
@@ -73,7 +73,7 @@
public IColumnMetadata activate(IValueReference metadata) throws
HyracksDataException {
try {
Mutable<IColumnWriteMultiPageOp> multiPageOpRef = new
MutableObject<>();
- IColumnValuesWriterFactory writerFactory = new
ColumnValuesWriterFactory(multiPageOpRef);
+ IColumnValuesWriterFactory writerFactory = new
ColumnValuesWriterFactory(multiPageOpRef, true);
return FlushColumnMetadata.create(datasetType, metaType,
primaryKeys, keySourceIndicator, writerFactory,
multiPageOpRef, metadata);
} catch (IOException e) {
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
index f514638..5a7281d 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
@@ -124,13 +124,13 @@
serializeColumnsMetadata();
}
- private FlushColumnMetadata(ARecordType datasetType, ARecordType metaType,
List<List<String>> primaryKeys,
+ private FlushColumnMetadata(ARecordType datasetType, ARecordType metaType,
int numPrimaryKeys,
boolean metaContainsKeys, IColumnValuesWriterFactory
columnWriterFactory,
Mutable<IColumnWriteMultiPageOp> multiPageOpRef,
List<IColumnValuesWriter> columnWriters,
IFieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root,
ObjectSchemaNode metaRoot,
Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels,
ArrayBackedValueStorage serializedMetadata) {
- super(datasetType, metaType, primaryKeys.size());
+ super(datasetType, metaType, numPrimaryKeys);
this.multiPageOpRef = multiPageOpRef;
this.columnWriterFactory = columnWriterFactory;
this.definitionLevels = definitionLevels;
@@ -239,6 +239,32 @@
}
}
+ public static FlushColumnMetadata createMutableMetadata(ARecordType
datasetType, ARecordType metaType,
+ int numPrimaryKeys, boolean metaContainsKeys,
IColumnValuesWriterFactory columnWriterFactory,
+ List<IColumnValuesWriter> writers,
Mutable<IColumnWriteMultiPageOp> multiPageOpRef,
+ IValueReference serializedMetadata) throws IOException {
+ DataInput input = new DataInputStream(new
ByteArrayInputStream(serializedMetadata.getByteArray(),
+ serializedMetadata.getStartOffset(),
serializedMetadata.getLength()));
+ //Skip offsets
+ input.skipBytes(OFFSETS_SIZE);
+
+ //FieldNames
+ IFieldNamesDictionary fieldNamesDictionary =
AbstractFieldNamesDictionary.deserialize(input);
+
+ //Schema
+ Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels =
new HashMap<>();
+ ObjectSchemaNode root = (ObjectSchemaNode)
AbstractSchemaNode.deserialize(input, definitionLevels);
+ ObjectSchemaNode metaRoot = null;
+ if (metaType != null) {
+ metaRoot = (ObjectSchemaNode)
AbstractSchemaNode.deserialize(input, definitionLevels);
+ }
+
+ ArrayBackedValueStorage schemaStorage = new
ArrayBackedValueStorage(serializedMetadata.getLength());
+ schemaStorage.append(serializedMetadata);
+ return new FlushColumnMetadata(datasetType, metaType, numPrimaryKeys,
metaContainsKeys, columnWriterFactory,
+ multiPageOpRef, writers, fieldNamesDictionary, root, metaRoot,
definitionLevels, schemaStorage);
+ }
+
private static FlushColumnMetadata createMutableMetadata(ARecordType
datasetType, ARecordType metaType,
List<List<String>> primaryKeys, boolean metaContainsKeys,
IColumnValuesWriterFactory columnWriterFactory,
Mutable<IColumnWriteMultiPageOp> multiPageOpRef, IValueReference
serializedMetadata) throws IOException {
@@ -265,7 +291,7 @@
ArrayBackedValueStorage schemaStorage = new
ArrayBackedValueStorage(serializedMetadata.getLength());
schemaStorage.append(serializedMetadata);
logSchema(root, metaRoot, fieldNamesDictionary);
- return new FlushColumnMetadata(datasetType, metaType, primaryKeys,
metaContainsKeys, columnWriterFactory,
+ return new FlushColumnMetadata(datasetType, metaType,
primaryKeys.size(), metaContainsKeys, columnWriterFactory,
multiPageOpRef, writers, fieldNamesDictionary, root, metaRoot,
definitionLevels, schemaStorage);
}
@@ -590,4 +616,20 @@
LOGGER.debug("Schema for {} has changed: {}", META_RECORD_SCHEMA,
metaRecordSchema);
}
}
+
+ public ArrayBackedValueStorage getSerializedMetadata() {
+ return serializedMetadata;
+ }
+
+ public boolean isMetaContainsKeys() {
+ return metaContainsKeys;
+ }
+
+ public List<IColumnValuesWriter> getColumnWriters() {
+ return columnWriters;
+ }
+
+ public IColumnValuesWriterFactory getColumnWriterFactory() {
+ return columnWriterFactory;
+ }
}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
index 65f5eb4..35f6093 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
@@ -33,7 +33,9 @@
import org.apache.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference;
public class FlushColumnTupleWriter extends AbstractColumnTupleWriter {
- protected final FlushColumnMetadata columnMetadata;
+ protected FlushColumnMetadata columnMetadata;
+ protected FlushColumnMetadata prevColumnMetadata;
+
protected final BatchFinalizerVisitor finalizer;
protected final ColumnBatchWriter writer;
@@ -111,6 +113,7 @@
@Override
public void writeTuple(ITupleReference tuple) throws HyracksDataException {
+ columnMetadata.serializeColumnsMetadata();
//This from an in-memory component, hence the cast
LSMBTreeTupleReference btreeTuple = (LSMBTreeTupleReference) tuple;
if (btreeTuple.isAntimatter()) {
@@ -139,4 +142,30 @@
protected void writeMeta(LSMBTreeTupleReference btreeTuple) throws
HyracksDataException {
//NoOp
}
+
+ @Override
+ public void applyCache(ITupleReference tuple) throws HyracksDataException {
+ for (int i = 0; i < columnMetadata.getNumberOfColumns(); i++) {
+ columnMetadata.getWriter(i).applyCache();
+ }
+ }
+
+ @Override
+ public void undo() throws HyracksDataException {
+ try {
+ prevColumnMetadata = columnMetadata;
+ columnMetadata =
FlushColumnMetadata.createMutableMetadata(columnMetadata.getDatasetType(),
+ columnMetadata.getMetaType(),
columnMetadata.getNumberOfPrimaryKeys(),
+ columnMetadata.isMetaContainsKeys(),
columnMetadata.getColumnWriterFactory(),
+ columnMetadata.getColumnWriters(),
columnMetadata.getMultiPageOpRef(),
+ columnMetadata.getSerializedMetadata());
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public void redo() throws HyracksDataException {
+ columnMetadata = prevColumnMetadata;
+ }
}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
index 6ff2cd9..8852af3 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
@@ -40,7 +40,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
public class MergeColumnTupleWriter extends AbstractColumnTupleWriter {
- private final MergeColumnWriteMetadata columnMetadata;
+ private MergeColumnWriteMetadata columnMetadata;
private final int maxLeafNodeSize;
private final MergeColumnTupleReference[] componentsTuples;
private final RunLengthIntArray writtenComponents;
@@ -112,7 +112,7 @@
}
@Override
- public void writeTuple(ITupleReference tuple) throws HyracksDataException {
+ public void applyCache(ITupleReference tuple) throws HyracksDataException {
MergeColumnTupleReference columnTuple = (MergeColumnTupleReference)
tuple;
int componentIndex = columnTuple.getComponentIndex();
int skipCount = columnTuple.getAndResetSkipCount();
@@ -251,4 +251,16 @@
node.put("componentIndex", componentIndex);
node.put("count", count);
}
+
+ @Override
+ public void writeTuple(ITupleReference tuple) throws HyracksDataException {
+ }
+
+ @Override
+ public void undo() throws HyracksDataException {
+ }
+
+ @Override
+ public void redo() throws HyracksDataException {
+ }
}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java
index b0d1a01..2f8fc19 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java
@@ -92,6 +92,13 @@
}
public static MergeColumnWriteMetadata create(ARecordType datasetType,
ARecordType metaType,
+ int numberOfPrimaryKeys, List<IColumnValuesWriter> writers,
Mutable<IColumnWriteMultiPageOp> multiPageOpRef,
+ IValueReference serializedMetadata, List<IColumnTupleIterator>
componentsTuples) throws IOException {
+ return new MergeColumnWriteMetadata(datasetType, metaType,
numberOfPrimaryKeys, multiPageOpRef, writers,
+ serializedMetadata, componentsTuples);
+ }
+
+ public static MergeColumnWriteMetadata create(ARecordType datasetType,
ARecordType metaType,
int numberOfPrimaryKeys, Mutable<IColumnWriteMultiPageOp>
multiPageOpRef,
IValueReference serializedMetadata, List<IColumnTupleIterator>
componentsTuples) throws IOException {
byte[] bytes = serializedMetadata.getByteArray();
@@ -101,7 +108,7 @@
int writersOffset = offset + IntegerPointable.getInteger(bytes, offset
+ WRITERS_POINTER);
DataInput input = new DataInputStream(new ByteArrayInputStream(bytes,
writersOffset, length));
- IColumnValuesWriterFactory writerFactory = new
ColumnValuesWriterFactory(multiPageOpRef);
+ IColumnValuesWriterFactory writerFactory = new
ColumnValuesWriterFactory(multiPageOpRef, false);
List<IColumnValuesWriter> writers = new ArrayList<>();
FlushColumnMetadata.deserializeWriters(input, writers, writerFactory);
@@ -112,4 +119,12 @@
public List<IColumnTupleIterator> getComponentsTuples() {
return componentsTuples;
}
+
+ public IValueReference getSerializedMetadata() {
+ return serializedMetadata;
+ }
+
+ public List<IColumnValuesWriter> getColumnWriters() {
+ return columnWriters;
+ }
}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesWriter.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesWriter.java
index 2e2aa9e..add2442 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesWriter.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesWriter.java
@@ -141,4 +141,6 @@
* @param output destination to which the writer should be serialized to
*/
void serialize(DataOutput output) throws IOException;
+
+ void applyCache() throws HyracksDataException;
}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/AbstractColumnValuesWriter.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/AbstractColumnValuesWriter.java
index accf4a0..e7e7f1e 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/AbstractColumnValuesWriter.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/AbstractColumnValuesWriter.java
@@ -22,6 +22,8 @@
import java.io.DataOutput;
import java.io.IOException;
import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
import
org.apache.asterix.column.bytes.encoder.ParquetRunLengthBitPackingHybridEncoder;
import org.apache.asterix.column.util.ColumnValuesUtil;
@@ -32,6 +34,7 @@
import
org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
import org.apache.asterix.column.values.writer.filters.NoOpColumnFilterWriter;
import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.logging.log4j.LogManager;
@@ -52,8 +55,11 @@
private final int nullBitMask;
private int count;
private boolean writeValues;
+ protected final boolean cacheFirst;
+ protected List<Pair<ATypeTag, IValueReference>> cachedValues;
+ protected List<Pair<Integer, Integer>> cachedLevels;
- AbstractColumnValuesWriter(int columnIndex, int level, boolean collection,
boolean filtered) {
+ AbstractColumnValuesWriter(int columnIndex, int level, boolean collection,
boolean filtered, boolean cacheFirst) {
this.columnIndex = columnIndex;
this.level = level;
this.collection = collection;
@@ -61,6 +67,9 @@
int width = ColumnValuesUtil.getBitWidth(level);
definitionLevels = new ParquetRunLengthBitPackingHybridEncoder(width);
this.filterWriter = filtered ? createFilter() :
NoOpColumnFilterWriter.INSTANCE;
+ this.cacheFirst = cacheFirst;
+ cachedValues = new ArrayList<>();
+ cachedLevels = new ArrayList<>();
}
@Override
@@ -91,9 +100,9 @@
@Override
public final void writeValue(ATypeTag tag, IValueReference value) throws
HyracksDataException {
- addLevel(level);
+ addOrCacheLevel(level, 1);
try {
- addValue(tag, value);
+ addOrCacheValue(tag, value);
} catch (IOException e) {
throw HyracksDataException.create(e);
}
@@ -101,20 +110,12 @@
@Override
public final void writeLevel(int level) throws HyracksDataException {
- addLevel(level);
+ addOrCacheLevel(level, 1);
}
@Override
public void writeLevels(int level, int count) throws HyracksDataException {
- writeValues = writeValues || this.level == level;
- this.count += count;
- try {
- for (int i = 0; i < count; i++) {
- definitionLevels.writeInt(level);
- }
- } catch (IOException e) {
- throw HyracksDataException.create(e);
- }
+ addOrCacheLevel(level, count);
}
@Override
@@ -124,7 +125,7 @@
@Override
public final void writeNull(int level) throws HyracksDataException {
- addLevel(level | nullBitMask);
+ addOrCacheLevel(level | nullBitMask, 1);
}
@Override
@@ -138,9 +139,9 @@
@Override
public void writeAntiMatter(ATypeTag tag, IValueReference value) throws
HyracksDataException {
- addLevel(0);
+ addOrCacheLevel(0, 1);
try {
- addValue(tag, value);
+ addOrCacheValue(tag, value);
} catch (IOException e) {
throw HyracksDataException.create(e);
}
@@ -226,11 +227,44 @@
return writerFactory.createValueWriter(typeTag, columnIndex, level,
collection, filtered);
}
- protected void addLevel(int level) throws HyracksDataException {
+ protected void addOrCacheLevel(int level, int count) throws
HyracksDataException {
+ if (cacheFirst) {
+ cachedLevels.add(new Pair<>(level, count));
+ } else {
+ addLevel(level, count);
+ }
+ }
+
+ protected void addLevel(int level, int count) throws HyracksDataException {
+ writeValues = writeValues || this.level == level;
+ this.count += count;
try {
- writeValues = writeValues || this.level == level;
- definitionLevels.writeInt(level);
- count++;
+ for (int i = 0; i < count; i++) {
+ definitionLevels.writeInt(level);
+ }
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ protected void addOrCacheValue(ATypeTag tag, IValueReference value) throws
IOException {
+ if (cacheFirst) {
+ cachedValues.add(new Pair<>(tag, value));
+ } else {
+ addValue(tag, value);
+ }
+ }
+
+ public void applyCache() throws HyracksDataException {
+ try {
+ for (Pair<Integer, Integer> level : cachedLevels) {
+ addLevel(level.getFirst(), level.getSecond());
+ }
+ cachedLevels.clear();
+ for (Pair<ATypeTag, IValueReference> value : cachedValues) {
+ addValue(value.getFirst(), value.getSecond());
+ }
+ cachedValues.clear();
} catch (IOException e) {
throw HyracksDataException.create(e);
}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/BooleanColumnValuesWriter.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/BooleanColumnValuesWriter.java
index 7d50cb1..1ab948b 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/BooleanColumnValuesWriter.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/BooleanColumnValuesWriter.java
@@ -31,8 +31,9 @@
public final class BooleanColumnValuesWriter extends
AbstractColumnValuesWriter {
private final ParquetRunLengthBitPackingHybridEncoder booleanWriter;
- public BooleanColumnValuesWriter(int columnIndex, int level, boolean
collection, boolean filtered) {
- super(columnIndex, level, collection, filtered);
+ public BooleanColumnValuesWriter(int columnIndex, int level, boolean
collection, boolean filtered,
+ boolean cacheFirst) {
+ super(columnIndex, level, collection, filtered, cacheFirst);
booleanWriter = new ParquetRunLengthBitPackingHybridEncoder(1);
}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnValuesWriterFactory.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnValuesWriterFactory.java
index 3d32a27..b9d68d8 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnValuesWriterFactory.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnValuesWriterFactory.java
@@ -26,9 +26,11 @@
public class ColumnValuesWriterFactory implements IColumnValuesWriterFactory {
private final Mutable<IColumnWriteMultiPageOp> multiPageOpRef;
+ private boolean cacheFirst;
- public ColumnValuesWriterFactory(Mutable<IColumnWriteMultiPageOp>
multiPageOpRef) {
+ public ColumnValuesWriterFactory(Mutable<IColumnWriteMultiPageOp>
multiPageOpRef, boolean cacheFirst) {
this.multiPageOpRef = multiPageOpRef;
+ this.cacheFirst = cacheFirst;
}
@Override
@@ -37,23 +39,27 @@
switch (typeTag) {
case MISSING:
case NULL:
- return new NullMissingColumnValuesWriter(columnIndex,
maxLevel, writeAlways, filtered);
+ return new NullMissingColumnValuesWriter(columnIndex,
maxLevel, writeAlways, filtered, cacheFirst);
case BOOLEAN:
- return new BooleanColumnValuesWriter(columnIndex, maxLevel,
writeAlways, filtered);
+ return new BooleanColumnValuesWriter(columnIndex, maxLevel,
writeAlways, filtered, cacheFirst);
case TINYINT:
case SMALLINT:
case INTEGER:
case BIGINT:
- return new LongColumnValuesWriter(multiPageOpRef, columnIndex,
maxLevel, writeAlways, filtered,
- typeTag);
+ return new LongColumnValuesWriter(multiPageOpRef, columnIndex,
maxLevel, writeAlways, filtered, typeTag,
+ cacheFirst);
case FLOAT:
- return new FloatColumnValuesWriter(multiPageOpRef,
columnIndex, maxLevel, writeAlways, filtered);
+ return new FloatColumnValuesWriter(multiPageOpRef,
columnIndex, maxLevel, writeAlways, filtered,
+ cacheFirst);
case DOUBLE:
- return new DoubleColumnValuesWriter(multiPageOpRef,
columnIndex, maxLevel, writeAlways, filtered);
+ return new DoubleColumnValuesWriter(multiPageOpRef,
columnIndex, maxLevel, writeAlways, filtered,
+ cacheFirst);
case STRING:
- return new StringColumnValuesWriter(multiPageOpRef,
columnIndex, maxLevel, writeAlways, filtered);
+ return new StringColumnValuesWriter(multiPageOpRef,
columnIndex, maxLevel, writeAlways, filtered,
+ cacheFirst);
case UUID:
- return new UUIDColumnValuesWriter(multiPageOpRef, columnIndex,
maxLevel, writeAlways, filtered);
+ return new UUIDColumnValuesWriter(multiPageOpRef, columnIndex,
maxLevel, writeAlways, filtered,
+ cacheFirst);
default:
throw new UnsupportedOperationException(typeTag + " is not
supported");
}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java
index 9e6f906..db844fb 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java
@@ -40,8 +40,8 @@
private final ParquetPlainFixedLengthValuesWriter doubleWriter;
public DoubleColumnValuesWriter(Mutable<IColumnWriteMultiPageOp>
multiPageOpRef, int columnIndex, int level,
- boolean collection, boolean filtered) {
- super(columnIndex, level, collection, filtered);
+ boolean collection, boolean filtered, boolean cacheFirst) {
+ super(columnIndex, level, collection, filtered, cacheFirst);
doubleWriter = new ParquetPlainFixedLengthValuesWriter(multiPageOpRef);
}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/FloatColumnValuesWriter.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/FloatColumnValuesWriter.java
index 39abcad..992a007 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/FloatColumnValuesWriter.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/FloatColumnValuesWriter.java
@@ -40,8 +40,8 @@
private final ParquetPlainFixedLengthValuesWriter floatWriter;
public FloatColumnValuesWriter(Mutable<IColumnWriteMultiPageOp>
multiPageOpRef, int columnIndex, int level,
- boolean collection, boolean filtered) {
- super(columnIndex, level, collection, filtered);
+ boolean collection, boolean filtered, boolean cacheFirst) {
+ super(columnIndex, level, collection, filtered, cacheFirst);
floatWriter = new ParquetPlainFixedLengthValuesWriter(multiPageOpRef);
}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java
index 516f56d..3cb6b18 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java
@@ -43,8 +43,8 @@
private final ATypeTag typeTag;
public LongColumnValuesWriter(Mutable<IColumnWriteMultiPageOp>
multiPageOpRef, int columnIndex, int level,
- boolean collection, boolean filtered, ATypeTag typeTag) {
- super(columnIndex, level, collection, filtered);
+ boolean collection, boolean filtered, ATypeTag typeTag, boolean
cacheFirst) {
+ super(columnIndex, level, collection, filtered, cacheFirst);
longWriter = filtered ? new
ParquetDeltaBinaryPackingValuesWriterForLong(multiPageOpRef)
: new ParquetPlainFixedLengthValuesWriter(multiPageOpRef);
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NullMissingColumnValuesWriter.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NullMissingColumnValuesWriter.java
index 2d9f5bf..e6f9780 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NullMissingColumnValuesWriter.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NullMissingColumnValuesWriter.java
@@ -33,15 +33,16 @@
private static final BytesInput EMPTY = BytesInput.empty();
private final RunLengthIntArray defLevelsIntArray;
- NullMissingColumnValuesWriter(int columnIndex, int level, boolean
collection, boolean filtered) {
- super(columnIndex, level, collection, filtered);
+ NullMissingColumnValuesWriter(int columnIndex, int level, boolean
collection, boolean filtered,
+ boolean cacheFirst) {
+ super(columnIndex, level, collection, filtered, cacheFirst);
defLevelsIntArray = new RunLengthIntArray();
}
@Override
- protected void addLevel(int level) throws HyracksDataException {
+ protected void addLevel(int level, int count) throws HyracksDataException {
defLevelsIntArray.add(level);
- super.addLevel(level);
+ super.addLevel(level, 0);
}
@Override
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java
index 5b1977f..074439c 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java
@@ -38,14 +38,14 @@
private final boolean skipLengthBytes;
public StringColumnValuesWriter(Mutable<IColumnWriteMultiPageOp>
multiPageOpRef, int columnIndex, int level,
- boolean collection, boolean filtered) {
+ boolean collection, boolean filtered, boolean cacheFirst) {
this(columnIndex, level, collection, filtered, true, filtered ? new
ParquetDeltaByteArrayWriter(multiPageOpRef)
- : new ParquetPlainVariableLengthValuesWriter(multiPageOpRef));
+ : new ParquetPlainVariableLengthValuesWriter(multiPageOpRef),
cacheFirst);
}
protected StringColumnValuesWriter(int columnIndex, int level, boolean
collection, boolean filtered,
- boolean skipLengthBytes, AbstractParquetValuesWriter stringWriter)
{
- super(columnIndex, level, collection, filtered);
+ boolean skipLengthBytes, AbstractParquetValuesWriter stringWriter,
boolean cacheFirst) {
+ super(columnIndex, level, collection, filtered, cacheFirst);
this.stringWriter = stringWriter;
this.skipLengthBytes = skipLengthBytes;
}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/UUIDColumnValuesWriter.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/UUIDColumnValuesWriter.java
index 9d4ff9a..2801407 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/UUIDColumnValuesWriter.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/UUIDColumnValuesWriter.java
@@ -28,9 +28,10 @@
final class UUIDColumnValuesWriter extends StringColumnValuesWriter {
public UUIDColumnValuesWriter(Mutable<IColumnWriteMultiPageOp>
multiPageOpRef, int columnIndex, int level,
- boolean collection, boolean filtered) {
+ boolean collection, boolean filtered, boolean cacheFirst) {
// UUID is always written without encoding
- super(columnIndex, level, collection, filtered, false, new
ParquetPlainFixedLengthValuesWriter(multiPageOpRef));
+ super(columnIndex, level, collection, filtered, false, new
ParquetPlainFixedLengthValuesWriter(multiPageOpRef),
+ cacheFirst);
}
@Override
diff --git
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
index 55097c5..55de467 100644
---
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
+++
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
@@ -120,7 +120,7 @@
protected FlushColumnMetadata prepareNewFile(int fileId) throws
HyracksDataException {
Mutable<IColumnWriteMultiPageOp> multiPageOpRef = new
MutableObject<>();
- IColumnValuesWriterFactory writerFactory = new
ColumnValuesWriterFactory(multiPageOpRef);
+ IColumnValuesWriterFactory writerFactory = new
ColumnValuesWriterFactory(multiPageOpRef, true);
FlushColumnMetadata columnMetadata = new
FlushColumnMetadata(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE, null,
Collections.emptyList(), null, writerFactory, multiPageOpRef);
IColumnWriteMultiPageOp multiPageOp = new
TestWriteMultiPageOp(dummyBufferCache, fileId);
diff --git
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/DummyColumnValuesWriter.java
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/DummyColumnValuesWriter.java
index d287417..d03ac23 100644
---
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/DummyColumnValuesWriter.java
+++
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/DummyColumnValuesWriter.java
@@ -152,4 +152,9 @@
public RunLengthIntArray getDefinitionLevelsIntArray() {
return definitionLevels;
}
+
+ @Override
+ public void applyCache() throws HyracksDataException {
+
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
index 6d26a45..f74426c 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
@@ -92,4 +92,11 @@
* Close the current writer and release all allocated temporary buffers
*/
public abstract void close();
+
+ public abstract void applyCache(ITupleReference tuple) throws
HyracksDataException;
+
+ public abstract void undo() throws HyracksDataException;
+
+ public abstract void redo() throws HyracksDataException;
+
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
index d9aec0c..1c10b2d 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
@@ -84,13 +84,16 @@
@Override
public void add(ITupleReference tuple) throws HyracksDataException {
+ columnWriter.writeTuple(tuple);
if (isFull(tuple)) {
+ columnWriter.undo();
writeFullLeafPage();
confiscateNewLeafPage();
+ columnWriter.redo();
}
//Save the key of the last inserted tuple
setMinMaxKeys(tuple);
- columnWriter.writeTuple(tuple);
+ columnWriter.applyCache(tuple);
tupleCount++;
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19128
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: goldfish
Gerrit-Change-Id: I693a2953cf2fcc8ff5aa3620bd772dc9e0fa7eec
Gerrit-Change-Number: 19128
Gerrit-PatchSet: 1
Gerrit-Owner: Peeyush Gupta <[email protected]>
Gerrit-MessageType: newchange