Repository: asterixdb Updated Branches: refs/heads/master 6b1b52c1f -> 1d2f00d88
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java new file mode 100644 index 0000000..82c7d50 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.nc.result; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executor; + +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.io.IWorkspaceFileFactory; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.result.IResultPartitionManager; +import org.apache.hyracks.api.result.ResultSetId; +import org.apache.hyracks.control.common.result.AbstractResultManager; +import org.apache.hyracks.control.common.result.ResultStateSweeper; +import org.apache.hyracks.control.nc.NodeControllerService; +import org.apache.hyracks.control.nc.io.WorkspaceFileFactory; +import org.apache.hyracks.control.nc.resources.DefaultDeallocatableRegistry; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class ResultPartitionManager extends AbstractResultManager implements IResultPartitionManager { + private static final Logger LOGGER = LogManager.getLogger(); + + private final NodeControllerService ncs; + + private final Executor executor; + + private final Map<JobId, ResultSetMap> partitionResultStateMap; + + private final DefaultDeallocatableRegistry deallocatableRegistry; + + private final IWorkspaceFileFactory fileFactory; + + private final ResultMemoryManager resultMemoryManager; + + public ResultPartitionManager(NodeControllerService ncs, Executor executor, int availableMemory, long resultTTL, + long resultSweepThreshold) { + super(resultTTL); + this.ncs = ncs; + this.executor = executor; + deallocatableRegistry = new DefaultDeallocatableRegistry(); + fileFactory = new WorkspaceFileFactory(deallocatableRegistry, ncs.getIoManager()); + if (availableMemory >= ResultMemoryManager.getPageSize()) { + resultMemoryManager = new ResultMemoryManager(availableMemory); + } else { + resultMemoryManager = null; + } + partitionResultStateMap = new HashMap<>(); + executor.execute(new ResultStateSweeper(this, resultSweepThreshold, LOGGER)); + } + + @Override + public IFrameWriter createResultPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult, + boolean asyncMode, int partition, int nPartitions, long maxReads) { + ResultPartitionWriter dpw; + JobId jobId = ctx.getJobletContext().getJobId(); + synchronized (this) { + dpw = new ResultPartitionWriter(ctx, this, jobId, rsId, asyncMode, orderedResult, partition, nPartitions, + resultMemoryManager, fileFactory, maxReads); + ResultSetMap rsIdMap = partitionResultStateMap.computeIfAbsent(jobId, k -> new ResultSetMap()); + ResultState[] resultStates = rsIdMap.createOrGetResultStates(rsId, nPartitions); + resultStates[partition] = dpw.getResultState(); + } + LOGGER.debug("Initialized partition writer: JobId: {}:partition: {}", jobId, partition); + return dpw; + } + + @Override + public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions, + boolean orderedResult, boolean emptyResult) throws HyracksException { + try { + // Be sure to send the *public* network address to the CC + ncs.getClusterController(jobId.getCcId()).registerResultPartitionLocation(jobId, rsId, orderedResult, + emptyResult, partition, nPartitions, ncs.getResultNetworkManager().getPublicNetworkAddress()); + } catch (Exception e) { + throw HyracksException.create(e); + } + } + + @Override + public void reportPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws HyracksException { + try { + LOGGER.debug("Reporting partition write completion: JobId: {}:ResultSetId: {}:partition: {}", jobId, rsId, + partition); + ncs.getClusterController(jobId.getCcId()).reportResultPartitionWriteCompletion(jobId, rsId, partition); + } catch (Exception e) { + throw HyracksException.create(e); + } + } + + @Override + public void initializeResultPartitionReader(JobId jobId, ResultSetId resultSetId, int partition, + IFrameWriter writer) throws HyracksException { + ResultState resultState = getResultState(jobId, resultSetId, partition); + ResultPartitionReader dpr = new ResultPartitionReader(this, resultMemoryManager, executor, resultState); + dpr.writeTo(writer); + LOGGER.debug("Initialized partition reader: JobId: {}:ResultSetId: {}:partition: {}", jobId, resultSetId, + partition); + } + + private synchronized ResultState getResultState(JobId jobId, ResultSetId resultSetId, int partition) + throws HyracksException { + ResultSetMap rsIdMap = partitionResultStateMap.get(jobId); + if (rsIdMap == null) { + throw new HyracksException("Unknown JobId " + jobId); + } + ResultState[] resultStates = rsIdMap.getResultStates(resultSetId); + if (resultStates == null) { + throw new HyracksException("Unknown JobId: " + jobId + " ResultSetId: " + resultSetId); + } + ResultState resultState = resultStates[partition]; + if (resultState == null) { + throw new HyracksException("No ResultPartitionWriter for partition " + partition); + } + return resultState; + } + + @Override + public synchronized void removePartition(JobId jobId, ResultSetId resultSetId, int partition) { + ResultSetMap rsIdMap = partitionResultStateMap.get(jobId); + if (rsIdMap != null && rsIdMap.removePartition(jobId, resultSetId, partition)) { + partitionResultStateMap.remove(jobId); + } + } + + @Override + public synchronized void abortReader(JobId jobId) { + ResultSetMap rsIdMap = partitionResultStateMap.get(jobId); + if (rsIdMap != null) { + rsIdMap.abortAll(); + } + } + + @Override + public synchronized void close() { + for (JobId jobId : getJobIds()) { + deinit(jobId); + } + deallocatableRegistry.close(); + } + + @Override + public synchronized Set<JobId> getJobIds() { + return partitionResultStateMap.keySet(); + } + + @Override + public synchronized ResultSetMap getState(JobId jobId) { + return partitionResultStateMap.get(jobId); + } + + @Override + public synchronized void sweep(JobId jobId) { + deinit(jobId); + partitionResultStateMap.remove(jobId); + } + + private synchronized void deinit(JobId jobId) { + ResultSetMap rsIdMap = partitionResultStateMap.get(jobId); + if (rsIdMap != null) { + rsIdMap.closeAndDeleteAll(); + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionReader.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionReader.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionReader.java new file mode 100644 index 0000000..121d5a1 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionReader.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.nc.result; + +import java.nio.ByteBuffer; +import java.util.concurrent.Executor; + +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.partitions.ResultSetPartitionId; +import org.apache.hyracks.comm.channels.NetworkOutputChannel; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class ResultPartitionReader { + private static final Logger LOGGER = LogManager.getLogger(); + + private final ResultPartitionManager resultPartitionManager; + private final ResultMemoryManager resultMemoryManager; + private final Executor executor; + private final ResultState resultState; + + public ResultPartitionReader(ResultPartitionManager resultPartitionManager, ResultMemoryManager resultMemoryManager, + Executor executor, ResultState resultState) { + this.resultPartitionManager = resultPartitionManager; + this.resultMemoryManager = resultMemoryManager; + this.executor = executor; + this.resultState = resultState; + } + + public void writeTo(final IFrameWriter writer) { + executor.execute(new ResultPartitionSender((NetworkOutputChannel) writer)); + } + + private class ResultPartitionSender implements Runnable { + + private final NetworkOutputChannel channel; + + ResultPartitionSender(final NetworkOutputChannel channel) { + this.channel = channel; + } + + @Override + public void run() { + channel.setFrameSize(resultState.getFrameSize()); + channel.open(); + try { + resultState.readOpen(); + long offset = 0; + final ByteBuffer buffer = ByteBuffer.allocate(resultState.getFrameSize()); + while (true) { + buffer.clear(); + final long size = read(offset, buffer); + if (size <= 0) { + break; + } else if (size < buffer.limit()) { + throw new IllegalStateException( + "Premature end of file - readSize: " + size + " buffer limit: " + buffer.limit()); + } + offset += size; + buffer.flip(); + channel.nextFrame(buffer); + } + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("result reading successful(" + resultState.getResultSetPartitionId() + ")"); + } + } catch (Exception e) { + LOGGER.error(() -> "failed to send result partition " + resultState.getResultSetPartitionId(), e); + channel.abort(); + } finally { + close(); + } + } + + private long read(long offset, ByteBuffer buffer) throws HyracksDataException { + return resultMemoryManager != null ? resultState.read(resultMemoryManager, offset, buffer) + : resultState.read(offset, buffer); + } + + private void close() { + try { + channel.close(); + resultState.readClose(); + if (resultState.isExhausted()) { + final ResultSetPartitionId partitionId = resultState.getResultSetPartitionId(); + resultPartitionManager.removePartition(partitionId.getJobId(), partitionId.getResultSetId(), + partitionId.getPartition()); + } + } catch (HyracksDataException e) { + LOGGER.error("unexpected failure in partition reader clean up", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionWriter.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionWriter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionWriter.java new file mode 100644 index 0000000..f25bc58 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionWriter.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.nc.result; + +import java.nio.ByteBuffer; + +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.io.IWorkspaceFileFactory; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.partitions.ResultSetPartitionId; +import org.apache.hyracks.api.result.IResultPartitionManager; +import org.apache.hyracks.api.result.ResultSetId; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class ResultPartitionWriter implements IFrameWriter { + private static final Logger LOGGER = LogManager.getLogger(); + + private final IResultPartitionManager manager; + + private final JobId jobId; + + private final ResultSetId resultSetId; + + private final boolean orderedResult; + + private final int partition; + + private final int nPartitions; + + private final ResultMemoryManager resultMemoryManager; + + private final ResultSetPartitionId resultSetPartitionId; + + private final ResultState resultState; + + private boolean partitionRegistered; + + private boolean failed = false; + + public ResultPartitionWriter(IHyracksTaskContext ctx, IResultPartitionManager manager, JobId jobId, + ResultSetId rsId, boolean asyncMode, boolean orderedResult, int partition, int nPartitions, + ResultMemoryManager resultMemoryManager, IWorkspaceFileFactory fileFactory, long maxReads) { + this.manager = manager; + this.jobId = jobId; + this.resultSetId = rsId; + this.orderedResult = orderedResult; + this.partition = partition; + this.nPartitions = nPartitions; + this.resultMemoryManager = resultMemoryManager; + + resultSetPartitionId = new ResultSetPartitionId(jobId, rsId, partition); + resultState = new ResultState(resultSetPartitionId, asyncMode, ctx.getIoManager(), fileFactory, + ctx.getInitialFrameSize(), maxReads); + } + + public ResultState getResultState() { + return resultState; + } + + @Override + public void open() { + if (LOGGER.isInfoEnabled()) { + LOGGER.info("open(" + partition + ")"); + } + partitionRegistered = false; + resultState.open(); + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + registerResultPartitionLocation(false); + if (resultMemoryManager == null) { + resultState.write(buffer); + } else { + resultState.write(resultMemoryManager, buffer); + } + } + + @Override + public void fail() throws HyracksDataException { + failed = true; + resultState.closeAndDelete(); + resultState.abort(); + } + + @Override + public void close() throws HyracksDataException { + if (LOGGER.isInfoEnabled()) { + LOGGER.info("close(" + partition + ")"); + } + try { + if (!failed) { + registerResultPartitionLocation(true); + } + } finally { + resultState.close(); + } + try { + if (partitionRegistered) { + manager.reportPartitionWriteCompletion(jobId, resultSetId, partition); + } + } catch (HyracksException e) { + throw HyracksDataException.create(e); + } + } + + void registerResultPartitionLocation(boolean empty) throws HyracksDataException { + try { + if (!partitionRegistered) { + manager.registerResultPartitionLocation(jobId, resultSetId, partition, nPartitions, orderedResult, + empty); + partitionRegistered = true; + } + } catch (HyracksException e) { + throw HyracksDataException.create(e); + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultSetMap.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultSetMap.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultSetMap.java new file mode 100644 index 0000000..41b7f07 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultSetMap.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.nc.result; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.result.IResultStateRecord; +import org.apache.hyracks.api.result.ResultSetId; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +class ResultSetMap implements IResultStateRecord, Serializable { + private static final long serialVersionUID = 1L; + + private static final Logger LOGGER = LogManager.getLogger(); + + private final long timestamp; + private final HashMap<ResultSetId, ResultState[]> resultStateMap; + + ResultSetMap() { + timestamp = System.nanoTime(); + resultStateMap = new HashMap<>(); + } + + @Override + public long getTimestamp() { + return timestamp; + } + + ResultState[] getResultStates(ResultSetId rsId) { + return resultStateMap.get(rsId); + } + + ResultState[] createOrGetResultStates(ResultSetId rsId, int nPartitions) { + return resultStateMap.computeIfAbsent(rsId, (k) -> new ResultState[nPartitions]); + } + + /** + * removes a result partition for a result set + * + * @param jobId + * the id of the job that produced the result set + * @param resultSetId + * the id of the result set + * @param partition + * the partition number + * @return true, if all partitions for the resultSetId have been removed + */ + boolean removePartition(JobId jobId, ResultSetId resultSetId, int partition) { + final ResultState[] resultStates = resultStateMap.get(resultSetId); + if (resultStates != null) { + final ResultState state = resultStates[partition]; + if (state != null) { + state.closeAndDelete(); + LOGGER.debug("Removing partition: " + partition + " for JobId: " + jobId); + } + resultStates[partition] = null; + boolean stateEmpty = true; + for (ResultState resState : resultStates) { + if (resState != null) { + stateEmpty = false; + break; + } + } + if (stateEmpty) { + resultStateMap.remove(resultSetId); + } + return resultStateMap.isEmpty(); + } + return true; + } + + void abortAll() { + applyToAllStates((rsId, state, i) -> state.abort()); + } + + void closeAndDeleteAll() { + applyToAllStates((rsId, state, i) -> { + state.closeAndDelete(); + LOGGER.debug("Removing partition: " + i + " for result set " + rsId); + }); + } + + @FunctionalInterface + private interface StateModifier { + void modify(ResultSetId rsId, ResultState entry, int partition); + } + + private void applyToAllStates(StateModifier modifier) { + for (Map.Entry<ResultSetId, ResultState[]> entry : resultStateMap.entrySet()) { + final ResultSetId rsId = entry.getKey(); + final ResultState[] resultStates = entry.getValue(); + if (resultStates == null) { + continue; + } + for (int i = 0; i < resultStates.length; i++) { + final ResultState state = resultStates[i]; + if (state != null) { + modifier.modify(rsId, state, i); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultState.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultState.java new file mode 100644 index 0000000..25d3f00 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultState.java @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.nc.result; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hyracks.api.dataflow.state.IStateObject; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.api.io.IFileHandle; +import org.apache.hyracks.api.io.IIOManager; +import org.apache.hyracks.api.io.IWorkspaceFileFactory; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.partitions.ResultSetPartitionId; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.util.MinimalPrettyPrinter; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +public class ResultState implements IStateObject { + private static final String FILE_PREFIX = "result_"; + + private final ResultSetPartitionId resultSetPartitionId; + + private final boolean asyncMode; + + private final int frameSize; + + private final IIOManager ioManager; + + private final IWorkspaceFileFactory fileFactory; + + private final AtomicBoolean eos; + + private final AtomicBoolean failed; + + private final List<Page> localPageList; + + private FileReference fileRef; + + private IFileHandle fileHandle; + + private volatile int referenceCount = 0; + + private long size; + + private long persistentSize; + private long remainingReads; + + ResultState(ResultSetPartitionId resultSetPartitionId, boolean asyncMode, IIOManager ioManager, + IWorkspaceFileFactory fileFactory, int frameSize, long maxReads) { + if (maxReads <= 0) { + throw new IllegalArgumentException("maxReads must be > 0"); + } + this.resultSetPartitionId = resultSetPartitionId; + this.asyncMode = asyncMode; + this.ioManager = ioManager; + this.fileFactory = fileFactory; + this.frameSize = frameSize; + remainingReads = maxReads; + eos = new AtomicBoolean(false); + failed = new AtomicBoolean(false); + localPageList = new ArrayList<>(); + + fileRef = null; + fileHandle = null; + } + + public synchronized void open() { + size = 0; + persistentSize = 0; + referenceCount = 0; + } + + public synchronized void close() { + eos.set(true); + closeWriteFileHandle(); + notifyAll(); + } + + public synchronized void closeAndDelete() { + // Deleting a job is equivalent to aborting the job for all practical purposes, so the same action, needs + // to be taken when there are more requests to these result states. + failed.set(true); + closeWriteFileHandle(); + if (fileRef != null) { + fileRef.delete(); + fileRef = null; + } + } + + private void closeWriteFileHandle() { + if (fileHandle != null) { + doCloseFileHandle(); + } + } + + private void doCloseFileHandle() { + if (--referenceCount == 0) { + // close the file if there is no more reference + try { + ioManager.close(fileHandle); + } catch (IOException e) { + // Since file handle could not be closed, just ignore. + } + fileHandle = null; + } + } + + public synchronized void write(ByteBuffer buffer) throws HyracksDataException { + if (fileRef == null) { + initWriteFileHandle(); + } + + size += ioManager.syncWrite(fileHandle, size, buffer); + notifyAll(); + } + + public synchronized void write(ResultMemoryManager resultMemoryManager, ByteBuffer buffer) + throws HyracksDataException { + int srcOffset = 0; + Page destPage = null; + + if (!localPageList.isEmpty()) { + destPage = localPageList.get(localPageList.size() - 1); + } + + while (srcOffset < buffer.limit()) { + if ((destPage == null) || (destPage.getBuffer().remaining() <= 0)) { + destPage = resultMemoryManager.requestPage(resultSetPartitionId, this); + localPageList.add(destPage); + } + int srcLength = Math.min(buffer.limit() - srcOffset, destPage.getBuffer().remaining()); + destPage.getBuffer().put(buffer.array(), srcOffset, srcLength); + srcOffset += srcLength; + size += srcLength; + } + + notifyAll(); + } + + public synchronized void readOpen() { + if (isExhausted()) { + throw new IllegalStateException("Result reads exhausted"); + } + remainingReads--; + } + + public synchronized void readClose() throws HyracksDataException { + if (fileHandle != null) { + doCloseFileHandle(); + } + } + + public synchronized long read(long offset, ByteBuffer buffer) throws HyracksDataException { + long readSize = 0; + + while (offset >= size && !eos.get() && !failed.get()) { + try { + wait(); + } catch (InterruptedException e) { + throw HyracksDataException.create(e); + } + } + if ((offset >= size && eos.get()) || failed.get()) { + return readSize; + } + + if (fileHandle == null) { + initReadFileHandle(); + } + readSize = ioManager.syncRead(fileHandle, offset, buffer); + + return readSize; + } + + public synchronized long read(ResultMemoryManager resultMemoryManager, long offset, ByteBuffer buffer) + throws HyracksDataException { + long readSize = 0; + while (offset >= size && !eos.get() && !failed.get()) { + try { + wait(); + } catch (InterruptedException e) { + throw HyracksDataException.create(e); + } + } + + if ((offset >= size && eos.get()) || failed.get()) { + return readSize; + } + + if (offset < persistentSize) { + if (fileHandle == null) { + initReadFileHandle(); + } + readSize = ioManager.syncRead(fileHandle, offset, buffer); + if (readSize < 0) { + throw new HyracksDataException("Premature end of file"); + } + } + + if (readSize < buffer.capacity()) { + long localPageOffset = offset - persistentSize; + int localPageIndex = (int) (localPageOffset / ResultMemoryManager.getPageSize()); + int pageOffset = (int) (localPageOffset % ResultMemoryManager.getPageSize()); + Page page = getPage(localPageIndex); + if (page == null) { + return readSize; + } + readSize += buffer.remaining(); + buffer.put(page.getBuffer().array(), pageOffset, buffer.remaining()); + } + resultMemoryManager.pageReferenced(resultSetPartitionId); + return readSize; + } + + public synchronized void abort() { + failed.set(true); + notifyAll(); + } + + public synchronized Page returnPage() throws HyracksDataException { + Page page = removePage(); + + // If we do not have any pages to be given back close the write channel since we don't write any more, return + // null. + if (page == null) { + ioManager.close(fileHandle); + return null; + } + + page.getBuffer().flip(); + + if (fileRef == null) { + initWriteFileHandle(); + } + + long delta = ioManager.syncWrite(fileHandle, persistentSize, page.getBuffer()); + persistentSize += delta; + return page; + } + + public synchronized void setEOS(boolean eos) { + this.eos.set(eos); + } + + public ResultSetPartitionId getResultSetPartitionId() { + return resultSetPartitionId; + } + + public int getFrameSize() { + return frameSize; + } + + public IIOManager getIOManager() { + return ioManager; + } + + public boolean getAsyncMode() { + return asyncMode; + } + + @Override + public JobId getJobId() { + return resultSetPartitionId.getJobId(); + } + + @Override + public Object getId() { + return resultSetPartitionId; + } + + @Override + public long getMemoryOccupancy() { + throw new UnsupportedOperationException(); + } + + @Override + public void toBytes(DataOutput out) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void fromBytes(DataInput in) throws IOException { + throw new UnsupportedOperationException(); + } + + private Page getPage(int index) { + Page page = null; + if (!localPageList.isEmpty()) { + page = localPageList.get(index); + } + return page; + } + + private Page removePage() { + Page page = null; + if (!localPageList.isEmpty()) { + page = localPageList.remove(localPageList.size() - 1); + } + return page; + } + + private void initWriteFileHandle() throws HyracksDataException { + if (fileHandle == null) { + String fName = FILE_PREFIX + String.valueOf(resultSetPartitionId.getPartition()); + fileRef = fileFactory.createUnmanagedWorkspaceFile(fName); + fileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE, + IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC); + if (referenceCount != 0) { + throw new IllegalStateException("Illegal reference count " + referenceCount); + } + referenceCount = 1; + notifyAll(); // NOSONAR: always called from a synchronized block + } + } + + private void initReadFileHandle() throws HyracksDataException { + while (fileRef == null && !failed.get()) { + // wait for writer to create the file + try { + wait(); + } catch (InterruptedException e) { + throw HyracksDataException.create(e); + } + } + if (failed.get()) { + return; + } + if (fileHandle == null) { + // fileHandle has been closed by the writer, create it again + fileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_ONLY, + IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC); + } + referenceCount++; + } + + @Override + public String toString() { + try { + ObjectMapper om = new ObjectMapper(); + ObjectNode on = om.createObjectNode(); + on.put("rspid", resultSetPartitionId.toString()); + on.put("async", asyncMode); + on.put("remainingReads", remainingReads); + on.put("eos", eos.get()); + on.put("failed", failed.get()); + on.put("fileRef", String.valueOf(fileRef)); + return om.writer(new MinimalPrettyPrinter()).writeValueAsString(on); + } catch (JsonProcessingException e) { // NOSONAR + return e.getMessage(); + } + } + + public synchronized boolean isExhausted() { + return remainingReads == 0; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java index c6696fd..b11dada 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java @@ -23,9 +23,9 @@ import java.util.Collection; import java.util.Deque; import org.apache.hyracks.api.control.CcId; -import org.apache.hyracks.api.dataset.IDatasetPartitionManager; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobStatus; +import org.apache.hyracks.api.result.IResultPartitionManager; import org.apache.hyracks.control.common.work.SynchronizableWork; import org.apache.hyracks.control.nc.Joblet; import org.apache.hyracks.control.nc.NodeControllerService; @@ -48,9 +48,9 @@ public class AbortAllJobsWork extends SynchronizableWork { @Override protected void doRun() throws Exception { LOGGER.info("Aborting all tasks for controller {}", ccId); - IDatasetPartitionManager dpm = ncs.getDatasetPartitionManager(); - if (dpm == null) { - LOGGER.log(Level.WARN, "DatasetPartitionManager is null on " + ncs.getId()); + IResultPartitionManager resultPartitionManager = ncs.getResultPartitionManager(); + if (resultPartitionManager == null) { + LOGGER.log(Level.WARN, "ResultPartitionManager is null on " + ncs.getId()); } Deque<Task> abortedTasks = new ArrayDeque<>(); Collection<Joblet> joblets = ncs.getJobletMap().values(); @@ -61,9 +61,9 @@ public class AbortAllJobsWork extends SynchronizableWork { abortedTasks.add(task); }); final JobId jobId = joblet.getJobId(); - if (dpm != null) { - dpm.abortReader(jobId); - dpm.sweep(jobId); + if (resultPartitionManager != null) { + resultPartitionManager.abortReader(jobId); + resultPartitionManager.sweep(jobId); } ncs.getWorkQueue().schedule(new CleanupJobletWork(ncs, jobId, JobStatus.FAILURE)); }); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java index 80f3e98..f47e1ce 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java @@ -22,8 +22,8 @@ import java.util.List; import java.util.Map; import org.apache.hyracks.api.dataflow.TaskAttemptId; -import org.apache.hyracks.api.dataset.IDatasetPartitionManager; import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.result.IResultPartitionManager; import org.apache.hyracks.control.common.work.AbstractWork; import org.apache.hyracks.control.nc.Joblet; import org.apache.hyracks.control.nc.NodeControllerService; @@ -52,9 +52,9 @@ public class AbortTasksWork extends AbstractWork { if (LOGGER.isInfoEnabled()) { LOGGER.info("Aborting Tasks: " + jobId + ":" + tasks); } - IDatasetPartitionManager dpm = ncs.getDatasetPartitionManager(); - if (dpm != null) { - ncs.getDatasetPartitionManager().abortReader(jobId); + IResultPartitionManager resultPartitionManager = ncs.getResultPartitionManager(); + if (resultPartitionManager != null) { + ncs.getResultPartitionManager().abortReader(jobId); } Joblet ji = ncs.getJobletMap().get(jobId); if (ji != null) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java index ac80afa..7dbb3c2 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java @@ -21,8 +21,8 @@ package org.apache.hyracks.control.nc.work; import java.util.List; import org.apache.hyracks.api.dataflow.TaskAttemptId; -import org.apache.hyracks.api.dataset.IDatasetPartitionManager; import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.result.IResultPartitionManager; import org.apache.hyracks.control.common.work.AbstractWork; import org.apache.hyracks.control.nc.NodeControllerService; import org.apache.hyracks.control.nc.Task; @@ -52,9 +52,9 @@ public class NotifyTaskFailureWork extends AbstractWork { LOGGER.log(Level.WARN, ncs.getId() + " is sending a notification to cc that task " + taskId + " has failed", exceptions.get(0)); try { - IDatasetPartitionManager dpm = ncs.getDatasetPartitionManager(); - if (dpm != null) { - dpm.abortReader(jobId); + IResultPartitionManager resultPartitionManager = ncs.getResultPartitionManager(); + if (resultPartitionManager != null) { + resultPartitionManager.abortReader(jobId); } ncs.getClusterController(jobId.getCcId()).notifyTaskFailure(jobId, taskId, ncs.getId(), exceptions); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java index b0cc40c..9fb9815 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java @@ -31,11 +31,11 @@ import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; import org.apache.hyracks.api.dataflow.value.IResultSerializer; import org.apache.hyracks.api.dataflow.value.IResultSerializerFactory; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.dataset.IDatasetPartitionManager; -import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.api.result.IResultPartitionManager; +import org.apache.hyracks.api.result.ResultSetId; import org.apache.hyracks.dataflow.common.comm.io.FrameOutputStream; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; @@ -67,7 +67,7 @@ public class ResultWriterOperatorDescriptor extends AbstractSingleActivityOperat public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) throws HyracksDataException { - final IDatasetPartitionManager dpm = ctx.getDatasetPartitionManager(); + final IResultPartitionManager resultPartitionManager = ctx.getResultPartitionManager(); final IFrame frame = new VSizeFrame(ctx); @@ -82,15 +82,15 @@ public class ResultWriterOperatorDescriptor extends AbstractSingleActivityOperat final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(outRecordDesc); return new AbstractUnaryInputSinkOperatorNodePushable() { - private IFrameWriter datasetPartitionWriter; + private IFrameWriter resultPartitionWriter; private boolean failed = false; @Override public void open() throws HyracksDataException { try { - datasetPartitionWriter = dpm.createDatasetPartitionWriter(ctx, rsId, ordered, asyncMode, partition, - nPartitions, maxReads); - datasetPartitionWriter.open(); + resultPartitionWriter = resultPartitionManager.createResultPartitionWriter(ctx, rsId, ordered, + asyncMode, partition, nPartitions, maxReads); + resultPartitionWriter.open(); resultSerializer.init(); } catch (HyracksException e) { throw HyracksDataException.create(e); @@ -103,7 +103,7 @@ public class ResultWriterOperatorDescriptor extends AbstractSingleActivityOperat for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) { resultSerializer.appendTuple(frameTupleAccessor, tIndex); if (!frameOutputStream.appendTuple()) { - frameOutputStream.flush(datasetPartitionWriter); + frameOutputStream.flush(resultPartitionWriter); resultSerializer.appendTuple(frameTupleAccessor, tIndex); frameOutputStream.appendTuple(); @@ -114,23 +114,23 @@ public class ResultWriterOperatorDescriptor extends AbstractSingleActivityOperat @Override public void fail() throws HyracksDataException { failed = true; - if (datasetPartitionWriter != null) { - datasetPartitionWriter.fail(); + if (resultPartitionWriter != null) { + resultPartitionWriter.fail(); } } @Override public void close() throws HyracksDataException { - if (datasetPartitionWriter != null) { + if (resultPartitionWriter != null) { try { if (!failed && frameOutputStream.getTupleCount() > 0) { - frameOutputStream.flush(datasetPartitionWriter); + frameOutputStream.flush(resultPartitionWriter); } } catch (Exception e) { - datasetPartitionWriter.fail(); + resultPartitionWriter.fail(); throw e; } finally { - datasetPartitionWriter.close(); + resultPartitionWriter.close(); } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java index 0931501..5dcc99a 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java @@ -36,16 +36,16 @@ import org.apache.hyracks.api.client.HyracksConnection; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.comm.IFrameTupleAccessor; import org.apache.hyracks.api.comm.VSizeFrame; -import org.apache.hyracks.api.dataset.IHyracksDataset; -import org.apache.hyracks.api.dataset.IHyracksDatasetReader; -import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.FileSplit; import org.apache.hyracks.api.io.ManagedFileSplit; import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; -import org.apache.hyracks.client.dataset.HyracksDataset; +import org.apache.hyracks.api.result.IResultSet; +import org.apache.hyracks.api.result.IResultSetReader; +import org.apache.hyracks.api.result.ResultSetId; +import org.apache.hyracks.client.result.ResultSet; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.common.controllers.CCConfig; import org.apache.hyracks.control.common.controllers.NCConfig; @@ -154,8 +154,8 @@ public abstract class AbstractIntegrationTest { IFrameTupleAccessor frameTupleAccessor = new ResultFrameTupleAccessor(); - IHyracksDataset hyracksDataset = new HyracksDataset(hcc, spec.getFrameSize(), nReaders); - IHyracksDatasetReader reader = hyracksDataset.createReader(jobId, resultSetId); + IResultSet resultSet = new ResultSet(hcc, spec.getFrameSize(), nReaders); + IResultSetReader reader = resultSet.createReader(jobId, resultSetId); List<String> resultRecords = new ArrayList<>(); ByteBufferInputStream bbis = new ByteBufferInputStream(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java index 58da8a2..55fd9a0 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java @@ -29,8 +29,6 @@ import org.apache.hyracks.api.client.HyracksConnection; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.comm.IFrameTupleAccessor; import org.apache.hyracks.api.comm.VSizeFrame; -import org.apache.hyracks.api.dataset.IHyracksDataset; -import org.apache.hyracks.api.dataset.IHyracksDatasetReader; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.JobFlag; @@ -38,7 +36,9 @@ import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.JobStatus; import org.apache.hyracks.api.job.resource.IJobCapacityController; -import org.apache.hyracks.client.dataset.HyracksDataset; +import org.apache.hyracks.api.result.IResultSet; +import org.apache.hyracks.api.result.IResultSetReader; +import org.apache.hyracks.client.result.ResultSet; import org.apache.hyracks.control.cc.BaseCCApplication; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.cc.application.CCServiceContext; @@ -160,8 +160,8 @@ public abstract class AbstractMultiNCIntegrationTest { IFrameTupleAccessor frameTupleAccessor = new ResultFrameTupleAccessor(); if (!spec.getResultSetIds().isEmpty()) { - IHyracksDataset hyracksDataset = new HyracksDataset(hcc, spec.getFrameSize(), nReaders); - IHyracksDatasetReader reader = hyracksDataset.createReader(jobId, spec.getResultSetIds().get(0)); + IResultSet resultSet = new ResultSet(hcc, spec.getFrameSize(), nReaders); + IResultSetReader reader = resultSet.createReader(jobId, spec.getResultSetIds().get(0)); ObjectMapper om = new ObjectMapper(); ArrayNode resultRecords = om.createArrayNode(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java index 752c643..39e7a45 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java @@ -28,10 +28,10 @@ import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory; import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.io.FileSplit; import org.apache.hyracks.api.io.ManagedFileSplit; import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.result.ResultSetId; import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory; import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory; import org.apache.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java index c28a5aa..c542fe9 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java @@ -27,10 +27,10 @@ import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.io.FileSplit; import org.apache.hyracks.api.io.ManagedFileSplit; import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.result.ResultSetId; import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory; import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory; import org.apache.hyracks.data.std.primitive.IntegerPointable; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java index 9e795bf..1dce88d 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java @@ -27,10 +27,10 @@ import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory; import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.io.FileSplit; import org.apache.hyracks.api.io.ManagedFileSplit; import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.result.ResultSetId; import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory; import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory; import org.apache.hyracks.data.std.primitive.UTF8StringPointable; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java index 49dee84..8d3b0dc 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java @@ -29,10 +29,10 @@ import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory; import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.io.FileSplit; import org.apache.hyracks.api.io.ManagedFileSplit; import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.result.ResultSetId; import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory; import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory; import org.apache.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/NodesAPIIntegrationTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/NodesAPIIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/NodesAPIIntegrationTest.java index 672d2c4..9d79d80 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/NodesAPIIntegrationTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/NodesAPIIntegrationTest.java @@ -45,8 +45,8 @@ public class NodesAPIIntegrationTest extends AbstractIntegrationTest { "nonheap-committed-sizes", "nonheap-max-sizes", "application-memory-budget", "application-cpu-core-budget", "thread-counts", "peak-thread-counts", "system-load-averages", "gc-names", "gc-collection-counts", "gc-collection-times", "net-payload-bytes-read", "net-payload-bytes-written", "net-signaling-bytes-read", - "net-signaling-bytes-written", "dataset-net-payload-bytes-read", "dataset-net-payload-bytes-written", - "dataset-net-signaling-bytes-read", "dataset-net-signaling-bytes-written", "ipc-messages-sent", + "net-signaling-bytes-written", "result-net-payload-bytes-read", "result-net-payload-bytes-written", + "result-net-signaling-bytes-read", "result-net-signaling-bytes-written", "ipc-messages-sent", "ipc-message-bytes-sent", "ipc-messages-received", "ipc-message-bytes-received", "disk-reads", "disk-writes", "config" }; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java index 09629b2..bc5de11 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java @@ -27,10 +27,10 @@ import org.apache.hyracks.api.constraints.PartitionConstraintHelper; import org.apache.hyracks.api.dataflow.IOperatorDescriptor; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.io.FileSplit; import org.apache.hyracks.api.io.ManagedFileSplit; import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.result.ResultSetId; import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer; import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory; import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java index 75ba33f..7838a34 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java @@ -26,10 +26,10 @@ import org.apache.hyracks.api.dataflow.IOperatorDescriptor; import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.io.FileSplit; import org.apache.hyracks.api.io.ManagedFileSplit; import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.result.ResultSetId; import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory; import org.apache.hyracks.data.std.primitive.IntegerPointable; import org.apache.hyracks.data.std.primitive.UTF8StringPointable; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java index 315b74c..06c0eaa 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java @@ -26,7 +26,7 @@ import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.dataset.ResultSetId; +import org.apache.hyracks.api.result.ResultSetId; import org.apache.hyracks.api.io.FileSplit; import org.apache.hyracks.api.io.ManagedFileSplit; import org.apache.hyracks.api.job.JobSpecification; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java index 816f3fa..6154e28 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java @@ -28,10 +28,10 @@ import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory; import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.io.FileSplit; import org.apache.hyracks.api.io.ManagedFileSplit; import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.result.ResultSetId; import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory; import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory; import org.apache.hyracks.data.std.primitive.UTF8StringPointable; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java index c2b3263..0eb6810 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java @@ -32,11 +32,11 @@ import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.dataflow.value.ITuplePairComparator; import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileSplit; import org.apache.hyracks.api.io.ManagedFileSplit; import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.result.ResultSetId; import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory; import org.apache.hyracks.data.std.primitive.UTF8StringPointable; import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java index 81a71eb..48f7837 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java @@ -24,10 +24,10 @@ import org.apache.hyracks.api.constraints.PartitionConstraintHelper; import org.apache.hyracks.api.dataflow.IOperatorDescriptor; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.io.FileSplit; import org.apache.hyracks.api.io.ManagedFileSplit; import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.result.ResultSetId; import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer; import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory; import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java index ac52573..174d5cd 100644 --- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java +++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java @@ -30,7 +30,6 @@ import org.apache.hyracks.api.context.IHyracksJobletContext; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.TaskAttemptId; import org.apache.hyracks.api.dataflow.state.IStateObject; -import org.apache.hyracks.api.dataset.IDatasetPartitionManager; import org.apache.hyracks.api.deployment.DeploymentId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; @@ -39,6 +38,7 @@ import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.profiling.IStatsCollector; import org.apache.hyracks.api.job.profiling.counters.ICounterContext; import org.apache.hyracks.api.resources.IDeallocatable; +import org.apache.hyracks.api.result.IResultPartitionManager; import org.apache.hyracks.control.common.job.profiling.StatsCollector; import org.apache.hyracks.control.nc.io.WorkspaceFileFactory; @@ -134,7 +134,7 @@ public class TestTaskContext implements IHyracksTaskContext { } @Override - public IDatasetPartitionManager getDatasetPartitionManager() { + public IResultPartitionManager getResultPartitionManager() { return null; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java index 2f58259..1ceb199 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java @@ -19,7 +19,7 @@ package org.apache.hyracks.storage.am.lsm.invertedindex.util; -import static org.junit.Assert.*; +import static org.junit.Assert.fail; import java.io.ByteArrayInputStream; import java.io.DataInput; @@ -44,7 +44,6 @@ import org.apache.hyracks.api.dataflow.TaskAttemptId; import org.apache.hyracks.api.dataflow.state.IStateObject; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; -import org.apache.hyracks.api.dataset.IDatasetPartitionManager; import org.apache.hyracks.api.deployment.DeploymentId; import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -55,6 +54,7 @@ import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.profiling.IStatsCollector; import org.apache.hyracks.api.job.profiling.counters.ICounterContext; import org.apache.hyracks.api.resources.IDeallocatable; +import org.apache.hyracks.api.result.IResultPartitionManager; import org.apache.hyracks.api.util.HyracksConstants; import org.apache.hyracks.data.std.primitive.IntegerPointable; import org.apache.hyracks.data.std.util.GrowableArray; @@ -726,7 +726,7 @@ public class LSMInvertedIndexTestUtils { } @Override - public IDatasetPartitionManager getDatasetPartitionManager() { + public IResultPartitionManager getResultPartitionManager() { return null; }