abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/660
Change subject: ASTERIXDB-1302 Fix Deadlock with Feed Connection
......................................................................
ASTERIXDB-1302 Fix Deadlock with Feed Connection
A bug causes a read lock to never be released when a feed is
connected with "wait-for-completion" set to false. The bug
was fixed and a test case was added.
Change-Id: I8f6e982440d3577343f2479c3779653a9c3db614
---
M
asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
M asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
M
asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
M asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
M
asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
A
asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.1.ddl.aql
A
asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.2.update.aql
A
asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.sleep.aql
A
asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.4.update.aql
A
asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.5.ddl.aql
M asterix-app/src/test/resources/runtimets/testsuite.xml
A
asterix-common/src/test/java/org/apache/asterix/test/client/FileFeedSocketAdapterClient.java
A asterix-common/src/test/java/org/apache/asterix/test/client/ITestClient.java
A
asterix-common/src/test/java/org/apache/asterix/test/client/TestClientProvider.java
M
asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java
M asterix-common/src/test/java/org/apache/asterix/test/server/ITestServer.java
A
asterix-common/src/test/java/org/apache/asterix/test/server/OpenSocketFileTestServer.java
A
asterix-common/src/test/java/org/apache/asterix/test/server/TestClientServer.java
M
asterix-common/src/test/java/org/apache/asterix/test/server/TestServerProvider.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
23 files changed, 711 insertions(+), 114 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/60/660/1
diff --git
a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index e33aed2..be9452b 100644
---
a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++
b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -87,7 +87,8 @@
if (tempPath.endsWith(File.separator)) {
tempPath = tempPath.substring(0, tempPath.length() - 1);
}
- //get initial partitions from properties
+ System.err.println("Using the path: " + tempPath);
+ // get initial partitions from properties
String[] nodeStores = propertiesAccessor.getStores().get(ncName);
if (nodeStores == null) {
throw new Exception("Coudn't find stores for NC: " + ncName);
@@ -97,7 +98,7 @@
tempDirPath += File.separator;
}
for (int p = 0; p < nodeStores.length; p++) {
- //create IO devices based on stores
+ // create IO devices based on stores
String iodevicePath = tempDirPath + ncConfig1.nodeId +
File.separator + nodeStores[p];
File ioDeviceDir = new File(iodevicePath);
ioDeviceDir.mkdirs();
diff --git
a/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
b/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
index c0245d7..5cd490a 100644
---
a/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
+++
b/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
@@ -22,6 +22,7 @@
import java.util.List;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.external.feed.api.IFeedJoint;
import org.apache.asterix.external.feed.api.IFeedMessage;
@@ -37,9 +38,11 @@
import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
import org.apache.asterix.external.operators.FeedMessageOperatorDescriptor;
import org.apache.asterix.external.util.FeedConstants;
+import org.apache.asterix.external.util.FeedUtils;
import org.apache.asterix.file.JobSpecificationUtils;
import org.apache.asterix.metadata.declared.AqlMetadataProvider;
import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.om.util.AsterixClusterProperties;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
@@ -49,6 +52,9 @@
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
/**
@@ -251,4 +257,17 @@
completeDisconnection,
EndFeedMessage.EndMessageType.DISCONNECT_FEED);
return buildSendFeedMessageRuntime(jobSpec, feedConenctionId,
feedMessage, locations);
}
+
+ public static JobSpecification buildRemoveFeedStorageJob(Feed feed) throws
Exception {
+ JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+ AlgebricksAbsolutePartitionConstraint locations =
AsterixClusterProperties.INSTANCE.getClusterLocations();
+ FileSplit[] feedLogFileSplits =
FeedUtils.splitsForAdapter(feed.getDataverseName(), feed.getFeedName(),
+ locations);
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint>
splitsAndConstraint = StoragePathUtil
+ .splitProviderAndPartitionConstraints(feedLogFileSplits);
+ FileRemoveOperatorDescriptor frod = new
FileRemoveOperatorDescriptor(spec, splitsAndConstraint.first);
+
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, frod,
splitsAndConstraint.second);
+ spec.addRoot(frod);
+ return spec;
+ }
}
diff --git
a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
index 9f024e9..90eb7b9 100644
---
a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
+++
b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
@@ -209,7 +209,7 @@
ASYNC_DEFERRED
}
- public static final boolean IS_DEBUG_MODE = false;//true
+ public static final boolean IS_DEBUG_MODE = false;// true
private final List<Statement> statements;
private final SessionConfig sessionConfig;
private Dataverse activeDefaultDataverse;
@@ -593,7 +593,8 @@
}
if (compactionPolicy == null) {
if (filterField != null) {
- // If the dataset has a filter and the user didn't
specify a merge policy, then we will pick the
+ // If the dataset has a filter and the user didn't
specify a merge
+ // policy, then we will pick the
// correlated-prefix as the default merge policy.
compactionPolicy =
GlobalConfig.DEFAULT_FILTERED_DATASET_COMPACTION_POLICY_NAME;
compactionPolicyProperties =
GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES;
@@ -615,12 +616,12 @@
}
- //#. initialize DatasetIdFactory if it is not initialized.
+ // #. initialize DatasetIdFactory if it is not initialized.
if (!DatasetIdFactory.isInitialized()) {
DatasetIdFactory.initialize(MetadataManager.INSTANCE.getMostRecentDatasetId());
}
- //#. add a new dataset with PendingAddOp
+ // #. add a new dataset with PendingAddOp
dataset = new Dataset(dataverseName, datasetName,
itemTypeDataverseName, itemTypeName, ngName,
compactionPolicy, compactionPolicyProperties,
datasetDetails, dd.getHints(), dsType,
DatasetIdFactory.generateDatasetId(),
IMetadataEntity.PENDING_ADD_OP);
@@ -632,21 +633,21 @@
JobSpecification jobSpec =
DatasetOperations.createDatasetJobSpec(dataverse, datasetName,
metadataProvider);
- //#. make metadataTxn commit before calling runJob.
+ // #. make metadataTxn commit before calling runJob.
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
- //#. runJob
+ // #. runJob
JobUtils.runJob(hcc, jobSpec, true);
- //#. begin new metadataTxn
+ // #. begin new metadataTxn
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
}
- //#. add a new dataset with PendingNoOp after deleting the dataset
with PendingAddOp
+ // #. add a new dataset with PendingNoOp after deleting the
dataset with PendingAddOp
MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(),
dataverseName, datasetName);
dataset.setPendingOp(IMetadataEntity.PENDING_NO_OP);
MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(),
dataset);
@@ -658,11 +659,12 @@
if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
- //#. execute compensation operations
- // remove the index in NC
- // [Notice]
- // As long as we updated(and committed) metadata, we should
remove any effect of the job
- // because an exception occurs during runJob.
+ // #. execute compensation operations
+ // remove the index in NC
+ // [Notice]
+ // As long as we updated(and committed) metadata, we should
remove any effect of the
+ // job
+ // because an exception occurs during runJob.
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -679,7 +681,7 @@
}
}
- // remove the record from the metadata.
+ // remove the record from the metadata.
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
try {
@@ -880,8 +882,10 @@
}
}
- // Checks whether a user is trying to create an inverted secondary
index on a dataset with a variable-length primary key.
- // Currently, we do not support this. Therefore, as a temporary
solution, we print an error message and stop.
+ // Checks whether a user is trying to create an inverted secondary
index on a dataset
+ // with a variable-length primary key.
+ // Currently, we do not support this. Therefore, as a temporary
solution, we print an
+ // error message and stop.
if (stmtCreateIndex.getIndexType() ==
IndexType.SINGLE_PARTITION_WORD_INVIX
|| stmtCreateIndex.getIndexType() ==
IndexType.SINGLE_PARTITION_NGRAM_INVIX
|| stmtCreateIndex.getIndexType() ==
IndexType.LENGTH_PARTITIONED_WORD_INVIX
@@ -959,7 +963,7 @@
}
}
- //check whether there exists another enforced index on the same
field
+ // check whether there exists another enforced index on the same
field
if (stmtCreateIndex.isEnforced()) {
List<Index> indexes = MetadataManager.INSTANCE
.getDatasetIndexes(metadataProvider.getMetadataTxnContext(), dataverseName,
datasetName);
@@ -973,7 +977,7 @@
}
}
- //#. add a new index with PendingAddOp
+ // #. add a new index with PendingAddOp
Index index = new Index(dataverseName, datasetName, indexName,
stmtCreateIndex.getIndexType(), indexFields,
indexFieldTypes, stmtCreateIndex.getGramLength(),
stmtCreateIndex.isEnforced(), false,
IMetadataEntity.PENDING_ADD_OP);
@@ -984,7 +988,7 @@
enforcedType =
IntroduceSecondaryIndexInsertDeleteRule.createEnforcedType(aRecordType, index);
}
- //#. prepare to create the index artifact in NC.
+ // #. prepare to create the index artifact in NC.
CompiledCreateIndexStatement cis = new
CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
index.getDatasetName(), index.getKeyFieldNames(),
index.getKeyFieldTypes(),
index.isEnforcingKeyFileds(), index.getGramLength(),
index.getIndexType());
@@ -998,14 +1002,14 @@
progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
- //#. create the index artifact in NC.
+ // #. create the index artifact in NC.
JobUtils.runJob(hcc, spec, true);
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- //#. load data into the index in NC.
+ // #. load data into the index in NC.
cis = new CompiledCreateIndexStatement(index.getIndexName(),
dataverseName, index.getDatasetName(),
index.getKeyFieldNames(), index.getKeyFieldTypes(),
index.isEnforcingKeyFileds(),
index.getGramLength(), index.getIndexType());
@@ -1015,17 +1019,18 @@
JobUtils.runJob(hcc, spec, true);
- //#. begin new metadataTxn
+ // #. begin new metadataTxn
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- //#. add another new index with PendingNoOp after deleting the
index with PendingAddOp
+ // #. add another new index with PendingNoOp after deleting the
index with PendingAddOp
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(),
dataverseName, datasetName,
indexName);
index.setPendingOp(IMetadataEntity.PENDING_NO_OP);
MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(),
index);
- // add another new files index with PendingNoOp after deleting the
index with PendingAddOp
+ // add another new files index with PendingNoOp after deleting the
index with
+ // PendingAddOp
if (firstExternalDatasetIndex) {
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(),
dataverseName, datasetName,
filesIndex.getIndexName());
@@ -1041,7 +1046,8 @@
if (bActiveTxn) {
abort(e, e, mdTxnCtx);
}
- // If files index was replicated for external dataset, it should
be cleaned up on NC side
+ // If files index was replicated for external dataset, it should
be cleaned up on NC
+ // side
if (filesIndexReplicated) {
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
bActiveTxn = true;
@@ -1062,8 +1068,8 @@
}
if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
- //#. execute compensation operations
- // remove the index in NC
+ // #. execute compensation operations
+ // remove the index in NC
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -1183,7 +1189,6 @@
MetadataLockManager.INSTANCE.acquireDataverseWriteLock(dataverseName);
List<JobSpecification> jobsToExecute = new
ArrayList<JobSpecification>();
try {
-
Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx,
dataverseName);
if (dv == null) {
if (stmtDelete.getIfExists()) {
@@ -1194,7 +1199,7 @@
}
}
- //# disconnect all feeds from any datasets in the dataverse.
+ // # disconnect all feeds from any datasets in the dataverse.
List<FeedConnectionId> activeFeedConnections =
FeedLifecycleListener.INSTANCE
.getActiveFeedConnections(null);
DisconnectFeedStatement disStmt = null;
@@ -1216,10 +1221,15 @@
+ connection.getDatasetName() + ".
Encountered exception " + exception);
}
}
+ // prepare job to remove feed log storage
+ jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(
+ MetadataManager.INSTANCE.getFeed(mdTxnCtx,
dataverseName, feedId.getFeedName())));
}
}
- //#. prepare jobs which will drop corresponding datasets with
indexes.
+ // #. prepare jobs which will drop corresponding datasets with
indexes.
+
+ // #. prepare jobs which will drop corresponding datasets with
indexes.
List<Dataset> datasets =
MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverseName);
for (int j = 0; j < datasets.size(); j++) {
String datasetName = datasets.get(j).getDatasetName();
@@ -1259,9 +1269,10 @@
}
}
jobsToExecute.add(DataverseOperations.createDropDataverseJobSpec(dv,
metadataProvider));
- //#. mark PendingDropOp on the dataverse record by
- // first, deleting the dataverse record from the
DATAVERSE_DATASET
- // second, inserting the dataverse record with the PendingDropOp
value into the DATAVERSE_DATASET
+ // #. mark PendingDropOp on the dataverse record by
+ // first, deleting the dataverse record from the DATAVERSE_DATASET
+ // second, inserting the dataverse record with the PendingDropOp
value into the
+ // DATAVERSE_DATASET
MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
MetadataManager.INSTANCE.addDataverse(mdTxnCtx,
new Dataverse(dataverseName, dv.getDataFormat(),
IMetadataEntity.PENDING_DROP_OP));
@@ -1278,7 +1289,7 @@
bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- //#. finally, delete the dataverse.
+ // #. finally, delete the dataverse.
MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
if (activeDefaultDataverse != null &&
activeDefaultDataverse.getDataverseName() == dataverseName) {
activeDefaultDataverse = null;
@@ -1294,18 +1305,18 @@
activeDefaultDataverse = null;
}
- //#. execute compensation operations
- // remove the all indexes in NC
+ // #. execute compensation operations
+ // remove the all indexes in NC
try {
for (JobSpecification jobSpec : jobsToExecute) {
JobUtils.runJob(hcc, jobSpec, true);
}
} catch (Exception e2) {
- //do no throw exception since still the metadata needs to
be compensated.
+ // do no throw exception since still the metadata needs to
be compensated.
e.addSuppressed(e2);
}
- // remove the record from the metadata.
+ // remove the record from the metadata.
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
try {
MetadataManager.INSTANCE.dropDataverse(mdTxnCtx,
dataverseName);
@@ -1366,7 +1377,7 @@
}
}
- //#. prepare jobs to drop the datatset and the indexes in NC
+ // #. prepare jobs to drop the datatset and the indexes in NC
List<Index> indexes =
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
datasetName);
for (int j = 0; j < indexes.size(); j++) {
if (indexes.get(j).isSecondaryIndex()) {
@@ -1378,7 +1389,7 @@
CompiledDatasetDropStatement cds = new
CompiledDatasetDropStatement(dataverseName, datasetName);
jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds,
metadataProvider));
- //#. mark the existing dataset as PendingDropOp
+ // #. mark the existing dataset as PendingDropOp
MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName,
datasetName);
MetadataManager.INSTANCE.addDataset(mdTxnCtx,
new Dataset(dataverseName, datasetName,
ds.getItemTypeDataverseName(), ds.getItemTypeName(),
@@ -1390,12 +1401,12 @@
bActiveTxn = false;
progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
- //# disconnect the feeds
+ // # disconnect the feeds
for (Pair<JobSpecification, Boolean> p :
disconnectJobList.values()) {
JobUtils.runJob(hcc, p.first, true);
}
- //#. run the jobs
+ // #. run the jobs
for (JobSpecification jobSpec : jobsToExecute) {
JobUtils.runJob(hcc, jobSpec, true);
}
@@ -1406,7 +1417,7 @@
} else {
// External dataset
ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds);
- //#. prepare jobs to drop the datatset and the indexes in NC
+ // #. prepare jobs to drop the datatset and the indexes in NC
List<Index> indexes =
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
datasetName);
for (int j = 0; j < indexes.size(); j++) {
if
(ExternalIndexingOperations.isFileIndex(indexes.get(j))) {
@@ -1421,7 +1432,7 @@
}
}
- //#. mark the existing dataset as PendingDropOp
+ // #. mark the existing dataset as PendingDropOp
MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName,
datasetName);
MetadataManager.INSTANCE.addDataset(mdTxnCtx,
new Dataset(dataverseName, datasetName,
ds.getItemTypeDataverseName(), ds.getItemTypeName(),
@@ -1433,7 +1444,7 @@
bActiveTxn = false;
progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
- //#. run the jobs
+ // #. run the jobs
for (JobSpecification jobSpec : jobsToExecute) {
JobUtils.runJob(hcc, jobSpec, true);
}
@@ -1445,7 +1456,7 @@
metadataProvider.setMetadataTxnContext(mdTxnCtx);
}
- //#. finally, delete the dataset.
+ // #. finally, delete the dataset.
MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName,
datasetName);
// Drop the associated nodegroup
String nodegroup = ds.getNodeGroupName();
@@ -1460,18 +1471,18 @@
}
if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
- //#. execute compensation operations
- // remove the all indexes in NC
+ // #. execute compensation operations
+ // remove the all indexes in NC
try {
for (JobSpecification jobSpec : jobsToExecute) {
JobUtils.runJob(hcc, jobSpec, true);
}
} catch (Exception e2) {
- //do no throw exception since still the metadata needs to
be compensated.
+ // do no throw exception since still the metadata needs to
be compensated.
e.addSuppressed(e2);
}
- // remove the record from the metadata.
+ // remove the record from the metadata.
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
try {
@@ -1545,18 +1556,18 @@
throw new AlgebricksException("There is no index with
this name " + indexName + ".");
}
}
- //#. prepare a job to drop the index in NC.
+ // #. prepare a job to drop the index in NC.
CompiledIndexDropStatement cds = new
CompiledIndexDropStatement(dataverseName, datasetName, indexName);
jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds,
metadataProvider, ds));
- //#. mark PendingDropOp on the existing index
+ // #. mark PendingDropOp on the existing index
MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName,
datasetName, indexName);
MetadataManager.INSTANCE.addIndex(mdTxnCtx,
new Index(dataverseName, datasetName, indexName,
index.getIndexType(), index.getKeyFieldNames(),
index.getKeyFieldTypes(),
index.isEnforcingKeyFileds(), index.isPrimaryIndex(),
IMetadataEntity.PENDING_DROP_OP));
- //#. commit the existing transaction before calling runJob.
+ // #. commit the existing transaction before calling runJob.
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
@@ -1565,12 +1576,12 @@
JobUtils.runJob(hcc, jobSpec, true);
}
- //#. begin a new transaction
+ // #. begin a new transaction
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- //#. finally, delete the existing index
+ // #. finally, delete the existing index
MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName,
datasetName, indexName);
} else {
// External dataset
@@ -1586,7 +1597,7 @@
} else if (ExternalIndexingOperations.isFileIndex(index)) {
throw new AlgebricksException("Dropping a dataset's files
index is not allowed.");
}
- //#. prepare a job to drop the index in NC.
+ // #. prepare a job to drop the index in NC.
CompiledIndexDropStatement cds = new
CompiledIndexDropStatement(dataverseName, datasetName, indexName);
jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds,
metadataProvider, ds));
List<Index> datasetIndexes =
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
@@ -1600,7 +1611,7 @@
externalIndex.getIndexName());
jobsToExecute.add(
ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, metadataProvider,
ds));
- //#. mark PendingDropOp on the existing files index
+ // #. mark PendingDropOp on the existing files
index
MetadataManager.INSTANCE.dropIndex(mdTxnCtx,
dataverseName, datasetName,
externalIndex.getIndexName());
MetadataManager.INSTANCE.addIndex(mdTxnCtx,
@@ -1612,14 +1623,14 @@
}
}
- //#. mark PendingDropOp on the existing index
+ // #. mark PendingDropOp on the existing index
MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName,
datasetName, indexName);
MetadataManager.INSTANCE.addIndex(mdTxnCtx,
new Index(dataverseName, datasetName, indexName,
index.getIndexType(), index.getKeyFieldNames(),
index.getKeyFieldTypes(),
index.isEnforcingKeyFileds(), index.isPrimaryIndex(),
IMetadataEntity.PENDING_DROP_OP));
- //#. commit the existing transaction before calling runJob.
+ // #. commit the existing transaction before calling runJob.
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
@@ -1628,12 +1639,12 @@
JobUtils.runJob(hcc, jobSpec, true);
}
- //#. begin a new transaction
+ // #. begin a new transaction
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- //#. finally, delete the existing index
+ // #. finally, delete the existing index
MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName,
datasetName, indexName);
if (dropFilesIndex) {
// delete the files index too
@@ -1651,18 +1662,18 @@
}
if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
- //#. execute compensation operations
- // remove the all indexes in NC
+ // #. execute compensation operations
+ // remove the all indexes in NC
try {
for (JobSpecification jobSpec : jobsToExecute) {
JobUtils.runJob(hcc, jobSpec, true);
}
} catch (Exception e2) {
- //do no throw exception since still the metadata needs to
be compensated.
+ // do no throw exception since still the metadata needs to
be compensated.
e.addSuppressed(e2);
}
- // remove the record from the metadata.
+ // remove the record from the metadata.
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
try {
@@ -1930,14 +1941,12 @@
private void handleCreateFeedStatement(AqlMetadataProvider
metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws Exception {
-
CreateFeedStatement cfs = (CreateFeedStatement) stmt;
String dataverseName = getActiveDataverse(cfs.getDataverseName());
String feedName = cfs.getFeedName().getValue();
MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
MetadataLockManager.INSTANCE.createFeedBegin(dataverseName,
dataverseName + "." + feedName);
-
Feed feed = null;
try {
feed =
MetadataManager.INSTANCE.getFeed(metadataProvider.getMetadataTxnContext(),
dataverseName, feedName);
@@ -2120,7 +2129,6 @@
MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- boolean readLatchAcquired = true;
boolean subscriberRegistered = false;
IFeedLifecycleEventSubscriber eventSubscriber = new
FeedLifecycleEventSubscriber();
FeedConnectionId feedConnId = null;
@@ -2171,13 +2179,14 @@
FeedLifecycleListener.INSTANCE.registerFeedJoint(fj);
}
JobUtils.runJob(hcc, pair.first, false);
- /* TODO: Fix record tracking
+ /*
+ * TODO: Fix record tracking
* IFeedAdapterFactory adapterFactory = pair.second;
- if (adapterFactory.isRecordTrackingEnabled()) {
-
FeedLifecycleListener.INSTANCE.registerFeedIntakeProgressTracker(feedConnId,
- adapterFactory.createIntakeProgressTracker());
- }
- */
+ * if (adapterFactory.isRecordTrackingEnabled()) {
+ *
FeedLifecycleListener.INSTANCE.registerFeedIntakeProgressTracker(feedConnId,
+ * adapterFactory.createIntakeProgressTracker());
+ * }
+ */
eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_INTAKE_STARTED);
} else {
for (IFeedJoint fj : triple.third) {
@@ -2186,7 +2195,6 @@
}
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
- readLatchAcquired = false;
eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_COLLECT_STARTED);
if
(Boolean.valueOf(metadataProvider.getConfig().get(ConnectFeedStatement.WAIT_FOR_COMPLETION)))
{
eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_ENDED); //
blocking call
@@ -2197,7 +2205,6 @@
if (waitForCompletion) {
MetadataLockManager.INSTANCE.connectFeedEnd(dataverseName,
dataverseName + "." + datasetName,
dataverseName + "." + feedName);
- readLatchAcquired = false;
}
} catch (Exception e) {
if (bActiveTxn) {
@@ -2205,10 +2212,8 @@
}
throw e;
} finally {
- if (readLatchAcquired) {
- MetadataLockManager.INSTANCE.connectFeedEnd(dataverseName,
dataverseName + "." + datasetName,
- dataverseName + "." + feedName);
- }
+ MetadataLockManager.INSTANCE.connectFeedEnd(dataverseName,
dataverseName + "." + datasetName,
+ dataverseName + "." + feedName);
if (subscriberRegistered) {
FeedLifecycleListener.INSTANCE.deregisterFeedEventSubscriber(feedConnId,
eventSubscriber);
}
@@ -2242,7 +2247,8 @@
boolean isFeedJointAvailable =
FeedLifecycleListener.INSTANCE.isFeedJointAvailable(feedJointKey);
if (!isFeedJointAvailable) {
sourceFeedJoint =
FeedLifecycleListener.INSTANCE.getAvailableFeedJoint(feedJointKey);
- if (sourceFeedJoint == null) { // the feed is currently not being
ingested, i.e., it is unavailable.
+ if (sourceFeedJoint == null) { // the feed is currently not being
ingested, i.e., it is
+ // unavailable.
connectionLocation =
ConnectionLocation.SOURCE_FEED_INTAKE_STAGE;
FeedId sourceFeedId = feedJointKey.getFeedId(); // the
root/primary feedId
Feed primaryFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx,
dataverse, sourceFeedId.getFeedName());
@@ -2262,7 +2268,8 @@
functionsToApply.add(f);
}
}
- // register the compute feed point that represents the final
output from the collection of
+ // register the compute feed point that represents the final
output from the collection
+ // of
// functions that will be applied.
if (!functionsToApply.isEmpty()) {
FeedJointKey computeFeedJointKey = new
FeedJointKey(feed.getFeedId(), functionsToApply);
@@ -2475,7 +2482,7 @@
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
- //#. run the jobs
+ // #. run the jobs
for (JobSpecification jobSpec : jobsToExecute) {
JobUtils.runJob(hcc, jobSpec, true);
}
@@ -2656,7 +2663,8 @@
return;
}
- // At this point, we know data has changed in the external file
system, record transaction in metadata and start
+ // At this point, we know data has changed in the external file
system, record
+ // transaction in metadata and start
transactionDataset =
ExternalIndexingOperations.createTransactionDataset(ds);
/*
* Remove old dataset record and replace it with a new one
@@ -2682,14 +2690,14 @@
bActiveTxn = false;
transactionState = ExternalDatasetTransactionState.BEGIN;
- //run the files update job
+ // run the files update job
JobUtils.runJob(hcc, spec, true);
for (Index index : indexes) {
if (!ExternalIndexingOperations.isFileIndex(index)) {
spec = ExternalIndexingOperations.buildIndexUpdateOp(ds,
index, metadataFiles, deletedFiles,
addedFiles, appendedFiles, metadataProvider);
- //run the files update job
+ // run the files update job
JobUtils.runJob(hcc, spec, true);
}
}
@@ -2860,7 +2868,8 @@
}
// Finds PREGELIX_HOME in AsterixDB configuration.
if (pregelixHome == null) {
- // Since there is a default value for PREGELIX_HOME in
AsterixCompilerProperties, pregelixHome can never be null.
+ // Since there is a default value for PREGELIX_HOME in
AsterixCompilerProperties,
+ // pregelixHome can never be null.
pregelixHome =
AsterixAppContextInfo.getInstance().getCompilerProperties().getPregelixHome();
}
diff --git
a/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
b/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
index 976ca70..8d020e7 100644
---
a/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
+++
b/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
@@ -27,7 +27,6 @@
import org.apache.asterix.common.config.AsterixTransactionProperties;
import org.apache.asterix.test.aql.TestExecutor;
import org.apache.asterix.testframework.context.TestCaseContext;
-import org.apache.asterix.testframework.xml.TestGroup;
import org.apache.commons.lang3.StringUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
diff --git
a/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
b/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
index 5e76ecb..e62c315 100644
---
a/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
+++
b/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
@@ -59,7 +59,7 @@
LOGGER.info("initializing HDFS");
}
- HDFSCluster.getInstance().setup();
+ // HDFSCluster.getInstance().setup();
// Set the node resolver to be the identity resolver that expects node
// names
diff --git
a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.1.ddl.aql
b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.1.ddl.aql
new file mode 100644
index 0000000..8cd89da
--- /dev/null
+++
b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.1.ddl.aql
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Drop a dataverse with disconnected feed
+ * Expected Res : Success
+ * Date : 24th Feb 2016
+ */
+drop dataverse experiments if exists;
+create dataverse experiments;
+use dataverse experiments;
+
+create type TwitterUserType as closed {
+ screen-name: string,
+ lang: string,
+ friends_count: int32,
+ statuses_count: int32,
+ name: string,
+ followers_count: int32
+}
+
+create type TweetMessageType as closed {
+ tweetid: int64,
+ user: TwitterUserType,
+ sender-location: point,
+ send-time: datetime,
+ referred-topics: {{ string }},
+ message-text: string
+}
+
+create dataset Tweets(TweetMessageType) primary key tweetid;
+
+create feed TweetFeed using socket_adapter
+(
+ ("sockets"="127.0.0.1:10001"),
+ ("address-type"="IP"),
+ ("type-name"="TweetMessageType"),
+ ("format"="adm")
+);
\ No newline at end of file
diff --git
a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.2.update.aql
b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.2.update.aql
new file mode 100644
index 0000000..9dcd753
--- /dev/null
+++
b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.2.update.aql
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a feed dataset and verify contents in Metadata
+ * Expected Res : Success
+ * Date : 24th Dec 2012
+ */
+
+use dataverse experiments;
+set wait-for-completion-feed "false";
+
+connect feed TweetFeed to dataset Tweets;
diff --git
a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.sleep.aql
b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.sleep.aql
new file mode 100644
index 0000000..97b7013
--- /dev/null
+++
b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.sleep.aql
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a feed dataset and verify contents in Metadata
+ * Expected Res : Success
+ * Date : 24th Dec 2012
+ */
+3000
\ No newline at end of file
diff --git
a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.4.update.aql
b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.4.update.aql
new file mode 100644
index 0000000..4106d42
--- /dev/null
+++
b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.4.update.aql
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Drop a dataverse with disconnected feed
+ * Expected Res : Success
+ * Date : 24th Feb 2016
+ */
+
+use dataverse experiments;
+disconnect feed TweetFeed from dataset Tweets;
\ No newline at end of file
diff --git
a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.5.ddl.aql
b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.5.ddl.aql
new file mode 100644
index 0000000..8aaec06
--- /dev/null
+++
b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.5.ddl.aql
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Drop a dataverse with disconnected feed
+ * Expected Res : Success
+ * Date : 24th Feb 2016
+ */
+
+use dataverse experiments;
+drop dataverse experiments;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml
b/asterix-app/src/test/resources/runtimets/testsuite.xml
index a3a1fba..84c6ad3 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -36,6 +36,11 @@
</compilation-unit>
</test-case> -->
<test-case FilePath="feeds">
+ <compilation-unit name="drop-dataverse-with-disconnected-feed">
+ <output-dir
compare="Text">drop-dataverse-with-disconnected-feed</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="feeds">
<compilation-unit name="feed-with-external-parser">
<output-dir
compare="Text">feed-with-external-parser</output-dir>
</compilation-unit>
diff --git
a/asterix-common/src/test/java/org/apache/asterix/test/client/FileFeedSocketAdapterClient.java
b/asterix-common/src/test/java/org/apache/asterix/test/client/FileFeedSocketAdapterClient.java
new file mode 100644
index 0000000..6082269
--- /dev/null
+++
b/asterix-common/src/test/java/org/apache/asterix/test/client/FileFeedSocketAdapterClient.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.client;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+
+public class FileFeedSocketAdapterClient implements ITestClient {
+ private final int port;
+ private final int wait;
+ private final String url;
+ private Socket socket;
+ private String path;
+ private int batchSize;
+ private int maxCount;
+ private OutputStream out = null;
+
+ // expected args: url, source-file-path, max-count, batch-size, wait
+ public FileFeedSocketAdapterClient(int port, String[] args) throws
Exception {
+ this.port = port;
+ if (args.length != 5) {
+ throw new Exception(
+ "Invalid arguments for FileFeedSocketAdapterClient.
Expected arguments <url> <source-file-path> <max-count> <batch-size> <wait>");
+ }
+ this.url = args[0];
+ this.path = args[1];
+ this.maxCount = Integer.parseInt(args[2]);
+ this.batchSize = Integer.parseInt(args[3]);
+ this.wait = Integer.parseInt(args[4]);
+ }
+
+ @Override
+ public void start() {
+ try {
+ socket = new Socket(url, port);
+ } catch (IOException e) {
+ System.err.println("Problem in creating socket against host " +
url + " on the port " + port);
+ e.printStackTrace();
+ }
+
+ int recordCount = 0;
+ BufferedReader br = null;
+ try {
+ out = socket.getOutputStream();
+ br = new BufferedReader(new FileReader(path));
+ String nextRecord;
+ byte[] b;
+ byte[] newLineBytes = "\n".getBytes();
+
+ while ((nextRecord = br.readLine()) != null) {
+ b = nextRecord.replaceAll("\\s+", " ").getBytes();
+ if (wait >= 1 && recordCount % batchSize == 0) {
+ Thread.sleep(wait);
+ }
+ out.write(b);
+ out.write(newLineBytes);
+ recordCount++;
+ if (recordCount % 100000 == 0) {
+ System.err.println("send " + recordCount);
+ }
+ if (recordCount == maxCount) {
+ break;
+ }
+ }
+ System.err.println("send " + recordCount);
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (br != null) {
+ try {
+ br.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void stop() throws Exception {
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ try {
+ if (socket != null) {
+ socket.close();
+ }
+ } catch (IOException e) {
+ System.err.println("Problem in closing socket against host " + url
+ " on the port " + port);
+ e.printStackTrace();
+ }
+ }
+}
diff --git
a/asterix-common/src/test/java/org/apache/asterix/test/client/ITestClient.java
b/asterix-common/src/test/java/org/apache/asterix/test/client/ITestClient.java
new file mode 100644
index 0000000..56d626d
--- /dev/null
+++
b/asterix-common/src/test/java/org/apache/asterix/test/client/ITestClient.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.client;
+
+public interface ITestClient {
+
+ public void start() throws Exception;
+
+ public void stop() throws Exception;
+
+}
diff --git
a/asterix-common/src/test/java/org/apache/asterix/test/client/TestClientProvider.java
b/asterix-common/src/test/java/org/apache/asterix/test/client/TestClientProvider.java
new file mode 100644
index 0000000..d26351b
--- /dev/null
+++
b/asterix-common/src/test/java/org/apache/asterix/test/client/TestClientProvider.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.client;
+
+import java.util.Arrays;
+
+public class TestClientProvider {
+
+ public static ITestClient createTestClient(String[] args, int port) throws
Exception {
+ if (args.length < 1) {
+ throw new Exception("Unspecified test client");
+ }
+ String clientName = args[0];
+ String[] clientArgs = Arrays.copyOfRange(args, 1, args.length);
+ switch (clientName) {
+ case "file-client":
+ return new FileFeedSocketAdapterClient(port, clientArgs);
+ default:
+ throw new Exception("Unknown test client: " + clientName);
+ }
+ }
+
+}
diff --git
a/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java
b/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java
index f40cce4..ba32af2 100644
---
a/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java
+++
b/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java
@@ -26,10 +26,10 @@
import java.net.Socket;
public class FileTestServer implements ITestServer {
- private String[] paths;
- private final int port;
- private ServerSocket serverSocket;
- private Thread listenerThread;
+ protected String[] paths;
+ protected final int port;
+ protected ServerSocket serverSocket;
+ protected Thread listenerThread;
public FileTestServer(int port) {
this.port = port;
diff --git
a/asterix-common/src/test/java/org/apache/asterix/test/server/ITestServer.java
b/asterix-common/src/test/java/org/apache/asterix/test/server/ITestServer.java
index 18a4969..b3b1183 100644
---
a/asterix-common/src/test/java/org/apache/asterix/test/server/ITestServer.java
+++
b/asterix-common/src/test/java/org/apache/asterix/test/server/ITestServer.java
@@ -20,7 +20,7 @@
public interface ITestServer {
- public void configure(String[] args);
+ public void configure(String[] args) throws Exception;
public void start() throws Exception;
diff --git
a/asterix-common/src/test/java/org/apache/asterix/test/server/OpenSocketFileTestServer.java
b/asterix-common/src/test/java/org/apache/asterix/test/server/OpenSocketFileTestServer.java
new file mode 100644
index 0000000..d417cf6
--- /dev/null
+++
b/asterix-common/src/test/java/org/apache/asterix/test/server/OpenSocketFileTestServer.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.server;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+public class OpenSocketFileTestServer extends FileTestServer {
+
+ private boolean closed;
+
+ public OpenSocketFileTestServer(int port) {
+ super(port);
+ }
+
+ @Override
+ public void start() throws IOException {
+ serverSocket = new ServerSocket(port);
+ listenerThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ while (!serverSocket.isClosed()) {
+ try {
+ Socket socket = serverSocket.accept();
+ new Thread(new SocketThread(socket)).start();
+ } catch (IOException e) {
+ // Do nothing. This means the socket was closed for
some reason.
+ // There is nothing to do here except try to close the
socket and see if the
+ // server is still listening!
+ // This also could be due to the close() call
+ }
+ }
+ }
+ });
+ listenerThread.start();
+ }
+
+ private class SocketThread implements Runnable {
+ private Socket socket;
+ private OutputStream os;
+
+ public SocketThread(Socket socket) {
+ this.socket = socket;
+ }
+
+ @Override
+ public void run() {
+ try {
+ os = socket.getOutputStream();
+ byte[] chunk = new byte[1024];
+ for (String path : paths) {
+ try (FileInputStream fin = new FileInputStream(new
File(path))) {
+ int read = fin.read(chunk);
+ while (read > 0) {
+ os.write(chunk, 0, read);
+ read = fin.read(chunk);
+ }
+ }
+ }
+ } catch (Throwable th) {
+ } finally {
+ synchronized (serverSocket) {
+ if (!closed) {
+ try {
+ serverSocket.wait();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ try {
+ os.close();
+ } catch (Throwable th) {
+ th.printStackTrace();
+ }
+ try {
+ socket.close();
+ } catch (Throwable th) {
+ th.printStackTrace();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void stop() throws IOException, InterruptedException {
+ synchronized (serverSocket) {
+ closed = true;
+ try {
+ serverSocket.close();
+ if (listenerThread.isAlive()) {
+ listenerThread.join();
+ }
+ } finally {
+ serverSocket.notifyAll();
+ }
+ }
+ }
+}
diff --git
a/asterix-common/src/test/java/org/apache/asterix/test/server/TestClientServer.java
b/asterix-common/src/test/java/org/apache/asterix/test/server/TestClientServer.java
new file mode 100644
index 0000000..3312d1b
--- /dev/null
+++
b/asterix-common/src/test/java/org/apache/asterix/test/server/TestClientServer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.server;
+
+import org.apache.asterix.test.client.ITestClient;
+import org.apache.asterix.test.client.TestClientProvider;
+
+public class TestClientServer implements ITestServer {
+
+ // port of the server to connect to
+ private final int port;
+ private ITestClient client;
+
+ public TestClientServer(int port) {
+ this.port = port;
+ }
+
+ @Override
+ public void configure(String[] args) throws Exception {
+ client = TestClientProvider.createTestClient(args, port);
+ }
+
+ @Override
+ public void start() throws Exception {
+ client.start();
+ }
+
+ @Override
+ public void stop() throws Exception {
+ client.stop();
+ }
+
+}
diff --git
a/asterix-common/src/test/java/org/apache/asterix/test/server/TestServerProvider.java
b/asterix-common/src/test/java/org/apache/asterix/test/server/TestServerProvider.java
index 0be6800..ab8b005 100644
---
a/asterix-common/src/test/java/org/apache/asterix/test/server/TestServerProvider.java
+++
b/asterix-common/src/test/java/org/apache/asterix/test/server/TestServerProvider.java
@@ -26,6 +26,10 @@
return new FileTestServer(port);
case "rss":
return new RSSTestServer(port);
+ case "open-socket-file":
+ return new OpenSocketFileTestServer(port);
+ case "client":
+ return new TestClientServer(port);
default:
throw new Exception("Unknown test server");
}
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
index 1e86f39..f61ecbd 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
@@ -25,17 +25,25 @@
import java.util.Map;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.util.ExternalDataExceptionUtils;
import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public class SocketInputStream extends AInputStream {
private ServerSocket server;
private Socket socket;
private InputStream connectionStream;
+ private boolean closed;
public SocketInputStream(ServerSocket server) throws IOException {
this.server = server;
- socket = server.accept();
- connectionStream = socket.getInputStream();
+ socket = new Socket();
+ connectionStream = new InputStream() {
+ @Override
+ public int read() throws IOException {
+ return -1;
+ }
+ };
}
@Override
@@ -66,9 +74,15 @@
@Override
public int read(byte b[], int off, int len) throws IOException {
+ if (closed) {
+ return -1;
+ }
int read = connectionStream.read(b, off, len);
while (read < 0) {
- accept();
+ if (!accept()) {
+ closed = true;
+ return -1;
+ }
read = connectionStream.read(b, off, len);
}
return read;
@@ -86,21 +100,55 @@
@Override
public void close() throws IOException {
- connectionStream.close();
- socket.close();
- server.close();
+ HyracksDataException hde = null;
+ try {
+ if (connectionStream != null) {
+ connectionStream.close();
+ }
+ connectionStream = null;
+ } catch (IOException e) {
+ hde = new HyracksDataException(e);
+ }
+ try {
+ if (socket != null) {
+ socket.close();
+ }
+ socket = null;
+ } catch (IOException e) {
+ hde = ExternalDataExceptionUtils.suppress(hde, e);
+ }
+ try {
+ if (server != null) {
+ server.close();
+ }
+ server = null;
+ } catch (IOException e) {
+ hde = ExternalDataExceptionUtils.suppress(hde, e);
+ }
+ if (hde != null) {
+ throw hde;
+ }
}
- private void accept() throws IOException {
- connectionStream.close();
- socket.close();
- socket = server.accept();
- connectionStream = socket.getInputStream();
+ private boolean accept() throws IOException {
+ try {
+ connectionStream.close();
+ connectionStream = null;
+ socket.close();
+ socket = null;
+ socket = server.accept();
+ connectionStream = socket.getInputStream();
+ return true;
+ } catch (Exception e) {
+ close();
+ return false;
+ }
}
@Override
public boolean stop() throws Exception {
- return false;
+ close();
+ return true;
}
@Override
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
index e39b507..cd4a3c1 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
@@ -58,7 +58,6 @@
@Override
public AInputStream getInputStream() throws Exception {
- twitterServer.start();
return twitterServer;
}
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
index 7e28c35..d0348c2 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
@@ -149,7 +149,7 @@
private IngestionRuntime getIntakeRuntime(SubscribableFeedRuntimeId
subscribableRuntimeId) {
int waitCycleCount = 0;
ISubscribableRuntime ingestionRuntime =
subscriptionManager.getSubscribableRuntime(subscribableRuntimeId);
- while (ingestionRuntime == null && waitCycleCount < 10) {
+ while (ingestionRuntime == null && waitCycleCount < 1000) {
try {
Thread.sleep(3000);
waitCycleCount++;
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
index 3cb5d64..36c11e9 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
@@ -241,7 +241,8 @@
FeedRuntimeId runtimeId = null;
FeedRuntimeType subscribableRuntimeType = ((EndFeedMessage)
message).getSourceRuntimeType();
if (endFeedMessage.isCompleteDisconnection()) {
- // subscribableRuntimeType represents the location at which the
feed connection receives data
+ // subscribableRuntimeType represents the location at which the
feed connection receives
+ // data
FeedRuntimeType runtimeType = null;
switch (subscribableRuntimeType) {
case INTAKE:
@@ -257,15 +258,19 @@
runtimeId = new FeedRuntimeId(runtimeType, partition,
FeedRuntimeId.DEFAULT_OPERAND_ID);
CollectionRuntime feedRuntime = (CollectionRuntime)
feedManager.getFeedConnectionManager()
.getFeedRuntime(connectionId, runtimeId);
- feedRuntime.getSourceRuntime().unsubscribeFeed(feedRuntime);
+ if (feedRuntime != null) {
+ feedRuntime.getSourceRuntime().unsubscribeFeed(feedRuntime);
+ }
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Complete Unsubscription of " +
endFeedMessage.getFeedConnectionId());
}
} else {
- // subscribaleRuntimeType represents the location for data
hand-off in presence of subscribers
+ // subscribaleRuntimeType represents the location for data
hand-off in presence of
+ // subscribers
switch (subscribableRuntimeType) {
case INTAKE:
- // illegal state as data hand-off from one feed to another
does not happen at intake
+ // illegal state as data hand-off from one feed to another
does not happen at
+ // intake
throw new IllegalStateException("Illegal State, invalid
runtime type " + subscribableRuntimeType);
case COMPUTE:
// feed could be primary or secondary, doesn't matter
--
To view, visit https://asterix-gerrit.ics.uci.edu/660
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I8f6e982440d3577343f2479c3779653a9c3db614
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>