Yingyi Bu has submitted this change and it was merged. Change subject: Support IFrameWriter contract check. ......................................................................
Support IFrameWriter contract check. - add a instance-level flag for injecting operators to check IFrameWriter contract violations; - check contract violations in runtime tests. Change-Id: I9827b06f640858f27ec1bcca2a39991780bee3b1 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1618 Tested-by: Jenkins <[email protected]> BAD: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Yingyi Bu <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java A hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/EnforcePushRuntime.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntime.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java A hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorNodePushable.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java M hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java A hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorNodePushable.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java M hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java 44 files changed, 504 insertions(+), 202 deletions(-) Approvals: Yingyi Bu: Looks good to me, approved Jenkins: Verified; No violations found; Verified Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java index 9d01d63..ba98f73 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java @@ -25,7 +25,6 @@ import java.io.IOException; import java.net.Inet4Address; import java.util.ArrayList; -import java.util.EnumSet; import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; @@ -42,9 +41,6 @@ import org.apache.hyracks.api.application.INCApplication; import org.apache.hyracks.api.client.HyracksConnection; import org.apache.hyracks.api.client.IHyracksClientConnection; -import org.apache.hyracks.api.job.JobFlag; -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.common.config.ConfigManager; import org.apache.hyracks.control.common.controllers.CCConfig; @@ -131,6 +127,7 @@ ccConfig.setClusterListenPort(DEFAULT_HYRACKS_CC_CLUSTER_PORT); ccConfig.setResultTTL(120000L); ccConfig.setResultSweepThreshold(1000L); + ccConfig.setEnforceFrameWriterProtocol(true); configManager.set(ControllerConfig.Option.DEFAULT_DIR, joinPath(getDefaultStoragePath(), "asterixdb")); return ccConfig; } @@ -213,13 +210,6 @@ deleteTransactionLogs(); removeTestStorageFiles(); } - } - - public void runJob(JobSpecification spec) throws Exception { - GlobalConfig.ASTERIX_LOGGER.info(spec.toJSON().toString()); - JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME)); - GlobalConfig.ASTERIX_LOGGER.info(jobId.toString()); - hcc.waitForCompletion(jobId); } protected String getDefaultStoragePath() { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 80c05db..7ce9df6 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Date; +import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashSet; @@ -191,8 +192,10 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileSplit; import org.apache.hyracks.api.io.UnmanagedFileSplit; +import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.control.common.controllers.CCConfig; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; /* @@ -214,6 +217,7 @@ protected final IRewriterFactory rewriterFactory; protected final IStorageComponentProvider componentProvider; protected final ExecutorService executorService; + protected final EnumSet<JobFlag> jobFlags = EnumSet.noneOf(JobFlag.class); public QueryTranslator(ICcApplicationContext appCtx, List<Statement> statements, SessionOutput output, ILangCompilationProvider compliationProvider, IStorageComponentProvider componentProvider, @@ -228,6 +232,9 @@ rewriterFactory = compliationProvider.getRewriterFactory(); activeDataverse = MetadataBuiltinEntities.DEFAULT_DATAVERSE; this.executorService = executorService; + if (appCtx.getServiceContext().getAppConfig().getBoolean(CCConfig.Option.ENFORCE_FRAME_WRITER_PROTOCOL)) { + this.jobFlags.add(JobFlag.ENFORCE_CONTRACT); + } } public SessionOutput getSessionOutput() { @@ -621,7 +628,7 @@ progress.setValue(ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA); // #. runJob - JobUtils.runJob(hcc, jobSpec, true); + runJob(hcc, jobSpec); // #. begin new metadataTxn mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); @@ -653,7 +660,7 @@ JobSpecification jobSpec = DatasetUtil.dropDatasetJobSpec(dataset, metadataProvider); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; - JobUtils.runJob(hcc, jobSpec, true); + runJob(hcc, jobSpec); } catch (Exception e2) { e.addSuppressed(e2); if (bActiveTxn) { @@ -900,7 +907,7 @@ "Failed to create job spec for replicating Files Index For external dataset"); } filesIndexReplicated = true; - JobUtils.runJob(hcc, spec, true); + runJob(hcc, spec); } } @@ -937,7 +944,7 @@ progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA; // #. create the index artifact in NC. - JobUtils.runJob(hcc, spec, true); + runJob(hcc, spec); mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); bActiveTxn = true; @@ -948,7 +955,7 @@ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; - JobUtils.runJob(hcc, spec, true); + runJob(hcc, spec); // #. begin new metadataTxn mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); @@ -986,7 +993,7 @@ ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, ds); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; - JobUtils.runJob(hcc, jobSpec, true); + runJob(hcc, jobSpec); } catch (Exception e2) { e.addSuppressed(e2); if (bActiveTxn) { @@ -1005,7 +1012,7 @@ JobSpecification jobSpec = IndexUtil.buildDropIndexJobSpec(index, metadataProvider, ds); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; - JobUtils.runJob(hcc, jobSpec, true); + runJob(hcc, jobSpec); } catch (Exception e2) { e.addSuppressed(e2); if (bActiveTxn) { @@ -1189,7 +1196,7 @@ progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA; for (JobSpecification jobSpec : jobsToExecute) { - JobUtils.runJob(hcc, jobSpec, true); + runJob(hcc, jobSpec); } mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); @@ -1226,7 +1233,7 @@ // remove the all indexes in NC try { for (JobSpecification jobSpec : jobsToExecute) { - JobUtils.runJob(hcc, jobSpec, true); + runJob(hcc, jobSpec); } } catch (Exception e2) { // do no throw exception since still the metadata needs to be compensated. @@ -1405,7 +1412,7 @@ progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA; for (JobSpecification jobSpec : jobsToExecute) { - JobUtils.runJob(hcc, jobSpec, true); + runJob(hcc, jobSpec); } // #. begin a new transaction @@ -1466,7 +1473,7 @@ progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA; for (JobSpecification jobSpec : jobsToExecute) { - JobUtils.runJob(hcc, jobSpec, true); + runJob(hcc, jobSpec); } // #. begin a new transaction @@ -1496,7 +1503,7 @@ // remove the all indexes in NC try { for (JobSpecification jobSpec : jobsToExecute) { - JobUtils.runJob(hcc, jobSpec, true); + runJob(hcc, jobSpec); } } catch (Exception e2) { // do no throw exception since still the metadata needs to be compensated. @@ -1659,7 +1666,7 @@ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; if (spec != null) { - JobUtils.runJob(hcc, spec, true); + runJob(hcc, spec); } } catch (Exception e) { if (bActiveTxn) { @@ -1725,7 +1732,7 @@ if (jobSpec == null) { return jobSpec; } - JobUtils.runJob(hcc, jobSpec, true); + runJob(hcc, jobSpec); } finally { locker.unlock(); } @@ -1753,7 +1760,7 @@ bActiveTxn = false; if (jobSpec != null && !compileOnly) { - JobUtils.runJob(hcc, jobSpec, true); + runJob(hcc, jobSpec); } return jobSpec; } catch (Exception e) { @@ -1935,7 +1942,7 @@ } else { JobSpecification spec = FeedOperations.buildRemoveFeedStorageJob(metadataProvider, MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), feedId.getEntityName())); - JobUtils.runJob(hcc, spec, true); + runJob(hcc, spec); MetadataManager.INSTANCE.dropFeed(mdTxnCtx, dataverseName, feedName); } @@ -2022,6 +2029,9 @@ activeEventHandler.registerListener(listener); IActiveEventSubscriber eventSubscriber = listener.subscribe(ActivityState.STARTED); feedJob.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId); + + // TODO(Yingyi): currently we do not check IFrameWriter protocol violations for Feed jobs. + // We will need to design general exception handling mechanism for feeds. JobUtils.runJob(hcc, feedJob, Boolean.valueOf(metadataProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION))); eventSubscriber.sync(); @@ -2211,7 +2221,7 @@ // #. run the jobs for (JobSpecification jobSpec : jobsToExecute) { - JobUtils.runJob(hcc, jobSpec, true); + runJob(hcc, jobSpec); } } catch (Exception e) { if (bActiveTxn) { @@ -2300,14 +2310,14 @@ } break; case IMMEDIATE: - createAndRunJob(hcc, null, compiler, locker, resultDelivery, id -> { + createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> { final ResultReader resultReader = new ResultReader(hdc, id, resultSetId); ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats, metadataProvider.findOutputRecordType()); }, clientContextId, ctx); break; case DEFERRED: - createAndRunJob(hcc, null, compiler, locker, resultDelivery, id -> { + createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> { ResultUtil.printResultHandle(sessionOutput, new ResultHandle(id, resultSetId)); if (outMetadata != null) { outMetadata.getResultSets() @@ -2325,7 +2335,7 @@ ResultSetId resultSetId, MutableBoolean printed) { Mutable<JobId> jobId = new MutableObject<>(JobId.INVALID); try { - createAndRunJob(hcc, jobId, compiler, locker, resultDelivery, id -> { + createAndRunJob(hcc, jobFlags, jobId, compiler, locker, resultDelivery, id -> { final ResultHandle handle = new ResultHandle(id, resultSetId); ResultUtil.printStatus(sessionOutput, AbstractQueryApiServlet.ResultStatus.RUNNING); ResultUtil.printResultHandle(sessionOutput, handle); @@ -2353,16 +2363,20 @@ } } - private static void createAndRunJob(IHyracksClientConnection hcc, Mutable<JobId> jId, IStatementCompiler compiler, - IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer, String clientContextId, - IStatementExecutorContext ctx) throws Exception { + private void runJob(IHyracksClientConnection hcc, JobSpecification jobSpec) throws Exception { + JobUtils.runJob(hcc, jobSpec, jobFlags, true); + } + + private static void createAndRunJob(IHyracksClientConnection hcc, EnumSet<JobFlag> jobFlags, Mutable<JobId> jId, + IStatementCompiler compiler, IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer, + String clientContextId, IStatementExecutorContext ctx) throws Exception { locker.lock(); try { final JobSpecification jobSpec = compiler.compile(); if (jobSpec == null) { return; } - final JobId jobId = JobUtils.runJob(hcc, jobSpec, false); + final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false); if (ctx != null && clientContextId != null) { ctx.put(clientContextId, jobId); // Adds the running job into the context. } @@ -2507,14 +2521,14 @@ transactionState = TransactionState.BEGIN; // run the files update job - JobUtils.runJob(hcc, spec, true); + runJob(hcc, spec); for (Index index : indexes) { if (!ExternalIndexingOperations.isFileIndex(index)) { spec = ExternalIndexingOperations.buildIndexUpdateOp(ds, index, metadataFiles, addedFiles, appendedFiles, metadataProvider); // run the files update job - JobUtils.runJob(hcc, spec, true); + runJob(hcc, spec); } } @@ -2533,7 +2547,7 @@ bActiveTxn = false; transactionState = TransactionState.READY_TO_COMMIT; // We don't release the latch since this job is expected to be quick - JobUtils.runJob(hcc, spec, true); + runJob(hcc, spec); // Start a new metadata transaction to record the final state of the transaction mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); @@ -2602,7 +2616,7 @@ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; try { - JobUtils.runJob(hcc, spec, true); + runJob(hcc, spec); } catch (Exception e2) { // This should never happen -- fix throw illegal e.addSuppressed(e2); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java index f99793a..7d4b41d 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java @@ -205,7 +205,7 @@ NoOpOperationCallbackFactory.INSTANCE, filterFields, filterFields, false); BTreeSearchOperatorNodePushable searchOp = searchOpDesc.createPushRuntime(ctx, primaryIndexInfo.getSearchRecordDescriptorProvider(), PARTITION, 1); - emptyTupleOp.setFrameWriter(0, searchOp, + emptyTupleOp.setOutputFrameWriter(0, searchOp, primaryIndexInfo.getSearchRecordDescriptorProvider().getInputRecordDescriptor(null, 0)); searchOp.setOutputFrameWriter(0, countOp, primaryIndexInfo.rDesc); return emptyTupleOp; diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java index 5e3da5f..b80f8f8 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java @@ -40,6 +40,9 @@ import org.apache.asterix.runtime.utils.CcApplicationContext; import org.apache.asterix.translator.IStatementExecutor; import org.apache.asterix.translator.SessionOutput; +import org.apache.hyracks.api.application.ICCServiceContext; +import org.apache.hyracks.api.config.IApplicationConfig; +import org.apache.hyracks.control.common.controllers.CCConfig; import org.junit.Assert; import org.junit.Test; @@ -59,6 +62,11 @@ ExternalProperties mockAsterixExternalProperties = mock(ExternalProperties.class); when(mockAsterixAppContextInfo.getExternalProperties()).thenReturn(mockAsterixExternalProperties); when(mockAsterixExternalProperties.getAPIServerPort()).thenReturn(19002); + ICCServiceContext mockServiceContext = mock(ICCServiceContext.class); + when(mockAsterixAppContextInfo.getServiceContext()).thenReturn(mockServiceContext); + IApplicationConfig mockApplicationConfig = mock(IApplicationConfig.class); + when(mockServiceContext.getAppConfig()).thenReturn(mockApplicationConfig); + when(mockApplicationConfig.getBoolean(CCConfig.Option.ENFORCE_FRAME_WRITER_PROTOCOL)).thenReturn(true); // Mocks AsterixClusterProperties. Cluster mockCluster = mock(Cluster.class); diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java index 41f9e67..cacbfbc 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java @@ -19,7 +19,10 @@ package org.apache.asterix.common.utils; +import java.util.EnumSet; + import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; @@ -32,8 +35,13 @@ public static JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, boolean waitForCompletion) throws Exception { + return runJob(hcc, spec, EnumSet.noneOf(JobFlag.class), waitForCompletion); + } + + public static JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, EnumSet<JobFlag> jobFlags, + boolean waitForCompletion) throws Exception { spec.setMaxReattempts(0); - final JobId jobId = hcc.startJob(spec); + final JobId jobId = hcc.startJob(spec, jobFlags); if (waitForCompletion) { hcc.waitForCompletion(jobId); } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java index 71acecf..d0b7b47 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java @@ -21,28 +21,20 @@ import java.util.ArrayList; import java.util.List; -import org.apache.commons.lang3.mutable.Mutable; - import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; -import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan; import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty; import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; -import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; -import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; import org.apache.hyracks.dataflow.std.misc.SinkOperatorDescriptor; -import org.apache.hyracks.dataflow.std.union.UnionAllOperatorDescriptor; +import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; public class SinkPOperator extends AbstractPhysicalOperator { diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/EnforcePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/EnforcePushRuntime.java new file mode 100644 index 0000000..26002e6 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/EnforcePushRuntime.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.algebricks.runtime.base; + +import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime; +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.dataflow.EnforceFrameWriter; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; + +public class EnforcePushRuntime extends EnforceFrameWriter implements IPushRuntime { + + private final IPushRuntime pushRuntime; + + private EnforcePushRuntime(IPushRuntime pushRuntime) { + super(pushRuntime); + this.pushRuntime = pushRuntime; + } + + @Override + public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) { + pushRuntime.setOutputFrameWriter(index, writer, recordDesc); + } + + @Override + public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) { + pushRuntime.setInputRecordDescriptor(index, recordDescriptor); + } + + public static IPushRuntime enforce(IPushRuntime pushRuntime) { + return pushRuntime instanceof EnforcePushRuntime || pushRuntime instanceof NestedTupleSourceRuntime + ? pushRuntime : new EnforcePushRuntime(pushRuntime); + } +} diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntime.java index 27d6900..de00697 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntime.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntime.java @@ -22,7 +22,26 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor; public interface IPushRuntime extends IFrameWriter { - public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc); - public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor); + /** + * Sets the output frame writer for this writer. + * + * @param index, + * the index of the output channel. + * @param writer, + * the writer for writing output. + * @param recordDesc, + * the output record descriptor. + */ + void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc); + + /** + * Sets the input record descriptor for this writer. + * + * @param index, + * the index of the input channel. + * @param recordDescriptor, + * the corresponding input record descriptor. + */ + void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor); } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java index 42e5157..e99b61b 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java @@ -124,6 +124,7 @@ @Override public void fail() throws HyracksDataException { + failed = true; if (isOpen) { writer.fail(); } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java index b397f23..893aa61 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline; +import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime; import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime; @@ -29,6 +30,7 @@ import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory; @@ -52,7 +54,6 @@ @Override public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc, RecordDescriptor outRecordDescriptor, int[] keys, int[] partialKeys) throws HyracksDataException { - final AggregatorOutput outputWriter = new AggregatorOutput(subplans, keyFieldIdx.length, decorFieldIdx.length); final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length]; for (int i = 0; i < subplans.length; i++) { @@ -91,8 +92,8 @@ } @Override - public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor stateAccessor, int tIndex, - AggregateState state) throws HyracksDataException { + public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor stateAccessor, + int tIndex, AggregateState state) throws HyracksDataException { for (int i = 0; i < pipelines.length; i++) { outputWriter.setInputIdx(i); pipelines[i].close(); @@ -144,9 +145,13 @@ IFrameWriter start = writer; IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories(); RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors(); + // should enforce protocol + boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT); for (int i = runtimeFactories.length - 1; i >= 0; i--) { IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx); - newRuntime.setFrameWriter(0, start, recordDescriptors[i]); + newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : newRuntime; + start = enforce ? EnforcePushRuntime.enforce(start) : start; + newRuntime.setOutputFrameWriter(0, start, recordDescriptors[i]); if (i > 0) { newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 1]); } else { diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java index 52245e1..8b8e320 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline; +import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime; import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime; @@ -28,8 +29,10 @@ import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.EnforceFrameWriter; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; @@ -58,11 +61,14 @@ public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults, final IFrameWriter writer) throws HyracksDataException { - final RunningAggregatorOutput outputWriter = new RunningAggregatorOutput(ctx, subplans, keyFieldIdx.length, - decorFieldIdx.length, writer); + final RunningAggregatorOutput outputWriter = + new RunningAggregatorOutput(ctx, subplans, keyFieldIdx.length, decorFieldIdx.length, writer); + // should enforce protocol + boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT); + IFrameWriter enforcedWriter = enforce ? EnforceFrameWriter.enforce(outputWriter) : outputWriter; final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length]; for (int i = 0; i < subplans.length; i++) { - pipelines[i] = (NestedTupleSourceRuntime) assemblePipeline(subplans[i], outputWriter, ctx); + pipelines[i] = (NestedTupleSourceRuntime) assemblePipeline(subplans[i], enforcedWriter, ctx); } final ArrayTupleBuilder gbyTb = outputWriter.getGroupByTupleBuilder(); @@ -136,13 +142,17 @@ private IFrameWriter assemblePipeline(AlgebricksPipeline subplan, IFrameWriter writer, IHyracksTaskContext ctx) throws HyracksDataException { + // should enforce protocol + boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT); // plug the operators IFrameWriter start = writer; IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories(); RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors(); for (int i = runtimeFactories.length - 1; i >= 0; i--) { IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx); - newRuntime.setFrameWriter(0, start, recordDescriptors[i]); + newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : newRuntime; + start = enforce ? EnforceFrameWriter.enforce(start) : start; + newRuntime.setOutputFrameWriter(0, start, recordDescriptors[i]); if (i > 0) { newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 1]); } else { @@ -206,8 +216,9 @@ int start = 0; int offset = 0; for (int i = 0; i < fieldEnds.length; i++) { - if (i > 0) + if (i > 0) { start = fieldEnds[i - 1]; + } offset = fieldEnds[i] - start; tb.addField(data, start, offset); } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java index 33e7c73..5cced8d 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java @@ -29,7 +29,7 @@ protected boolean failed; @Override - public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) { + public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) { this.writer = writer; this.outputRecordDesc = recordDesc; } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java index e430461..d47199d 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java @@ -26,7 +26,7 @@ protected RecordDescriptor inputRecordDesc; @Override - public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) { + public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) { throw new IllegalStateException(); } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java index b7707d4..35563e0 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java @@ -27,20 +27,29 @@ @Override public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + // nextFrame will never be called on this runtime throw new UnsupportedOperationException(); } @Override public void close() throws HyracksDataException { + // close is a no op since this operator completes operating in open() } @Override public void fail() throws HyracksDataException { - writer.fail(); + // fail is a no op since if a failure happened, the operator would've already called fail() on downstream + } + + @Override + public void flush() throws HyracksDataException { + // flush will never be called on this runtime + throw new UnsupportedOperationException(); } @Override public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) { + // setInputRecordDescriptor will never be called on this runtime since it has no input throw new UnsupportedOperationException(); } } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java index 1294614..f6ebf19 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java @@ -112,6 +112,7 @@ return new AbstractUnaryInputUnaryOutputOperatorNodePushable() { private IFrameWriter startOfPipeline; + private boolean opened = false; @Override public void open() throws HyracksDataException { @@ -124,6 +125,7 @@ pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor); startOfPipeline = pa.assemblePipeline(writer, ctx); } + opened = true; startOfPipeline.open(); } @@ -134,12 +136,16 @@ @Override public void close() throws HyracksDataException { - startOfPipeline.close(); + if (opened) { + startOfPipeline.close(); + } } @Override public void fail() throws HyracksDataException { - startOfPipeline.fail(); + if (opened) { + startOfPipeline.fail(); + } } @Override diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java index 03e2aaf..e1081e0 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java @@ -19,11 +19,14 @@ package org.apache.hyracks.algebricks.runtime.operators.meta; import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline; +import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime; import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.EnforceFrameWriter; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobFlag; public class PipelineAssembler { @@ -44,18 +47,21 @@ this.outputArity = outputArity; } - public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx) throws - HyracksDataException { + public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx) throws HyracksDataException { + // should enforce protocol + boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT); // plug the operators IFrameWriter start = writer;// this.writer; for (int i = pipeline.getRuntimeFactories().length - 1; i >= 0; i--) { IPushRuntime newRuntime = pipeline.getRuntimeFactories()[i].createPushRuntime(ctx); + newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : newRuntime; + start = enforce ? EnforceFrameWriter.enforce(start) : start; if (i == pipeline.getRuntimeFactories().length - 1) { if (outputArity == 1) { - newRuntime.setFrameWriter(0, start, pipelineOutputRecordDescriptor); + newRuntime.setOutputFrameWriter(0, start, pipelineOutputRecordDescriptor); } } else { - newRuntime.setFrameWriter(0, start, pipeline.getRecordDescriptors()[i]); + newRuntime.setOutputFrameWriter(0, start, pipeline.getRecordDescriptors()[i]); } if (i > 0) { newRuntime.setInputRecordDescriptor(0, pipeline.getRecordDescriptors()[i - 1]); diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java index 3e30f73..925ff93 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java @@ -79,11 +79,6 @@ } @Override - public void fail() throws HyracksDataException { - writer.fail(); - } - - @Override public void close() throws HyracksDataException { try { frameSorter.sort(); diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java index 2b7c2da..3ccceed 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java @@ -56,6 +56,11 @@ } @Override + public void fail() throws HyracksDataException { + writer.fail(); + } + + @Override public void close() throws HyracksDataException { writer.close(); } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java index e123adf..496679f 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java @@ -65,11 +65,6 @@ } @Override - public void fail() throws HyracksDataException { - writer.fail(); - } - - @Override public void flush() throws HyracksDataException { writer.flush(); } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java index 38fe7d1..33b7725 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java @@ -118,7 +118,7 @@ @Override public void fail() throws HyracksDataException { if (isOpen) { - super.fail(); + writer.fail(); } } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java index ebf3d3a..55146e2 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java @@ -81,6 +81,7 @@ @Override public void fail() throws HyracksDataException { + // fail() is a no op. in close we will cleanup } @Override diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java index 0f57fd7..171544d 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java @@ -85,7 +85,6 @@ private IScalarEvaluator eval; private IMissingWriter missingWriter = null; private ArrayTupleBuilder missingTupleBuilder = null; - private boolean isOpen = false; @Override public void open() throws HyracksDataException { @@ -93,7 +92,6 @@ initAccessAppendFieldRef(ctx); eval = cond.createScalarEvaluator(ctx); } - isOpen = true; writer.open(); //prepare nullTupleBuilder @@ -107,20 +105,11 @@ } @Override - public void fail() throws HyracksDataException { - if (isOpen) { - super.fail(); - } - } - - @Override public void close() throws HyracksDataException { - if (isOpen) { - try { - flushIfNotFailed(); - } finally { - writer.close(); - } + try { + flushIfNotFailed(); + } finally { + writer.close(); } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java index aa9232e..e2868ae 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java @@ -177,7 +177,7 @@ } public StartJobFunction(JobId jobId) { - this(null, null, null, jobId); + this(null, null, EnumSet.noneOf(JobFlag.class), jobId); } public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) { diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java index ad54110..75cbf61 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java @@ -102,14 +102,14 @@ @Override public JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception { - JobSpecificationActivityClusterGraphGeneratorFactory jsacggf = new JobSpecificationActivityClusterGraphGeneratorFactory( - jobSpec); + IActivityClusterGraphGeneratorFactory jsacggf = + new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec); return startJob(jsacggf, jobFlags); } @Override public JobId distributeJob(JobSpecification jobSpec) throws Exception { - JobSpecificationActivityClusterGraphGeneratorFactory jsacggf = + IActivityClusterGraphGeneratorFactory jsacggf = new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec); return distributeJob(jsacggf); } @@ -212,15 +212,14 @@ @Override public JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception { - JobSpecificationActivityClusterGraphGeneratorFactory jsacggf = new JobSpecificationActivityClusterGraphGeneratorFactory( - jobSpec); + IActivityClusterGraphGeneratorFactory jsacggf = + new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec); return startJob(deploymentId, jsacggf, jobFlags); } @Override public JobId startJob(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf, - EnumSet<JobFlag> jobFlags) - throws Exception { + EnumSet<JobFlag> jobFlags) throws Exception { return hci.startJob(deploymentId, JavaSerializationUtils.serialize(acggf), jobFlags); } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java index c8e4cf8..df693b2 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java @@ -19,6 +19,7 @@ package org.apache.hyracks.api.context; import java.io.Serializable; +import java.util.Set; import java.util.concurrent.ExecutorService; import org.apache.hyracks.api.dataflow.TaskAttemptId; @@ -26,6 +27,7 @@ import org.apache.hyracks.api.deployment.DeploymentId; import org.apache.hyracks.api.io.IWorkspaceFileFactory; import org.apache.hyracks.api.job.IOperatorEnvironment; +import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.profiling.counters.ICounterContext; import org.apache.hyracks.api.resources.IDeallocatableRegistry; @@ -48,4 +50,6 @@ void setSharedObject(Object object); Object getSharedObject(); + + Set<JobFlag> getJobFlags(); } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java new file mode 100644 index 0000000..bf54e01 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.dataflow; + +import java.nio.ByteBuffer; + +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class EnforceFrameWriter implements IFrameWriter { + + // The downstream data consumer of this writer. + private final IFrameWriter writer; + + // A flag that indicates whether the data consumer of this writer has failed. + private boolean downstreamFailed = false; + + // A flag that indicates whether the the data producer of this writer has called fail() for this writer. + // There could be two cases: + // CASE 1: the downstream of this writer fails and the exception is propagated to the source operator, which + // cascades to the fail() of this writer; + // CASE 2: the failure happens in the upstream of this writer and the source operator cascades to the fail() + // of this writer. + private boolean failCalledByUpstream = false; + + // A flag that indicates whether the downstream of this writer is open. + private boolean downstreamOpen = false; + + protected EnforceFrameWriter(IFrameWriter writer) { + this.writer = writer; + } + + @Override + public final void open() throws HyracksDataException { + try { + if (downstreamOpen) { + throw HyracksDataException.create(ErrorCode.OPEN_ON_OPEN_WRITER); + } + if (downstreamFailed || failCalledByUpstream) { + throw HyracksDataException.create(ErrorCode.OPEN_ON_FAILED_WRITER); + } + writer.open(); + downstreamOpen = true; + } catch (Throwable th) { + downstreamFailed = true; + throw th; + } + } + + @Override + public final void nextFrame(ByteBuffer buffer) throws HyracksDataException { + if (!downstreamOpen) { + throw HyracksDataException.create(ErrorCode.NEXT_FRAME_ON_CLOSED_WRITER); + } + if (downstreamFailed || failCalledByUpstream) { + throw HyracksDataException.create(ErrorCode.NEXT_FRAME_ON_FAILED_WRITER); + } + try { + writer.nextFrame(buffer); + } catch (Throwable th) { + downstreamFailed = true; + throw th; + } + } + + @Override + public final void flush() throws HyracksDataException { + if (!downstreamOpen) { + throw HyracksDataException.create(ErrorCode.FLUSH_ON_CLOSED_WRITER); + } + if (downstreamFailed || failCalledByUpstream) { + throw HyracksDataException.create(ErrorCode.FLUSH_ON_FAILED_WRITER); + } + try { + writer.flush(); + } catch (Throwable th) { + downstreamFailed = true; + throw th; + } + } + + @Override + public final void fail() throws HyracksDataException { + writer.fail(); + if (failCalledByUpstream) { + throw HyracksDataException.create(ErrorCode.FAIL_ON_FAILED_WRITER); + } + failCalledByUpstream = true; + } + + @Override + public void close() throws HyracksDataException { + writer.close(); + downstreamOpen = false; + if (downstreamFailed && !failCalledByUpstream) { + throw HyracksDataException.create(ErrorCode.MISSED_FAIL_CALL); + } + } + + public static IFrameWriter enforce(IFrameWriter writer) { + return writer instanceof EnforceFrameWriter ? writer : new EnforceFrameWriter(writer); + } +} \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorNodePushable.java index f6c201e..82433e8 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorNodePushable.java @@ -23,16 +23,15 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; public interface IOperatorNodePushable { - public void initialize() throws HyracksDataException; + void initialize() throws HyracksDataException; - public void deinitialize() throws HyracksDataException; + void deinitialize() throws HyracksDataException; - public int getInputArity(); + int getInputArity(); - public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) - throws HyracksDataException; + void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) throws HyracksDataException; - public IFrameWriter getInputFrameWriter(int index); + IFrameWriter getInputFrameWriter(int index); - public String getDisplayName(); + String getDisplayName(); } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java index b52a6a5..8f36fcd 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java @@ -92,6 +92,14 @@ public static final int DISK_COMPONENT_SCAN_NOT_ALLOWED_FOR_SECONDARY_INDEX = 56; public static final int CANNOT_FIND_MATTER_TUPLE_FOR_ANTI_MATTER_TUPLE = 57; public static final int TASK_ABORTED = 58; + public static final int OPEN_ON_OPEN_WRITER = 59; + public static final int OPEN_ON_FAILED_WRITER = 60; + public static final int NEXT_FRAME_ON_FAILED_WRITER = 61; + public static final int NEXT_FRAME_ON_CLOSED_WRITER = 62; + public static final int FLUSH_ON_FAILED_WRITER = 63; + public static final int FLUSH_ON_CLOSED_WRITER = 64; + public static final int FAIL_ON_FAILED_WRITER = 65; + public static final int MISSED_FAIL_CALL = 66; // Compilation error codes. public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000; diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java index a33c6c9..7225cd4 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java @@ -19,5 +19,6 @@ package org.apache.hyracks.api.job; public enum JobFlag { - PROFILE_RUNTIME + PROFILE_RUNTIME, + ENFORCE_CONTRACT } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java index 314bf8b..7fdf106 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java @@ -35,12 +35,14 @@ import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.ActivityId; +import org.apache.hyracks.api.dataflow.EnforceFrameWriter; import org.apache.hyracks.api.dataflow.IActivity; import org.apache.hyracks.api.dataflow.IConnectorDescriptor; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobFlag; /** * The runtime of a SuperActivity, which internally executes a DAG of one-to-one @@ -90,18 +92,18 @@ private void init() throws HyracksDataException { Queue<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> childQueue = new LinkedList<>(); List<IConnectorDescriptor> outputConnectors; - + final boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT); /* * Set up the source operators */ for (Entry<ActivityId, IActivity> entry : startActivities.entrySet()) { - IOperatorNodePushable opPushable = entry.getValue().createPushRuntime(ctx, recordDescProvider, partition, - nPartitions); + IOperatorNodePushable opPushable = + entry.getValue().createPushRuntime(ctx, recordDescProvider, partition, nPartitions); operatorNodePushablesBFSOrder.add(opPushable); operatorNodePushables.put(entry.getKey(), opPushable); inputArity += opPushable.getInputArity(); - outputConnectors = MapUtils.getObject(parent.getActivityOutputMap(), entry.getKey(), - Collections.emptyList()); + outputConnectors = + MapUtils.getObject(parent.getActivityOutputMap(), entry.getKey(), Collections.emptyList()); for (IConnectorDescriptor conn : outputConnectors) { childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId())); } @@ -131,7 +133,9 @@ /* * construct the dataflow connection from a producer to a consumer */ - sourceOp.setOutputFrameWriter(outputChannel, destOp.getInputFrameWriter(inputChannel), + IFrameWriter writer = destOp.getInputFrameWriter(inputChannel); + writer = enforce ? EnforceFrameWriter.enforce(writer) : writer; + sourceOp.setOutputFrameWriter(outputChannel, writer, recordDescProvider.getInputRecordDescriptor(destId, inputChannel)); /* diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties index 35a2fc5..4bf069c 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties @@ -75,6 +75,13 @@ 56 = LSM disk component scan is not allowed for a secondary index 57 = Couldn't find the matter tuple for anti-matter tuple in the primary index 58 = Task %1$s was aborted +59 = Data pipeline protocol violation: open() is called on a opened writer +60 = Data pipeline protocol violation: open() is called on a failed writer +61 = Data pipeline protocol violation: nextFrame() is called on a failed writer +62 = Data pipeline protocol violation: nextFrame() is called on a closed writer +63 = Data pipeline protocol violation: flush() is called on a failed writer +64 = Data pipeline protocol violation: flush() is called on a closed writer +65 = Data pipeline protocol violation: fail() is called twice on a writer +66 = Data pipeline protocol violation: fail() is not called by the upstream when there is a failure in the downstream -# 10000 ---- 19999: compilation errors 10000 = The given rule collection %1$s is not an instance of the List class. diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java index 55a7a82..95a6d9b 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java @@ -20,7 +20,6 @@ import java.io.PrintWriter; import java.io.StringWriter; -import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -116,10 +115,10 @@ } //Run a Pre-distributed job by passing the JobId - public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId, + public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId, Set<JobFlag> jobFlags, PreDistributedJobDescriptor distributedJobDescriptor) throws HyracksException { - this(deploymentId, jobId, EnumSet.noneOf(JobFlag.class), + this(deploymentId, jobId, jobFlags, distributedJobDescriptor.getJobSpecification(), distributedJobDescriptor.getActivityClusterGraph()); Set<Constraint> constaints = distributedJobDescriptor.getActivityClusterGraphConstraints(); this.scheduler = new JobExecutor(ccs, this, constaints, true); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java index 2dbb631..e083d2a 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java @@ -68,7 +68,7 @@ run = new JobRun(ccs, deploymentId, jobId, acggf, acgg, jobFlags); } else { //ActivityClusterGraph has already been distributed - run = new JobRun(ccs, deploymentId, jobId, + run = new JobRun(ccs, deploymentId, jobId, jobFlags, ccs.getPreDistributedJobStore().getDistributedJobDescriptor(jobId)); } jobManager.add(run); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java index f83df3a..cbc6146 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java @@ -18,6 +18,7 @@ */ package org.apache.hyracks.control.common.controllers; +import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN; import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER; import static org.apache.hyracks.control.common.config.OptionTypes.LONG; import static org.apache.hyracks.control.common.config.OptionTypes.STRING; @@ -64,7 +65,8 @@ CLUSTER_TOPOLOGY(STRING), JOB_QUEUE_CLASS(STRING, "org.apache.hyracks.control.cc.scheduler.FIFOJobQueue"), JOB_QUEUE_CAPACITY(INTEGER, 4096), - JOB_MANAGER_CLASS(STRING, "org.apache.hyracks.control.cc.job.JobManager"); + JOB_MANAGER_CLASS(STRING, "org.apache.hyracks.control.cc.job.JobManager"), + ENFORCE_FRAME_WRITER_PROTOCOL(BOOLEAN, false); private final IOptionType parser; private Object defaultValue; @@ -156,6 +158,9 @@ return "The maximum number of jobs to queue before rejecting new jobs"; case JOB_MANAGER_CLASS: return "Specify the implementation class name for the job manager"; + case ENFORCE_FRAME_WRITER_PROTOCOL: + return "A flag indicating if runtime should enforce frame writer protocol and detect " + + "bad behaving operators"; default: throw new IllegalStateException("NYI: " + this); } @@ -357,4 +362,12 @@ public int getJobQueueCapacity() { return getAppConfig().getInt(Option.JOB_QUEUE_CAPACITY); } + + public boolean getEnforceFrameWriterProtocol() { + return getAppConfig().getBoolean(Option.ENFORCE_FRAME_WRITER_PROTOCOL); + } + + public void setEnforceFrameWriterProtocol(boolean enforce) { + configManager.set(Option.ENFORCE_FRAME_WRITER_PROTOCOL, enforce); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java index 42726a7..0fc3be7 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java @@ -64,8 +64,10 @@ MESSAGING_PUBLIC_ADDRESS(STRING, PUBLIC_ADDRESS), MESSAGING_PUBLIC_PORT(INTEGER, MESSAGING_LISTEN_PORT), CLUSTER_CONNECT_RETRIES(INTEGER, 5), - IODEVICES(STRING_ARRAY, appConfig -> new String[] { - FileUtil.joinPath(appConfig.getString(ControllerConfig.Option.DEFAULT_DIR), "iodevice") }, + IODEVICES( + STRING_ARRAY, + appConfig -> new String[] { + FileUtil.joinPath(appConfig.getString(ControllerConfig.Option.DEFAULT_DIR), "iodevice") }, "<value of " + ControllerConfig.Option.DEFAULT_DIR.cmdline() + ">/iodevice"), NET_THREAD_COUNT(INTEGER, 1), NET_BUFFER_COUNT(INTEGER, 1), @@ -95,7 +97,7 @@ } <T> Option(IOptionType<T> parser, Function<IApplicationConfig, T> defaultValue, - String defaultValueDescription) { + String defaultValueDescription) { this.parser = parser; this.defaultValue = defaultValue; this.defaultValueDescription = defaultValueDescription; @@ -246,6 +248,7 @@ return configManager; } + @Override public IApplicationConfig getAppConfig() { return appConfig; } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java index 04d48f3..d689bc0 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java @@ -52,6 +52,7 @@ import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.api.io.IWorkspaceFileFactory; import org.apache.hyracks.api.job.IOperatorEnvironment; +import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.profiling.counters.ICounter; import org.apache.hyracks.api.job.profiling.counters.ICounterContext; import org.apache.hyracks.api.partitions.PartitionId; @@ -104,9 +105,13 @@ private Object sharedObject; - public Task(Joblet joblet, TaskAttemptId taskId, String displayName, ExecutorService executor, - NodeControllerService ncs, List<List<PartitionChannel>> inputChannelsFromConnectors) { + private final Set<JobFlag> jobFlags; + + public Task(Joblet joblet, Set<JobFlag> jobFlags, TaskAttemptId taskId, String displayName, + ExecutorService executor, NodeControllerService ncs, + List<List<PartitionChannel>> inputChannelsFromConnectors) { this.joblet = joblet; + this.jobFlags = jobFlags; this.taskAttemptId = taskId; this.displayName = displayName; this.executorService = executor; @@ -426,4 +431,9 @@ public Object getSharedObject() { return sharedObject; } + + @Override + public Set<JobFlag> getJobFlags() { + return jobFlags; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java index 02e8051..95f3e83 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java @@ -37,6 +37,7 @@ import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.ActivityId; import org.apache.hyracks.api.dataflow.ConnectorDescriptorId; +import org.apache.hyracks.api.dataflow.EnforceFrameWriter; import org.apache.hyracks.api.dataflow.IActivity; import org.apache.hyracks.api.dataflow.IConnectorDescriptor; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; @@ -131,12 +132,10 @@ } final int partition = tid.getPartition(); List<IConnectorDescriptor> inputs = ac.getActivityInputMap().get(aid); - task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutor(), ncs, + task = new Task(joblet, flags, taId, han.getClass().getName(), ncs.getExecutor(), ncs, createInputChannels(td, inputs)); IOperatorNodePushable operator = han.createPushRuntime(task, rdp, partition, td.getPartitionCount()); - List<IPartitionCollector> collectors = new ArrayList<>(); - if (inputs != null) { for (int i = 0; i < inputs.size(); ++i) { IConnectorDescriptor conn = inputs.get(i); @@ -145,26 +144,28 @@ LOGGER.info("input: " + i + ": " + conn.getConnectorId()); } RecordDescriptor recordDesc = ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId()); - IPartitionCollector collector = createPartitionCollector(td, partition, task, i, conn, - recordDesc, cPolicy); + IPartitionCollector collector = + createPartitionCollector(td, partition, task, i, conn, recordDesc, cPolicy); collectors.add(collector); } } List<IConnectorDescriptor> outputs = ac.getActivityOutputMap().get(aid); if (outputs != null) { + final boolean enforce = flags.contains(JobFlag.ENFORCE_CONTRACT); for (int i = 0; i < outputs.size(); ++i) { final IConnectorDescriptor conn = outputs.get(i); RecordDescriptor recordDesc = ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId()); IConnectorPolicy cPolicy = connectorPoliciesMap.get(conn.getConnectorId()); - IPartitionWriterFactory pwFactory = createPartitionWriterFactory(task, cPolicy, jobId, conn, - partition, taId, flags); + IPartitionWriterFactory pwFactory = + createPartitionWriterFactory(task, cPolicy, jobId, conn, partition, taId, flags); if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("output: " + i + ": " + conn.getConnectorId()); } - IFrameWriter writer = conn.createPartitioner(task, recordDesc, pwFactory, partition, td - .getPartitionCount(), td.getOutputPartitionCounts()[i]); + IFrameWriter writer = conn.createPartitioner(task, recordDesc, pwFactory, partition, + td.getPartitionCount(), td.getOutputPartitionCounts()[i]); + writer = enforce ? EnforceFrameWriter.enforce(writer) : writer; operator.setOutputFrameWriter(i, writer, recordDesc); } } @@ -203,11 +204,11 @@ private IPartitionCollector createPartitionCollector(TaskAttemptDescriptor td, final int partition, Task task, int i, IConnectorDescriptor conn, RecordDescriptor recordDesc, IConnectorPolicy cPolicy) throws HyracksDataException { - IPartitionCollector collector = conn.createPartitionCollector(task, recordDesc, partition, td - .getInputPartitionCounts()[i], td.getPartitionCount()); + IPartitionCollector collector = conn.createPartitionCollector(task, recordDesc, partition, + td.getInputPartitionCounts()[i], td.getPartitionCount()); if (cPolicy.materializeOnReceiveSide()) { - return new ReceiveSideMaterializingCollector(task, ncs.getPartitionManager(), collector, task - .getTaskAttemptId(), ncs.getExecutor()); + return new ReceiveSideMaterializingCollector(task, ncs.getPartitionManager(), collector, + task.getTaskAttemptId(), ncs.getExecutor()); } else { return collector; } @@ -222,8 +223,9 @@ factory = new IPartitionWriterFactory() { @Override public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException { - return new MaterializedPartitionWriter(ctx, ncs.getPartitionManager(), new PartitionId(jobId, - conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs.getExecutor()); + return new MaterializedPartitionWriter(ctx, ncs.getPartitionManager(), + new PartitionId(jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId, + ncs.getExecutor()); } }; } else { @@ -231,9 +233,9 @@ @Override public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException { - return new MaterializingPipelinedPartition(ctx, ncs.getPartitionManager(), new PartitionId( - jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs - .getExecutor()); + return new MaterializingPipelinedPartition(ctx, ncs.getPartitionManager(), + new PartitionId(jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId, + ncs.getExecutor()); } }; } @@ -241,8 +243,8 @@ factory = new IPartitionWriterFactory() { @Override public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException { - return new PipelinedPartition(ctx, ncs.getPartitionManager(), new PartitionId(jobId, conn - .getConnectorId(), senderIndex, receiverIndex), taId); + return new PipelinedPartition(ctx, ncs.getPartitionManager(), + new PartitionId(jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId); } }; } @@ -272,11 +274,14 @@ if (inputAddresses[i] != null) { for (int j = 0; j < inputAddresses[i].length; j++) { NetworkAddress networkAddress = inputAddresses[i][j]; - PartitionId pid = new PartitionId(jobId, inputs.get(i).getConnectorId(), j, td - .getTaskAttemptId().getTaskId().getPartition()); - PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ncs - .getNetworkManager(), new InetSocketAddress(InetAddress.getByAddress(networkAddress - .lookupIpAddress()), networkAddress.getPort()), pid, 5)); + PartitionId pid = new PartitionId(jobId, inputs.get(i).getConnectorId(), j, + td.getTaskAttemptId().getTaskId().getPartition()); + PartitionChannel channel = new PartitionChannel(pid, + new NetworkInputChannel(ncs.getNetworkManager(), + new InetSocketAddress( + InetAddress.getByAddress(networkAddress.lookupIpAddress()), + networkAddress.getPort()), + pid, 5)); channels.add(channel); } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java index 7901141..3105d42 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java @@ -190,6 +190,9 @@ } aggregator.close(); aggregateState.close(); + } catch (Exception e) { + appenderWrapper.fail(); + throw e; } finally { appenderWrapper.close(); } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java index 48e6b35..9ac9296 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java @@ -18,15 +18,11 @@ */ package org.apache.hyracks.dataflow.std.misc; -import java.nio.ByteBuffer; - import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; -import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; -import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable; public class NullSinkOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { private static final long serialVersionUID = 1L; @@ -38,22 +34,6 @@ @Override public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) { - return new AbstractUnaryInputSinkOperatorNodePushable() { - @Override - public void open() throws HyracksDataException { - } - - @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { - } - - @Override - public void close() throws HyracksDataException { - } - - @Override - public void fail() throws HyracksDataException { - } - }; + return new SinkOperatorNodePushable(); } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java index fe64472..d987a35 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java @@ -19,15 +19,11 @@ package org.apache.hyracks.dataflow.std.misc; -import java.nio.ByteBuffer; - import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; -import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; -import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable; public class SinkOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { private static final long serialVersionUID = 1L; @@ -39,22 +35,6 @@ @Override public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) { - return new AbstractUnaryInputSinkOperatorNodePushable() { - @Override - public void open() throws HyracksDataException { - } - - @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { - } - - @Override - public void close() throws HyracksDataException { - } - - @Override - public void fail() throws HyracksDataException { - } - }; + return new SinkOperatorNodePushable(); } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorNodePushable.java new file mode 100644 index 0000000..85e1bb8 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorNodePushable.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.dataflow.std.misc; + +import java.nio.ByteBuffer; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable; + +public class SinkOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable { + + @Override + public void open() throws HyracksDataException { + // Does nothing. + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + // Does nothing. + } + + @Override + public void close() throws HyracksDataException { + // Does nothing. + } + + @Override + public void fail() throws HyracksDataException { + // Does nothing. + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java index bcebb7d..3c11669 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java @@ -63,6 +63,9 @@ flushWriter.open(); try { getSorter().flush(flushWriter); + } catch (Exception e) { + flushWriter.fail(); + throw e; } finally { flushWriter.close(); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java index 509607c..0352cea 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java @@ -78,6 +78,7 @@ protected final boolean appendIndexFilter; protected ArrayTupleBuilder nonFilterTupleBuild; protected final ISearchOperationCallbackFactory searchCallbackFactory; + protected boolean failed = false; public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc, int partition, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, IIndexDataflowHelperFactory indexHelperFactory, @@ -193,7 +194,7 @@ writeSearchResults(i); } } catch (Exception e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } @@ -207,12 +208,15 @@ HyracksDataException closeException = null; if (index != null) { // if index == null, then the index open was not successful - try { - if (appender.getTupleCount() > 0) { - appender.write(writer, true); + if (!failed) { + try { + if (appender.getTupleCount() > 0) { + appender.write(writer, true); + } + } catch (Throwable th) { + writer.fail(); + closeException = new HyracksDataException(th); } - } catch (Throwable th) { - closeException = new HyracksDataException(th); } try { @@ -251,6 +255,7 @@ @Override public void fail() throws HyracksDataException { + failed = true; writer.fail(); } diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java index 2171122..b100300 100644 --- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java +++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java @@ -20,8 +20,10 @@ import java.io.Serializable; import java.nio.ByteBuffer; +import java.util.EnumSet; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutorService; import org.apache.hyracks.api.context.IHyracksJobletContext; @@ -33,6 +35,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.IIOManager; +import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.profiling.counters.ICounterContext; import org.apache.hyracks.api.resources.IDeallocatable; import org.apache.hyracks.control.nc.io.WorkspaceFileFactory; @@ -155,4 +158,9 @@ public Object getSharedObject() { return sharedObject; } + + @Override + public Set<JobFlag> getJobFlags() { + return EnumSet.noneOf(JobFlag.class); + } } -- To view, visit https://asterix-gerrit.ics.uci.edu/1618 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I9827b06f640858f27ec1bcca2a39991780bee3b1 Gerrit-PatchSet: 44 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Steven Jacobs <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
