abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1523
Change subject: Cleanup and bug fixes in Feeds pipeline
......................................................................
Cleanup and bug fixes in Feeds pipeline
Change-Id: Ie97b2133ebecb7380cf0ba336e60ed714d06f8ee
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
M
asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
M
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/VSizeFrame.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
M
hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
M
hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/IPointable.java
M
hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/VoidPointable.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
R
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/ArrayValueReference.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/test/java/org/apache/hyracks/storage/am/common/frames/LIFOMetadataFrameTest.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilterManager.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
38 files changed, 498 insertions(+), 495 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/23/1523/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index d785cce..da93fb8 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.messaging;
-import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -32,7 +31,7 @@
public class CCMessageBroker implements ICCMessageBroker {
- private final static Logger LOGGER =
Logger.getLogger(CCMessageBroker.class.getName());
+ private static final Logger LOGGER =
Logger.getLogger(CCMessageBroker.class.getName());
private final ClusterControllerService ccs;
public CCMessageBroker(ClusterControllerService ccs) {
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index cbb4868..1a8ccae 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -39,6 +39,7 @@
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
import org.apache.hyracks.storage.am.common.api.IIndex;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
@@ -72,7 +73,7 @@
}
@Override
- public synchronized IIndex get(String resourcePath) throws
HyracksDataException {
+ public synchronized ILSMIndex get(String resourcePath) throws
HyracksDataException {
validateDatasetLifecycleManagerState();
int datasetID = getDIDfromResourcePath(resourcePath);
long resourceID = getResourceIDfromResourcePath(resourcePath);
@@ -80,7 +81,7 @@
}
@Override
- public synchronized IIndex getIndex(int datasetID, long resourceID) throws
HyracksDataException {
+ public synchronized ILSMIndex getIndex(int datasetID, long resourceID)
throws HyracksDataException {
validateDatasetLifecycleManagerState();
DatasetResource datasetResource = datasets.get(datasetID);
if (datasetResource == null) {
@@ -556,7 +557,7 @@
while (used + additionalSize > capacity) {
if (!evictCandidateDataset()) {
throw new HyracksDataException("Cannot allocate
dataset " + dsInfo.getDatasetID()
- + " memory since memory budget would be
exceeded.");
+ + " memory since memory budget would be exceeded.");
}
}
used += additionalSize;
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index 403d3cb..41e587d 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -81,7 +81,7 @@
return datasetVirtualBufferCaches;
}
- public IIndex getIndex(long resourceID) {
+ public ILSMIndex getIndex(long resourceID) {
IndexInfo iInfo = getIndexInfo(resourceID);
return (iInfo == null) ? null : iInfo.getIndex();
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
index b9d187d..001240b 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
@@ -37,7 +37,7 @@
public static HyracksDataException
suppressIntoHyracksDataException(HyracksDataException hde, Throwable th) {
if (hde == null) {
- return new HyracksDataException(th);
+ return (th instanceof HyracksDataException) ?
(HyracksDataException) th : new HyracksDataException(th);
} else {
hde.addSuppressed(th);
return hde;
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index f903b65..6fa45f0 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -25,7 +25,7 @@
import org.apache.hyracks.data.std.primitive.LongPointable;
import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
import org.apache.hyracks.storage.am.common.api.ITreeIndex;
-import
org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
+import org.apache.hyracks.storage.am.common.freepage.ArrayValueReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
@@ -33,7 +33,7 @@
// A single LSMIOOperationCallback per LSM index used to perform actions
around Flush and Merge operations
public abstract class AbstractLSMIOOperationCallback implements
ILSMIOOperationCallback {
- public static final MutableArrayValueReference LSN_KEY = new
MutableArrayValueReference("LSN".getBytes());
+ public static final ArrayValueReference LSN_KEY = new
ArrayValueReference("LSN".getBytes());
public static final long INVALID = -1L;
// First LSN per mutable component
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
index b977c4d..bbe2c4f 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
@@ -119,4 +119,8 @@
pointable.setLong(lsn);
index.getCurrentMemoryComponent().getMetadata().put(ComponentMetadataUtil.MARKER_LSN_KEY,
pointable);
}
+
+ public ILSMIndex getIndex() {
+ return index;
+ }
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index d03f9df..830a4ec 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -61,7 +61,7 @@
private boolean isFeed;
private FileSplit[] feedLogFileSplits;
private ARecordType metaType;
- private FeedLogManager feedLogManager = null;
+ private transient FeedLogManager feedLogManager;
@Override
public void setSnapshot(List<ExternalFile> files, boolean indexingOp) {
@@ -75,8 +75,7 @@
}
@Override
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint()
- throws HyracksDataException, AlgebricksException {
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint()
throws AlgebricksException {
return dataSourceFactory.getPartitionConstraint();
}
@@ -86,8 +85,8 @@
@Override
public synchronized IDataSourceAdapter createAdapter(IHyracksTaskContext
ctx, int partition)
throws HyracksDataException {
- IAppRuntimeContext runtimeCtx = (IAppRuntimeContext)
ctx.getJobletContext()
- .getApplicationContext().getApplicationObject();
+ IAppRuntimeContext runtimeCtx =
+ (IAppRuntimeContext)
ctx.getJobletContext().getApplicationContext().getApplicationObject();
try {
restoreExternalObjects(runtimeCtx.getLibraryManager());
} catch (Exception e) {
@@ -184,6 +183,7 @@
this.metaType = metaType;
}
+ @Override
public IExternalDataSourceFactory getDataSourceFactory() {
return dataSourceFactory;
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
index 3ea3bb1..5b8a101 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
@@ -40,7 +40,7 @@
*
* @return the display name
*/
- public String getAlias();
+ String getAlias();
/**
* Gets a list of partition constraints. A partition constraint can be a
@@ -54,10 +54,8 @@
* running on the node with the given IP address.
*
* @throws AlgebricksException
- * @throws HyracksDataException
*/
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint()
- throws HyracksDataException, AlgebricksException;
+ AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws
AlgebricksException;
/**
* Creates an instance of IDatasourceAdapter.
@@ -67,7 +65,7 @@
* @return An instance of IDatasourceAdapter.
* @throws Exception
*/
- public IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int
partition) throws HyracksDataException;
+ IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition)
throws HyracksDataException;
/**
* @param libraryManager
@@ -75,14 +73,16 @@
* @throws AlgebricksException
* @throws HyracksDataException
*/
- public void configure(ILibraryManager libraryManager, Map<String, String>
configuration)
+ void configure(ILibraryManager libraryManager, Map<String, String>
configuration)
throws HyracksDataException, AlgebricksException;
- public void setOutputType(ARecordType outputType);
+ void setOutputType(ARecordType outputType);
- public void setMetaType(ARecordType metaType);
+ void setMetaType(ARecordType metaType);
- public ARecordType getOutputType();
+ ARecordType getOutputType();
- public ARecordType getMetaType();
+ ARecordType getMetaType();
+
+ IExternalDataSourceFactory getDataSourceFactory();
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
index e2274b9..656f17b 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
@@ -54,7 +54,7 @@
* @throws AsterixException
*/
public AlgebricksAbsolutePartitionConstraint getPartitionConstraint()
- throws AlgebricksException, HyracksDataException;
+ throws AlgebricksException;
/**
* Configure the data parser factory. The passed map contains key value
pairs from the
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
index d85fe65..57e79c3 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
@@ -18,8 +18,6 @@
*/
package org.apache.asterix.external.dataflow;
-import java.io.IOException;
-
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordWithPKDataParser;
@@ -34,9 +32,9 @@
public ChangeFeedDataFlowController(final IHyracksTaskContext ctx, final
FeedTupleForwarder tupleForwarder,
final FeedLogManager feedLogManager, final int numOfOutputFields,
- final IRecordWithPKDataParser<T> dataParser, final
IRecordReader<T> recordReader, boolean sendMarker)
- throws HyracksDataException {
- super(ctx, tupleForwarder, feedLogManager, numOfOutputFields,
dataParser, recordReader, sendMarker);
+ final IRecordWithPKDataParser<T> dataParser, final
IRecordReader<T> recordReader)
+ throws HyracksDataException {
+ super(ctx, tupleForwarder, feedLogManager, numOfOutputFields,
dataParser, recordReader);
this.dataParser = dataParser;
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
index 4c88b0f..22fa8be 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
@@ -18,8 +18,6 @@
*/
package org.apache.asterix.external.dataflow;
-import java.io.IOException;
-
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordWithMetadataParser;
@@ -32,14 +30,14 @@
public ChangeFeedWithMetaDataFlowController(final IHyracksTaskContext ctx,
final FeedTupleForwarder tupleForwarder,
final FeedLogManager feedLogManager, final int numOfOutputFields,
- final IRecordWithMetadataParser<T> dataParser, final
IRecordReader<T> recordReader, boolean sendMarker)
- throws HyracksDataException {
- super(ctx, tupleForwarder, feedLogManager, numOfOutputFields,
dataParser, recordReader, sendMarker);
+ final IRecordWithMetadataParser<T> dataParser, final
IRecordReader<T> recordReader)
+ throws HyracksDataException {
+ super(ctx, tupleForwarder, feedLogManager, numOfOutputFields,
dataParser, recordReader);
}
@Override
protected void addPrimaryKeys(final ArrayTupleBuilder tb, final
IRawRecord<? extends T> record)
throws HyracksDataException {
- ((IRecordWithMetadataParser<T>)
dataParser).appendLastParsedPrimaryKeyToTuple(tb);
+ dataParser.appendLastParsedPrimaryKeyToTuple(tb);
}
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 2e687ba..73b9e52 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -19,71 +19,55 @@
package org.apache.asterix.external.dataflow;
import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.asterix.common.exceptions.ExceptionUtils;
-import org.apache.asterix.external.api.IFeedMarker;
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.util.HyracksConstants;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
-import org.apache.hyracks.dataflow.common.utils.TaskUtil;
import org.apache.log4j.Logger;
public class FeedRecordDataFlowController<T> extends
AbstractFeedDataFlowController {
private static final Logger LOGGER =
Logger.getLogger(FeedRecordDataFlowController.class.getName());
- protected final IRecordDataParser<T> dataParser;
- protected final IRecordReader<? extends T> recordReader;
+ private final IRecordDataParser<T> dataParser;
+ private final IRecordReader<? extends T> recordReader;
protected final AtomicBoolean closed = new AtomicBoolean(false);
protected static final long INTERVAL = 1000;
- protected final Object mutex = new Object();
- protected final boolean sendMarker;
protected boolean failed = false;
- private FeedRecordDataFlowController<T>.DataflowMarker dataflowMarker;
- private Future<?> dataflowMarkerResult;
public FeedRecordDataFlowController(IHyracksTaskContext ctx,
FeedTupleForwarder tupleForwarder,
FeedLogManager feedLogManager, int numOfOutputFields,
IRecordDataParser<T> dataParser,
- IRecordReader<T> recordReader, boolean sendMarker) throws
HyracksDataException {
+ IRecordReader<T> recordReader) throws HyracksDataException {
super(ctx, tupleForwarder, feedLogManager, numOfOutputFields);
this.dataParser = dataParser;
this.recordReader = recordReader;
- this.sendMarker = sendMarker;
recordReader.setFeedLogManager(feedLogManager);
recordReader.setController(this);
}
@Override
public void start(IFrameWriter writer) throws HyracksDataException {
- startDataflowMarker();
HyracksDataException hde = null;
try {
failed = false;
tupleForwarder.initialize(ctx, writer);
while (recordReader.hasNext()) {
- // synchronized on mutex before we call next() so we don't a
marker before its record
- synchronized (mutex) {
- IRawRecord<? extends T> record = recordReader.next();
- if (record == null) {
- flush();
- mutex.wait(INTERVAL);
- continue;
+ IRawRecord<? extends T> record = recordReader.next();
+ if (record == null) {
+ flush();
+ synchronized (this) {
+ wait(INTERVAL);
}
- tb.reset();
- parseAndForward(record);
+ continue;
}
+ tb.reset();
+ parseAndForward(record);
}
} catch (InterruptedException e) {
//TODO: Find out what could cause an interrupted exception beside
termination of a job/feed
@@ -95,7 +79,6 @@
LOGGER.warn("Failure while operating a feed source", e);
throw new HyracksDataException(e);
}
- stopDataflowMarker();
try {
tupleForwarder.close();
} catch (Throwable th) {
@@ -108,9 +91,6 @@
hde = ExceptionUtils.suppressIntoHyracksDataException(hde, th);
} finally {
closeSignal();
- if (sendMarker && dataflowMarkerResult != null) {
- dataflowMarkerResult.cancel(true);
- }
}
if (hde != null) {
throw hde;
@@ -118,41 +98,24 @@
}
private void parseAndForward(IRawRecord<? extends T> record) throws
IOException {
- synchronized (dataParser) {
- try {
- dataParser.parse(record, tb.getDataOutput());
- } catch (Exception e) {
- LOGGER.warn(ExternalDataConstants.ERROR_PARSE_RECORD, e);
- feedLogManager.logRecord(record.toString(),
ExternalDataConstants.ERROR_PARSE_RECORD);
- // continue the outer loop
- return;
- }
- tb.addFieldEndOffset();
- addMetaPart(tb, record);
- addPrimaryKeys(tb, record);
- tupleForwarder.addTuple(tb);
+ try {
+ dataParser.parse(record, tb.getDataOutput());
+ } catch (Exception e) {
+ LOGGER.warn(ExternalDataConstants.ERROR_PARSE_RECORD, e);
+ feedLogManager.logRecord(record.toString(),
ExternalDataConstants.ERROR_PARSE_RECORD);
+ // continue the outer loop
+ return;
}
+ tb.addFieldEndOffset();
+ addMetaPart(tb, record);
+ addPrimaryKeys(tb, record);
+ tupleForwarder.addTuple(tb);
}
protected void addMetaPart(ArrayTupleBuilder tb, IRawRecord<? extends T>
record) throws IOException {
}
protected void addPrimaryKeys(ArrayTupleBuilder tb, IRawRecord<? extends
T> record) throws IOException {
- }
-
- private void startDataflowMarker() {
- ExecutorService executorService = sendMarker ?
Executors.newSingleThreadExecutor() : null;
- if (sendMarker && dataflowMarker == null) {
- dataflowMarker = new
DataflowMarker(recordReader.getProgressReporter(),
- TaskUtil.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE,
ctx));
- dataflowMarkerResult = executorService.submit(dataflowMarker);
- }
- }
-
- private void stopDataflowMarker() {
- if (dataflowMarker != null) {
- dataflowMarker.stop();
- }
}
private void closeSignal() {
@@ -172,7 +135,6 @@
@Override
public boolean stop() throws HyracksDataException {
- stopDataflowMarker();
HyracksDataException hde = null;
if (recordReader.stop()) {
if (failed) {
@@ -208,52 +170,11 @@
return recordReader.handleException(th);
}
- private class DataflowMarker implements Runnable {
- private final IFeedMarker marker;
- private final VSizeFrame mark;
- private volatile boolean stopped = false;
+ public IRecordReader<? extends T> getReader() {
+ return recordReader;
+ }
- public DataflowMarker(IFeedMarker marker, VSizeFrame mark) {
- this.marker = marker;
- this.mark = mark;
- }
-
- public synchronized void stop() {
- stopped = true;
- notify();
- }
-
- @Override
- public void run() {
- try {
- while (true) {
- synchronized (this) {
- if (!stopped) {
- // TODO (amoudi): find a better reactive way to do
this
- // sleep for two seconds
- wait(TimeUnit.SECONDS.toMillis(2));
- } else {
- break;
- }
- }
- synchronized (mutex) {
- if (marker.mark(mark)) {
- // broadcast
- tupleForwarder.flush();
- // clear
- mark.getBuffer().clear();
-
mark.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
- mark.getBuffer().flip();
- }
- }
- }
- } catch (InterruptedException e) {
- LOGGER.warn("Marker stopped", e);
- Thread.currentThread().interrupt();
- return;
- } catch (Exception e) {
- LOGGER.warn("Marker stopped", e);
- }
- }
+ public IRecordDataParser<T> getParser() {
+ return dataParser;
}
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
index d31e074..4177ea6 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
@@ -28,12 +28,9 @@
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.util.HyracksConstants;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
-import org.apache.hyracks.dataflow.common.utils.TaskUtil;
public class FeedTupleForwarder implements ITupleForwarder {
@@ -58,11 +55,6 @@
this.frame = new VSizeFrame(ctx);
this.writer = writer;
this.appender = new FrameTupleAppender(frame);
- // Set null feed message
- VSizeFrame message = TaskUtil.<VSizeFrame>
get(HyracksConstants.KEY_MESSAGE, ctx);
- // a null message
-
message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
- message.getBuffer().flip();
initialized = true;
}
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
index 45ae52b..c7f6d9c 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
@@ -28,15 +28,18 @@
public class FeedWithMetaDataFlowController<T> extends
FeedRecordDataFlowController<T> {
+ protected final IRecordWithMetadataParser<T> dataParser;
+
public FeedWithMetaDataFlowController(IHyracksTaskContext ctx,
FeedTupleForwarder tupleForwarder,
FeedLogManager feedLogManager, int numOfOutputFields,
IRecordWithMetadataParser<T> dataParser,
- IRecordReader<T> recordReader, boolean sendMarker) throws
HyracksDataException {
- super(ctx, tupleForwarder, feedLogManager, numOfOutputFields,
dataParser, recordReader, sendMarker);
+ IRecordReader<T> recordReader) throws HyracksDataException {
+ super(ctx, tupleForwarder, feedLogManager, numOfOutputFields,
dataParser, recordReader);
+ this.dataParser = dataParser;
}
@Override
protected void addMetaPart(ArrayTupleBuilder tb, IRawRecord<? extends T>
record) throws HyracksDataException {
- ((IRecordWithMetadataParser<T>)
dataParser).parseMeta(tb.getDataOutput());
+ dataParser.parseMeta(tb.getDataOutput());
tb.addFieldEndOffset();
}
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
index 4649559..9b23e38 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
@@ -52,8 +52,7 @@
}
@Override
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint()
- throws HyracksDataException, AlgebricksException {
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint()
throws AlgebricksException {
return streamFactory.getPartitionConstraint();
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 99fff19..546946a 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -27,11 +27,13 @@
import org.apache.asterix.external.dataset.adapter.FeedAdapter;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager;
+import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.HyracksConstants;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
import org.apache.hyracks.dataflow.common.utils.TaskUtil;
/**
@@ -63,7 +65,12 @@
Thread.currentThread().setName("Intake Thread");
FeedAdapter adapter = (FeedAdapter)
adapterFactory.createAdapter(ctx, partition);
adapterRuntimeManager = new AdapterRuntimeManager(ctx,
runtimeId.getEntityId(), adapter, writer, partition);
- TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, new
VSizeFrame(ctx), ctx);
+ IFrame message = new VSizeFrame(ctx);
+ TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, message,
ctx);
+ // Set null feed message
+ // a null message
+
message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
+ message.getBuffer().flip();
adapterRuntimeManager.start();
synchronized (adapterRuntimeManager) {
while (!adapterRuntimeManager.isDone()) {
@@ -82,7 +89,7 @@
*/
throw new HyracksDataException(ie);
} finally {
- writer.close();
+ writer.close();
}
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index a369fe3..94bcdce 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -61,7 +61,7 @@
public static IDataFlowController getDataflowController(ARecordType
recordType, IHyracksTaskContext ctx,
int partition, IExternalDataSourceFactory dataSourceFactory,
IDataParserFactory dataParserFactory,
Map<String, String> configuration, boolean indexingOp, boolean
isFeed, FeedLogManager feedLogManager)
- throws HyracksDataException {
+ throws HyracksDataException {
try {
switch (dataSourceFactory.getDataSourceType()) {
case RECORDS:
@@ -69,7 +69,6 @@
IRecordReader<?> recordReader =
recordReaderFactory.createRecordReader(ctx, partition);
IRecordDataParserFactory<?> recordParserFactory =
(IRecordDataParserFactory<?>) dataParserFactory;
IRecordDataParser<?> dataParser =
recordParserFactory.createRecordParser(ctx);
- boolean sendMarker =
ExternalDataUtils.isSendMarker(configuration);
if (indexingOp) {
return new IndexingDataFlowController(ctx,
DataflowUtils.getTupleForwarder(configuration,
feedLogManager), dataParser,
@@ -83,19 +82,18 @@
if (isChangeFeed) {
int numOfKeys =
ExternalDataUtils.getNumberOfKeys(configuration);
return new
ChangeFeedWithMetaDataFlowController(ctx, tupleForwarder, feedLogManager,
- numOfKeys + 2,
(IRecordWithMetadataParser) dataParser, recordReader,
- sendMarker);
+ numOfKeys + 2,
(IRecordWithMetadataParser) dataParser, recordReader);
} else {
return new FeedWithMetaDataFlowController(ctx,
tupleForwarder, feedLogManager, 2,
- (IRecordWithMetadataParser)
dataParser, recordReader, sendMarker);
+ (IRecordWithMetadataParser)
dataParser, recordReader);
}
} else if (isChangeFeed) {
int numOfKeys =
ExternalDataUtils.getNumberOfKeys(configuration);
return new ChangeFeedDataFlowController(ctx,
tupleForwarder, feedLogManager, numOfKeys + 1,
- (IRecordWithPKDataParser) dataParser,
recordReader, sendMarker);
+ (IRecordWithPKDataParser) dataParser,
recordReader);
} else {
return new FeedRecordDataFlowController(ctx,
tupleForwarder, feedLogManager, 1, dataParser,
- recordReader, sendMarker);
+ recordReader);
}
} else {
return new RecordDataFlowController(ctx,
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index a89d13e..3b6e7ff 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -41,8 +41,6 @@
public static final String KEY_FILESYSTEM = "fs";
// specifies the address of the HDFS name node
public static final String KEY_HADOOP_FILESYSTEM_URI = "fs.defaultFS";
- // specifies whether a feed sends progress markers or not
- public static final String KEY_SEND_MARKER = "send-marker";
// specifies the class implementation of the accessed instance of HDFS
public static final String KEY_HADOOP_FILESYSTEM_CLASS = "fs.hdfs.impl";
public static final String KEY_HADOOP_INPUT_DIR = "mapred.input.dir";
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index d009960..88d00d0 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -162,7 +162,7 @@
private static Map<ATypeTag, IValueParserFactory> valueParserFactoryMap =
initializeValueParserFactoryMap();
private static Map<ATypeTag, IValueParserFactory>
initializeValueParserFactoryMap() {
- Map<ATypeTag, IValueParserFactory> m = new HashMap<ATypeTag,
IValueParserFactory>();
+ Map<ATypeTag, IValueParserFactory> m = new HashMap<>();
m.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE);
m.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
m.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
@@ -338,9 +338,5 @@
intIndicators[i] = Integer.parseInt(stringIndicators[i]);
}
return intIndicators;
- }
-
- public static boolean isSendMarker(Map<String, String> configuration) {
- return
Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_SEND_MARKER));
}
}
diff --git
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index d286ff9..f796a73 100644
---
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -81,8 +81,8 @@
IPropertiesProvider propertiesProvider =
(IPropertiesProvider) ((NodeControllerService) ctx
.getJobletContext().getApplicationContext().getControllerService())
- .getApplicationContext()
- .getApplicationObject();
+ .getApplicationContext()
+ .getApplicationObject();
ClusterPartition nodePartition =
propertiesProvider.getMetadataProperties().getNodePartitions()
.get(nodeId)[0];
parser = new ADMDataParser(outputType, true);
@@ -145,4 +145,9 @@
return null;
}
+ @Override
+ public IExternalDataSourceFactory getDataSourceFactory() {
+ return null;
+ }
+
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index f5c6d9a..8f80416 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -404,7 +404,7 @@
protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
buildLoadableDatasetScan(
JobSpecification jobSpec, IAdapterFactory adapterFactory,
RecordDescriptor rDesc)
- throws AlgebricksException {
+ throws AlgebricksException {
ExternalScanOperatorDescriptor dataScanner =
new ExternalScanOperatorDescriptor(jobSpec, rDesc,
adapterFactory);
AlgebricksPartitionConstraint constraint;
@@ -447,7 +447,7 @@
break;
case EXTERNAL:
String libraryName = primaryFeed.getAdapterName().trim()
-
.split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[0];
+
.split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[0];
feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec,
primaryFeed, libraryName,
adapterFactory.getClass().getName(), recordType,
policyAccessor, factoryOutput.second);
break;
@@ -754,36 +754,36 @@
String indexName = primaryIndex.getIndexName();
ARecordType metaType = dataset.hasMetaPart()
? (ARecordType)
findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName())
- : null;
- String itemTypeName = dataset.getItemTypeName();
- ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
- .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(),
itemTypeName).getDatatype();
- ITypeTraits[] typeTraits =
DatasetUtil.computeTupleTypeTraits(dataset, itemType, null);
- IBinaryComparatorFactory[] comparatorFactories =
DatasetUtil.computeKeysBinaryComparatorFactories(dataset,
- itemType, metaType,
context.getBinaryComparatorFactoryProvider());
+ : null;
+ String itemTypeName = dataset.getItemTypeName();
+ ARecordType itemType = (ARecordType)
MetadataManager.INSTANCE
+ .getDatatype(mdTxnCtx,
dataset.getItemTypeDataverseName(), itemTypeName).getDatatype();
+ ITypeTraits[] typeTraits =
DatasetUtil.computeTupleTypeTraits(dataset, itemType, null);
+ IBinaryComparatorFactory[] comparatorFactories =
DatasetUtil.computeKeysBinaryComparatorFactories(dataset,
+ itemType, metaType,
context.getBinaryComparatorFactoryProvider());
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint>
splitsAndConstraint =
-
getSplitProviderAndConstraints(dataSource.getId().getDataverseName(),
datasetName, indexName,
- temp);
- IApplicationContextInfo appContext = (IApplicationContextInfo)
context.getAppContext();
- long numElementsHint = getCardinalityPerPartitionHint(dataset);
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint>
splitsAndConstraint =
+
getSplitProviderAndConstraints(dataSource.getId().getDataverseName(),
datasetName, indexName,
+ temp);
+ IApplicationContextInfo appContext =
(IApplicationContextInfo) context.getAppContext();
+ long numElementsHint =
getCardinalityPerPartitionHint(dataset);
- // TODO
- // figure out the right behavior of the bulkload and then give the
- // right callback
- // (ex. what's the expected behavior when there is an error during
- // bulkload?)
- Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
- DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
- TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad =
- new TreeIndexBulkLoadOperatorDescriptor(spec, null,
appContext.getStorageManager(),
- appContext.getIndexLifecycleManagerProvider(),
splitsAndConstraint.first, typeTraits,
- comparatorFactories, bloomFilterKeyFields,
fieldPermutation,
- GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false,
- numElementsHint, true,
dataset.getIndexDataflowHelperFactory(this, primaryIndex, itemType,
- metaType, compactionInfo.first,
compactionInfo.second),
- metadataPageManagerFactory);
- return new Pair<>(btreeBulkLoad, splitsAndConstraint.second);
+ // TODO
+ // figure out the right behavior of the bulkload and then
give the
+ // right callback
+ // (ex. what's the expected behavior when there is an
error during
+ // bulkload?)
+ Pair<ILSMMergePolicyFactory, Map<String, String>>
compactionInfo =
+ DatasetUtil.getMergePolicyFactory(dataset,
mdTxnCtx);
+ TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad =
+ new TreeIndexBulkLoadOperatorDescriptor(spec,
null, appContext.getStorageManager(),
+
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first,
typeTraits,
+ comparatorFactories, bloomFilterKeyFields,
fieldPermutation,
+ GlobalConfig.DEFAULT_TREE_FILL_FACTOR,
false,
+ numElementsHint, true,
dataset.getIndexDataflowHelperFactory(this, primaryIndex, itemType,
+ metaType, compactionInfo.first,
compactionInfo.second),
+ metadataPageManagerFactory);
+ return new Pair<>(btreeBulkLoad,
splitsAndConstraint.second);
} catch (MetadataException me) {
throw new AlgebricksException(me);
}
@@ -826,7 +826,7 @@
IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv,
List<LogicalVariable> primaryKeys,
List<LogicalVariable> secondaryKeys, List<LogicalVariable>
additionalNonKeyFields,
ILogicalExpression filterExpr, RecordDescriptor recordDesc,
JobGenContext context, JobSpecification spec)
- throws AlgebricksException {
+ throws AlgebricksException {
return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.DELETE,
dataSourceIndex, propagatedSchema,
inputSchemas, typeEnv, primaryKeys, secondaryKeys,
additionalNonKeyFields, filterExpr, recordDesc,
context, spec, false, null, null);
@@ -986,7 +986,7 @@
public Pair<IFileSplitProvider, AlgebricksPartitionConstraint>
splitProviderAndPartitionConstraintsForFilesIndex(
String dataverseName, String datasetName, String targetIdxName,
boolean create)
- throws AlgebricksException {
+ throws AlgebricksException {
return
SplitsAndConstraintsUtil.getFilesIndexSplitProviderAndConstraints(mdTxnCtx,
dataverseName, datasetName,
targetIdxName, create);
}
@@ -1004,9 +1004,9 @@
if (dataset.hasMetaPart()) {
metaType =
(ARecordType) MetadataManager.INSTANCE
-
.getDatatype(metadataProvider.getMetadataTxnContext(),
-
dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName())
- .getDatatype();
+ .getDatatype(metadataProvider.getMetadataTxnContext(),
+ dataset.getMetaItemTypeDataverseName(),
dataset.getMetaItemTypeName())
+ .getDatatype();
}
ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails)
dataset.getDatasetDetails();
LookupAdapterFactory<?> adapterFactory =
@@ -1117,64 +1117,64 @@
IModificationOperationCallbackFactory modificationCallbackFactory
= temp
? new
TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
primaryKeyFields, txnSubsystemProvider,
IndexOperation.UPSERT, ResourceType.LSM_BTREE)
- : new UpsertOperationCallbackFactory(jobId, datasetId,
primaryKeyFields, txnSubsystemProvider,
- IndexOperation.UPSERT, ResourceType.LSM_BTREE,
dataset.hasMetaPart());
+ : new UpsertOperationCallbackFactory(jobId,
datasetId, primaryKeyFields, txnSubsystemProvider,
+ IndexOperation.UPSERT,
ResourceType.LSM_BTREE, dataset.hasMetaPart());
- LockThenSearchOperationCallbackFactory searchCallbackFactory = new
LockThenSearchOperationCallbackFactory(
- jobId, datasetId, primaryKeyFields, txnSubsystemProvider,
ResourceType.LSM_BTREE);
+ LockThenSearchOperationCallbackFactory
searchCallbackFactory = new LockThenSearchOperationCallbackFactory(
+ jobId, datasetId, primaryKeyFields,
txnSubsystemProvider, ResourceType.LSM_BTREE);
- Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
- DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
- IIndexDataflowHelperFactory idfh =
dataset.getIndexDataflowHelperFactory(this, primaryIndex, itemType,
- metaItemType, compactionInfo.first, compactionInfo.second);
- LSMTreeUpsertOperatorDescriptor op;
+ Pair<ILSMMergePolicyFactory, Map<String, String>>
compactionInfo =
+ DatasetUtil.getMergePolicyFactory(dataset,
mdTxnCtx);
+ IIndexDataflowHelperFactory idfh =
dataset.getIndexDataflowHelperFactory(this, primaryIndex, itemType,
+ metaItemType, compactionInfo.first,
compactionInfo.second);
+ LSMTreeUpsertOperatorDescriptor op;
- ITypeTraits[] outputTypeTraits =
- new ITypeTraits[recordDesc.getFieldCount() +
(dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
- ISerializerDeserializer[] outputSerDes = new
ISerializerDeserializer[recordDesc.getFieldCount()
- + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+ ITypeTraits[] outputTypeTraits =
+ new ITypeTraits[recordDesc.getFieldCount() +
(dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+ ISerializerDeserializer[] outputSerDes = new
ISerializerDeserializer[recordDesc.getFieldCount()
+
+ (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
- // add the previous record first
- int f = 0;
- outputSerDes[f] =
FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType);
- f++;
- // add the previous meta second
- if (dataset.hasMetaPart()) {
- outputSerDes[f] =
-
FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(metaItemType);
- outputTypeTraits[f] =
FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType);
- f++;
- }
- // add the previous filter third
- int fieldIdx = -1;
- if (numFilterFields > 0) {
- String filterField =
DatasetUtil.getFilterField(dataset).get(0);
- for (i = 0; i < itemType.getFieldNames().length; i++) {
- if (itemType.getFieldNames()[i].equals(filterField)) {
- break;
+ // add the previous record first
+ int f = 0;
+ outputSerDes[f] =
FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType);
+ f++;
+ // add the previous meta second
+ if (dataset.hasMetaPart()) {
+ outputSerDes[f] =
+
FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(metaItemType);
+ outputTypeTraits[f] =
FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType);
+ f++;
}
- }
- fieldIdx = i;
- outputTypeTraits[f] =
FormatUtils.getDefaultFormat().getTypeTraitProvider()
- .getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
- outputSerDes[f] =
FormatUtils.getDefaultFormat().getSerdeProvider()
-
.getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
- f++;
- }
- for (int j = 0; j < recordDesc.getFieldCount(); j++) {
- outputTypeTraits[j + f] = recordDesc.getTypeTraits()[j];
- outputSerDes[j + f] = recordDesc.getFields()[j];
- }
+ // add the previous filter third
+ int fieldIdx = -1;
+ if (numFilterFields > 0) {
+ String filterField =
DatasetUtil.getFilterField(dataset).get(0);
+ for (i = 0; i < itemType.getFieldNames().length; i++) {
+ if
(itemType.getFieldNames()[i].equals(filterField)) {
+ break;
+ }
+ }
+ fieldIdx = i;
+ outputTypeTraits[f] =
FormatUtils.getDefaultFormat().getTypeTraitProvider()
+
.getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
+ outputSerDes[f] =
FormatUtils.getDefaultFormat().getSerdeProvider()
+
.getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
+ f++;
+ }
+ for (int j = 0; j < recordDesc.getFieldCount(); j++) {
+ outputTypeTraits[j + f] =
recordDesc.getTypeTraits()[j];
+ outputSerDes[j + f] = recordDesc.getFields()[j];
+ }
- RecordDescriptor outputRecordDesc = new
RecordDescriptor(outputSerDes, outputTypeTraits);
- op = new LSMTreeUpsertOperatorDescriptor(spec, outputRecordDesc,
appContext.getStorageManager(),
- appContext.getIndexLifecycleManagerProvider(),
splitsAndConstraint.first, typeTraits,
- comparatorFactories, bloomFilterKeyFields,
fieldPermutation, idfh, null, true, indexName,
- context.getMissingWriterFactory(),
modificationCallbackFactory, searchCallbackFactory, null,
- metadataPageManagerFactory);
- op.setType(itemType);
- op.setFilterIndex(fieldIdx);
- return new Pair<>(op, splitsAndConstraint.second);
+ RecordDescriptor outputRecordDesc = new
RecordDescriptor(outputSerDes, outputTypeTraits);
+ op = new LSMTreeUpsertOperatorDescriptor(spec,
outputRecordDesc, appContext.getStorageManager(),
+ appContext.getIndexLifecycleManagerProvider(),
splitsAndConstraint.first, typeTraits,
+ comparatorFactories, bloomFilterKeyFields,
fieldPermutation, idfh, null, true, indexName,
+ context.getMissingWriterFactory(),
modificationCallbackFactory, searchCallbackFactory, null,
+ metadataPageManagerFactory);
+ op.setType(itemType);
+ op.setFilterIndex(fieldIdx);
+ return new Pair<>(op, splitsAndConstraint.second);
} catch (MetadataException me) {
throw new AlgebricksException(me);
@@ -1183,7 +1183,7 @@
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
buildExternalDatasetDataScannerRuntime(
JobSpecification jobSpec, IAType itemType, IAdapterFactory
adapterFactory, IDataFormat format)
- throws AlgebricksException {
+ throws AlgebricksException {
if (itemType.getTypeTag() != ATypeTag.RECORD) {
throw new AlgebricksException("Can only scan datasets of
records.");
}
@@ -1233,7 +1233,7 @@
switch (dsType) {
case INTERNAL:
keyType = (hasMeta &&
primaryIndexKeyIndicators.get(j).intValue() == 1)
- ?
metaType.getSubFieldType(pidxKeyFieldNames.get(j))
+ ? metaType.getSubFieldType(pidxKeyFieldNames.get(j))
:
recType.getSubFieldType(pidxKeyFieldNames.get(j));
break;
case EXTERNAL:
@@ -1269,7 +1269,7 @@
int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0
: 1;
// Move key fields to front.
int[] fieldPermutation = new int[numKeys + 1 + numFilterFields
- + (additionalNonFilteringFields == null ? 0 :
additionalNonFilteringFields.size())];
+ + (additionalNonFilteringFields ==
null ? 0 : additionalNonFilteringFields.size())];
int[] bloomFilterKeyFields = new int[numKeys];
int i = 0;
for (LogicalVariable varKey : keys) {
@@ -1317,31 +1317,31 @@
? new
TempDatasetPrimaryIndexModificationOperationCallbackFactory(
((JobEventListenerFactory)
spec.getJobletEventListenerFactory()).getJobId(), datasetId,
primaryKeyFields, txnSubsystemProvider, indexOp,
ResourceType.LSM_BTREE)
- : new PrimaryIndexModificationOperationCallbackFactory(
- ((JobEventListenerFactory)
spec.getJobletEventListenerFactory()).getJobId(), datasetId,
- primaryKeyFields, txnSubsystemProvider, indexOp,
ResourceType.LSM_BTREE,
- dataset.hasMetaPart());
+ : new
PrimaryIndexModificationOperationCallbackFactory(
+ ((JobEventListenerFactory)
spec.getJobletEventListenerFactory()).getJobId(), datasetId,
+ primaryKeyFields, txnSubsystemProvider,
indexOp, ResourceType.LSM_BTREE,
+ dataset.hasMetaPart());
- Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
- DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
- IIndexDataflowHelperFactory idfh =
dataset.getIndexDataflowHelperFactory(this, primaryIndex, itemType,
- metaItemType, compactionInfo.first, compactionInfo.second);
- IOperatorDescriptor op;
- if (bulkload) {
- long numElementsHint = getCardinalityPerPartitionHint(dataset);
- op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc,
appContext.getStorageManager(),
- appContext.getIndexLifecycleManagerProvider(),
splitsAndConstraint.first, typeTraits,
- comparatorFactories, bloomFilterKeyFields,
fieldPermutation,
- GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true,
numElementsHint, true, idfh,
- metadataPageManagerFactory);
- } else {
- op = new LSMTreeInsertDeleteOperatorDescriptor(spec,
recordDesc, appContext.getStorageManager(),
- appContext.getIndexLifecycleManagerProvider(),
splitsAndConstraint.first, typeTraits,
- comparatorFactories, bloomFilterKeyFields,
fieldPermutation, indexOp, idfh, null, true,
- indexName, null, modificationCallbackFactory,
NoOpOperationCallbackFactory.INSTANCE,
- metadataPageManagerFactory);
- }
- return new Pair<>(op, splitsAndConstraint.second);
+ Pair<ILSMMergePolicyFactory, Map<String, String>>
compactionInfo =
+ DatasetUtil.getMergePolicyFactory(dataset,
mdTxnCtx);
+ IIndexDataflowHelperFactory idfh =
dataset.getIndexDataflowHelperFactory(this, primaryIndex, itemType,
+ metaItemType, compactionInfo.first,
compactionInfo.second);
+ IOperatorDescriptor op;
+ if (bulkload) {
+ long numElementsHint =
getCardinalityPerPartitionHint(dataset);
+ op = new
TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc,
appContext.getStorageManager(),
+
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first,
typeTraits,
+ comparatorFactories,
bloomFilterKeyFields, fieldPermutation,
+ GlobalConfig.DEFAULT_TREE_FILL_FACTOR,
true, numElementsHint, true, idfh,
+ metadataPageManagerFactory);
+ } else {
+ op = new
LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
appContext.getStorageManager(),
+
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first,
typeTraits,
+ comparatorFactories,
bloomFilterKeyFields, fieldPermutation, indexOp, idfh, null, true,
+ indexName, null,
modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE,
+ metadataPageManagerFactory);
+ }
+ return new Pair<>(op, splitsAndConstraint.second);
} catch (MetadataException me) {
throw new AlgebricksException(me);
}
@@ -1465,86 +1465,86 @@
ARecordType metaType = dataset.hasMetaPart()
? (ARecordType)
MetadataManager.INSTANCE.getDatatype(mdTxnCtx,
dataset.getMetaItemTypeDataverseName(),
dataset.getMetaItemTypeName()).getDatatype()
- : null;
+ : null;
- // Index parameters.
- Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx,
dataset.getDataverseName(),
- dataset.getDatasetName(), indexName);
+ // Index parameters.
+ Index secondaryIndex =
MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+ dataset.getDatasetName(), indexName);
- ITypeTraits[] filterTypeTraits =
DatasetUtil.computeFilterTypeTraits(dataset, itemType);
- int[] filterFields;
- int[] btreeFields;
- if (filterTypeTraits != null) {
- filterFields = new int[1];
- filterFields[0] = numKeys;
- btreeFields = new int[numKeys];
- for (int k = 0; k < btreeFields.length; k++) {
- btreeFields[k] = k;
- }
- }
+ ITypeTraits[] filterTypeTraits =
DatasetUtil.computeFilterTypeTraits(dataset, itemType);
+ int[] filterFields;
+ int[] btreeFields;
+ if (filterTypeTraits != null) {
+ filterFields = new int[1];
+ filterFields[0] = numKeys;
+ btreeFields = new int[numKeys];
+ for (int k = 0; k < btreeFields.length; k++) {
+ btreeFields[k] = k;
+ }
+ }
- List<List<String>> secondaryKeyNames =
secondaryIndex.getKeyFieldNames();
- List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
- ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
- IBinaryComparatorFactory[] comparatorFactories = new
IBinaryComparatorFactory[numKeys];
- for (i = 0; i < secondaryKeys.size(); ++i) {
- Pair<IAType, Boolean> keyPairType =
Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i),
- secondaryKeyNames.get(i), itemType);
- IAType keyType = keyPairType.first;
- comparatorFactories[i] =
-
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
true);
- typeTraits[i] =
TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- }
- List<List<String>> partitioningKeys =
DatasetUtil.getPartitioningKeys(dataset);
- for (List<String> partitioningKey : partitioningKeys) {
- IAType keyType = itemType.getSubFieldType(partitioningKey);
- comparatorFactories[i] =
-
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
true);
- typeTraits[i] =
TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- ++i;
- }
+ List<List<String>> secondaryKeyNames =
secondaryIndex.getKeyFieldNames();
+ List<IAType> secondaryKeyTypes =
secondaryIndex.getKeyFieldTypes();
+ ITypeTraits[] typeTraits = new
ITypeTraits[numKeys];
+ IBinaryComparatorFactory[] comparatorFactories =
new IBinaryComparatorFactory[numKeys];
+ for (i = 0; i < secondaryKeys.size(); ++i) {
+ Pair<IAType, Boolean> keyPairType =
Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i),
+ secondaryKeyNames.get(i), itemType);
+ IAType keyType = keyPairType.first;
+ comparatorFactories[i] =
+
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
true);
+ typeTraits[i] =
TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+ }
+ List<List<String>> partitioningKeys =
DatasetUtil.getPartitioningKeys(dataset);
+ for (List<String> partitioningKey :
partitioningKeys) {
+ IAType keyType =
itemType.getSubFieldType(partitioningKey);
+ comparatorFactories[i] =
+
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
true);
+ typeTraits[i] =
TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+ ++i;
+ }
- IApplicationContextInfo appContext = (IApplicationContextInfo)
context.getAppContext();
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint>
splitsAndConstraint =
- getSplitProviderAndConstraints(dataverseName, datasetName,
indexName, temp);
+ IApplicationContextInfo appContext =
(IApplicationContextInfo) context.getAppContext();
+ Pair<IFileSplitProvider,
AlgebricksPartitionConstraint> splitsAndConstraint =
+
getSplitProviderAndConstraints(dataverseName, datasetName, indexName, temp);
- // prepare callback
- JobId jobId = ((JobEventListenerFactory)
spec.getJobletEventListenerFactory()).getJobId();
- int datasetId = dataset.getDatasetId();
- IModificationOperationCallbackFactory modificationCallbackFactory
= temp
- ? new
TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields,
txnSubsystemProvider, indexOp,
- ResourceType.LSM_BTREE)
- : new
SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields,
txnSubsystemProvider, indexOp,
- ResourceType.LSM_BTREE, dataset.hasMetaPart());
+ // prepare callback
+ JobId jobId = ((JobEventListenerFactory)
spec.getJobletEventListenerFactory()).getJobId();
+ int datasetId = dataset.getDatasetId();
+ IModificationOperationCallbackFactory
modificationCallbackFactory = temp
+ ? new
TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+
modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
+ ResourceType.LSM_BTREE)
+ : new
SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+
modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
+ ResourceType.LSM_BTREE,
dataset.hasMetaPart());
- Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
- DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
- IIndexDataflowHelperFactory idfh =
dataset.getIndexDataflowHelperFactory(this, secondaryIndex, itemType,
- metaType, compactionInfo.first, compactionInfo.second);
- IOperatorDescriptor op;
- if (bulkload) {
- long numElementsHint = getCardinalityPerPartitionHint(dataset);
- op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc,
appContext.getStorageManager(),
- appContext.getIndexLifecycleManagerProvider(),
splitsAndConstraint.first, typeTraits,
- comparatorFactories, bloomFilterKeyFields,
fieldPermutation,
- GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false,
numElementsHint, false, idfh,
- metadataPageManagerFactory);
- } else if (indexOp == IndexOperation.UPSERT) {
- op = new LSMTreeUpsertOperatorDescriptor(spec, recordDesc,
appContext.getStorageManager(),
- appContext.getIndexLifecycleManagerProvider(),
splitsAndConstraint.first, typeTraits,
- comparatorFactories, bloomFilterKeyFields,
fieldPermutation, idfh, filterFactory, false,
- indexName, null, modificationCallbackFactory,
NoOpOperationCallbackFactory.INSTANCE,
- prevFieldPermutation, metadataPageManagerFactory);
- } else {
- op = new LSMTreeInsertDeleteOperatorDescriptor(spec,
recordDesc, appContext.getStorageManager(),
- appContext.getIndexLifecycleManagerProvider(),
splitsAndConstraint.first, typeTraits,
- comparatorFactories, bloomFilterKeyFields,
fieldPermutation, indexOp, idfh, filterFactory,
- false, indexName, null, modificationCallbackFactory,
NoOpOperationCallbackFactory.INSTANCE,
- metadataPageManagerFactory);
- }
- return new Pair<>(op, splitsAndConstraint.second);
+ Pair<ILSMMergePolicyFactory, Map<String,
String>> compactionInfo =
+
DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
+ IIndexDataflowHelperFactory idfh =
dataset.getIndexDataflowHelperFactory(this, secondaryIndex, itemType,
+ metaType, compactionInfo.first,
compactionInfo.second);
+ IOperatorDescriptor op;
+ if (bulkload) {
+ long numElementsHint =
getCardinalityPerPartitionHint(dataset);
+ op = new
TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc,
appContext.getStorageManager(),
+
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first,
typeTraits,
+ comparatorFactories,
bloomFilterKeyFields, fieldPermutation,
+
GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh,
+ metadataPageManagerFactory);
+ } else if (indexOp ==
IndexOperation.UPSERT) {
+ op = new
LSMTreeUpsertOperatorDescriptor(spec, recordDesc,
appContext.getStorageManager(),
+
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first,
typeTraits,
+ comparatorFactories,
bloomFilterKeyFields, fieldPermutation, idfh, filterFactory, false,
+ indexName, null,
modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE,
+ prevFieldPermutation,
metadataPageManagerFactory);
+ } else {
+ op = new
LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
appContext.getStorageManager(),
+
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first,
typeTraits,
+ comparatorFactories,
bloomFilterKeyFields, fieldPermutation, indexOp, idfh, filterFactory,
+ false, indexName, null,
modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE,
+ metadataPageManagerFactory);
+ }
+ return new Pair<>(op,
splitsAndConstraint.second);
} catch (Exception e) {
throw new AlgebricksException(e);
}
@@ -1673,36 +1673,36 @@
? new
TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
modificationCallbackPrimaryKeyFields,
txnSubsystemProvider, indexOp,
ResourceType.LSM_RTREE)
- : new
SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields,
txnSubsystemProvider, indexOp,
- ResourceType.LSM_RTREE, dataset.hasMetaPart());
+ : new
SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+ modificationCallbackPrimaryKeyFields,
txnSubsystemProvider, indexOp,
+ ResourceType.LSM_RTREE,
dataset.hasMetaPart());
- Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
- DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
- IIndexDataflowHelperFactory indexDataflowHelperFactory =
dataset.getIndexDataflowHelperFactory(this,
- secondaryIndex, recType, metaItemType,
compactionInfo.first, compactionInfo.second);
- IOperatorDescriptor op;
- if (bulkload) {
- long numElementsHint = getCardinalityPerPartitionHint(dataset);
- op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc,
appContext.getStorageManager(),
- appContext.getIndexLifecycleManagerProvider(),
splitsAndConstraint.first, typeTraits,
- primaryComparatorFactories, btreeFields,
fieldPermutation,
- GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false,
numElementsHint, false,
- indexDataflowHelperFactory,
metadataPageManagerFactory);
- } else if (indexOp == IndexOperation.UPSERT) {
- op = new LSMTreeUpsertOperatorDescriptor(spec, recordDesc,
appContext.getStorageManager(),
- appContext.getIndexLifecycleManagerProvider(),
splitsAndConstraint.first, typeTraits,
- comparatorFactories, null, fieldPermutation,
indexDataflowHelperFactory, filterFactory, false,
- indexName, null, modificationCallbackFactory,
NoOpOperationCallbackFactory.INSTANCE,
- prevFieldPermutation, metadataPageManagerFactory);
- } else {
- op = new LSMTreeInsertDeleteOperatorDescriptor(spec,
recordDesc, appContext.getStorageManager(),
- appContext.getIndexLifecycleManagerProvider(),
splitsAndConstraint.first, typeTraits,
- comparatorFactories, null, fieldPermutation, indexOp,
indexDataflowHelperFactory,
- filterFactory, false, indexName, null,
modificationCallbackFactory,
- NoOpOperationCallbackFactory.INSTANCE,
metadataPageManagerFactory);
- }
- return new Pair<>(op, splitsAndConstraint.second);
+ Pair<ILSMMergePolicyFactory, Map<String, String>>
compactionInfo =
+ DatasetUtil.getMergePolicyFactory(dataset,
mdTxnCtx);
+ IIndexDataflowHelperFactory indexDataflowHelperFactory =
dataset.getIndexDataflowHelperFactory(this,
+ secondaryIndex, recType, metaItemType,
compactionInfo.first, compactionInfo.second);
+ IOperatorDescriptor op;
+ if (bulkload) {
+ long numElementsHint =
getCardinalityPerPartitionHint(dataset);
+ op = new TreeIndexBulkLoadOperatorDescriptor(spec,
recordDesc, appContext.getStorageManager(),
+ appContext.getIndexLifecycleManagerProvider(),
splitsAndConstraint.first, typeTraits,
+ primaryComparatorFactories, btreeFields,
fieldPermutation,
+ GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false,
numElementsHint, false,
+ indexDataflowHelperFactory,
metadataPageManagerFactory);
+ } else if (indexOp == IndexOperation.UPSERT) {
+ op = new LSMTreeUpsertOperatorDescriptor(spec,
recordDesc, appContext.getStorageManager(),
+ appContext.getIndexLifecycleManagerProvider(),
splitsAndConstraint.first, typeTraits,
+ comparatorFactories, null, fieldPermutation,
indexDataflowHelperFactory, filterFactory, false,
+ indexName, null, modificationCallbackFactory,
NoOpOperationCallbackFactory.INSTANCE,
+ prevFieldPermutation,
metadataPageManagerFactory);
+ } else {
+ op = new LSMTreeInsertDeleteOperatorDescriptor(spec,
recordDesc, appContext.getStorageManager(),
+ appContext.getIndexLifecycleManagerProvider(),
splitsAndConstraint.first, typeTraits,
+ comparatorFactories, null, fieldPermutation,
indexOp, indexDataflowHelperFactory,
+ filterFactory, false, indexName, null,
modificationCallbackFactory,
+ NoOpOperationCallbackFactory.INSTANCE,
metadataPageManagerFactory);
+ }
+ return new Pair<>(op, splitsAndConstraint.second);
} catch (MetadataException e) {
throw new AlgebricksException(e);
}
@@ -1714,7 +1714,7 @@
AsterixTupleFilterFactory filterFactory, RecordDescriptor
recordDesc, JobGenContext context,
JobSpecification spec, IndexOperation indexOp, IndexType
indexType, boolean bulkload,
List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable>
prevAdditionalFilteringKeys)
- throws AlgebricksException {
+ throws AlgebricksException {
// Check the index is length-partitioned or not.
boolean isPartitioned;
if (indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX
@@ -1883,36 +1883,36 @@
? new
TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
modificationCallbackPrimaryKeyFields,
txnSubsystemProvider, indexOp,
ResourceType.LSM_INVERTED_INDEX)
- : new
SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields,
txnSubsystemProvider, indexOp,
- ResourceType.LSM_INVERTED_INDEX,
dataset.hasMetaPart());
- Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
- DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
- IIndexDataflowHelperFactory indexDataFlowFactory =
dataset.getIndexDataflowHelperFactory(this,
- secondaryIndex, recType, metaItemType,
compactionInfo.first, compactionInfo.second);
- IOperatorDescriptor op;
- if (bulkload) {
- long numElementsHint = getCardinalityPerPartitionHint(dataset);
- op = new LSMInvertedIndexBulkLoadOperatorDescriptor(spec,
recordDesc, fieldPermutation, false,
- numElementsHint, false,
appContext.getStorageManager(), splitsAndConstraint.first,
- appContext.getIndexLifecycleManagerProvider(),
tokenTypeTraits, tokenComparatorFactories,
- invListsTypeTraits, invListComparatorFactories,
tokenizerFactory, indexDataFlowFactory,
- metadataPageManagerFactory);
- } else if (indexOp == IndexOperation.UPSERT) {
- op = new LSMInvertedIndexUpsertOperatorDescriptor(spec,
recordDesc, appContext.getStorageManager(),
- splitsAndConstraint.first,
appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits,
- tokenComparatorFactories, invListsTypeTraits,
invListComparatorFactories, tokenizerFactory,
- fieldPermutation, indexDataFlowFactory, filterFactory,
modificationCallbackFactory, indexName,
- prevFieldPermutation, metadataPageManagerFactory);
- } else {
- op = new LSMInvertedIndexInsertDeleteOperatorDescriptor(spec,
recordDesc,
- appContext.getStorageManager(),
splitsAndConstraint.first,
- appContext.getIndexLifecycleManagerProvider(),
tokenTypeTraits, tokenComparatorFactories,
- invListsTypeTraits, invListComparatorFactories,
tokenizerFactory, fieldPermutation, indexOp,
- indexDataFlowFactory, filterFactory,
modificationCallbackFactory, indexName,
- metadataPageManagerFactory);
- }
- return new Pair<>(op, splitsAndConstraint.second);
+ : new
SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+ modificationCallbackPrimaryKeyFields,
txnSubsystemProvider, indexOp,
+ ResourceType.LSM_INVERTED_INDEX,
dataset.hasMetaPart());
+ Pair<ILSMMergePolicyFactory, Map<String, String>>
compactionInfo =
+ DatasetUtil.getMergePolicyFactory(dataset,
mdTxnCtx);
+ IIndexDataflowHelperFactory indexDataFlowFactory =
dataset.getIndexDataflowHelperFactory(this,
+ secondaryIndex, recType, metaItemType,
compactionInfo.first, compactionInfo.second);
+ IOperatorDescriptor op;
+ if (bulkload) {
+ long numElementsHint =
getCardinalityPerPartitionHint(dataset);
+ op = new
LSMInvertedIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation,
false,
+ numElementsHint, false,
appContext.getStorageManager(), splitsAndConstraint.first,
+ appContext.getIndexLifecycleManagerProvider(),
tokenTypeTraits, tokenComparatorFactories,
+ invListsTypeTraits,
invListComparatorFactories, tokenizerFactory, indexDataFlowFactory,
+ metadataPageManagerFactory);
+ } else if (indexOp == IndexOperation.UPSERT) {
+ op = new
LSMInvertedIndexUpsertOperatorDescriptor(spec, recordDesc,
appContext.getStorageManager(),
+ splitsAndConstraint.first,
appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits,
+ tokenComparatorFactories, invListsTypeTraits,
invListComparatorFactories, tokenizerFactory,
+ fieldPermutation, indexDataFlowFactory,
filterFactory, modificationCallbackFactory, indexName,
+ prevFieldPermutation,
metadataPageManagerFactory);
+ } else {
+ op = new
LSMInvertedIndexInsertDeleteOperatorDescriptor(spec, recordDesc,
+ appContext.getStorageManager(),
splitsAndConstraint.first,
+ appContext.getIndexLifecycleManagerProvider(),
tokenTypeTraits, tokenComparatorFactories,
+ invListsTypeTraits,
invListComparatorFactories, tokenizerFactory, fieldPermutation, indexOp,
+ indexDataFlowFactory, filterFactory,
modificationCallbackFactory, indexName,
+ metadataPageManagerFactory);
+ }
+ return new Pair<>(op, splitsAndConstraint.second);
} catch (Exception e) {
throw new AlgebricksException(e);
}
@@ -2105,7 +2105,7 @@
private AsterixTupleFilterFactory
createTupleFilterFactory(IOperatorSchema[] inputSchemas,
IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr,
JobGenContext context)
- throws AlgebricksException {
+ throws AlgebricksException {
// No filtering condition.
if (filterExpr == null) {
return null;
@@ -2125,4 +2125,16 @@
public IStorageComponentProvider getStorageComponentProvider() {
return storaegComponentProvider;
}
+
+ public Pair<IFileSplitProvider, AlgebricksPartitionConstraint>
getSplitProviderAndConstraints(Dataset ds)
+ throws AlgebricksException {
+ return getSplitProviderAndConstraints(ds.getDataverseName(),
ds.getDatasetName(), ds.getDatasetName(),
+ ds.getDatasetDetails().isTemp());
+ }
+
+ public Pair<IFileSplitProvider, AlgebricksPartitionConstraint>
getSplitProviderAndConstraints(Dataset ds,
+ Index index) throws AlgebricksException {
+ return getSplitProviderAndConstraints(ds.getDataverseName(),
ds.getDatasetName(), index.getIndexName(),
+ ds.getDatasetDetails().isTemp());
+ }
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 2e328f9..c72129f 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -29,7 +29,6 @@
import org.apache.asterix.active.ActiveJobNotificationHandler;
import org.apache.asterix.active.IActiveEntityEventsListener;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.metadata.IDataset;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
@@ -37,6 +36,7 @@
import
org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory;
import
org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
import
org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
+import org.apache.asterix.common.metadata.IDataset;
import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
import org.apache.asterix.common.transactions.JobId;
import org.apache.asterix.common.utils.JobUtils;
@@ -433,9 +433,9 @@
switch (index.getIndexType()) {
case BTREE:
return getDatasetType() == DatasetType.EXTERNAL
- &&
!index.getIndexName().equals(BTreeDataflowHelperFactoryProvider.externalFileIndexName(this))
- ?
LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE
- : LSMBTreeIOOperationCallbackFactory.INSTANCE;
+ &&
!index.getIndexName().equals(BTreeDataflowHelperFactoryProvider.externalFileIndexName(this))
+ ? LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE
+ : LSMBTreeIOOperationCallbackFactory.INSTANCE;
case RTREE:
return LSMRTreeIOOperationCallbackFactory.INSTANCE;
case LENGTH_PARTITIONED_NGRAM_INVIX:
@@ -489,8 +489,8 @@
return (op == IndexOperation.UPSERT)
? new LockThenSearchOperationCallbackFactory(jobId,
getDatasetId(), primaryKeyFields,
storageComponentProvider.getTransactionSubsystemProvider(),
ResourceType.LSM_BTREE)
- : new
PrimaryIndexInstantSearchOperationCallbackFactory(jobId, getDatasetId(),
primaryKeyFields,
-
storageComponentProvider.getTransactionSubsystemProvider(),
ResourceType.LSM_BTREE);
+ : new
PrimaryIndexInstantSearchOperationCallbackFactory(jobId, getDatasetId(),
primaryKeyFields,
+
storageComponentProvider.getTransactionSubsystemProvider(),
ResourceType.LSM_BTREE);
}
return new SecondaryIndexSearchOperationCallbackFactory();
}
@@ -522,17 +522,17 @@
? new UpsertOperationCallbackFactory(jobId,
getDatasetId(), primaryKeyFields,
componentProvider.getTransactionSubsystemProvider(), op, index.resourceType(),
hasMetaPart())
- : op == IndexOperation.DELETE || op ==
IndexOperation.INSERT
+ : op == IndexOperation.DELETE || op ==
IndexOperation.INSERT
? new
PrimaryIndexModificationOperationCallbackFactory(jobId, getDatasetId(),
primaryKeyFields,
componentProvider.getTransactionSubsystemProvider(), op,
index.resourceType(), hasMetaPart())
- : NoOpOperationCallbackFactory.INSTANCE;
+ : NoOpOperationCallbackFactory.INSTANCE;
} else {
return op == IndexOperation.DELETE || op == IndexOperation.INSERT
|| op == IndexOperation.UPSERT
? new
SecondaryIndexModificationOperationCallbackFactory(jobId, getDatasetId(),
primaryKeyFields,
componentProvider.getTransactionSubsystemProvider(), op, index.resourceType(),
hasMetaPart())
- : NoOpOperationCallbackFactory.INSTANCE;
+ : NoOpOperationCallbackFactory.INSTANCE;
}
}
@@ -577,4 +577,27 @@
metadataProvider.isTemporaryDatasetWriteJob(),
metadataProvider.isWriteTransaction(), upsertVarIdx,
datasetPartitions, isSink);
}
+
+ /**
+ * Get the index dataflow helper factory for the dataset's primary index
+ *
+ * @param mdProvider
+ * an instance of metadata provider that is used to fetch
metadata information
+ * @throws AlgebricksException
+ */
+ public IIndexDataflowHelperFactory
getIndexDataflowHelperFactory(MetadataProvider mdProvider)
+ throws AlgebricksException {
+ if (getDatasetType() != DatasetType.INTERNAL) {
+ throw new AlgebricksException("Only Internal datasets have
dataflow helper factory");
+ }
+ Index index =
MetadataManager.INSTANCE.getIndex(mdProvider.getMetadataTxnContext(),
getDataverseName(),
+ getDatasetName(), getDatasetName());
+ ARecordType recordType = (ARecordType)
mdProvider.findType(getItemTypeDataverseName(), getItemTypeName());
+ ARecordType metaType = (ARecordType)
mdProvider.findType(getMetaItemTypeDataverseName(), getMetaItemTypeName());
+ Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+ DatasetUtil.getMergePolicyFactory(this,
mdProvider.getMetadataTxnContext());
+ return getIndexDataflowHelperFactory(mdProvider, index, recordType,
metaType, compactionInfo.first,
+ compactionInfo.second);
+ }
+
}
diff --git
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
index 2fae304..190a3b2 100644
---
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
+++
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
@@ -71,6 +71,7 @@
public class ARecordPointable extends AbstractPointable {
private final UTF8StringWriter utf8Writer = new UTF8StringWriter();
+ public static final ARecordPointableFactory FACTORY = new
ARecordPointableFactory();
public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() {
private static final long serialVersionUID = 1L;
@@ -86,11 +87,15 @@
}
};
- public static final IPointableFactory FACTORY = new IPointableFactory() {
+ public static class ARecordPointableFactory implements IPointableFactory {
+
private static final long serialVersionUID = 1L;
+ private ARecordPointableFactory() {
+ }
+
@Override
- public IPointable createPointable() {
+ public ARecordPointable createPointable() {
return new ARecordPointable();
}
@@ -98,7 +103,8 @@
public ITypeTraits getTypeTraits() {
return TYPE_TRAITS;
}
- };
+
+ }
public static final IObjectFactory<IPointable, ATypeTag> ALLOCATOR = new
IObjectFactory<IPointable, ATypeTag>() {
@Override
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
index d38c5b7..510664b 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
@@ -32,7 +32,7 @@
import org.apache.asterix.common.transactions.LogType;
import org.apache.asterix.common.utils.TransactionUtil;
import
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
-import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -46,7 +46,7 @@
public class CommitRuntime extends
AbstractOneInputOneOutputOneFramePushRuntime {
- private final static long SEED = 0L;
+ protected static final long SEED = 0L;
protected final ITransactionManager transactionManager;
protected final ILogManager logMgr;
@@ -85,8 +85,7 @@
try {
transactionContext =
transactionManager.getTransactionContext(jobId, false);
transactionContext.setWriteTxn(isWriteTransaction);
- ILogMarkerCallback callback =
-
TaskUtil.<ILogMarkerCallback>get(ILogMarkerCallback.KEY_MARKER_CALLBACK, ctx);
+ ILogMarkerCallback callback =
TaskUtil.get(ILogMarkerCallback.KEY_MARKER_CALLBACK, ctx);
logRecord = new LogRecord(callback);
if (isSink) {
return;
@@ -126,7 +125,7 @@
}
}
}
- VSizeFrame message =
TaskUtil.<VSizeFrame>get(HyracksConstants.KEY_MESSAGE, ctx);
+ IFrame message = TaskUtil.get(HyracksConstants.KEY_MESSAGE, ctx);
if (message != null
&& MessagingFrameTupleAppender.getMessageType(message) ==
MessagingFrameTupleAppender.MARKER_MESSAGE) {
try {
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
index 536e657..cfe2a25 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
@@ -29,14 +29,14 @@
private static final long serialVersionUID = 1L;
- private final JobId jobId;
- private final int datasetId;
- private final int[] primaryKeyFields;
- private final boolean isTemporaryDatasetWriteJob;
- private final boolean isWriteTransaction;
- private final int upsertVarIdx;
- private int[] datasetPartitions;
- private final boolean isSink;
+ protected final JobId jobId;
+ protected final int datasetId;
+ protected final int[] primaryKeyFields;
+ protected final boolean isTemporaryDatasetWriteJob;
+ protected final boolean isWriteTransaction;
+ protected final int upsertVarIdx;
+ protected int[] datasetPartitions;
+ protected final boolean isSink;
public CommitRuntimeFactory(JobId jobId, int datasetId, int[]
primaryKeyFields, boolean isTemporaryDatasetWriteJob,
boolean isWriteTransaction, int upsertVarIdx, int[]
datasetPartitions, boolean isSink) {
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java
index 9b2fe36..b555471 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java
@@ -27,7 +27,7 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
public class UpsertCommitRuntime extends CommitRuntime {
- private final int upsertIdx;
+ protected final int upsertIdx;
public UpsertCommitRuntime(IHyracksTaskContext ctx, JobId jobId, int
datasetId, int[] primaryKeyFields,
boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction,
int resourcePartition, int upsertIdx,
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/VSizeFrame.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/VSizeFrame.java
index 06538af..80dd19b 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/VSizeFrame.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/VSizeFrame.java
@@ -43,6 +43,7 @@
buffer = ctx.allocateFrame(frameSize);
}
+ @Override
public ByteBuffer getBuffer() {
return buffer;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 346f934..19d2c6d 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -459,4 +459,8 @@
public ThreadDumpRun removeThreadDumpRun(String requestKey) {
return threadDumpRunMap.remove(requestKey);
}
+
+ public ICCApplicationEntryPoint getApplication() {
+ return aep;
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
index 05417a8..ec9d083 100644
---
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
+++
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
@@ -25,6 +25,12 @@
protected int length;
+ public byte[] copy() {
+ byte[] copy = new byte[length];
+ System.arraycopy(bytes, start, copy, 0, length);
+ return copy;
+ }
+
@Override
public void set(byte[] bytes, int start, int length) {
this.bytes = bytes;
diff --git
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/IPointable.java
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/IPointable.java
index 74ced4f..2e8071c 100644
---
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/IPointable.java
+++
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/IPointable.java
@@ -18,8 +18,27 @@
*/
package org.apache.hyracks.data.std.api;
+/**
+ * Point to range over byte array
+ */
public interface IPointable extends IValueReference {
- public void set(byte[] bytes, int start, int length);
+ /**
+ * Point to the range from position = start with length = length over the
byte array bytes
+ *
+ * @param bytes
+ * the byte array
+ * @param start
+ * the start offset
+ * @param length
+ * the length of the range
+ */
+ void set(byte[] bytes, int start, int length);
- public void set(IValueReference pointer);
+ /**
+ * Point to the same range pointed to by the passed pointer
+ *
+ * @param pointer
+ * the pointer to the targetted range
+ */
+ void set(IValueReference pointer);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/VoidPointable.java
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/VoidPointable.java
index ee00163..51c155e 100644
---
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/VoidPointable.java
+++
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/VoidPointable.java
@@ -20,10 +20,10 @@
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.data.std.api.AbstractPointable;
-import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.data.std.api.IPointableFactory;
public final class VoidPointable extends AbstractPointable {
+ public static final VoidPointableFactory FACTORY = new
VoidPointableFactory();
public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() {
private static final long serialVersionUID = 1L;
@@ -38,11 +38,14 @@
}
};
- public static final IPointableFactory FACTORY = new IPointableFactory() {
+ public static class VoidPointableFactory implements IPointableFactory {
private static final long serialVersionUID = 1L;
+ private VoidPointableFactory() {
+ }
+
@Override
- public IPointable createPointable() {
+ public VoidPointable createPointable() {
return new VoidPointable();
}
@@ -50,5 +53,5 @@
public ITypeTraits getTypeTraits() {
return TYPE_TRAITS;
}
- };
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
index 57f8072..704df61 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
@@ -59,7 +59,7 @@
protected boolean hasEnoughSpace(int fieldCount, int tupleLength) {
return tupleDataEndOffset + FrameHelper.calcRequiredSpace(fieldCount,
tupleLength)
- + tupleCount * FrameConstants.SIZE_LEN <=
FrameHelper.getTupleCountOffset(frame.getFrameSize());
+ + tupleCount * FrameConstants.SIZE_LEN <=
FrameHelper.getTupleCountOffset(frame.getFrameSize());
}
protected void reset(ByteBuffer buffer, boolean clear) {
@@ -108,4 +108,11 @@
return false;
}
+ @Override
+ public void flush(IFrameWriter writer) throws HyracksDataException {
+ if (tupleCount > 0) {
+ write(writer, true);
+ }
+ writer.flush();
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
index 8f005d8..9d98bb2 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
@@ -22,8 +22,8 @@
import java.nio.ByteBuffer;
import org.apache.hyracks.api.comm.FrameHelper;
+import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.HyracksConstants;
@@ -37,23 +37,23 @@
*/
public class MessagingFrameTupleAppender extends FrameTupleAppender {
- private final IHyracksTaskContext ctx;
- private static final int NULL_MESSAGE_SIZE = 1;
+ protected final IHyracksTaskContext ctx;
+ protected static final int NULL_MESSAGE_SIZE = 1;
public static final byte NULL_FEED_MESSAGE = 0x01;
public static final byte ACK_REQ_FEED_MESSAGE = 0x02;
public static final byte MARKER_MESSAGE = 0x03;
- private boolean initialized = false;
- private VSizeFrame message;
+ protected boolean initialized = false;
+ protected IFrame message;
public MessagingFrameTupleAppender(IHyracksTaskContext ctx) {
this.ctx = ctx;
}
- public static void printMessage(VSizeFrame message, PrintStream out)
throws HyracksDataException {
+ public static void printMessage(IFrame message, PrintStream out) throws
HyracksDataException {
out.println(getMessageString(message));
}
- public static String getMessageString(VSizeFrame message) throws
HyracksDataException {
+ public static String getMessageString(IFrame message) throws
HyracksDataException {
StringBuilder aString = new StringBuilder();
aString.append("Message Type: ");
switch (getMessageType(message)) {
@@ -76,7 +76,7 @@
return aString.toString();
}
- public static byte getMessageType(VSizeFrame message) throws
HyracksDataException {
+ public static byte getMessageType(IFrame message) throws
HyracksDataException {
switch (message.getBuffer().array()[0]) {
case NULL_FEED_MESSAGE:
return NULL_FEED_MESSAGE;
@@ -112,8 +112,7 @@
@Override
public void write(IFrameWriter outWriter, boolean clearFrame) throws
HyracksDataException {
if (!initialized) {
- message = TaskUtil.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE,
ctx);
- initialized = true;
+ init();
}
// If message fits, we append it, otherwise, we append a null message,
then send a message only
// frame with the message
@@ -125,7 +124,7 @@
} else {
ByteBuffer buffer = message.getBuffer();
int messageSize = buffer.limit() - buffer.position();
- if (hasEnoughSpace(1, messageSize)) {
+ if (hasEnoughSpace(0, messageSize)) {
appendMessage(buffer);
forward(outWriter);
} else {
@@ -133,7 +132,7 @@
appendNullMessage();
forward(outWriter);
}
- if (!hasEnoughSpace(1, messageSize)) {
+ if (!hasEnoughSpace(0, messageSize)) {
frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(1, messageSize,
frame.getMinSize()));
reset(frame.getBuffer(), true);
}
@@ -143,14 +142,19 @@
}
}
- private void forward(IFrameWriter outWriter) throws HyracksDataException {
+ protected void init() {
+ message = TaskUtil.<IFrame> get(HyracksConstants.KEY_MESSAGE, ctx);
+ initialized = true;
+ }
+
+ protected void forward(IFrameWriter outWriter) throws HyracksDataException
{
getBuffer().clear();
outWriter.nextFrame(getBuffer());
frame.reset();
reset(getBuffer(), true);
}
- private void appendMessage(ByteBuffer message) {
+ protected void appendMessage(ByteBuffer message) {
int messageLength = message.limit() - message.position();
System.arraycopy(message.array(), message.position(), array,
tupleDataEndOffset, messageLength);
tupleDataEndOffset += messageLength;
@@ -160,7 +164,7 @@
IntSerDeUtils.putInt(getBuffer().array(),
FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
}
- private void appendNullMessage() {
+ protected void appendNullMessage() {
array[tupleDataEndOffset] = NULL_FEED_MESSAGE;
tupleDataEndOffset++;
IntSerDeUtils.putInt(getBuffer().array(),
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
index 6d87d89..2eb1f94 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
@@ -33,14 +33,14 @@
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
public class PartitionDataWriter implements IFrameWriter {
- private final int consumerPartitionCount;
- private final IFrameWriter[] pWriters;
- private final boolean[] isOpen;
- private final FrameTupleAppender[] appenders;
- private final FrameTupleAccessor tupleAccessor;
- private final ITuplePartitionComputer tpc;
- private final IHyracksTaskContext ctx;
- private boolean[] allocatedFrames;
+ protected final int consumerPartitionCount;
+ protected final IFrameWriter[] pWriters;
+ protected final boolean[] isOpen;
+ protected final FrameTupleAppender[] appenders;
+ protected final FrameTupleAccessor tupleAccessor;
+ protected final ITuplePartitionComputer tpc;
+ protected final IHyracksTaskContext ctx;
+ protected boolean[] allocatedFrames;
public PartitionDataWriter(IHyracksTaskContext ctx, int
consumerPartitionCount, IPartitionWriterFactory pwFactory,
RecordDescriptor recordDescriptor, ITuplePartitionComputer tpc)
throws HyracksDataException {
@@ -49,6 +49,13 @@
isOpen = new boolean[consumerPartitionCount];
allocatedFrames = new boolean[consumerPartitionCount];
appenders = new FrameTupleAppender[consumerPartitionCount];
+ tupleAccessor = new FrameTupleAccessor(recordDescriptor);
+ this.tpc = tpc;
+ this.ctx = ctx;
+ initializeAppenders(pwFactory);
+ }
+
+ protected void initializeAppenders(IPartitionWriterFactory pwFactory)
throws HyracksDataException {
for (int i = 0; i < consumerPartitionCount; ++i) {
try {
pWriters[i] = pwFactory.createFrameWriter(i);
@@ -57,9 +64,6 @@
throw new HyracksDataException(e);
}
}
- tupleAccessor = new FrameTupleAccessor(recordDescriptor);
- this.tpc = tpc;
- this.ctx = ctx;
}
protected FrameTupleAppender createTupleAppender(IHyracksTaskContext ctx) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/ArrayValueReference.java
similarity index 84%
rename from
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java
rename to
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/ArrayValueReference.java
index 627994c..fc0ce9c 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/ArrayValueReference.java
@@ -20,14 +20,10 @@
import org.apache.hyracks.data.std.api.IValueReference;
-public class MutableArrayValueReference implements IValueReference {
+public class ArrayValueReference implements IValueReference {
private byte[] array;
- public MutableArrayValueReference() {
- //mutable array. user doesn't need to specify the array in advance
- }
-
- public MutableArrayValueReference(byte[] array) {
+ public ArrayValueReference(byte[] array) {
this.array = array;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/test/java/org/apache/hyracks/storage/am/common/frames/LIFOMetadataFrameTest.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/test/java/org/apache/hyracks/storage/am/common/frames/LIFOMetadataFrameTest.java
index fbb930d..3454ecb 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/test/java/org/apache/hyracks/storage/am/common/frames/LIFOMetadataFrameTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/test/java/org/apache/hyracks/storage/am/common/frames/LIFOMetadataFrameTest.java
@@ -22,7 +22,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.primitive.LongPointable;
-import
org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
+import org.apache.hyracks.storage.am.common.freepage.ArrayValueReference;
import org.apache.hyracks.storage.common.buffercache.VirtualPage;
import org.junit.Assert;
import org.junit.Test;
@@ -33,14 +33,14 @@
public void test() throws HyracksDataException {
LIFOMetaDataFrame frame = new LIFOMetaDataFrame();
VirtualPage page = new VirtualPage(ByteBuffer.allocate(512), 512);
- MutableArrayValueReference testKey = new
MutableArrayValueReference("TestLSNKey".getBytes());
+ ArrayValueReference testKey = new
ArrayValueReference("TestLSNKey".getBytes());
frame.setPage(page);
frame.init();
LongPointable longPointable = (LongPointable)
LongPointable.FACTORY.createPointable();
frame.get(testKey, longPointable);
Assert.assertNull(longPointable.getByteArray());
byte[] longBytes = new byte[Long.BYTES];
- MutableArrayValueReference value = new
MutableArrayValueReference(longBytes);
+ ArrayValueReference value = new ArrayValueReference(longBytes);
int space = frame.getSpace() - (value.getLength() + Integer.BYTES * 2
+ testKey.getLength());
for (long l = 1L; l < 52L; l++) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilterManager.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilterManager.java
index 7f8e990..100084d 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilterManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilterManager.java
@@ -25,7 +25,7 @@
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
import org.apache.hyracks.storage.am.common.api.ITreeIndex;
-import
org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
+import org.apache.hyracks.storage.am.common.freepage.ArrayValueReference;
import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
import
org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory;
@@ -34,7 +34,7 @@
public class LSMComponentFilterManager implements ILSMComponentFilterManager {
- public static final MutableArrayValueReference FILTER_KEY = new
MutableArrayValueReference("Filter".getBytes());
+ public static final ArrayValueReference FILTER_KEY = new
ArrayValueReference("Filter".getBytes());
private final ILSMComponentFilterFrameFactory filterFrameFactory;
public LSMComponentFilterManager(ILSMComponentFilterFrameFactory
filterFrameFactory) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
index b55e8ad..04d50df 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
@@ -24,7 +24,7 @@
import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.data.std.primitive.LongPointable;
-import
org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
+import org.apache.hyracks.storage.am.common.freepage.ArrayValueReference;
import org.apache.hyracks.storage.am.lsm.common.api.IComponentMetadata;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -32,8 +32,8 @@
public class ComponentMetadataUtil {
- public static final MutableArrayValueReference MARKER_LSN_KEY =
- new MutableArrayValueReference("Marker".getBytes());
+ public static final ArrayValueReference MARKER_LSN_KEY =
+ new ArrayValueReference("Marker".getBytes());
public static final long NOT_FOUND = -1L;
private ComponentMetadataUtil() {
--
To view, visit https://asterix-gerrit.ics.uci.edu/1523
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ie97b2133ebecb7380cf0ba336e60ed714d06f8ee
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>