Till Westmann has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1548

Change subject: WIP - re-read results
......................................................................

WIP - re-read results

Change-Id: I88fe289fe9109ea012c63d82af0083dce6bde31b
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.2.pollget.http
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.3.get.http
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.4.get.http
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.5.query.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.1.ignore
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.2.json
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.3.json
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.4.json
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.5.json
M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
18 files changed, 234 insertions(+), 28 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/48/1548/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
index 292dd2a..d8c39f4 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
@@ -27,6 +27,7 @@
 import org.apache.asterix.app.result.ResultUtil;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.asterix.translator.SessionConfig;
+import org.apache.hyracks.api.dataset.DatasetJobRecord;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.exceptions.ErrorCode;
@@ -67,6 +68,19 @@
 
             IHyracksDataset hds = getHyracksDataset();
             ResultReader resultReader = new ResultReader(hds, jobId, rsId);
+            DatasetJobRecord.Status status = resultReader.getStatus();
+            switch (status) {
+                case SUCCESS:
+                    break;
+                case RUNNING:
+                case IDLE:
+                case FAILED:
+                    response.setStatus(HttpResponseStatus.NOT_FOUND);
+                    return;
+                default:
+                    
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+                    return;
+            }
 
             // QQQ The output format is determined by the initial
             // query and cannot be modified here, so calling back to
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
index 3531211..3e40c40 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
@@ -49,7 +49,7 @@
         PrettyPrinter singleLine = new SingleLinePrettyPrinter();
         ObjectNode result = om.readValue(resultStr, ObjectNode.class);
 
-        LOGGER.fine("+++++++\n" + result + "\n+++++++\n");
+        System.err.println("+++++++\n" + result + "\n+++++++\n");
 
         String type = "";
         String status = "";
@@ -106,7 +106,7 @@
                     }
                     break;
                 default:
-                    throw new AsterixException(field + "unanticipated field");
+                    throw new AsterixException("Unanticipated field \"" + 
field + "\"");
             }
         }
 
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp
new file mode 100644
index 0000000..a44b911
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#handlevariable=handle
+
+select i, i * i as i2 from range(1, 10) i;
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.2.pollget.http
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.2.pollget.http
new file mode 100644
index 0000000..88e0861
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.2.pollget.http
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#polltimeoutsecs=100
+
+/query/status?handle=$handle
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.3.get.http
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.3.get.http
new file mode 100644
index 0000000..a88991c
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.3.get.http
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/query/result?handle=$handle
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.4.get.http
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.4.get.http
new file mode 100644
index 0000000..a88991c
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.4.get.http
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/query/result?handle=$handle
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.5.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.5.query.sqlpp
new file mode 100644
index 0000000..e452678
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.5.query.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+select i, i * i as i2 from range(1, 10) i;
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.1.ignore
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.1.ignore
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.1.ignore
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.2.json
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.2.json
new file mode 100644
index 0000000..6213a6b
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.2.json
@@ -0,0 +1 @@
+{"status":"SUCCESS"}
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.3.json
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.3.json
new file mode 100644
index 0000000..09e86cc
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.3.json
@@ -0,0 +1,10 @@
+{ "i": 1, "i2": 1 }
+{ "i": 2, "i2": 4 }
+{ "i": 3, "i2": 9 }
+{ "i": 4, "i2": 16 }
+{ "i": 5, "i2": 25 }
+{ "i": 6, "i2": 36 }
+{ "i": 7, "i2": 49 }
+{ "i": 8, "i2": 64 }
+{ "i": 9, "i2": 81 }
+{ "i": 10, "i2": 100 }
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.4.json
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.4.json
new file mode 100644
index 0000000..09e86cc
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.4.json
@@ -0,0 +1,10 @@
+{ "i": 1, "i2": 1 }
+{ "i": 2, "i2": 4 }
+{ "i": 3, "i2": 9 }
+{ "i": 4, "i2": 16 }
+{ "i": 5, "i2": 25 }
+{ "i": 6, "i2": 36 }
+{ "i": 7, "i2": 49 }
+{ "i": 8, "i2": 64 }
+{ "i": 9, "i2": 81 }
+{ "i": 10, "i2": 100 }
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.5.json
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.5.json
new file mode 100644
index 0000000..09e86cc
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.5.json
@@ -0,0 +1,10 @@
+{ "i": 1, "i2": 1 }
+{ "i": 2, "i2": 4 }
+{ "i": 3, "i2": 9 }
+{ "i": 4, "i2": 16 }
+{ "i": 5, "i2": 25 }
+{ "i": 6, "i2": 36 }
+{ "i": 7, "i2": 49 }
+{ "i": 8, "i2": 64 }
+{ "i": 9, "i2": 81 }
+{ "i": 10, "i2": 100 }
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 431b215..3005a22 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -39,6 +39,11 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="async-deferred">
+      <compilation-unit name="async-repeated">
+        <output-dir compare="Text">async-repeated</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="async-deferred">
       <compilation-unit name="async-running">
         <output-dir compare="Text">async-running</output-dir>
       </compilation-unit>
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
index f29ff4a..9bbe1c8 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hyracks.api.dataset;
 
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -84,7 +86,7 @@
 
     @Override
     public String toString() {
-        return resultSetMetadataMap.toString();
+        return Arrays.toString(resultSetMetadataMap.entrySet().toArray());
     }
 
     public List<Exception> getExceptions() {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 98c0697..0adb330 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -83,7 +83,7 @@
     }
 
     @Override
