>From Wail Alkowaileet <[email protected]>:
Wail Alkowaileet has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17996 )
Change subject: [ASTERIXDB-3324][STO] Ensure PKs uniqueness on load
......................................................................
[ASTERIXDB-3324][STO] Ensure PKs uniqueness on load
Details:
On load, an exception should be thrown if duplicate PKs
were encountered.
Change-Id: I140589d92b142fb4661b4112a28424fa168684ef
---
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnManagerFactory.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
M
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManagerFactory.java
M
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java
M
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleWriter.java
5 files changed, 49 insertions(+), 7 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/96/17996/1
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManagerFactory.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManagerFactory.java
index cfd5143..2780f92 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManagerFactory.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManagerFactory.java
@@ -26,6 +26,7 @@
import
org.apache.asterix.column.operation.lsm.load.LoadColumnTupleReaderWriterFactory;
import
org.apache.asterix.column.operation.lsm.merge.MergeColumnTupleReaderWriterFactory;
import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IJsonSerializable;
import org.apache.hyracks.api.io.IPersistedResourceRegistry;
@@ -71,8 +72,10 @@
}
@Override
- public AbstractColumnTupleReaderWriterFactory
getLoadColumnTupleReaderWriterFactory() {
- return new LoadColumnTupleReaderWriterFactory(pageSize, maxTupleCount,
tolerance, maxLeafNodeSize);
+ public AbstractColumnTupleReaderWriterFactory
getLoadColumnTupleReaderWriterFactory(
+ IBinaryComparatorFactory[] cmpFactories) {
+ return new LoadColumnTupleReaderWriterFactory(pageSize, maxTupleCount,
tolerance, maxLeafNodeSize,
+ cmpFactories);
}
@Override
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java
index dec2ec3..7fc6fbd 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java
@@ -20,20 +20,24 @@
import org.apache.asterix.column.operation.lsm.flush.FlushColumnMetadata;
import
org.apache.asterix.column.operation.lsm.flush.FlushColumnTupleReaderWriterFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import
org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnMetadata;
+import org.apache.hyracks.storage.common.MultiComparator;
public class LoadColumnTupleReaderWriterFactory extends
FlushColumnTupleReaderWriterFactory {
private static final long serialVersionUID = -7583574057314353873L;
+ private final IBinaryComparatorFactory[] cmpFactories;
public LoadColumnTupleReaderWriterFactory(int pageSize, int
maxNumberOfTuples, double tolerance,
- int maxLeafNodeSize) {
+ int maxLeafNodeSize, IBinaryComparatorFactory[] cmpFactories) {
super(pageSize, maxNumberOfTuples, tolerance, maxLeafNodeSize);
+ this.cmpFactories = cmpFactories;
}
@Override
public AbstractColumnTupleWriter createColumnWriter(IColumnMetadata
columnMetadata) {
return new LoadColumnTupleWriter((FlushColumnMetadata) columnMetadata,
pageSize, maxNumberOfTuples, tolerance,
- maxLeafNodeSize);
+ maxLeafNodeSize, MultiComparator.create(cmpFactories));
}
}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleWriter.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleWriter.java
index e47b210..ca14000 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleWriter.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleWriter.java
@@ -20,17 +20,35 @@
import org.apache.asterix.column.operation.lsm.flush.FlushColumnMetadata;
import org.apache.asterix.column.operation.lsm.flush.FlushColumnTupleWriter;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import
org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
+import org.apache.hyracks.storage.common.MultiComparator;
public class LoadColumnTupleWriter extends FlushColumnTupleWriter {
+ private final PointableTupleReference prevTupleKeys;
+ private final MultiComparator comparator;
+
public LoadColumnTupleWriter(FlushColumnMetadata columnMetadata, int
pageSize, int maxNumberOfTuples,
- double tolerance, int maxLeafNodeSize) {
+ double tolerance, int maxLeafNodeSize, MultiComparator comparator)
{
super(columnMetadata, pageSize, maxNumberOfTuples, tolerance,
maxLeafNodeSize);
+ prevTupleKeys =
+
PointableTupleReference.create(columnMetadata.getNumberOfPrimaryKeys(),
ArrayBackedValueStorage::new);
+ this.comparator = comparator;
}
@Override
public void writeTuple(ITupleReference tuple) throws HyracksDataException {
+ ensureKeysUniqueness(tuple);
writeRecord(tuple);
}
+
+ private void ensureKeysUniqueness(ITupleReference tuple) throws
HyracksDataException {
+ if (prevTupleKeys.getFieldLength(0) > 0 &&
comparator.compare(prevTupleKeys, tuple) == 0) {
+ throw HyracksDataException.create(ErrorCode.DUPLICATE_LOAD_INPUT);
+ }
+ prevTupleKeys.set(tuple);
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnManagerFactory.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnManagerFactory.java
index a2dfbcf..d1b9946 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnManagerFactory.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnManagerFactory.java
@@ -20,6 +20,7 @@
import java.io.Serializable;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.io.IJsonSerializable;
import
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
@@ -31,8 +32,11 @@
/**
* Get column tuple reader/writer for the {@link LSMIOOperationType#LOAD}
+ *
+ * @param cmpFactories Primary keys comparators' factories
*/
- AbstractColumnTupleReaderWriterFactory
getLoadColumnTupleReaderWriterFactory();
+ AbstractColumnTupleReaderWriterFactory
getLoadColumnTupleReaderWriterFactory(
+ IBinaryComparatorFactory[] cmpFactories);
/**
* Get column tuple reader/writer for the {@link LSMIOOperationType#FLUSH}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
index 6ef0dc6..490fa2e 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
@@ -82,7 +82,7 @@
ITreeIndexFrameFactory mergeLeafFrameFactory = new
ColumnBTreeLeafFrameFactory(copyTupleWriterFactory,
columnManagerFactory.createMergeColumnTupleReaderWriterFactory());
ITreeIndexFrameFactory bulkLoadLeafFrameFactory = new
ColumnBTreeLeafFrameFactory(bulkLoadTupleWriterFactory,
- columnManagerFactory.getLoadColumnTupleReaderWriterFactory());
+
columnManagerFactory.getLoadColumnTupleReaderWriterFactory(cmpFactories));
ITreeIndexFrameFactory insertLeafFrameFactory = new
BTreeNSMLeafFrameFactory(insertTupleWriterFactory);
ITreeIndexFrameFactory deleteLeafFrameFactory = new
BTreeNSMLeafFrameFactory(deleteTupleWriterFactory);
ITreeIndexFrameFactory interiorFrameFactory = new
BTreeNSMInteriorFrameFactory(insertTupleWriterFactory);
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17996
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I140589d92b142fb4661b4112a28424fa168684ef
Gerrit-Change-Number: 17996
Gerrit-PatchSet: 1
Gerrit-Owner: Wail Alkowaileet <[email protected]>
Gerrit-MessageType: newchange