Repository: asterixdb Updated Branches: refs/heads/master 19e0c22a8 -> 392bbbc00
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java index a594f95..5fac823 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java @@ -18,10 +18,8 @@ */ package org.apache.hyracks.control.nc.dataset; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Executor; import java.util.logging.Logger; @@ -66,29 +64,22 @@ public class DatasetPartitionManager implements IDatasetPartitionManager { datasetMemoryManager = null; } partitionResultStateMap = new LinkedHashMap<>(); - executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold)); + executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold, LOGGER)); } @Override public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult, boolean asyncMode, int partition, int nPartitions) throws HyracksException { - DatasetPartitionWriter dpw = null; + DatasetPartitionWriter dpw; JobId jobId = ctx.getJobletContext().getJobId(); synchronized (this) { dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, asyncMode, orderedResult, partition, nPartitions, datasetMemoryManager, fileFactory); - ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId); - if (rsIdMap == null) { - rsIdMap = new ResultSetMap(); - partitionResultStateMap.put(jobId, rsIdMap); - } + ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.computeIfAbsent(jobId, + k -> new ResultSetMap()); - ResultState[] resultStates = rsIdMap.get(rsId); - if (resultStates == null) { - resultStates = new ResultState[nPartitions]; - rsIdMap.put(rsId, resultStates); - } + ResultState[] resultStates = rsIdMap.createOrGetResultStates(rsId, nPartitions); resultStates[partition] = dpw.getResultState(); } @@ -141,7 +132,7 @@ public class DatasetPartitionManager implements IDatasetPartitionManager { throw new HyracksException("Unknown JobId " + jobId); } - ResultState[] resultStates = rsIdMap.get(resultSetId); + ResultState[] resultStates = rsIdMap.getResultStates(resultSetId); if (resultStates == null) { throw new HyracksException("Unknown JobId: " + jobId + " ResultSetId: " + resultSetId); } @@ -161,49 +152,16 @@ public class DatasetPartitionManager implements IDatasetPartitionManager { @Override public synchronized void removePartition(JobId jobId, ResultSetId resultSetId, int partition) { ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId); - if (rsIdMap != null) { - ResultState[] resultStates = rsIdMap.get(resultSetId); - if (resultStates != null) { - ResultState state = resultStates[partition]; - if (state != null) { - state.closeAndDelete(); - LOGGER.fine("Removing partition: " + partition + " for JobId: " + jobId); - } - resultStates[partition] = null; - boolean stateEmpty = true; - for (int i = 0; i < resultStates.length; i++) { - if (resultStates[i] != null) { - stateEmpty = false; - break; - } - } - if (stateEmpty) { - rsIdMap.remove(resultSetId); - } - } - if (rsIdMap.isEmpty()) { - partitionResultStateMap.remove(jobId); - } + if (rsIdMap != null && rsIdMap.removePartition(jobId, resultSetId, partition)) { + partitionResultStateMap.remove(jobId); } } @Override public synchronized void abortReader(JobId jobId) { ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId); - - if (rsIdMap == null) { - return; - } - - for (Entry<ResultSetId, ResultState[]> mapEntry : rsIdMap.entrySet()) { - ResultState[] resultStates = mapEntry.getValue(); - if (resultStates != null) { - for (ResultState state : resultStates) { - if (state != null) { - state.abort(); - } - } - } + if (rsIdMap != null) { + rsIdMap.abortAll(); } } @@ -214,59 +172,33 @@ public class DatasetPartitionManager implements IDatasetPartitionManager { @Override public synchronized void close() { - for (Entry<JobId, IDatasetStateRecord> entry : partitionResultStateMap.entrySet()) { - deinit(entry.getKey()); + for (JobId jobId : getJobIds()) { + deinit(jobId); } deallocatableRegistry.close(); } @Override - public Set<JobId> getJobIds() { + public synchronized Set<JobId> getJobIds() { return partitionResultStateMap.keySet(); } @Override - public IDatasetStateRecord getState(JobId jobId) { + public synchronized IDatasetStateRecord getState(JobId jobId) { return partitionResultStateMap.get(jobId); } @Override - public void deinitState(JobId jobId) { + public synchronized void deinitState(JobId jobId) { deinit(jobId); partitionResultStateMap.remove(jobId); } - private void deinit(JobId jobId) { + private synchronized void deinit(JobId jobId) { ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId); if (rsIdMap != null) { - for (Entry<ResultSetId, ResultState[]> mapEntry : rsIdMap.entrySet()) { - ResultState[] resultStates = mapEntry.getValue(); - if (resultStates != null) { - for (int i = 0; i < resultStates.length; i++) { - ResultState state = resultStates[i]; - if (state != null) { - state.closeAndDelete(); - LOGGER.fine("Removing partition: " + i + " for JobId: " + jobId); - } - } - } - } + rsIdMap.closeAndDeleteAll(); } } - private class ResultSetMap extends HashMap<ResultSetId, ResultState[]> implements IDatasetStateRecord { - private static final long serialVersionUID = 1L; - - long timestamp; - - public ResultSetMap() { - super(); - timestamp = System.currentTimeMillis(); - } - - @Override - public long getTimestamp() { - return timestamp; - } - } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java index e007050..f7aa2e8 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java @@ -86,10 +86,7 @@ public class DatasetPartitionWriter implements IFrameWriter { @Override public void nextFrame(ByteBuffer buffer) throws HyracksDataException { - if (!partitionRegistered) { - registerResultPartitionLocation(false); - partitionRegistered = true; - } + registerResultPartitionLocation(false); if (datasetMemoryManager == null) { resultState.write(buffer); } else { @@ -102,6 +99,7 @@ public class DatasetPartitionWriter implements IFrameWriter { try { resultState.closeAndDelete(); resultState.abort(); + registerResultPartitionLocation(false); manager.reportPartitionFailure(jobId, resultSetId, partition); } catch (HyracksException e) { throw new HyracksDataException(e); @@ -113,10 +111,7 @@ public class DatasetPartitionWriter implements IFrameWriter { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("close(" + partition + ")"); } - if (!partitionRegistered) { - registerResultPartitionLocation(true); - partitionRegistered = true; - } + registerResultPartitionLocation(true); resultState.close(); try { manager.reportPartitionWriteCompletion(jobId, resultSetId, partition); @@ -127,7 +122,11 @@ public class DatasetPartitionWriter implements IFrameWriter { void registerResultPartitionLocation(boolean empty) throws HyracksDataException { try { - manager.registerResultPartitionLocation(jobId, resultSetId, partition, nPartitions, orderedResult, empty); + if (!partitionRegistered) { + manager.registerResultPartitionLocation(jobId, resultSetId, partition, nPartitions, orderedResult, + empty); + partitionRegistered = true; + } } catch (HyracksException e) { if (e instanceof HyracksDataException) { throw (HyracksDataException) e; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java new file mode 100644 index 0000000..579f68b --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.nc.dataset; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Logger; + +import org.apache.hyracks.api.dataset.IDatasetStateRecord; +import org.apache.hyracks.api.dataset.ResultSetId; +import org.apache.hyracks.api.job.JobId; + +class ResultSetMap implements IDatasetStateRecord, Serializable { + private static final long serialVersionUID = 1L; + + private static final Logger LOGGER = Logger.getLogger(DatasetPartitionManager.class.getName()); + + private final long timestamp; + private final HashMap<ResultSetId, ResultState[]> resultStateMap; + + ResultSetMap() { + timestamp = System.currentTimeMillis(); + resultStateMap = new HashMap<>(); + } + + @Override + public long getTimestamp() { + return timestamp; + } + + ResultState[] getResultStates(ResultSetId rsId) { + return resultStateMap.get(rsId); + } + + ResultState[] createOrGetResultStates(ResultSetId rsId, int nPartitions) { + return resultStateMap.computeIfAbsent(rsId, (k) -> new ResultState[nPartitions]); + } + + /** + * removes a result partition for a result set + * + * @param jobId + * the id of the job that produced the result set + * @param resultSetId + * the id of the result set + * @param partition + * the partition number + * @return true, if all partitions for the resultSetId have been removed + */ + boolean removePartition(JobId jobId, ResultSetId resultSetId, int partition) { + final ResultState[] resultStates = resultStateMap.get(resultSetId); + if (resultStates != null) { + final ResultState state = resultStates[partition]; + if (state != null) { + state.closeAndDelete(); + LOGGER.fine("Removing partition: " + partition + " for JobId: " + jobId); + } + resultStates[partition] = null; + boolean stateEmpty = true; + for (ResultState resState : resultStates) { + if (resState != null) { + stateEmpty = false; + break; + } + } + if (stateEmpty) { + resultStateMap.remove(resultSetId); + } + return resultStateMap.isEmpty(); + } + return true; + } + + void abortAll() { + applyToAllStates((rsId, state, i) -> state.abort()); + } + + void closeAndDeleteAll() { + applyToAllStates((rsId, state, i) -> { + state.closeAndDelete(); + LOGGER.fine("Removing partition: " + i + " for result set " + rsId); + }); + } + + @FunctionalInterface + private interface StateModifier { + void modify(ResultSetId rsId, ResultState entry, int partition); + } + + private void applyToAllStates(StateModifier modifier) { + for (Map.Entry<ResultSetId, ResultState[]> entry : resultStateMap.entrySet()) { + final ResultSetId rsId = entry.getKey(); + final ResultState[] resultStates = entry.getValue(); + if (resultStates == null) { + continue; + } + for (int i = 0; i < resultStates.length; i++) { + final ResultState state = resultStates[i]; + if (state != null) { + modifier.modify(rsId, state, i); + } + } + } + } +}