-    public void notifyJobStart(JobId jobId) throws HyracksException {
+    public synchronized void notifyJobStart(JobId jobId) throws 
HyracksException {
         jobResultLocations.get(jobId).getRecord().start();
     }
 
@@ -169,19 +169,18 @@
     }
 
     @Override
-    public Set<JobId> getJobIds() {
+    public synchronized Set<JobId> getJobIds() {
         return jobResultLocations.keySet();
     }
 
     @Override
-    public IDatasetStateRecord getState(JobId jobId) {
+    public synchronized IDatasetStateRecord getState(JobId jobId) {
         return getDatasetJobRecord(jobId);
     }
 
     @Override
-    public void deinitState(JobId jobId) {
-        // See ASTERIXDB-1614 - DatasetDirectoryService.deinitState() fix 
intermittently fails
-        // jobResultLocations.remove(jobId);
+    public synchronized void deinitState(JobId jobId) {
+        jobResultLocations.remove(jobId);
     }
 
     @Override
@@ -277,6 +276,11 @@
             }
         }
     }
+
+    @Override
+    public String toString() {
+        return record.toString();
+    }
 }
 
 class Waiters extends HashMap<ResultSetId, Waiter> {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 5fac823..a82f850 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -124,31 +124,30 @@
     @Override
     public void initializeDatasetPartitionReader(JobId jobId, ResultSetId 
resultSetId, int partition,
             IFrameWriter writer) throws HyracksException {
-        ResultState resultState;
-        synchronized (this) {
-            ResultSetMap rsIdMap = (ResultSetMap) 
partitionResultStateMap.get(jobId);
-
-            if (rsIdMap == null) {
-                throw new HyracksException("Unknown JobId " + jobId);
-            }
-
-            ResultState[] resultStates = rsIdMap.getResultStates(resultSetId);
-            if (resultStates == null) {
-                throw new HyracksException("Unknown JobId: " + jobId + " 
ResultSetId: " + resultSetId);
-            }
-
-            resultState = resultStates[partition];
-            if (resultState == null) {
-                throw new HyracksException("No DatasetPartitionWriter for 
partition " + partition);
-            }
-        }
-
+        ResultState resultState = getResultState(jobId, resultSetId, 
partition);
         DatasetPartitionReader dpr = new DatasetPartitionReader(this, 
datasetMemoryManager, executor, resultState);
         dpr.writeTo(writer);
         LOGGER.fine("Initialized partition reader: JobId: " + jobId + 
":ResultSetId: " + resultSetId + ":partition: "
                 + partition);
     }
 
+    protected synchronized ResultState getResultState(JobId jobId, ResultSetId 
resultSetId, int partition)
+            throws HyracksException {
+        ResultSetMap rsIdMap = (ResultSetMap) 
partitionResultStateMap.get(jobId);
+        if (rsIdMap == null) {
+            throw new HyracksException("Unknown JobId " + jobId);
+        }
+        ResultState[] resultStates = rsIdMap.getResultStates(resultSetId);
+        if (resultStates == null) {
+            throw new HyracksException("Unknown JobId: " + jobId + " 
ResultSetId: " + resultSetId);
+        }
+        ResultState resultState = resultStates[partition];
+        if (resultState == null) {
+            throw new HyracksException("No DatasetPartitionWriter for 
partition " + partition);
+        }
+        return resultState;
+    }
+
     @Override
     public synchronized void removePartition(JobId jobId, ResultSetId 
resultSetId, int partition) {
         ResultSetMap rsIdMap = (ResultSetMap) 
partitionResultStateMap.get(jobId);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
index c501b5b..bc31bd4 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
@@ -25,6 +25,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Logger;
 
 import org.apache.hyracks.api.dataflow.state.IStateObject;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -37,6 +38,8 @@
 
 public class ResultState implements IStateObject {
     private static final String FILE_PREFIX = "result_";
+
+    private static final Logger LOGGER = 
Logger.getLogger(ResultState.class.getName());
 
     private final ResultSetPartitionId resultSetPartitionId;
 
@@ -77,20 +80,24 @@
 
         fileRef = null;
         writeFileHandle = null;
+        System.err.println("XXX created " + toString());
     }
 
     public synchronized void open() {
+        System.err.println("XXX open " + toString());
         size = 0;
         persistentSize = 0;
     }
 
     public synchronized void close() {
+        System.err.println("XXX close " + toString());
         eos.set(true);
         closeWriteFileHandle();
         notifyAll();
     }
 
     public synchronized void closeAndDelete() {
+        System.err.println("XXX closeAndDelete " + toString());
         // Deleting a job is equivalent to aborting the job for all practical 
purposes, so the same action, needs
         // to be taken when there are more requests to these result states.
         failed.set(true);
@@ -107,10 +114,12 @@
             } catch (IOException e) {
                 // Since file handle could not be closed, just ignore.
             }
+            writeFileHandle = null;
         }
     }
 
     public synchronized void write(ByteBuffer buffer) throws 
HyracksDataException {
+        System.err.println("XXX write1 " + toString());
         if (fileRef == null) {
             String fName = FILE_PREFIX + 
String.valueOf(resultSetPartitionId.getPartition());
             fileRef = fileFactory.createUnmanagedWorkspaceFile(fName);
@@ -124,6 +133,7 @@
 
     public synchronized void write(DatasetMemoryManager datasetMemoryManager, 
ByteBuffer buffer)
             throws HyracksDataException {
+        System.err.println("XXX write2 " + toString());
         int srcOffset = 0;
         Page destPage = null;
 
@@ -146,16 +156,20 @@
     }
 
     public synchronized void readOpen() {
+        System.err.println("XXX readOpen " + toString());
         // It is a noOp for now, leaving here to keep the API stable for 
future usage.
     }
 
     public synchronized void readClose() throws HyracksDataException {
+        System.err.println("XXX readClose " + toString());
         if (readFileHandle != null) {
             ioManager.close(readFileHandle);
+            readFileHandle = null;
         }
     }
 
     public synchronized long read(long offset, ByteBuffer buffer) throws 
HyracksDataException {
+        System.err.println("XXX read1 " + toString());
         long readSize = 0;
 
         while (offset >= size && !eos.get() && !failed.get()) {
@@ -179,6 +193,7 @@
 
     public long read(DatasetMemoryManager datasetMemoryManager, long offset, 
ByteBuffer buffer)
             throws HyracksDataException {
+        System.err.println("XXX read2 " + toString());
         long readSize = 0;
         synchronized (this) {
             while (offset >= size && !eos.get() && !failed.get()) {
@@ -220,11 +235,13 @@
     }
 
     public synchronized void abort() {
+        System.err.println("XXX abort " + toString());
         failed.set(true);
         notifyAll();
     }
 
     public synchronized Page returnPage() throws HyracksDataException {
+        System.err.println("XXX returnPage " + toString());
         Page page = removePage();
 
         // If we do not have any pages to be given back close the write 
channel since we don't write any more, return null.
@@ -324,4 +341,16 @@
         readFileHandle = ioManager.open(fileRef, 
IIOManager.FileReadWriteMode.READ_ONLY,
                 IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
     }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("{ ");
+        
sb.append('"').append("rspid").append("\":\"").append(resultSetPartitionId).append("\",
 ");
+        
sb.append('"').append("async").append("\":").append(asyncMode).append(", ");
+        sb.append('"').append("eos").append("\":").append(eos).append(", ");
+        sb.append('"').append("failed").append("\":").append(failed).append(", 
");
+        
sb.append('"').append("fileRef").append("\":\"").append(String.valueOf(fileRef)).append("\"
 }");
+        return sb.toString();
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index b422ef4..a8d699e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.io.PrintStream;
 import java.nio.ByteBuffer;
+import java.util.logging.Logger;
 
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -42,6 +43,9 @@
 import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 
 public class ResultWriterOperatorDescriptor extends 
AbstractSingleActivityOperatorDescriptor {
+    private static final Logger LOGGER = 
Logger.getLogger(ResultWriterOperatorDescriptor.class.getName());
+
+
     private static final long serialVersionUID = 1L;
 
     private final ResultSetId rsId;
@@ -85,6 +89,7 @@
 
             @Override
             public void open() throws HyracksDataException {
+                System.err.println("XXX open " + toString());
                 try {
                     datasetPartitionWriter = 
dpm.createDatasetPartitionWriter(ctx, rsId, ordered, asyncMode, partition,
                             nPartitions);
@@ -97,6 +102,7 @@
 
             @Override
             public void nextFrame(ByteBuffer buffer) throws 
HyracksDataException {
+                System.err.println("XXX nextFrame " + toString());
                 frameTupleAccessor.reset(buffer);
                 for (int tIndex = 0; tIndex < 
frameTupleAccessor.getTupleCount(); tIndex++) {
                     resultSerializer.appendTuple(frameTupleAccessor, tIndex);
@@ -111,12 +117,14 @@
 
             @Override
             public void fail() throws HyracksDataException {
+                System.err.println("XXX fail " + toString());
                 failed = true;
                 datasetPartitionWriter.fail();
             }
 
             @Override
             public void close() throws HyracksDataException {
+                System.err.println("XXX close " + toString());
                 try {
                     if (!failed && frameOutputStream.getTupleCount() > 0) {
                         frameOutputStream.flush(datasetPartitionWriter);
@@ -128,6 +136,16 @@
                     datasetPartitionWriter.close();
                 }
             }
+
+            @Override
+            public String toString() {
+                StringBuilder sb = new StringBuilder();
+                sb.append("{ ");
+                sb.append("\"rsId\": \"").append(rsId).append("\", ");
+                sb.append("\"ordered\": ").append(ordered).append(", ");
+                sb.append("\"asyncMode\": ").append(asyncMode).append(" }");
+                return sb.toString();
+            }
         };
     }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1548
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I88fe289fe9109ea012c63d82af0083dce6bde31b
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Till Westmann <ti...@apache.org>

Reply via email to