>From Peeyush Gupta <[email protected]>: Peeyush Gupta has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17643 )
Change subject: [NO ISSUE][TX]: Multiple fixes to atomic statements ...................................................................... [NO ISSUE][TX]: Multiple fixes to atomic statements - user model changes: no - storage format changes: no - interface changes: no Details: - Correctly computes partition count in case of parallel execution. - Correctly computes node count in case of a node group is used. - Fix for the case when a job prepared message is recieved after the transaction is aborted. - Job commit message is sent in commitTransaction method. This makes sure that the commit messages are sent only after the job was successfull. - Fix for the case when job is aborted/cancelled after flushes are scheduled on atomic datasets but are not finished. Change-Id: Ieb029d6273f19fa4bd0e7edfb8897f894c1f5b6e Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17643 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-3/atomic-statements-3.1.ddl.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-3/atomic-statements-3.2.update.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-4/atomic-statements-4.3.adm M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-4/atomic-statements-4.2.update.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-4/atomic-statements-4.3.query.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-3/atomic-statements-3.3.adm M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-3/atomic-statements-3.3.query.sqlpp M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-4/atomic-statements-4.1.ddl.sqlpp M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java 15 files changed, 384 insertions(+), 68 deletions(-) Approvals: Murtadha Hubail: Looks good to me, approved Jenkins: Verified; Verified diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java index 10f118c0..bb93fbc 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java @@ -554,7 +554,7 @@ public static AlgebricksAbsolutePartitionConstraint getJobLocations(JobSpecification spec, INodeJobTracker jobTracker, AlgebricksAbsolutePartitionConstraint clusterLocations) { - final Set<String> jobParticipatingNodes = jobTracker.getJobParticipatingNodes(spec); + final Set<String> jobParticipatingNodes = jobTracker.getJobParticipatingNodes(spec, null); return new AlgebricksAbsolutePartitionConstraint(Arrays.stream(clusterLocations.getLocations()) .filter(jobParticipatingNodes::contains).toArray(String[]::new)); } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java index beb55d8..4dbf38b 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java @@ -69,8 +69,7 @@ @Override public void commitTransaction(JobId jobId) throws ACIDException { IGlobalTransactionContext context = getTransactionContext(jobId); - if (context.getTxnStatus() == TransactionStatus.ACTIVE - || context.getTxnStatus() == TransactionStatus.PREPARED) { + if (context.getAcksReceived() != context.getNumPartitions()) { synchronized (context) { try { context.wait(); @@ -80,6 +79,19 @@ } } } + context.setTxnStatus(TransactionStatus.PREPARED); + context.persist(ioManager); + context.resetAcksReceived(); + sendJobCommitMessages(context); + + synchronized (context) { + try { + context.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ACIDException(e); + } + } txnContextRepository.remove(jobId); } @@ -95,17 +107,21 @@ @Override public void handleJobPreparedMessage(JobId jobId, String nodeId, int datasetId, Map<String, ILSMComponentId> componentIdMap) { - IGlobalTransactionContext context = getTransactionContext(jobId); + IGlobalTransactionContext context = txnContextRepository.get(jobId); + if (context == null) { + LOGGER.warn("JobPreparedMessage received for jobId " + jobId + + ", which does not exist. The transaction for the job is already aborted"); + return; + } if (context.getNodeResourceMap().containsKey(nodeId)) { context.getNodeResourceMap().get(nodeId).putAll(componentIdMap); } else { context.getNodeResourceMap().put(nodeId, componentIdMap); } if (context.incrementAndGetAcksReceived() == context.getNumPartitions()) { - context.setTxnStatus(TransactionStatus.PREPARED); - context.persist(ioManager); - context.resetAcksReceived(); - sendJobCommitMessages(context); + synchronized (context) { + context.notifyAll(); + } } } @@ -127,10 +143,10 @@ if (context.incrementAndGetAcksReceived() == context.getNumNodes()) { context.delete(ioManager); context.setTxnStatus(TransactionStatus.COMMITTED); - sendEnableMergeMessages(context); synchronized (context) { context.notifyAll(); } + sendEnableMergeMessages(context); } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index f390237..9f292e7 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -261,6 +261,7 @@ import org.apache.hyracks.control.common.controllers.CCConfig; import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; +import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexInsertUpdateDeleteOperatorDescriptor; import org.apache.hyracks.storage.am.lsm.invertedindex.fulltext.TokenizerCategory; import org.apache.hyracks.util.LogRedactionUtil; import org.apache.logging.log4j.Level; @@ -3524,8 +3525,11 @@ if (spec != null && !isCompileOnly()) { atomic = dataset.isAtomic(); if (atomic) { - int numParticipatingNodes = appCtx.getNodeJobTracker().getJobParticipatingNodes(spec).size(); - int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(spec); + int numParticipatingNodes = appCtx.getNodeJobTracker() + .getJobParticipatingNodes(spec, LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class) + .size(); + int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(spec, + LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class); List<Integer> participatingDatasetIds = new ArrayList<>(); participatingDatasetIds.add(dataset.getDatasetId()); spec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(participatingDatasetIds, @@ -3613,8 +3617,11 @@ ((InsertStatement) stmt).getDatasetName()); atomic = ds.isAtomic(); if (atomic) { - int numParticipatingNodes = appCtx.getNodeJobTracker().getJobParticipatingNodes(jobSpec).size(); - int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(jobSpec); + int numParticipatingNodes = appCtx.getNodeJobTracker() + .getJobParticipatingNodes(jobSpec, LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class) + .size(); + int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(jobSpec, + LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class); List<Integer> participatingDatasetIds = new ArrayList<>(); participatingDatasetIds.add(ds.getDatasetId()); jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo( @@ -3672,8 +3679,11 @@ Dataset ds = metadataProvider.findDataset(dataverseName, datasetName); atomic = ds.isAtomic(); if (atomic) { - int numParticipatingNodes = appCtx.getNodeJobTracker().getJobParticipatingNodes(jobSpec).size(); - int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(jobSpec); + int numParticipatingNodes = appCtx.getNodeJobTracker() + .getJobParticipatingNodes(jobSpec, LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class) + .size(); + int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(jobSpec, + LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class); List<Integer> participatingDatasetIds = new ArrayList<>(); participatingDatasetIds.add(ds.getDatasetId()); jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo( @@ -4791,8 +4801,11 @@ ((InsertStatement) atomicStatement).getDatasetName()); atomic = ds.isAtomic(); if (atomic) { - int numParticipatingNodes = appCtx.getNodeJobTracker().getJobParticipatingNodes(jobSpec).size(); - int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(jobSpec); + int numParticipatingNodes = appCtx.getNodeJobTracker() + .getJobParticipatingNodes(jobSpec, LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class) + .size(); + int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(jobSpec, + LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class); List<Integer> participatingDatasetIds = new ArrayList<>(); participatingDatasetIds.add(ds.getDatasetId()); jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo( diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-3/atomic-statements-3.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-3/atomic-statements-3.1.ddl.sqlpp new file mode 100644 index 0000000..d1216e1 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-3/atomic-statements-3.1.ddl.sqlpp @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +drop dataverse test if exists; +create dataverse test; +use test; + +create dataset page_views primary key (user:string); + +create dataset tmp primary key (id:uuid) autogenerated; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-3/atomic-statements-3.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-3/atomic-statements-3.2.update.sqlpp new file mode 100644 index 0000000..a29e59b --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-3/atomic-statements-3.2.update.sqlpp @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use test; + +load dataset page_views using localfs +(("path"="asterix_nc1://data/page_views.adm"),("format"="adm")); + +insert into tmp( + from page_views as t + group by t.user + group as g + select value { + "groups": (select value g.t from g) + } +); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-3/atomic-statements-3.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-3/atomic-statements-3.3.query.sqlpp new file mode 100644 index 0000000..b7a8bdc --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-3/atomic-statements-3.3.query.sqlpp @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use test; + +WITH beth AS( + FROM tmp AS i, i.groups AS item + SELECT DISTINCT VALUE + { + "user": item.user, + "action": item.action + } +) +, +rev AS( + FROM tmp i, i.groups AS item + SELECT DISTINCT VALUE + { + "user": item.user, + "estimated_revenue": item.estimated_revenue + } +) +, +ts AS ( + FROM tmp i, i.groups AS item + SELECT DISTINCT VALUE + { + "user": item.user, + "timespent": item.timespent + } +) + +FROM beth AS a, + ts AS b, + rev AS c +WHERE a.user=b.user AND a.user=c.user AND b.user=c.user +SELECT VALUE a.user +ORDER BY a.user; + diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-4/atomic-statements-4.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-4/atomic-statements-4.1.ddl.sqlpp new file mode 100644 index 0000000..8035b89 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-4/atomic-statements-4.1.ddl.sqlpp @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +drop dataverse test if exists; +create dataverse test; + +use test; + +drop nodegroup group_test if exists; +create nodegroup group_test on + asterix_nc1 +; + +create dataset Points +primary key (x:int, y:int) +WITH {"node-group":{"name":"group_test"}};; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-4/atomic-statements-4.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-4/atomic-statements-4.2.update.sqlpp new file mode 100644 index 0000000..a8b70cd --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-4/atomic-statements-4.2.update.sqlpp @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use test; + +insert into Points +{"x": 9, "y": 15}; + +insert into Points +{"x": 15, "y": 40}; + +insert into Points +{"x": 20, "y": 50}; + +insert into Points +{"x": 50, "y": 200}; + +insert into Points +{"x": 60, "y": 40}; + +insert into Points +{"x": 101, "y": 80}; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-4/atomic-statements-4.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-4/atomic-statements-4.3.query.sqlpp new file mode 100644 index 0000000..777499c --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-4/atomic-statements-4.3.query.sqlpp @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +use test; + +select value p +from Points p +where p.x>10 and p.x<100 +and p.y>10 and p.y<100 +order by p.x; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-3/atomic-statements-3.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-3/atomic-statements-3.3.adm new file mode 100644 index 0000000..80d0c73 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-3/atomic-statements-3.3.adm @@ -0,0 +1,2 @@ +"Bill" +"John" \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-4/atomic-statements-4.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-4/atomic-statements-4.3.adm new file mode 100644 index 0000000..625f0fd --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-4/atomic-statements-4.3.adm @@ -0,0 +1,3 @@ +{ "x": 15, "y": 40 } +{ "x": 20, "y": 50 } +{ "x": 60, "y": 40 } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml index d1a59a3..d24873e 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml @@ -4145,24 +4145,24 @@ --> </test-group> <test-group name="ddl"> -<!-- <test-case FilePath="ddl">--> -<!-- <compilation-unit name="create-dataset-1">--> -<!-- <output-dir compare="Clean-JSON">create-dataset-1</output-dir>--> -<!-- </compilation-unit>--> -<!-- </test-case>--> -<!-- <test-case FilePath="ddl">--> -<!-- <compilation-unit name="create-dataset-2">--> -<!-- <output-dir compare="Clean-JSON">create-dataset-2</output-dir>--> -<!-- <source-location>false</source-location>--> -<!-- <expected-error>type mismatch: missing a required closed field my_id: string</expected-error>--> -<!-- </compilation-unit>--> -<!-- </test-case>--> -<!-- <test-case FilePath="ddl">--> -<!-- <compilation-unit name="create-dataset-3">--> -<!-- <output-dir compare="Clean-JSON">create-dataset-3</output-dir>--> -<!-- <expected-error>ASX1077: Cannot find dataset non_existent in dataverse test nor an alias with name non_existent (in line 23, at column 21)</expected-error>--> -<!-- </compilation-unit>--> -<!-- </test-case>--> + <test-case FilePath="ddl"> + <compilation-unit name="create-dataset-1"> + <output-dir compare="Clean-JSON">create-dataset-1</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="ddl"> + <compilation-unit name="create-dataset-2"> + <output-dir compare="Clean-JSON">create-dataset-2</output-dir> + <source-location>false</source-location> + <expected-error>type mismatch: missing a required closed field my_id: string</expected-error> + </compilation-unit> + </test-case> + <test-case FilePath="ddl"> + <compilation-unit name="create-dataset-3"> + <output-dir compare="Clean-JSON">create-dataset-3</output-dir> + <expected-error>ASX1077: Cannot find dataset non_existent in dataverse test nor an alias with name non_existent (in line 23, at column 21)</expected-error> + </compilation-unit> + </test-case> <test-case FilePath="ddl"> <compilation-unit name="analyze-dataset-1"> <output-dir compare="Text">analyze-dataset-1</output-dir> @@ -16321,25 +16321,35 @@ </compilation-unit> </test-case> </test-group> -<!-- <test-group name="copy">--> -<!-- <test-case FilePath="copy">--> -<!-- <compilation-unit name="copy-1">--> -<!-- <output-dir compare="Text">copy-1</output-dir>--> -<!-- </compilation-unit>--> -<!-- </test-case>--> -<!-- </test-group>--> -<!-- <test-group name="atomic-statements">--> -<!-- <test-case FilePath="atomic-statements">--> -<!-- <compilation-unit name="atomic-statements-1">--> -<!-- <output-dir compare="Clean-JSON">atomic-statements-1</output-dir>--> -<!-- <expected-error>HYR0033: Inserting duplicate keys into the primary storage</expected-error>--> -<!-- <source-location>false</source-location>--> -<!-- </compilation-unit>--> -<!-- </test-case>--> -<!-- <test-case FilePath="atomic-statements">--> -<!-- <compilation-unit name="atomic-statements-2">--> -<!-- <output-dir compare="Clean-JSON">atomic-statements-2</output-dir>--> -<!-- </compilation-unit>--> -<!-- </test-case>--> -<!-- </test-group>--> + <test-group name="copy"> + <test-case FilePath="copy"> + <compilation-unit name="copy-1"> + <output-dir compare="Text">copy-1</output-dir> + </compilation-unit> + </test-case> + </test-group> + <test-group name="atomic-statements"> + <test-case FilePath="atomic-statements"> + <compilation-unit name="atomic-statements-1"> + <output-dir compare="Clean-JSON">atomic-statements-1</output-dir> + <expected-error>HYR0033: Inserting duplicate keys into the primary storage</expected-error> + <source-location>false</source-location> + </compilation-unit> + </test-case> + <test-case FilePath="atomic-statements"> + <compilation-unit name="atomic-statements-2"> + <output-dir compare="Clean-JSON">atomic-statements-2</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="atomic-statements"> + <compilation-unit name="atomic-statements-3"> + <output-dir compare="Clean-JSON">atomic-statements-3</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="atomic-statements"> + <compilation-unit name="atomic-statements-4"> + <output-dir compare="Clean-JSON">atomic-statements-4</output-dir> + </compilation-unit> + </test-case> + </test-group> </test-suite> diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java index 27a3d49..66ca95e 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java @@ -21,6 +21,7 @@ import java.util.Set; import org.apache.hyracks.api.application.IClusterLifecycleListener; +import org.apache.hyracks.api.dataflow.IOperatorDescriptor; import org.apache.hyracks.api.job.IJobLifecycleListener; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; @@ -44,8 +45,8 @@ * @param spec * @return The participating nodes in the job execution */ - Set<String> getJobParticipatingNodes(JobSpecification spec); + Set<String> getJobParticipatingNodes(JobSpecification spec, Class<? extends IOperatorDescriptor> operatorClass); - int getNumParticipatingPartitions(JobSpecification spec); + int getNumParticipatingPartitions(JobSpecification spec, Class<? extends IOperatorDescriptor> operatorClass); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java index a1980b6..3ec2d22 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java @@ -294,6 +294,8 @@ } public void clear() throws HyracksDataException { + List<FlushOperation> flushes = new ArrayList<>(getScheduledFlushes()); + LSMIndexUtil.waitFor(flushes); deleteMemoryComponent(false); Set<ILSMIndex> indexes = dsInfo.getDatasetPartitionOpenIndexes(partition); for (ILSMIndex lsmIndex : indexes) { diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java index 52fba60..56d3eea 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java @@ -34,6 +34,9 @@ import org.apache.hyracks.api.constraints.Constraint; import org.apache.hyracks.api.constraints.expressions.ConstantExpression; import org.apache.hyracks.api.constraints.expressions.PartitionCountExpression; +import org.apache.hyracks.api.constraints.expressions.PartitionLocationExpression; +import org.apache.hyracks.api.dataflow.IOperatorDescriptor; +import org.apache.hyracks.api.dataflow.OperatorDescriptorId; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.JobStatus; @@ -46,7 +49,7 @@ @Override public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) { - getJobParticipatingNodes(spec).stream().map(nodeJobs::get).forEach(jobsSet -> jobsSet.add(jobId)); + getJobParticipatingNodes(spec, null).stream().map(nodeJobs::get).forEach(jobsSet -> jobsSet.add(jobId)); } @Override @@ -76,17 +79,42 @@ } @Override - public Set<String> getJobParticipatingNodes(JobSpecification spec) { - return spec.getUserConstraints().stream().map(Constraint::getRValue) - .filter(ce -> ce.getTag() == ExpressionTag.CONSTANT).map(ConstantExpression.class::cast) - .map(ConstantExpression::getValue).map(Object::toString).filter(nodeJobs::containsKey) - .collect(Collectors.toSet()); + public Set<String> getJobParticipatingNodes(JobSpecification spec, + Class<? extends IOperatorDescriptor> operatorClass) { + if (operatorClass != null) { + List<OperatorDescriptorId> operatorDescriptorIds = + spec.getOperatorMap().entrySet().stream().filter(op -> operatorClass.isInstance(op.getValue())) + .map(Map.Entry::getKey).collect(Collectors.toList()); + return spec.getUserConstraints().stream() + .filter(ce -> ce.getLValue().getTag() == ExpressionTag.PARTITION_LOCATION && operatorDescriptorIds + .contains(((PartitionLocationExpression) ce.getLValue()).getOperatorDescriptorId())) + .map(Constraint::getRValue).map(ConstantExpression.class::cast).map(ConstantExpression::getValue) + .map(Object::toString).filter(nodeJobs::containsKey).collect(Collectors.toSet()); + } else { + return spec.getUserConstraints().stream().map(Constraint::getRValue) + .filter(ce -> ce.getTag() == ExpressionTag.CONSTANT).map(ConstantExpression.class::cast) + .map(ConstantExpression::getValue).map(Object::toString).filter(nodeJobs::containsKey) + .collect(Collectors.toSet()); + } } @Override - public int getNumParticipatingPartitions(JobSpecification spec) { - return spec.getUserConstraints().stream().filter(ce -> ce.getLValue() instanceof PartitionCountExpression) - .map(Constraint::getRValue).map(ConstantExpression.class::cast).map(ConstantExpression::getValue) - .map(Object::toString).map(Integer::parseInt).max(Integer::compare).get(); + public int getNumParticipatingPartitions(JobSpecification spec, + Class<? extends IOperatorDescriptor> operatorClass) { + if (operatorClass != null) { + List<OperatorDescriptorId> operatorDescriptorIds = + spec.getOperatorMap().entrySet().stream().filter(op -> operatorClass.isInstance(op.getValue())) + .map(Map.Entry::getKey).collect(Collectors.toList()); + return spec.getUserConstraints().stream() + .filter(ce -> ce.getLValue().getTag() == ExpressionTag.PARTITION_COUNT && operatorDescriptorIds + .contains(((PartitionCountExpression) ce.getLValue()).getOperatorDescriptorId())) + .map(Constraint::getRValue).map(ConstantExpression.class::cast).map(ConstantExpression::getValue) + .map(Object::toString).map(Integer::parseInt).max(Integer::compare).get(); + } else { + return spec.getUserConstraints().stream() + .filter(ce -> ce.getLValue().getTag() == ExpressionTag.PARTITION_COUNT).map(Constraint::getRValue) + .map(ConstantExpression.class::cast).map(ConstantExpression::getValue).map(Object::toString) + .map(Integer::parseInt).max(Integer::compare).get(); + } } } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17643 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: Ieb029d6273f19fa4bd0e7edfb8897f894c1f5b6e Gerrit-Change-Number: 17643 Gerrit-PatchSet: 7 Gerrit-Owner: Peeyush Gupta <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Peeyush Gupta <[email protected]> Gerrit-MessageType: merged
