Repository: asterixdb
Updated Branches:
  refs/heads/master 19e0c22a8 -> 392bbbc00


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
index a594f95..5fac823 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -18,10 +18,8 @@
  */
 package org.apache.hyracks.control.nc.dataset;
 
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.logging.Logger;
@@ -66,29 +64,22 @@ public class DatasetPartitionManager implements 
IDatasetPartitionManager {
             datasetMemoryManager = null;
         }
         partitionResultStateMap = new LinkedHashMap<>();
-        executor.execute(new ResultStateSweeper(this, resultTTL, 
resultSweepThreshold));
+        executor.execute(new ResultStateSweeper(this, resultTTL, 
resultSweepThreshold, LOGGER));
     }
 
     @Override
     public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, 
ResultSetId rsId, boolean orderedResult,
             boolean asyncMode, int partition, int nPartitions) throws 
HyracksException {
-        DatasetPartitionWriter dpw = null;
+        DatasetPartitionWriter dpw;
         JobId jobId = ctx.getJobletContext().getJobId();
         synchronized (this) {
             dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, 
asyncMode, orderedResult, partition, nPartitions,
                     datasetMemoryManager, fileFactory);
 
-            ResultSetMap rsIdMap = (ResultSetMap) 
partitionResultStateMap.get(jobId);
-            if (rsIdMap == null) {
-                rsIdMap = new ResultSetMap();
-                partitionResultStateMap.put(jobId, rsIdMap);
-            }
+            ResultSetMap rsIdMap = (ResultSetMap) 
partitionResultStateMap.computeIfAbsent(jobId,
+                    k -> new ResultSetMap());
 
-            ResultState[] resultStates = rsIdMap.get(rsId);
-            if (resultStates == null) {
-                resultStates = new ResultState[nPartitions];
-                rsIdMap.put(rsId, resultStates);
-            }
+            ResultState[] resultStates = rsIdMap.createOrGetResultStates(rsId, 
nPartitions);
             resultStates[partition] = dpw.getResultState();
         }
 
@@ -141,7 +132,7 @@ public class DatasetPartitionManager implements 
IDatasetPartitionManager {
                 throw new HyracksException("Unknown JobId " + jobId);
             }
 
