Till Westmann has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1548
Change subject: WIP - re-read results ...................................................................... WIP - re-read results Change-Id: I88fe289fe9109ea012c63d82af0083dce6bde31b --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.2.pollget.http A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.3.get.http A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.4.get.http A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.5.query.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.1.ignore A asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.2.json A asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.3.json A asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.4.json A asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.5.json M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java 18 files changed, 234 insertions(+), 28 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/48/1548/1 diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java index 292dd2a..d8c39f4 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java @@ -27,6 +27,7 @@ import org.apache.asterix.app.result.ResultUtil; import org.apache.asterix.translator.IStatementExecutor.Stats; import org.apache.asterix.translator.SessionConfig; +import org.apache.hyracks.api.dataset.DatasetJobRecord; import org.apache.hyracks.api.dataset.IHyracksDataset; import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.exceptions.ErrorCode; @@ -67,6 +68,19 @@ IHyracksDataset hds = getHyracksDataset(); ResultReader resultReader = new ResultReader(hds, jobId, rsId); + DatasetJobRecord.Status status = resultReader.getStatus(); + switch (status) { + case SUCCESS: + break; + case RUNNING: + case IDLE: + case FAILED: + response.setStatus(HttpResponseStatus.NOT_FOUND); + return; + default: + response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); + return; + } // QQQ The output format is determined by the initial // query and cannot be modified here, so calling back to diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java index 3531211..3e40c40 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java @@ -49,7 +49,7 @@ PrettyPrinter singleLine = new SingleLinePrettyPrinter(); ObjectNode result = om.readValue(resultStr, ObjectNode.class); - LOGGER.fine("+++++++\n" + result + "\n+++++++\n"); + System.err.println("+++++++\n" + result + "\n+++++++\n"); String type = ""; String status = ""; @@ -106,7 +106,7 @@ } break; default: - throw new AsterixException(field + "unanticipated field"); + throw new AsterixException("Unanticipated field \"" + field + "\""); } } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp new file mode 100644 index 0000000..a44b911 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#handlevariable=handle + +select i, i * i as i2 from range(1, 10) i; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.2.pollget.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.2.pollget.http new file mode 100644 index 0000000..88e0861 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.2.pollget.http @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#polltimeoutsecs=100 + +/query/status?handle=$handle diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.3.get.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.3.get.http new file mode 100644 index 0000000..a88991c --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.3.get.http @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/query/result?handle=$handle diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.4.get.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.4.get.http new file mode 100644 index 0000000..a88991c --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.4.get.http @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/query/result?handle=$handle diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.5.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.5.query.sqlpp new file mode 100644 index 0000000..e452678 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.5.query.sqlpp @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +select i, i * i as i2 from range(1, 10) i; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.1.ignore b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.1.ignore new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.1.ignore diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.2.json b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.2.json new file mode 100644 index 0000000..6213a6b --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.2.json @@ -0,0 +1 @@ +{"status":"SUCCESS"} diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.3.json b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.3.json new file mode 100644 index 0000000..09e86cc --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.3.json @@ -0,0 +1,10 @@ +{ "i": 1, "i2": 1 } +{ "i": 2, "i2": 4 } +{ "i": 3, "i2": 9 } +{ "i": 4, "i2": 16 } +{ "i": 5, "i2": 25 } +{ "i": 6, "i2": 36 } +{ "i": 7, "i2": 49 } +{ "i": 8, "i2": 64 } +{ "i": 9, "i2": 81 } +{ "i": 10, "i2": 100 } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.4.json b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.4.json new file mode 100644 index 0000000..09e86cc --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.4.json @@ -0,0 +1,10 @@ +{ "i": 1, "i2": 1 } +{ "i": 2, "i2": 4 } +{ "i": 3, "i2": 9 } +{ "i": 4, "i2": 16 } +{ "i": 5, "i2": 25 } +{ "i": 6, "i2": 36 } +{ "i": 7, "i2": 49 } +{ "i": 8, "i2": 64 } +{ "i": 9, "i2": 81 } +{ "i": 10, "i2": 100 } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.5.json b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.5.json new file mode 100644 index 0000000..09e86cc --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.5.json @@ -0,0 +1,10 @@ +{ "i": 1, "i2": 1 } +{ "i": 2, "i2": 4 } +{ "i": 3, "i2": 9 } +{ "i": 4, "i2": 16 } +{ "i": 5, "i2": 25 } +{ "i": 6, "i2": 36 } +{ "i": 7, "i2": 49 } +{ "i": 8, "i2": 64 } +{ "i": 9, "i2": 81 } +{ "i": 10, "i2": 100 } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml index 431b215..3005a22 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml @@ -39,6 +39,11 @@ </compilation-unit> </test-case> <test-case FilePath="async-deferred"> + <compilation-unit name="async-repeated"> + <output-dir compare="Text">async-repeated</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="async-deferred"> <compilation-unit name="async-running"> <output-dir compare="Text">async-running</output-dir> </compilation-unit> diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java index f29ff4a..9bbe1c8 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java @@ -18,6 +18,8 @@ */ package org.apache.hyracks.api.dataset; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -84,7 +86,7 @@ @Override public String toString() { - return resultSetMetadataMap.toString(); + return Arrays.toString(resultSetMetadataMap.entrySet().toArray()); } public List<Exception> getExceptions() { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java index 98c0697..0adb330 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java @@ -83,7 +83,7 @@ } @Override - public void notifyJobStart(JobId jobId) throws HyracksException { + public synchronized void notifyJobStart(JobId jobId) throws HyracksException { jobResultLocations.get(jobId).getRecord().start(); } @@ -169,19 +169,18 @@ } @Override - public Set<JobId> getJobIds() { + public synchronized Set<JobId> getJobIds() { return jobResultLocations.keySet(); } @Override - public IDatasetStateRecord getState(JobId jobId) { + public synchronized IDatasetStateRecord getState(JobId jobId) { return getDatasetJobRecord(jobId); } @Override - public void deinitState(JobId jobId) { - // See ASTERIXDB-1614 - DatasetDirectoryService.deinitState() fix intermittently fails - // jobResultLocations.remove(jobId); + public synchronized void deinitState(JobId jobId) { + jobResultLocations.remove(jobId); } @Override @@ -277,6 +276,11 @@ } } } + + @Override + public String toString() { + return record.toString(); + } } class Waiters extends HashMap<ResultSetId, Waiter> { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java index 5fac823..a82f850 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java @@ -124,31 +124,30 @@ @Override public void initializeDatasetPartitionReader(JobId jobId, ResultSetId resultSetId, int partition, IFrameWriter writer) throws HyracksException { - ResultState resultState; - synchronized (this) { - ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId); - - if (rsIdMap == null) { - throw new HyracksException("Unknown JobId " + jobId); - } - - ResultState[] resultStates = rsIdMap.getResultStates(resultSetId); - if (resultStates == null) { - throw new HyracksException("Unknown JobId: " + jobId + " ResultSetId: " + resultSetId); - } - - resultState = resultStates[partition]; - if (resultState == null) { - throw new HyracksException("No DatasetPartitionWriter for partition " + partition); - } - } - + ResultState resultState = getResultState(jobId, resultSetId, partition); DatasetPartitionReader dpr = new DatasetPartitionReader(this, datasetMemoryManager, executor, resultState); dpr.writeTo(writer); LOGGER.fine("Initialized partition reader: JobId: " + jobId + ":ResultSetId: " + resultSetId + ":partition: " + partition); } + protected synchronized ResultState getResultState(JobId jobId, ResultSetId resultSetId, int partition) + throws HyracksException { + ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId); + if (rsIdMap == null) { + throw new HyracksException("Unknown JobId " + jobId); + } + ResultState[] resultStates = rsIdMap.getResultStates(resultSetId); + if (resultStates == null) { + throw new HyracksException("Unknown JobId: " + jobId + " ResultSetId: " + resultSetId); + } + ResultState resultState = resultStates[partition]; + if (resultState == null) { + throw new HyracksException("No DatasetPartitionWriter for partition " + partition); + } + return resultState; + } + @Override public synchronized void removePartition(JobId jobId, ResultSetId resultSetId, int partition) { ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java index c501b5b..bc31bd4 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Logger; import org.apache.hyracks.api.dataflow.state.IStateObject; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -37,6 +38,8 @@ public class ResultState implements IStateObject { private static final String FILE_PREFIX = "result_"; + + private static final Logger LOGGER = Logger.getLogger(ResultState.class.getName()); private final ResultSetPartitionId resultSetPartitionId; @@ -77,20 +80,24 @@ fileRef = null; writeFileHandle = null; + System.err.println("XXX created " + toString()); } public synchronized void open() { + System.err.println("XXX open " + toString()); size = 0; persistentSize = 0; } public synchronized void close() { + System.err.println("XXX close " + toString()); eos.set(true); closeWriteFileHandle(); notifyAll(); } public synchronized void closeAndDelete() { + System.err.println("XXX closeAndDelete " + toString()); // Deleting a job is equivalent to aborting the job for all practical purposes, so the same action, needs // to be taken when there are more requests to these result states. failed.set(true); @@ -107,10 +114,12 @@ } catch (IOException e) { // Since file handle could not be closed, just ignore. } + writeFileHandle = null; } } public synchronized void write(ByteBuffer buffer) throws HyracksDataException { + System.err.println("XXX write1 " + toString()); if (fileRef == null) { String fName = FILE_PREFIX + String.valueOf(resultSetPartitionId.getPartition()); fileRef = fileFactory.createUnmanagedWorkspaceFile(fName); @@ -124,6 +133,7 @@ public synchronized void write(DatasetMemoryManager datasetMemoryManager, ByteBuffer buffer) throws HyracksDataException { + System.err.println("XXX write2 " + toString()); int srcOffset = 0; Page destPage = null; @@ -146,16 +156,20 @@ } public synchronized void readOpen() { + System.err.println("XXX readOpen " + toString()); // It is a noOp for now, leaving here to keep the API stable for future usage. } public synchronized void readClose() throws HyracksDataException { + System.err.println("XXX readClose " + toString()); if (readFileHandle != null) { ioManager.close(readFileHandle); + readFileHandle = null; } } public synchronized long read(long offset, ByteBuffer buffer) throws HyracksDataException { + System.err.println("XXX read1 " + toString()); long readSize = 0; while (offset >= size && !eos.get() && !failed.get()) { @@ -179,6 +193,7 @@ public long read(DatasetMemoryManager datasetMemoryManager, long offset, ByteBuffer buffer) throws HyracksDataException { + System.err.println("XXX read2 " + toString()); long readSize = 0; synchronized (this) { while (offset >= size && !eos.get() && !failed.get()) { @@ -220,11 +235,13 @@ } public synchronized void abort() { + System.err.println("XXX abort " + toString()); failed.set(true); notifyAll(); } public synchronized Page returnPage() throws HyracksDataException { + System.err.println("XXX returnPage " + toString()); Page page = removePage(); // If we do not have any pages to be given back close the write channel since we don't write any more, return null. @@ -324,4 +341,16 @@ readFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_ONLY, IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC); } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("{ "); + sb.append('"').append("rspid").append("\":\"").append(resultSetPartitionId).append("\", "); + sb.append('"').append("async").append("\":").append(asyncMode).append(", "); + sb.append('"').append("eos").append("\":").append(eos).append(", "); + sb.append('"').append("failed").append("\":").append(failed).append(", "); + sb.append('"').append("fileRef").append("\":\"").append(String.valueOf(fileRef)).append("\" }"); + return sb.toString(); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java index b422ef4..a8d699e 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.PrintStream; import java.nio.ByteBuffer; +import java.util.logging.Logger; import org.apache.hyracks.api.comm.IFrame; import org.apache.hyracks.api.comm.IFrameWriter; @@ -42,6 +43,9 @@ import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable; public class ResultWriterOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { + private static final Logger LOGGER = Logger.getLogger(ResultWriterOperatorDescriptor.class.getName()); + + private static final long serialVersionUID = 1L; private final ResultSetId rsId; @@ -85,6 +89,7 @@ @Override public void open() throws HyracksDataException { + System.err.println("XXX open " + toString()); try { datasetPartitionWriter = dpm.createDatasetPartitionWriter(ctx, rsId, ordered, asyncMode, partition, nPartitions); @@ -97,6 +102,7 @@ @Override public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + System.err.println("XXX nextFrame " + toString()); frameTupleAccessor.reset(buffer); for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) { resultSerializer.appendTuple(frameTupleAccessor, tIndex); @@ -111,12 +117,14 @@ @Override public void fail() throws HyracksDataException { + System.err.println("XXX fail " + toString()); failed = true; datasetPartitionWriter.fail(); } @Override public void close() throws HyracksDataException { + System.err.println("XXX close " + toString()); try { if (!failed && frameOutputStream.getTupleCount() > 0) { frameOutputStream.flush(datasetPartitionWriter); @@ -128,6 +136,16 @@ datasetPartitionWriter.close(); } } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("{ "); + sb.append("\"rsId\": \"").append(rsId).append("\", "); + sb.append("\"ordered\": ").append(ordered).append(", "); + sb.append("\"asyncMode\": ").append(asyncMode).append(" }"); + return sb.toString(); + } }; } } -- To view, visit https://asterix-gerrit.ics.uci.edu/1548 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I88fe289fe9109ea012c63d82af0083dce6bde31b Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Till Westmann <ti...@apache.org>