Repository: asterixdb
Updated Branches:
  refs/heads/master 6b1b52c1f -> 1d2f00d88


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java
new file mode 100644
index 0000000..82c7d50
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.result;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.io.IWorkspaceFileFactory;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.IResultPartitionManager;
+import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.hyracks.control.common.result.AbstractResultManager;
+import org.apache.hyracks.control.common.result.ResultStateSweeper;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.control.nc.io.WorkspaceFileFactory;
+import org.apache.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class ResultPartitionManager extends AbstractResultManager implements 
IResultPartitionManager {
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private final NodeControllerService ncs;
+
+    private final Executor executor;
+
+    private final Map<JobId, ResultSetMap> partitionResultStateMap;
+
+    private final DefaultDeallocatableRegistry deallocatableRegistry;
+
+    private final IWorkspaceFileFactory fileFactory;
+
+    private final ResultMemoryManager resultMemoryManager;
+
+    public ResultPartitionManager(NodeControllerService ncs, Executor 
executor, int availableMemory, long resultTTL,
+            long resultSweepThreshold) {
+        super(resultTTL);
+        this.ncs = ncs;
+        this.executor = executor;
+        deallocatableRegistry = new DefaultDeallocatableRegistry();
+        fileFactory = new WorkspaceFileFactory(deallocatableRegistry, 
ncs.getIoManager());
+        if (availableMemory >= ResultMemoryManager.getPageSize()) {
+            resultMemoryManager = new ResultMemoryManager(availableMemory);
+        } else {
+            resultMemoryManager = null;
+        }
+        partitionResultStateMap = new HashMap<>();
+        executor.execute(new ResultStateSweeper(this, resultSweepThreshold, 
LOGGER));
+    }
+
+    @Override
+    public IFrameWriter createResultPartitionWriter(IHyracksTaskContext ctx, 
ResultSetId rsId, boolean orderedResult,
+            boolean asyncMode, int partition, int nPartitions, long maxReads) {
+        ResultPartitionWriter dpw;
+        JobId jobId = ctx.getJobletContext().getJobId();
+        synchronized (this) {
+            dpw = new ResultPartitionWriter(ctx, this, jobId, rsId, asyncMode, 
orderedResult, partition, nPartitions,
+                    resultMemoryManager, fileFactory, maxReads);
+            ResultSetMap rsIdMap = 
partitionResultStateMap.computeIfAbsent(jobId, k -> new ResultSetMap());
+            ResultState[] resultStates = rsIdMap.createOrGetResultStates(rsId, 
nPartitions);
+            resultStates[partition] = dpw.getResultState();
+        }
+        LOGGER.debug("Initialized partition writer: JobId: {}:partition: {}", 
jobId, partition);
+        return dpw;
+    }
+
+    @Override
+    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, 
int partition, int nPartitions,
+            boolean orderedResult, boolean emptyResult) throws 
HyracksException {
+        try {
+            // Be sure to send the *public* network address to the CC
+            
ncs.getClusterController(jobId.getCcId()).registerResultPartitionLocation(jobId,
 rsId, orderedResult,
+                    emptyResult, partition, nPartitions, 
ncs.getResultNetworkManager().getPublicNetworkAddress());
+        } catch (Exception e) {
+            throw HyracksException.create(e);
+        }
+    }
+
+    @Override
+    public void reportPartitionWriteCompletion(JobId jobId, ResultSetId rsId, 
int partition) throws HyracksException {
+        try {
+            LOGGER.debug("Reporting partition write completion: JobId: 
{}:ResultSetId: {}:partition: {}", jobId, rsId,
+                    partition);
+            
ncs.getClusterController(jobId.getCcId()).reportResultPartitionWriteCompletion(jobId,
 rsId, partition);
+        } catch (Exception e) {
+            throw HyracksException.create(e);
+        }
+    }
+
+    @Override
+    public void initializeResultPartitionReader(JobId jobId, ResultSetId 
resultSetId, int partition,
+            IFrameWriter writer) throws HyracksException {
+        ResultState resultState = getResultState(jobId, resultSetId, 
partition);
+        ResultPartitionReader dpr = new ResultPartitionReader(this, 
resultMemoryManager, executor, resultState);
+        dpr.writeTo(writer);
+        LOGGER.debug("Initialized partition reader: JobId: {}:ResultSetId: 
{}:partition: {}", jobId, resultSetId,
+                partition);
+    }
+
+    private synchronized ResultState getResultState(JobId jobId, ResultSetId 
resultSetId, int partition)
+            throws HyracksException {
+        ResultSetMap rsIdMap = partitionResultStateMap.get(jobId);
+        if (rsIdMap == null) {
+            throw new HyracksException("Unknown JobId " + jobId);
+        }
+        ResultState[] resultStates = rsIdMap.getResultStates(resultSetId);
+        if (resultStates == null) {
+            throw new HyracksException("Unknown JobId: " + jobId + " 
ResultSetId: " + resultSetId);
+        }
+        ResultState resultState = resultStates[partition];
+        if (resultState == null) {
+            throw new HyracksException("No ResultPartitionWriter for partition 
" + partition);
+        }
+        return resultState;
+    }
+
+    @Override
+    public synchronized void removePartition(JobId jobId, ResultSetId 
resultSetId, int partition) {
+        ResultSetMap rsIdMap = partitionResultStateMap.get(jobId);
+        if (rsIdMap != null && rsIdMap.removePartition(jobId, resultSetId, 
partition)) {
+            partitionResultStateMap.remove(jobId);
+        }
+    }
+
+    @Override
+    public synchronized void abortReader(JobId jobId) {
+        ResultSetMap rsIdMap = partitionResultStateMap.get(jobId);
+        if (rsIdMap != null) {
+            rsIdMap.abortAll();
+        }
+    }
+
+    @Override
+    public synchronized void close() {
+        for (JobId jobId : getJobIds()) {
+            deinit(jobId);
+        }
+        deallocatableRegistry.close();
+    }
+
+    @Override
+    public synchronized Set<JobId> getJobIds() {
+        return partitionResultStateMap.keySet();
+    }
+
+    @Override
+    public synchronized ResultSetMap getState(JobId jobId) {
+        return partitionResultStateMap.get(jobId);
+    }
+
+    @Override
+    public synchronized void sweep(JobId jobId) {
+        deinit(jobId);
+        partitionResultStateMap.remove(jobId);
+    }
+
+    private synchronized void deinit(JobId jobId) {
+        ResultSetMap rsIdMap = partitionResultStateMap.get(jobId);
+        if (rsIdMap != null) {
+            rsIdMap.closeAndDeleteAll();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionReader.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionReader.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionReader.java
new file mode 100644
index 0000000..121d5a1
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionReader.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.result;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.partitions.ResultSetPartitionId;
+import org.apache.hyracks.comm.channels.NetworkOutputChannel;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class ResultPartitionReader {
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private final ResultPartitionManager resultPartitionManager;
+    private final ResultMemoryManager resultMemoryManager;
+    private final Executor executor;
+    private final ResultState resultState;
+
+    public ResultPartitionReader(ResultPartitionManager 
resultPartitionManager, ResultMemoryManager resultMemoryManager,
+            Executor executor, ResultState resultState) {
+        this.resultPartitionManager = resultPartitionManager;
+        this.resultMemoryManager = resultMemoryManager;
+        this.executor = executor;
+        this.resultState = resultState;
+    }
+
+    public void writeTo(final IFrameWriter writer) {
+        executor.execute(new ResultPartitionSender((NetworkOutputChannel) 
writer));
+    }
+
+    private class ResultPartitionSender implements Runnable {
+
+        private final NetworkOutputChannel channel;
+
+        ResultPartitionSender(final NetworkOutputChannel channel) {
+            this.channel = channel;
+        }
+
+        @Override
+        public void run() {
+            channel.setFrameSize(resultState.getFrameSize());
+            channel.open();
+            try {
+                resultState.readOpen();
+                long offset = 0;
+                final ByteBuffer buffer = 
ByteBuffer.allocate(resultState.getFrameSize());
+                while (true) {
+                    buffer.clear();
+                    final long size = read(offset, buffer);
+                    if (size <= 0) {
+                        break;
+                    } else if (size < buffer.limit()) {
+                        throw new IllegalStateException(
+                                "Premature end of file - readSize: " + size + 
" buffer limit: " + buffer.limit());
+                    }
+                    offset += size;
+                    buffer.flip();
+                    channel.nextFrame(buffer);
+                }
+                if (LOGGER.isDebugEnabled()) {
+                    LOGGER.debug("result reading successful(" + 
resultState.getResultSetPartitionId() + ")");
+                }
+            } catch (Exception e) {
+                LOGGER.error(() -> "failed to send result partition " + 
resultState.getResultSetPartitionId(), e);
+                channel.abort();
+            } finally {
+                close();
+            }
+        }
+
+        private long read(long offset, ByteBuffer buffer) throws 
HyracksDataException {
+            return resultMemoryManager != null ? 
resultState.read(resultMemoryManager, offset, buffer)
+                    : resultState.read(offset, buffer);
+        }
+
+        private void close() {
+            try {
+                channel.close();
+                resultState.readClose();
+                if (resultState.isExhausted()) {
+                    final ResultSetPartitionId partitionId = 
resultState.getResultSetPartitionId();
+                    
resultPartitionManager.removePartition(partitionId.getJobId(), 
partitionId.getResultSetId(),
+                            partitionId.getPartition());
+                }
+            } catch (HyracksDataException e) {
+                LOGGER.error("unexpected failure in partition reader clean 
up", e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionWriter.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionWriter.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionWriter.java
new file mode 100644
index 0000000..f25bc58
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionWriter.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.result;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.io.IWorkspaceFileFactory;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.partitions.ResultSetPartitionId;
+import org.apache.hyracks.api.result.IResultPartitionManager;
+import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class ResultPartitionWriter implements IFrameWriter {
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private final IResultPartitionManager manager;
+
+    private final JobId jobId;
+
+    private final ResultSetId resultSetId;
+
+    private final boolean orderedResult;
+
+    private final int partition;
+
+    private final int nPartitions;
+
+    private final ResultMemoryManager resultMemoryManager;
+
+    private final ResultSetPartitionId resultSetPartitionId;
+
+    private final ResultState resultState;
+
+    private boolean partitionRegistered;
+
+    private boolean failed = false;
+
+    public ResultPartitionWriter(IHyracksTaskContext ctx, 
IResultPartitionManager manager, JobId jobId,
+            ResultSetId rsId, boolean asyncMode, boolean orderedResult, int 
partition, int nPartitions,
+            ResultMemoryManager resultMemoryManager, IWorkspaceFileFactory 
fileFactory, long maxReads) {
+        this.manager = manager;
+        this.jobId = jobId;
+        this.resultSetId = rsId;
+        this.orderedResult = orderedResult;
+        this.partition = partition;
+        this.nPartitions = nPartitions;
+        this.resultMemoryManager = resultMemoryManager;
+
+        resultSetPartitionId = new ResultSetPartitionId(jobId, rsId, 
partition);
+        resultState = new ResultState(resultSetPartitionId, asyncMode, 
ctx.getIoManager(), fileFactory,
+                ctx.getInitialFrameSize(), maxReads);
+    }
+
+    public ResultState getResultState() {
+        return resultState;
+    }
+
+    @Override
+    public void open() {
+        if (LOGGER.isInfoEnabled()) {
+            LOGGER.info("open(" + partition + ")");
+        }
+        partitionRegistered = false;
+        resultState.open();
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        registerResultPartitionLocation(false);
+        if (resultMemoryManager == null) {
+            resultState.write(buffer);
+        } else {
+            resultState.write(resultMemoryManager, buffer);
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        failed = true;
+        resultState.closeAndDelete();
+        resultState.abort();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (LOGGER.isInfoEnabled()) {
+            LOGGER.info("close(" + partition + ")");
+        }
+        try {
+            if (!failed) {
+                registerResultPartitionLocation(true);
+            }
+        } finally {
+            resultState.close();
+        }
+        try {
+            if (partitionRegistered) {
+                manager.reportPartitionWriteCompletion(jobId, resultSetId, 
partition);
+            }
+        } catch (HyracksException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    void registerResultPartitionLocation(boolean empty) throws 
HyracksDataException {
+        try {
+            if (!partitionRegistered) {
+                manager.registerResultPartitionLocation(jobId, resultSetId, 
partition, nPartitions, orderedResult,
+                        empty);
+                partitionRegistered = true;
+            }
+        } catch (HyracksException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultSetMap.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultSetMap.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultSetMap.java
new file mode 100644
index 0000000..41b7f07
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultSetMap.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.result;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.IResultStateRecord;
+import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+class ResultSetMap implements IResultStateRecord, Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private final long timestamp;
+    private final HashMap<ResultSetId, ResultState[]> resultStateMap;
+
+    ResultSetMap() {
+        timestamp = System.nanoTime();
+        resultStateMap = new HashMap<>();
+    }
+
+    @Override
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    ResultState[] getResultStates(ResultSetId rsId) {
+        return resultStateMap.get(rsId);
+    }
+
+    ResultState[] createOrGetResultStates(ResultSetId rsId, int nPartitions) {
+        return resultStateMap.computeIfAbsent(rsId, (k) -> new 
ResultState[nPartitions]);
+    }
+
+    /**
+     * removes a result partition for a result set
+     *
+     * @param jobId
+     *            the id of the job that produced the result set
+     * @param resultSetId
+     *            the id of the result set
+     * @param partition
+     *            the partition number
+     * @return true, if all partitions for the resultSetId have been removed
+     */
+    boolean removePartition(JobId jobId, ResultSetId resultSetId, int 
partition) {
+        final ResultState[] resultStates = resultStateMap.get(resultSetId);
+        if (resultStates != null) {
+            final ResultState state = resultStates[partition];
+            if (state != null) {
+                state.closeAndDelete();
+                LOGGER.debug("Removing partition: " + partition + " for JobId: 
" + jobId);
+            }
+            resultStates[partition] = null;
+            boolean stateEmpty = true;
+            for (ResultState resState : resultStates) {
+                if (resState != null) {
+                    stateEmpty = false;
+                    break;
+                }
+            }
+            if (stateEmpty) {
+                resultStateMap.remove(resultSetId);
+            }
+            return resultStateMap.isEmpty();
+        }
+        return true;
+    }
+
+    void abortAll() {
+        applyToAllStates((rsId, state, i) -> state.abort());
+    }
+
+    void closeAndDeleteAll() {
+        applyToAllStates((rsId, state, i) -> {
+            state.closeAndDelete();
+            LOGGER.debug("Removing partition: " + i + " for result set " + 
rsId);
+        });
+    }
+
+    @FunctionalInterface
+    private interface StateModifier {
+        void modify(ResultSetId rsId, ResultState entry, int partition);
+    }
+
+    private void applyToAllStates(StateModifier modifier) {
+        for (Map.Entry<ResultSetId, ResultState[]> entry : 
resultStateMap.entrySet()) {
+            final ResultSetId rsId = entry.getKey();
+            final ResultState[] resultStates = entry.getValue();
+            if (resultStates == null) {
+                continue;
+            }
+            for (int i = 0; i < resultStates.length; i++) {
+                final ResultState state = resultStates[i];
+                if (state != null) {
+                    modifier.modify(rsId, state, i);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultState.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultState.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultState.java
new file mode 100644
index 0000000..25d3f00
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultState.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.result;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hyracks.api.dataflow.state.IStateObject;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.io.IWorkspaceFileFactory;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.partitions.ResultSetPartitionId;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.util.MinimalPrettyPrinter;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class ResultState implements IStateObject {
+    private static final String FILE_PREFIX = "result_";
+
+    private final ResultSetPartitionId resultSetPartitionId;
+
+    private final boolean asyncMode;
+
+    private final int frameSize;
+
+    private final IIOManager ioManager;
+
+    private final IWorkspaceFileFactory fileFactory;
+
+    private final AtomicBoolean eos;
+
+    private final AtomicBoolean failed;
+
+    private final List<Page> localPageList;
+
+    private FileReference fileRef;
+
+    private IFileHandle fileHandle;
+
+    private volatile int referenceCount = 0;
+
+    private long size;
+
+    private long persistentSize;
+    private long remainingReads;
+
+    ResultState(ResultSetPartitionId resultSetPartitionId, boolean asyncMode, 
IIOManager ioManager,
+            IWorkspaceFileFactory fileFactory, int frameSize, long maxReads) {
+        if (maxReads <= 0) {
+            throw new IllegalArgumentException("maxReads must be > 0");
+        }
+        this.resultSetPartitionId = resultSetPartitionId;
+        this.asyncMode = asyncMode;
+        this.ioManager = ioManager;
+        this.fileFactory = fileFactory;
+        this.frameSize = frameSize;
+        remainingReads = maxReads;
+        eos = new AtomicBoolean(false);
+        failed = new AtomicBoolean(false);
+        localPageList = new ArrayList<>();
+
+        fileRef = null;
+        fileHandle = null;
+    }
+
+    public synchronized void open() {
+        size = 0;
+        persistentSize = 0;
+        referenceCount = 0;
+    }
+
+    public synchronized void close() {
+        eos.set(true);
+        closeWriteFileHandle();
+        notifyAll();
+    }
+
+    public synchronized void closeAndDelete() {
+        // Deleting a job is equivalent to aborting the job for all practical 
purposes, so the same action, needs
+        // to be taken when there are more requests to these result states.
+        failed.set(true);
+        closeWriteFileHandle();
+        if (fileRef != null) {
+            fileRef.delete();
+            fileRef = null;
+        }
+    }
+
+    private void closeWriteFileHandle() {
+        if (fileHandle != null) {
+            doCloseFileHandle();
+        }
+    }
+
+    private void doCloseFileHandle() {
+        if (--referenceCount == 0) {
+            // close the file if there is no more reference
+            try {
+                ioManager.close(fileHandle);
+            } catch (IOException e) {
+                // Since file handle could not be closed, just ignore.
+            }
+            fileHandle = null;
+        }
+    }
+
+    public synchronized void write(ByteBuffer buffer) throws 
HyracksDataException {
+        if (fileRef == null) {
+            initWriteFileHandle();
+        }
+
+        size += ioManager.syncWrite(fileHandle, size, buffer);
+        notifyAll();
+    }
+
+    public synchronized void write(ResultMemoryManager resultMemoryManager, 
ByteBuffer buffer)
+            throws HyracksDataException {
+        int srcOffset = 0;
+        Page destPage = null;
+
+        if (!localPageList.isEmpty()) {
+            destPage = localPageList.get(localPageList.size() - 1);
+        }
+
+        while (srcOffset < buffer.limit()) {
+            if ((destPage == null) || (destPage.getBuffer().remaining() <= 0)) 
{
+                destPage = 
resultMemoryManager.requestPage(resultSetPartitionId, this);
+                localPageList.add(destPage);
+            }
+            int srcLength = Math.min(buffer.limit() - srcOffset, 
destPage.getBuffer().remaining());
+            destPage.getBuffer().put(buffer.array(), srcOffset, srcLength);
+            srcOffset += srcLength;
+            size += srcLength;
+        }
+
+        notifyAll();
+    }
+
+    public synchronized void readOpen() {
+        if (isExhausted()) {
+            throw new IllegalStateException("Result reads exhausted");
+        }
+        remainingReads--;
+    }
+
+    public synchronized void readClose() throws HyracksDataException {
+        if (fileHandle != null) {
+            doCloseFileHandle();
+        }
+    }
+
+    public synchronized long read(long offset, ByteBuffer buffer) throws 
HyracksDataException {
+        long readSize = 0;
+
+        while (offset >= size && !eos.get() && !failed.get()) {
+            try {
+                wait();
+            } catch (InterruptedException e) {
+                throw HyracksDataException.create(e);
+            }
+        }
+        if ((offset >= size && eos.get()) || failed.get()) {
+            return readSize;
+        }
+
+        if (fileHandle == null) {
+            initReadFileHandle();
+        }
+        readSize = ioManager.syncRead(fileHandle, offset, buffer);
+
+        return readSize;
+    }
+
+    public synchronized long read(ResultMemoryManager resultMemoryManager, 
long offset, ByteBuffer buffer)
+            throws HyracksDataException {
+        long readSize = 0;
+        while (offset >= size && !eos.get() && !failed.get()) {
+            try {
+                wait();
+            } catch (InterruptedException e) {
+                throw HyracksDataException.create(e);
+            }
+        }
+
+        if ((offset >= size && eos.get()) || failed.get()) {
+            return readSize;
+        }
+
+        if (offset < persistentSize) {
+            if (fileHandle == null) {
+                initReadFileHandle();
+            }
+            readSize = ioManager.syncRead(fileHandle, offset, buffer);
+            if (readSize < 0) {
+                throw new HyracksDataException("Premature end of file");
+            }
+        }
+
+        if (readSize < buffer.capacity()) {
+            long localPageOffset = offset - persistentSize;
+            int localPageIndex = (int) (localPageOffset / 
ResultMemoryManager.getPageSize());
+            int pageOffset = (int) (localPageOffset % 
ResultMemoryManager.getPageSize());
+            Page page = getPage(localPageIndex);
+            if (page == null) {
+                return readSize;
+            }
+            readSize += buffer.remaining();
+            buffer.put(page.getBuffer().array(), pageOffset, 
buffer.remaining());
+        }
+        resultMemoryManager.pageReferenced(resultSetPartitionId);
+        return readSize;
+    }
+
+    public synchronized void abort() {
+        failed.set(true);
+        notifyAll();
+    }
+
+    public synchronized Page returnPage() throws HyracksDataException {
+        Page page = removePage();
+
+        // If we do not have any pages to be given back close the write 
channel since we don't write any more, return
+        // null.
+        if (page == null) {
+            ioManager.close(fileHandle);
+            return null;
+        }
+
+        page.getBuffer().flip();
+
+        if (fileRef == null) {
+            initWriteFileHandle();
+        }
+
+        long delta = ioManager.syncWrite(fileHandle, persistentSize, 
page.getBuffer());
+        persistentSize += delta;
+        return page;
+    }
+
+    public synchronized void setEOS(boolean eos) {
+        this.eos.set(eos);
+    }
+
+    public ResultSetPartitionId getResultSetPartitionId() {
+        return resultSetPartitionId;
+    }
+
+    public int getFrameSize() {
+        return frameSize;
+    }
+
+    public IIOManager getIOManager() {
+        return ioManager;
+    }
+
+    public boolean getAsyncMode() {
+        return asyncMode;
+    }
+
+    @Override
+    public JobId getJobId() {
+        return resultSetPartitionId.getJobId();
+    }
+
+    @Override
+    public Object getId() {
+        return resultSetPartitionId;
+    }
+
+    @Override
+    public long getMemoryOccupancy() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void toBytes(DataOutput out) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void fromBytes(DataInput in) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    private Page getPage(int index) {
+        Page page = null;
+        if (!localPageList.isEmpty()) {
+            page = localPageList.get(index);
+        }
+        return page;
+    }
+
+    private Page removePage() {
+        Page page = null;
+        if (!localPageList.isEmpty()) {
+            page = localPageList.remove(localPageList.size() - 1);
+        }
+        return page;
+    }
+
+    private void initWriteFileHandle() throws HyracksDataException {
+        if (fileHandle == null) {
+            String fName = FILE_PREFIX + 
String.valueOf(resultSetPartitionId.getPartition());
+            fileRef = fileFactory.createUnmanagedWorkspaceFile(fName);
+            fileHandle = ioManager.open(fileRef, 
IIOManager.FileReadWriteMode.READ_WRITE,
+                    IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+            if (referenceCount != 0) {
+                throw new IllegalStateException("Illegal reference count " + 
referenceCount);
+            }
+            referenceCount = 1;
+            notifyAll(); // NOSONAR: always called from a synchronized block
+        }
+    }
+
+    private void initReadFileHandle() throws HyracksDataException {
+        while (fileRef == null && !failed.get()) {
+            // wait for writer to create the file
+            try {
+                wait();
+            } catch (InterruptedException e) {
+                throw HyracksDataException.create(e);
+            }
+        }
+        if (failed.get()) {
+            return;
+        }
+        if (fileHandle == null) {
+            // fileHandle has been closed by the writer, create it again
+            fileHandle = ioManager.open(fileRef, 
IIOManager.FileReadWriteMode.READ_ONLY,
+                    IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+        }
+        referenceCount++;
+    }
+
+    @Override
+    public String toString() {
+        try {
+            ObjectMapper om = new ObjectMapper();
+            ObjectNode on = om.createObjectNode();
+            on.put("rspid", resultSetPartitionId.toString());
+            on.put("async", asyncMode);
+            on.put("remainingReads", remainingReads);
+            on.put("eos", eos.get());
+            on.put("failed", failed.get());
+            on.put("fileRef", String.valueOf(fileRef));
+            return om.writer(new 
MinimalPrettyPrinter()).writeValueAsString(on);
+        } catch (JsonProcessingException e) { // NOSONAR
+            return e.getMessage();
+        }
+    }
+
+    public synchronized boolean isExhausted() {
+        return remainingReads == 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
index c6696fd..b11dada 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
@@ -23,9 +23,9 @@ import java.util.Collection;
 import java.util.Deque;
 
 import org.apache.hyracks.api.control.CcId;
-import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.result.IResultPartitionManager;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
 import org.apache.hyracks.control.nc.Joblet;
 import org.apache.hyracks.control.nc.NodeControllerService;
@@ -48,9 +48,9 @@ public class AbortAllJobsWork extends SynchronizableWork {
     @Override
     protected void doRun() throws Exception {
         LOGGER.info("Aborting all tasks for controller {}", ccId);
-        IDatasetPartitionManager dpm = ncs.getDatasetPartitionManager();
-        if (dpm == null) {
-            LOGGER.log(Level.WARN, "DatasetPartitionManager is null on " + 
ncs.getId());
+        IResultPartitionManager resultPartitionManager = 
ncs.getResultPartitionManager();
+        if (resultPartitionManager == null) {
+            LOGGER.log(Level.WARN, "ResultPartitionManager is null on " + 
ncs.getId());
         }
         Deque<Task> abortedTasks = new ArrayDeque<>();
         Collection<Joblet> joblets = ncs.getJobletMap().values();
@@ -61,9 +61,9 @@ public class AbortAllJobsWork extends SynchronizableWork {
                 abortedTasks.add(task);
             });
             final JobId jobId = joblet.getJobId();
-            if (dpm != null) {
-                dpm.abortReader(jobId);
-                dpm.sweep(jobId);
+            if (resultPartitionManager != null) {
+                resultPartitionManager.abortReader(jobId);
+                resultPartitionManager.sweep(jobId);
             }
             ncs.getWorkQueue().schedule(new CleanupJobletWork(ncs, jobId, 
JobStatus.FAILURE));
         });

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
index 80f3e98..f47e1ce 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
@@ -22,8 +22,8 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
-import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.IResultPartitionManager;
 import org.apache.hyracks.control.common.work.AbstractWork;
 import org.apache.hyracks.control.nc.Joblet;
 import org.apache.hyracks.control.nc.NodeControllerService;
@@ -52,9 +52,9 @@ public class AbortTasksWork extends AbstractWork {
         if (LOGGER.isInfoEnabled()) {
             LOGGER.info("Aborting Tasks: " + jobId + ":" + tasks);
         }
-        IDatasetPartitionManager dpm = ncs.getDatasetPartitionManager();
-        if (dpm != null) {
-            ncs.getDatasetPartitionManager().abortReader(jobId);
+        IResultPartitionManager resultPartitionManager = 
ncs.getResultPartitionManager();
+        if (resultPartitionManager != null) {
+            ncs.getResultPartitionManager().abortReader(jobId);
         }
         Joblet ji = ncs.getJobletMap().get(jobId);
         if (ji != null) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
index ac80afa..7dbb3c2 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -21,8 +21,8 @@ package org.apache.hyracks.control.nc.work;
 import java.util.List;
 
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
-import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.IResultPartitionManager;
 import org.apache.hyracks.control.common.work.AbstractWork;
 import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.control.nc.Task;
@@ -52,9 +52,9 @@ public class NotifyTaskFailureWork extends AbstractWork {
         LOGGER.log(Level.WARN, ncs.getId() + " is sending a notification to cc 
that task " + taskId + " has failed",
                 exceptions.get(0));
         try {
-            IDatasetPartitionManager dpm = ncs.getDatasetPartitionManager();
-            if (dpm != null) {
-                dpm.abortReader(jobId);
+            IResultPartitionManager resultPartitionManager = 
ncs.getResultPartitionManager();
+            if (resultPartitionManager != null) {
+                resultPartitionManager.abortReader(jobId);
             }
             ncs.getClusterController(jobId.getCcId()).notifyTaskFailure(jobId, 
taskId, ncs.getId(), exceptions);
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index b0cc40c..9fb9815 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -31,11 +31,11 @@ import 
org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.IResultSerializer;
 import org.apache.hyracks.api.dataflow.value.IResultSerializerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
-import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.api.result.IResultPartitionManager;
+import org.apache.hyracks.api.result.ResultSetId;
 import org.apache.hyracks.dataflow.common.comm.io.FrameOutputStream;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import 
org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
@@ -67,7 +67,7 @@ public class ResultWriterOperatorDescriptor extends 
AbstractSingleActivityOperat
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext 
ctx,
             IRecordDescriptorProvider recordDescProvider, final int partition, 
final int nPartitions)
             throws HyracksDataException {
-        final IDatasetPartitionManager dpm = ctx.getDatasetPartitionManager();
+        final IResultPartitionManager resultPartitionManager = 
ctx.getResultPartitionManager();
 
         final IFrame frame = new VSizeFrame(ctx);
 
@@ -82,15 +82,15 @@ public class ResultWriterOperatorDescriptor extends 
AbstractSingleActivityOperat
         final FrameTupleAccessor frameTupleAccessor = new 
FrameTupleAccessor(outRecordDesc);
 
         return new AbstractUnaryInputSinkOperatorNodePushable() {
-            private IFrameWriter datasetPartitionWriter;
+            private IFrameWriter resultPartitionWriter;
             private boolean failed = false;
 
             @Override
             public void open() throws HyracksDataException {
                 try {
-                    datasetPartitionWriter = 
dpm.createDatasetPartitionWriter(ctx, rsId, ordered, asyncMode, partition,
-                            nPartitions, maxReads);
-                    datasetPartitionWriter.open();
+                    resultPartitionWriter = 
resultPartitionManager.createResultPartitionWriter(ctx, rsId, ordered,
+                            asyncMode, partition, nPartitions, maxReads);
+                    resultPartitionWriter.open();
                     resultSerializer.init();
                 } catch (HyracksException e) {
                     throw HyracksDataException.create(e);
@@ -103,7 +103,7 @@ public class ResultWriterOperatorDescriptor extends 
AbstractSingleActivityOperat
                 for (int tIndex = 0; tIndex < 
frameTupleAccessor.getTupleCount(); tIndex++) {
                     resultSerializer.appendTuple(frameTupleAccessor, tIndex);
                     if (!frameOutputStream.appendTuple()) {
-                        frameOutputStream.flush(datasetPartitionWriter);
+                        frameOutputStream.flush(resultPartitionWriter);
 
                         resultSerializer.appendTuple(frameTupleAccessor, 
tIndex);
                         frameOutputStream.appendTuple();
@@ -114,23 +114,23 @@ public class ResultWriterOperatorDescriptor extends 
AbstractSingleActivityOperat
             @Override
             public void fail() throws HyracksDataException {
                 failed = true;
-                if (datasetPartitionWriter != null) {
-                    datasetPartitionWriter.fail();
+                if (resultPartitionWriter != null) {
+                    resultPartitionWriter.fail();
                 }
             }
 
             @Override
             public void close() throws HyracksDataException {
-                if (datasetPartitionWriter != null) {
+                if (resultPartitionWriter != null) {
                     try {
                         if (!failed && frameOutputStream.getTupleCount() > 0) {
-                            frameOutputStream.flush(datasetPartitionWriter);
+                            frameOutputStream.flush(resultPartitionWriter);
                         }
                     } catch (Exception e) {
-                        datasetPartitionWriter.fail();
+                        resultPartitionWriter.fail();
                         throw e;
                     } finally {
-                        datasetPartitionWriter.close();
+                        resultPartitionWriter.close();
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
index 0931501..5dcc99a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -36,16 +36,16 @@ import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
-import org.apache.hyracks.api.dataset.IHyracksDatasetReader;
-import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.io.ManagedFileSplit;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.client.dataset.HyracksDataset;
+import org.apache.hyracks.api.result.IResultSet;
+import org.apache.hyracks.api.result.IResultSetReader;
+import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.hyracks.client.result.ResultSet;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.common.controllers.CCConfig;
 import org.apache.hyracks.control.common.controllers.NCConfig;
@@ -154,8 +154,8 @@ public abstract class AbstractIntegrationTest {
 
         IFrameTupleAccessor frameTupleAccessor = new 
ResultFrameTupleAccessor();
 
-        IHyracksDataset hyracksDataset = new HyracksDataset(hcc, 
spec.getFrameSize(), nReaders);
-        IHyracksDatasetReader reader = hyracksDataset.createReader(jobId, 
resultSetId);
+        IResultSet resultSet = new ResultSet(hcc, spec.getFrameSize(), 
nReaders);
+        IResultSetReader reader = resultSet.createReader(jobId, resultSetId);
 
         List<String> resultRecords = new ArrayList<>();
         ByteBufferInputStream bbis = new ByteBufferInputStream();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index 58da8a2..55fd9a0 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -29,8 +29,6 @@ import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
-import org.apache.hyracks.api.dataset.IHyracksDatasetReader;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobFlag;
@@ -38,7 +36,9 @@ import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
-import org.apache.hyracks.client.dataset.HyracksDataset;
+import org.apache.hyracks.api.result.IResultSet;
+import org.apache.hyracks.api.result.IResultSetReader;
+import org.apache.hyracks.client.result.ResultSet;
 import org.apache.hyracks.control.cc.BaseCCApplication;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.application.CCServiceContext;
@@ -160,8 +160,8 @@ public abstract class AbstractMultiNCIntegrationTest {
         IFrameTupleAccessor frameTupleAccessor = new 
ResultFrameTupleAccessor();
 
         if (!spec.getResultSetIds().isEmpty()) {
-            IHyracksDataset hyracksDataset = new HyracksDataset(hcc, 
spec.getFrameSize(), nReaders);
-            IHyracksDatasetReader reader = hyracksDataset.createReader(jobId, 
spec.getResultSetIds().get(0));
+            IResultSet resultSet = new ResultSet(hcc, spec.getFrameSize(), 
nReaders);
+            IResultSetReader reader = resultSet.createReader(jobId, 
spec.getResultSetIds().get(0));
 
             ObjectMapper om = new ObjectMapper();
             ArrayNode resultRecords = om.createArrayNode();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
index 752c643..39e7a45 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
@@ -28,10 +28,10 @@ import 
org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.io.ManagedFileSplit;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.result.ResultSetId;
 import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
 import 
org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
 import 
org.apache.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
index c28a5aa..c542fe9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
@@ -27,10 +27,10 @@ import 
org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.io.ManagedFileSplit;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.result.ResultSetId;
 import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
 import 
org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
 import org.apache.hyracks.data.std.primitive.IntegerPointable;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
index 9e795bf..1dce88d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
@@ -27,10 +27,10 @@ import 
org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.io.ManagedFileSplit;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.result.ResultSetId;
 import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
 import 
org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
 import org.apache.hyracks.data.std.primitive.UTF8StringPointable;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
index 49dee84..8d3b0dc 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
@@ -29,10 +29,10 @@ import 
org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.io.ManagedFileSplit;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.result.ResultSetId;
 import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
 import 
org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
 import 
org.apache.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/NodesAPIIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/NodesAPIIntegrationTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/NodesAPIIntegrationTest.java
index 672d2c4..9d79d80 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/NodesAPIIntegrationTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/NodesAPIIntegrationTest.java
@@ -45,8 +45,8 @@ public class NodesAPIIntegrationTest extends 
AbstractIntegrationTest {
             "nonheap-committed-sizes", "nonheap-max-sizes", 
"application-memory-budget", "application-cpu-core-budget",
             "thread-counts", "peak-thread-counts", "system-load-averages", 
"gc-names", "gc-collection-counts",
             "gc-collection-times", "net-payload-bytes-read", 
"net-payload-bytes-written", "net-signaling-bytes-read",
-            "net-signaling-bytes-written", "dataset-net-payload-bytes-read", 
"dataset-net-payload-bytes-written",
-            "dataset-net-signaling-bytes-read", 
"dataset-net-signaling-bytes-written", "ipc-messages-sent",
+            "net-signaling-bytes-written", "result-net-payload-bytes-read", 
"result-net-payload-bytes-written",
+            "result-net-signaling-bytes-read", 
"result-net-signaling-bytes-written", "ipc-messages-sent",
             "ipc-message-bytes-sent", "ipc-messages-received", 
"ipc-message-bytes-received", "disk-reads",
             "disk-writes", "config" };
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
index 09629b2..bc5de11 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
@@ -27,10 +27,10 @@ import 
org.apache.hyracks.api.constraints.PartitionConstraintHelper;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.io.ManagedFileSplit;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.result.ResultSetId;
 import 
org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java
index 75ba33f..7838a34 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java
@@ -26,10 +26,10 @@ import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.io.ManagedFileSplit;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.result.ResultSetId;
 import 
org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
 import org.apache.hyracks.data.std.primitive.IntegerPointable;
 import org.apache.hyracks.data.std.primitive.UTF8StringPointable;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java
index 315b74c..06c0eaa 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java
@@ -26,7 +26,7 @@ import 
org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.result.ResultSetId;
 import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.io.ManagedFileSplit;
 import org.apache.hyracks.api.job.JobSpecification;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 816f3fa..6154e28 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -28,10 +28,10 @@ import 
org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.io.ManagedFileSplit;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.result.ResultSetId;
 import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
 import 
org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
 import org.apache.hyracks.data.std.primitive.UTF8StringPointable;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
index c2b3263..0eb6810 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
@@ -32,11 +32,11 @@ import 
org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
 import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.io.ManagedFileSplit;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.result.ResultSetId;
 import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
 import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
 import 
org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
index 81a71eb..48f7837 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
@@ -24,10 +24,10 @@ import 
org.apache.hyracks.api.constraints.PartitionConstraintHelper;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.io.ManagedFileSplit;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.result.ResultSetId;
 import 
org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index ac52573..174d5cd 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -30,7 +30,6 @@ import org.apache.hyracks.api.context.IHyracksJobletContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataflow.state.IStateObject;
-import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
@@ -39,6 +38,7 @@ import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.profiling.IStatsCollector;
 import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
 import org.apache.hyracks.api.resources.IDeallocatable;
+import org.apache.hyracks.api.result.IResultPartitionManager;
 import org.apache.hyracks.control.common.job.profiling.StatsCollector;
 import org.apache.hyracks.control.nc.io.WorkspaceFileFactory;
 
@@ -134,7 +134,7 @@ public class TestTaskContext implements IHyracksTaskContext 
{
     }
 
     @Override
-    public IDatasetPartitionManager getDatasetPartitionManager() {
+    public IResultPartitionManager getResultPartitionManager() {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
 
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
index 2f58259..1ceb199 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
@@ -19,7 +19,7 @@
 
 package org.apache.hyracks.storage.am.lsm.invertedindex.util;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.fail;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInput;
@@ -44,7 +44,6 @@ import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataflow.state.IStateObject;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -55,6 +54,7 @@ import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.profiling.IStatsCollector;
 import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
 import org.apache.hyracks.api.resources.IDeallocatable;
+import org.apache.hyracks.api.result.IResultPartitionManager;
 import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.data.std.primitive.IntegerPointable;
 import org.apache.hyracks.data.std.util.GrowableArray;
@@ -726,7 +726,7 @@ public class LSMInvertedIndexTestUtils {
         }
 
         @Override
-        public IDatasetPartitionManager getDatasetPartitionManager() {
+        public IResultPartitionManager getResultPartitionManager() {
             return null;
         }
 

Reply via email to