-            ResultState[] resultStates = rsIdMap.get(resultSetId);
+            ResultState[] resultStates = rsIdMap.getResultStates(resultSetId);
             if (resultStates == null) {
                 throw new HyracksException("Unknown JobId: " + jobId + " 
ResultSetId: " + resultSetId);
             }
@@ -161,49 +152,16 @@ public class DatasetPartitionManager implements 
IDatasetPartitionManager {
     @Override
     public synchronized void removePartition(JobId jobId, ResultSetId 
resultSetId, int partition) {
         ResultSetMap rsIdMap = (ResultSetMap) 
partitionResultStateMap.get(jobId);
-        if (rsIdMap != null) {
-            ResultState[] resultStates = rsIdMap.get(resultSetId);
-            if (resultStates != null) {
-                ResultState state = resultStates[partition];
-                if (state != null) {
-                    state.closeAndDelete();
-                    LOGGER.fine("Removing partition: " + partition + " for 
JobId: " + jobId);
-                }
-                resultStates[partition] = null;
-                boolean stateEmpty = true;
-                for (int i = 0; i < resultStates.length; i++) {
-                    if (resultStates[i] != null) {
-                        stateEmpty = false;
-                        break;
-                    }
-                }
-                if (stateEmpty) {
-                    rsIdMap.remove(resultSetId);
-                }
-            }
-            if (rsIdMap.isEmpty()) {
-                partitionResultStateMap.remove(jobId);
-            }
+        if (rsIdMap != null && rsIdMap.removePartition(jobId, resultSetId, 
partition)) {
+            partitionResultStateMap.remove(jobId);
         }
     }
 
     @Override
     public synchronized void abortReader(JobId jobId) {
         ResultSetMap rsIdMap = (ResultSetMap) 
partitionResultStateMap.get(jobId);
-
-        if (rsIdMap == null) {
-            return;
-        }
-
-        for (Entry<ResultSetId, ResultState[]> mapEntry : rsIdMap.entrySet()) {
-            ResultState[] resultStates = mapEntry.getValue();
-            if (resultStates != null) {
-                for (ResultState state : resultStates) {
-                    if (state != null) {
-                        state.abort();
-                    }
-                }
-            }
+        if (rsIdMap != null) {
+            rsIdMap.abortAll();
         }
     }
 
@@ -214,59 +172,33 @@ public class DatasetPartitionManager implements 
IDatasetPartitionManager {
 
     @Override
     public synchronized void close() {
-        for (Entry<JobId, IDatasetStateRecord> entry : 
partitionResultStateMap.entrySet()) {
-            deinit(entry.getKey());
+        for (JobId jobId : getJobIds()) {
+            deinit(jobId);
         }
         deallocatableRegistry.close();
     }
 
     @Override
-    public Set<JobId> getJobIds() {
+    public synchronized Set<JobId> getJobIds() {
         return partitionResultStateMap.keySet();
     }
 
     @Override
-    public IDatasetStateRecord getState(JobId jobId) {
+    public synchronized IDatasetStateRecord getState(JobId jobId) {
         return partitionResultStateMap.get(jobId);
     }
 
     @Override
-    public void deinitState(JobId jobId) {
+    public synchronized void deinitState(JobId jobId) {
         deinit(jobId);
         partitionResultStateMap.remove(jobId);
     }
 
-    private void deinit(JobId jobId) {
+    private synchronized void deinit(JobId jobId) {
         ResultSetMap rsIdMap = (ResultSetMap) 
partitionResultStateMap.get(jobId);
         if (rsIdMap != null) {
-            for (Entry<ResultSetId, ResultState[]> mapEntry : 
rsIdMap.entrySet()) {
-                ResultState[] resultStates = mapEntry.getValue();
-                if (resultStates != null) {
-                    for (int i = 0; i < resultStates.length; i++) {
-                        ResultState state = resultStates[i];
-                        if (state != null) {
-                            state.closeAndDelete();
-                            LOGGER.fine("Removing partition: " + i + " for 
JobId: " + jobId);
-                        }
-                    }
-                }
-            }
+            rsIdMap.closeAndDeleteAll();
         }
     }
 
-    private class ResultSetMap extends HashMap<ResultSetId, ResultState[]> 
implements IDatasetStateRecord {
-        private static final long serialVersionUID = 1L;
-
-        long timestamp;
-
-        public ResultSetMap() {
-            super();
-            timestamp = System.currentTimeMillis();
-        }
-
-        @Override
-        public long getTimestamp() {
-            return timestamp;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index e007050..f7aa2e8 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -86,10 +86,7 @@ public class DatasetPartitionWriter implements IFrameWriter {
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        if (!partitionRegistered) {
-            registerResultPartitionLocation(false);
-            partitionRegistered = true;
-        }
+        registerResultPartitionLocation(false);
         if (datasetMemoryManager == null) {
             resultState.write(buffer);
         } else {
@@ -102,6 +99,7 @@ public class DatasetPartitionWriter implements IFrameWriter {
         try {
             resultState.closeAndDelete();
             resultState.abort();
+            registerResultPartitionLocation(false);
             manager.reportPartitionFailure(jobId, resultSetId, partition);
         } catch (HyracksException e) {
             throw new HyracksDataException(e);
@@ -113,10 +111,7 @@ public class DatasetPartitionWriter implements 
IFrameWriter {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("close(" + partition + ")");
         }
-        if (!partitionRegistered) {
-            registerResultPartitionLocation(true);
-            partitionRegistered = true;
-        }
+        registerResultPartitionLocation(true);
         resultState.close();
         try {
             manager.reportPartitionWriteCompletion(jobId, resultSetId, 
partition);
@@ -127,7 +122,11 @@ public class DatasetPartitionWriter implements 
IFrameWriter {
 
     void registerResultPartitionLocation(boolean empty) throws 
HyracksDataException {
         try {
-            manager.registerResultPartitionLocation(jobId, resultSetId, 
partition, nPartitions, orderedResult, empty);
+            if (!partitionRegistered) {
+                manager.registerResultPartitionLocation(jobId, resultSetId, 
partition, nPartitions, orderedResult,
+                        empty);
+                partitionRegistered = true;
+            }
         } catch (HyracksException e) {
             if (e instanceof HyracksDataException) {
                 throw (HyracksDataException) e;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java
new file mode 100644
index 0000000..579f68b
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.dataset;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.dataset.IDatasetStateRecord;
+import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.job.JobId;
+
+class ResultSetMap implements IDatasetStateRecord, Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOGGER = 
Logger.getLogger(DatasetPartitionManager.class.getName());
+
+    private final long timestamp;
+    private final HashMap<ResultSetId, ResultState[]> resultStateMap;
+
+    ResultSetMap() {
+        timestamp = System.currentTimeMillis();
+        resultStateMap = new HashMap<>();
+    }
+
+    @Override
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    ResultState[] getResultStates(ResultSetId rsId) {
+        return resultStateMap.get(rsId);
+    }
+
+    ResultState[] createOrGetResultStates(ResultSetId rsId, int nPartitions) {
+        return resultStateMap.computeIfAbsent(rsId, (k) -> new 
ResultState[nPartitions]);
+    }
+
+    /**
+     * removes a result partition for a result set
+     *
+     * @param jobId
+     *            the id of the job that produced the result set
+     * @param resultSetId
+     *            the id of the result set
+     * @param partition
+     *            the partition number
+     * @return true, if all partitions for the resultSetId have been removed
+     */
+    boolean removePartition(JobId jobId, ResultSetId resultSetId, int 
partition) {
+        final ResultState[] resultStates = resultStateMap.get(resultSetId);
+        if (resultStates != null) {
+            final ResultState state = resultStates[partition];
+            if (state != null) {
+                state.closeAndDelete();
+                LOGGER.fine("Removing partition: " + partition + " for JobId: 
" + jobId);
+            }
+            resultStates[partition] = null;
+            boolean stateEmpty = true;
+            for (ResultState resState : resultStates) {
+                if (resState != null) {
+                    stateEmpty = false;
+                    break;
+                }
+            }
+            if (stateEmpty) {
+                resultStateMap.remove(resultSetId);
+            }
+            return resultStateMap.isEmpty();
+        }
+        return true;
+    }
+
+    void abortAll() {
+        applyToAllStates((rsId, state, i) -> state.abort());
+    }
+
+    void closeAndDeleteAll() {
+        applyToAllStates((rsId, state, i) -> {
+            state.closeAndDelete();
+            LOGGER.fine("Removing partition: " + i + " for result set " + 
rsId);
+        });
+    }
+
+    @FunctionalInterface
+    private interface StateModifier {
+        void modify(ResultSetId rsId, ResultState entry, int partition);
+    }
+
+    private void applyToAllStates(StateModifier modifier) {
+        for (Map.Entry<ResultSetId, ResultState[]> entry : 
resultStateMap.entrySet()) {
+            final ResultSetId rsId = entry.getKey();
+            final ResultState[] resultStates = entry.getValue();
+            if (resultStates == null) {
+                continue;
+            }
+            for (int i = 0; i < resultStates.length; i++) {
+                final ResultState state = resultStates[i];
+                if (state != null) {
+                    modifier.modify(rsId, state, i);
+                }
+            }
+        }
+    }
+}

Reply via email to