Repository: asterixdb
Updated Branches:
  refs/heads/master caf43069b -> 10e5ad1a7


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
index 32eff3a..7b3fb46 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
@@ -34,8 +34,8 @@ public abstract class AbstractOneInputOneOutputRuntimeFactory 
implements IPushRu
     }
 
     @Override
-    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
-        return createOneOutputPushRuntime(ctx);
+    public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
+        return new IPushRuntime[] { createOneOutputPushRuntime(ctx) };
     }
 
     public abstract AbstractOneInputOneOutputPushRuntime 
createOneOutputPushRuntime(IHyracksTaskContext ctx)

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
index a838557..f0e9406 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
@@ -38,8 +38,8 @@ public class SinkRuntimeFactory implements 
IPushRuntimeFactory {
     }
 
     @Override
-    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
-        return new AbstractOneInputSinkPushRuntime() {
+    public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
+        return new IPushRuntime[] { new AbstractOneInputSinkPushRuntime() {
 
             @Override
             public void open() throws HyracksDataException {
@@ -61,7 +61,6 @@ public class SinkRuntimeFactory implements 
IPushRuntimeFactory {
             public void flush() throws HyracksDataException {
                 // flush() is meaningless for sink operators
             }
-        };
+        } };
     }
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index b4d23fc..07365db 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -45,11 +45,18 @@ public class AlgebricksMetaOperatorDescriptor extends 
AbstractSingleActivityOper
 
     public AlgebricksMetaOperatorDescriptor(IOperatorDescriptorRegistry spec, 
int inputArity, int outputArity,
             IPushRuntimeFactory[] runtimeFactories, RecordDescriptor[] 
internalRecordDescriptors) {
+        this(spec, inputArity, outputArity, runtimeFactories, 
internalRecordDescriptors, null, null);
+    }
+
+    public AlgebricksMetaOperatorDescriptor(IOperatorDescriptorRegistry spec, 
int inputArity, int outputArity,
+            IPushRuntimeFactory[] runtimeFactories, RecordDescriptor[] 
internalRecordDescriptors,
+            IPushRuntimeFactory[] outputRuntimeFactories, int[] 
outputPositions) {
         super(spec, inputArity, outputArity);
         if (outputArity == 1) {
             this.outRecDescs[0] = 
internalRecordDescriptors[internalRecordDescriptors.length - 1];
         }
-        this.pipeline = new AlgebricksPipeline(runtimeFactories, 
internalRecordDescriptors);
+        this.pipeline = new AlgebricksPipeline(runtimeFactories, 
internalRecordDescriptors, outputRuntimeFactories,
+                outputPositions);
     }
 
     public AlgebricksPipeline getPipeline() {
@@ -81,7 +88,7 @@ public class AlgebricksMetaOperatorDescriptor extends 
AbstractSingleActivityOper
     private class SourcePushRuntime extends 
AbstractUnaryOutputSourceOperatorNodePushable {
         private final IHyracksTaskContext ctx;
 
-        public SourcePushRuntime(IHyracksTaskContext ctx) {
+        SourcePushRuntime(IHyracksTaskContext ctx) {
             this.ctx = ctx;
         }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
index e1081e0..a717794 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
@@ -18,9 +18,13 @@
  */
 package org.apache.hyracks.algebricks.runtime.operators.meta;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
 import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.EnforceFrameWriter;
@@ -37,6 +41,7 @@ public class PipelineAssembler {
     private final int inputArity;
     private final int outputArity;
     private final AlgebricksPipeline pipeline;
+    private final Map<IPushRuntimeFactory, IPushRuntime[]> runtimeMap;
 
     public PipelineAssembler(AlgebricksPipeline pipeline, int inputArity, int 
outputArity,
             RecordDescriptor pipelineInputRecordDescriptor, RecordDescriptor 
pipelineOutputRecordDescriptor) {
@@ -45,6 +50,7 @@ public class PipelineAssembler {
         this.pipelineOutputRecordDescriptor = pipelineOutputRecordDescriptor;
         this.inputArity = inputArity;
         this.outputArity = outputArity;
+        this.runtimeMap = new HashMap<>();
     }
 
     public IFrameWriter assemblePipeline(IFrameWriter writer, 
IHyracksTaskContext ctx) throws HyracksDataException {
@@ -52,19 +58,30 @@ public class PipelineAssembler {
         boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
         // plug the operators
         IFrameWriter start = writer;// this.writer;
-        for (int i = pipeline.getRuntimeFactories().length - 1; i >= 0; i--) {
-            IPushRuntime newRuntime = 
pipeline.getRuntimeFactories()[i].createPushRuntime(ctx);
-            newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : 
newRuntime;
+        IPushRuntimeFactory[] runtimeFactories = 
pipeline.getRuntimeFactories();
+        RecordDescriptor[] recordDescriptors = pipeline.getRecordDescriptors();
+        for (int i = runtimeFactories.length - 1; i >= 0; i--) {
             start = enforce ? EnforceFrameWriter.enforce(start) : start;
-            if (i == pipeline.getRuntimeFactories().length - 1) {
-                if (outputArity == 1) {
-                    newRuntime.setOutputFrameWriter(0, start, 
pipelineOutputRecordDescriptor);
+
+            IPushRuntimeFactory runtimeFactory = runtimeFactories[i];
+            IPushRuntime[] newRuntimes = runtimeFactory.createPushRuntime(ctx);
+            for (int j = 0; j < newRuntimes.length; j++) {
+                if (enforce) {
+                    newRuntimes[j] = 
EnforcePushRuntime.enforce(newRuntimes[j]);
+                }
+                if (i == runtimeFactories.length - 1) {
+                    if (outputArity == 1) {
+                        newRuntimes[j].setOutputFrameWriter(0, start, 
pipelineOutputRecordDescriptor);
+                    }
+                } else {
+                    newRuntimes[j].setOutputFrameWriter(0, start, 
recordDescriptors[i]);
                 }
-            } else {
-                newRuntime.setOutputFrameWriter(0, start, 
pipeline.getRecordDescriptors()[i]);
             }
+            runtimeMap.put(runtimeFactory, newRuntimes);
+
+            IPushRuntime newRuntime = newRuntimes[0];
             if (i > 0) {
-                newRuntime.setInputRecordDescriptor(0, 
pipeline.getRecordDescriptors()[i - 1]);
+                newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 
1]);
             } else if (inputArity > 0) {
                 newRuntime.setInputRecordDescriptor(0, 
pipelineInputRecordDescriptor);
             }
@@ -72,4 +89,8 @@ public class PipelineAssembler {
         }
         return start;
     }
+
+    public IPushRuntime[] getPushRuntime(IPushRuntimeFactory runtimeFactory) {
+        return runtimeMap.get(runtimeFactory);
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index 7e04750..159fde7 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -20,9 +20,11 @@ package org.apache.hyracks.algebricks.runtime.operators.meta;
 
 import java.io.DataOutput;
 import java.nio.ByteBuffer;
+import java.util.List;
 
 import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import 
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
 import 
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputPushRuntime;
@@ -41,16 +43,21 @@ public class SubplanRuntimeFactory extends 
AbstractOneInputOneOutputRuntimeFacto
 
     private static final long serialVersionUID = 1L;
 
-    private final AlgebricksPipeline pipeline;
+    private final List<AlgebricksPipeline> pipelines;
+
     private final RecordDescriptor inputRecordDesc;
+
+    private final RecordDescriptor outputRecordDesc;
+
     private final IMissingWriterFactory[] missingWriterFactories;
 
-    public SubplanRuntimeFactory(AlgebricksPipeline pipeline, 
IMissingWriterFactory[] missingWriterFactories,
-            RecordDescriptor inputRecordDesc, int[] projectionList) {
+    public SubplanRuntimeFactory(List<AlgebricksPipeline> pipelines, 
IMissingWriterFactory[] missingWriterFactories,
+            RecordDescriptor inputRecordDesc, RecordDescriptor 
outputRecordDesc, int[] projectionList) {
         super(projectionList);
-        this.pipeline = pipeline;
+        this.pipelines = pipelines;
         this.missingWriterFactories = missingWriterFactories;
         this.inputRecordDesc = inputRecordDesc;
+        this.outputRecordDesc = outputRecordDesc;
         if (projectionList != null) {
             throw new NotImplementedException();
         }
@@ -60,8 +67,12 @@ public class SubplanRuntimeFactory extends 
AbstractOneInputOneOutputRuntimeFacto
     public String toString() {
         StringBuilder sb = new StringBuilder();
         sb.append("Subplan { \n");
-        for (IPushRuntimeFactory f : pipeline.getRuntimeFactories()) {
-            sb.append("  " + f.toString() + ";\n");
+        for (AlgebricksPipeline pipeline : pipelines) {
+            sb.append('{');
+            for (IPushRuntimeFactory f : pipeline.getRuntimeFactories()) {
+                sb.append("  ").append(f).append(";\n");
+            }
+            sb.append('}');
         }
         sb.append("}");
         return sb.toString();
@@ -70,110 +81,177 @@ public class SubplanRuntimeFactory extends 
AbstractOneInputOneOutputRuntimeFacto
     @Override
     public AbstractOneInputOneOutputPushRuntime 
createOneOutputPushRuntime(final IHyracksTaskContext ctx)
             throws HyracksDataException {
+        return new SubplanPushRuntime(ctx);
+    }
 
-        RecordDescriptor pipelineOutputRecordDescriptor = null;
+    private class SubplanPushRuntime extends 
AbstractOneInputOneOutputOneFramePushRuntime {
 
-        final PipelineAssembler pa =
-                new PipelineAssembler(pipeline, 1, 1, inputRecordDesc, 
pipelineOutputRecordDescriptor);
-        final IMissingWriter[] nullWriters = new 
IMissingWriter[missingWriterFactories.length];
-        for (int i = 0; i < missingWriterFactories.length; i++) {
-            nullWriters[i] = missingWriterFactories[i].createMissingWriter();
-        }
+        final IHyracksTaskContext ctx;
 
-        return new AbstractOneInputOneOutputOneFramePushRuntime() {
+        final NestedTupleSourceRuntime[] startOfPipelines;
 
-            /**
-             * Computes the outer product between a given tuple and the frames
-             * passed.
-             */
-            class TupleOuterProduct implements IFrameWriter {
+        boolean first;
 
-                private boolean smthWasWritten = false;
-                private FrameTupleAccessor ta = new FrameTupleAccessor(
-                        
pipeline.getRecordDescriptors()[pipeline.getRecordDescriptors().length - 1]);
-                private ArrayTupleBuilder tb = new ArrayTupleBuilder(
-                        nullWriters.length + 
SubplanRuntimeFactory.this.inputRecordDesc.getFieldCount());
+        SubplanPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
+            this.ctx = ctx;
+            this.first = true;
 
-                @Override
-                public void open() throws HyracksDataException {
-                    smthWasWritten = false;
-                }
+            IMissingWriter[] missingWriters = new 
IMissingWriter[missingWriterFactories.length];
+            for (int i = 0; i < missingWriterFactories.length; i++) {
+                missingWriters[i] = 
missingWriterFactories[i].createMissingWriter();
+            }
+
+            int pipelineCount = pipelines.size();
+            startOfPipelines = new NestedTupleSourceRuntime[pipelineCount];
+            PipelineAssembler[] pipelineAssemblers = new 
PipelineAssembler[pipelineCount];
+            for (int i = 0; i < pipelineCount; i++) {
+                AlgebricksPipeline pipeline = pipelines.get(i);
+                RecordDescriptor pipelineLastRecordDescriptor =
+                        
pipeline.getRecordDescriptors()[pipeline.getRecordDescriptors().length - 1];
 
-                @Override
-                public void nextFrame(ByteBuffer buffer) throws 
HyracksDataException {
-                    ta.reset(buffer);
-                    int nTuple = ta.getTupleCount();
-                    for (int t = 0; t < nTuple; t++) {
-                        appendConcat(tRef.getFrameTupleAccessor(), 
tRef.getTupleIndex(), ta, t);
+                RecordDescriptor outputRecordDescriptor;
+                IFrameWriter outputWriter;
+                if (i == 0) {
+                    // primary pipeline
+                    outputWriter = new 
TupleOuterProduct(pipelineLastRecordDescriptor, missingWriters);
+                    outputRecordDescriptor = 
SubplanRuntimeFactory.this.outputRecordDesc;
+                } else {
+                    // secondary pipeline
+                    IPushRuntime outputPushRuntime = 
linkSecondaryPipeline(pipeline, pipelineAssemblers, i);
+                    if (outputPushRuntime == null) {
+                        throw new IllegalStateException("Invalid pipeline");
                     }
-                    smthWasWritten = true;
+                    outputPushRuntime.setInputRecordDescriptor(0, 
pipelineLastRecordDescriptor);
+                    outputWriter = outputPushRuntime;
+                    outputRecordDescriptor = pipelineLastRecordDescriptor;
                 }
 
-                @Override
-                public void close() throws HyracksDataException {
-                    if (!smthWasWritten && !failed) {
-                        // the case when we need to write nulls
-                        appendNullsToTuple();
-                        appendToFrameFromTupleBuilder(tb);
-                    }
+                PipelineAssembler pa = new PipelineAssembler(pipeline, 1, 1, 
inputRecordDesc, outputRecordDescriptor);
+                startOfPipelines[i] = (NestedTupleSourceRuntime) 
pa.assemblePipeline(outputWriter, ctx);
+                pipelineAssemblers[i] = pa;
+            }
+        }
+
+        IPushRuntime linkSecondaryPipeline(AlgebricksPipeline pipeline, 
PipelineAssembler[] pipelineAssemblers,
+                int pipelineAssemblersCount) {
+            IPushRuntimeFactory[] outputRuntimeFactories = 
pipeline.getOutputRuntimeFactories();
+            if (outputRuntimeFactories == null || 
outputRuntimeFactories.length != 1) {
+                throw new IllegalStateException();
+            }
+            IPushRuntimeFactory outRuntimeFactory = outputRuntimeFactories[0];
+            int outputPosition = pipeline.getOutputPositions()[0];
+            for (int i = 0; i < pipelineAssemblersCount; i++) {
+                IPushRuntime[] p = 
pipelineAssemblers[i].getPushRuntime(outRuntimeFactory);
+                if (p != null) {
+                    return p[outputPosition];
                 }
+            }
+            return null;
+        }
 
-                @Override
-                public void fail() throws HyracksDataException {
-                    // writer.fail() is called by the outer class' 
writer.fail().
+        @Override
+        public void open() throws HyracksDataException {
+            writer.open();
+            if (first) {
+                first = false;
+                initAccessAppendRef(ctx);
+            }
+        }
+
+        @Override
+        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            tAccess.reset(buffer);
+            int nTuple = tAccess.getTupleCount();
+            for (int t = 0; t < nTuple; t++) {
+                tRef.reset(tAccess, t);
+
+                for (NestedTupleSourceRuntime nts : startOfPipelines) {
+                    nts.writeTuple(buffer, t);
                 }
 
-                private void appendNullsToTuple() throws HyracksDataException {
-                    tb.reset();
-                    int n0 = tRef.getFieldCount();
-                    for (int f = 0; f < n0; f++) {
-                        tb.addField(tRef.getFrameTupleAccessor(), 
tRef.getTupleIndex(), f);
+                int n = 0;
+                try {
+                    for (; n < startOfPipelines.length; n++) {
+                        NestedTupleSourceRuntime nts = startOfPipelines[n];
+                        try {
+                            nts.open();
+                        } catch (Exception e) {
+                            nts.fail();
+                            throw e;
+                        }
                     }
-                    DataOutput dos = tb.getDataOutput();
-                    for (int i = 0; i < nullWriters.length; i++) {
-                        nullWriters[i].writeMissing(dos);
-                        tb.addFieldEndOffset();
+                } finally {
+                    for (int i = n - 1; i >= 0; i--) {
+                        startOfPipelines[i].close();
                     }
                 }
             }
+        }
 
-            IFrameWriter endPipe = new TupleOuterProduct();
+        @Override
+        public void flush() throws HyracksDataException {
+            writer.flush();
+        }
 
-            NestedTupleSourceRuntime startOfPipeline = 
(NestedTupleSourceRuntime) pa.assemblePipeline(endPipe, ctx);
+        /**
+         * Computes the outer product between a given tuple and the frames
+         * passed.
+         */
+        class TupleOuterProduct implements IFrameWriter {
 
-            boolean first = true;
+            private boolean smthWasWritten;
+            private final FrameTupleAccessor ta;
+            private final ArrayTupleBuilder tb;
+            private final IMissingWriter[] missingWriters;
+
+            private TupleOuterProduct(RecordDescriptor recordDescriptor, 
IMissingWriter[] missingWriters) {
+                ta = new FrameTupleAccessor(recordDescriptor);
+                tb = new ArrayTupleBuilder(
+                        missingWriters.length + 
SubplanRuntimeFactory.this.inputRecordDesc.getFieldCount());
+                this.missingWriters = missingWriters;
+            }
 
             @Override
             public void open() throws HyracksDataException {
-                writer.open();
-                if (first) {
-                    first = false;
-                    initAccessAppendRef(ctx);
-                }
+                smthWasWritten = false;
             }
 
             @Override
             public void nextFrame(ByteBuffer buffer) throws 
HyracksDataException {
-                tAccess.reset(buffer);
-                int nTuple = tAccess.getTupleCount();
+                ta.reset(buffer);
+                int nTuple = ta.getTupleCount();
                 for (int t = 0; t < nTuple; t++) {
-                    tRef.reset(tAccess, t);
-                    startOfPipeline.writeTuple(buffer, t);
-                    try {
-                        startOfPipeline.open();
-                    } catch (Exception e) {
-                        startOfPipeline.fail();
-                        throw e;
-                    } finally {
-                        startOfPipeline.close();
-                    }
+                    appendConcat(tRef.getFrameTupleAccessor(), 
tRef.getTupleIndex(), ta, t);
                 }
+                smthWasWritten = true;
             }
 
             @Override
-            public void flush() throws HyracksDataException {
-                writer.flush();
+            public void close() throws HyracksDataException {
+                if (!smthWasWritten && !failed) {
+                    // the case when we need to write nulls
+                    appendNullsToTuple();
+                    appendToFrameFromTupleBuilder(tb);
+                }
             }
-        };
+
+            @Override
+            public void fail() throws HyracksDataException {
+                // writer.fail() is called by the outer class' writer.fail().
+            }
+
+            private void appendNullsToTuple() throws HyracksDataException {
+                tb.reset();
+                int n0 = tRef.getFieldCount();
+                for (int f = 0; f < n0; f++) {
+                    tb.addField(tRef.getFrameTupleAccessor(), 
tRef.getTupleIndex(), f);
+                }
+                DataOutput dos = tb.getDataOutput();
+                for (IMissingWriter missingWriter : missingWriters) {
+                    missingWriter.writeMissing(dos);
+                    tb.addFieldEndOffset();
+                }
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
index 3ccceed..67f4a77 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
@@ -40,8 +40,8 @@ public class EmptyTupleSourceRuntimeFactory implements 
IPushRuntimeFactory {
     }
 
     @Override
-    public IPushRuntime createPushRuntime(final IHyracksTaskContext ctx) 
throws HyracksDataException {
-        return new AbstractOneInputSourcePushRuntime() {
+    public IPushRuntime[] createPushRuntime(final IHyracksTaskContext ctx) 
throws HyracksDataException {
+        return new IPushRuntime[] { new AbstractOneInputSourcePushRuntime() {
 
             private final ArrayTupleBuilder tb = new ArrayTupleBuilder(0);
             private final FrameTupleAppender appender = new 
FrameTupleAppender(new VSizeFrame(ctx));
@@ -69,6 +69,6 @@ public class EmptyTupleSourceRuntimeFactory implements 
IPushRuntimeFactory {
             public void flush() throws HyracksDataException {
                 appender.flush(writer);
             }
-        };
+        } };
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
index 496679f..8e64092 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
@@ -39,8 +39,8 @@ public class NestedTupleSourceRuntimeFactory implements 
IPushRuntimeFactory {
     }
 
     @Override
-    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
-        return new NestedTupleSourceRuntime(ctx);
+    public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
+        return new IPushRuntime[] { new NestedTupleSourceRuntime(ctx) };
     }
 
     public static class NestedTupleSourceRuntime extends 
AbstractOneInputOneOutputOneFramePushRuntime {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
index 021784a..8a06ecf 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
@@ -56,9 +56,9 @@ public class PrinterRuntimeFactory implements 
IPushRuntimeFactory {
     }
 
     @Override
-    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) {
+    public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) {
         IAWriter w = 
PrinterBasedWriterFactory.INSTANCE.createWriter(printColumns, System.out, 
printerFactories,
                 inputRecordDesc);
-        return new SinkWriterRuntime(w, System.out, inputRecordDesc);
+        return new IPushRuntime[] { new SinkWriterRuntime(w, System.out, 
inputRecordDesc) };
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
index d52ceee..536a769 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
@@ -67,11 +67,11 @@ public class SinkWriterRuntimeFactory implements 
IPushRuntimeFactory {
     }
 
     @Override
-    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
+    public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
         try {
             PrintStream filePrintStream = new PrintStream(new 
BufferedOutputStream(new FileOutputStream(outputFile)));
             IAWriter w = writerFactory.createWriter(fields, filePrintStream, 
printerFactories, inputRecordDesc);
-            return new SinkWriterRuntime(w, filePrintStream, inputRecordDesc, 
true);
+            return new IPushRuntime[] { new SinkWriterRuntime(w, 
filePrintStream, inputRecordDesc, true) };
         } catch (IOException e) {
             throw new HyracksDataException(e);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/union/MicroUnionAllRuntimeFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/union/MicroUnionAllRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/union/MicroUnionAllRuntimeFactory.java
new file mode 100644
index 0000000..1706e59
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/union/MicroUnionAllRuntimeFactory.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.runtime.operators.union;
+
+import java.nio.ByteBuffer;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class MicroUnionAllRuntimeFactory implements IPushRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int inputArity;
+
+    public MicroUnionAllRuntimeFactory(int inputArity) {
+        this.inputArity = inputArity;
+    }
+
+    @Override
+    public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) {
+        Mutable<Boolean> failedShared = new MutableObject<>(Boolean.FALSE);
+        IPushRuntime[] result = new IPushRuntime[inputArity];
+        for (int i = 0; i < inputArity; i++) {
+            result[i] = new MicroUnionAllPushRuntime(i, failedShared);
+        }
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "union-all";
+    }
+
+    private final class MicroUnionAllPushRuntime implements IPushRuntime {
+
+        private final int idx;
+
+        private final Mutable<Boolean> failedShared;
+
+        private IFrameWriter writer;
+
+        MicroUnionAllPushRuntime(int idx, Mutable<Boolean> failedShared) {
+            this.idx = idx;
+            this.failedShared = failedShared;
+        }
+
+        @Override
+        public void setOutputFrameWriter(int index, IFrameWriter writer, 
RecordDescriptor recordDesc) {
+            if (index != 0) {
+                throw new IllegalArgumentException(String.valueOf(index));
+            }
+            this.writer = writer;
+        }
+
+        @Override
+        public void setInputRecordDescriptor(int index, RecordDescriptor 
recordDescriptor) {
+            // input is not accessed
+        }
+
+        @Override
+        public void open() throws HyracksDataException {
+            if (idx == 0) {
+                writer.open();
+            }
+        }
+
+        @Override
+        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            writer.nextFrame(buffer);
+        }
+
+        @Override
+        public void fail() throws HyracksDataException {
+            boolean failed = failedShared.getValue();
+            failedShared.setValue(Boolean.TRUE);
+            if (!failed) {
+                writer.fail();
+            }
+        }
+
+        @Override
+        public void close() throws HyracksDataException {
+            if (idx == 0) {
+                writer.close();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
 
b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
index 40e2ec6..a7621ec 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
@@ -22,6 +22,7 @@ import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.io.FileUtils;
@@ -439,7 +440,7 @@ public class PushRuntimeTest {
         RecordDescriptor aggDesc =
                 new RecordDescriptor(new ISerializerDeserializer[] { 
IntegerSerializerDeserializer.INSTANCE });
         AlgebricksPipeline pipeline = new AlgebricksPipeline(new 
IPushRuntimeFactory[] { nts, agg },
-                new RecordDescriptor[] { ntsDesc, aggDesc });
+                new RecordDescriptor[] { ntsDesc, aggDesc }, null, null);
         NestedPlansAccumulatingAggregatorFactory npaaf = new 
NestedPlansAccumulatingAggregatorFactory(
                 new AlgebricksPipeline[] { pipeline }, new int[] { 3 }, new 
int[] {});
         RecordDescriptor gbyDesc = new RecordDescriptor(new 
ISerializerDeserializer[] {
@@ -780,10 +781,10 @@ public class PushRuntimeTest {
                 new RecordDescriptor(new ISerializerDeserializer[] { 
IntegerSerializerDeserializer.INSTANCE });
 
         AlgebricksPipeline pipeline = new AlgebricksPipeline(new 
IPushRuntimeFactory[] { nts, assign2, project1 },
-                new RecordDescriptor[] { assign1Desc, assign2Desc, 
project1Desc });
+                new RecordDescriptor[] { assign1Desc, assign2Desc, 
project1Desc }, null, null);
 
-        SubplanRuntimeFactory subplan = new SubplanRuntimeFactory(pipeline,
-                new IMissingWriterFactory[] { 
NoopMissingWriterFactory.INSTANCE }, assign1Desc, null);
+        SubplanRuntimeFactory subplan = new 
SubplanRuntimeFactory(Collections.singletonList(pipeline),
+                new IMissingWriterFactory[] { 
NoopMissingWriterFactory.INSTANCE }, assign1Desc, null, null);
 
         RecordDescriptor subplanDesc = new RecordDescriptor(new 
ISerializerDeserializer[] {
                 IntegerSerializerDeserializer.INSTANCE, 
IntegerSerializerDeserializer.INSTANCE });
@@ -851,7 +852,7 @@ public class PushRuntimeTest {
         RecordDescriptor aggDesc =
                 new RecordDescriptor(new ISerializerDeserializer[] { 
IntegerSerializerDeserializer.INSTANCE });
         AlgebricksPipeline pipeline = new AlgebricksPipeline(new 
IPushRuntimeFactory[] { nts, agg },
-                new RecordDescriptor[] { ntsDesc, aggDesc });
+                new RecordDescriptor[] { ntsDesc, aggDesc }, null, null);
         NestedPlansAccumulatingAggregatorFactory npaaf = new 
NestedPlansAccumulatingAggregatorFactory(
                 new AlgebricksPipeline[] { pipeline }, new int[] { 3 }, new 
int[] {});
         RecordDescriptor gbyDesc = new RecordDescriptor(new 
ISerializerDeserializer[] {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
index 9f66080..c4c7320 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
@@ -89,6 +89,8 @@ public class JobSpecification implements Serializable, 
IOperatorDescriptorRegist
 
     private transient int connectorIdCounter;
 
+    private transient List<IOperatorDescriptor> metaOps;
+
     // This constructor uses the default frame size. It is for test purposes 
only.
     // For other use cases, use the one which sets the frame size.
     public JobSpecification() {
@@ -308,6 +310,14 @@ public class JobSpecification implements Serializable, 
IOperatorDescriptorRegist
         return requiredClusterCapacity;
     }
 
+    public void setMetaOps(List<IOperatorDescriptor> metaOps) {
+        this.metaOps = metaOps;
+    }
+
+    public List<IOperatorDescriptor> getMetaOps() {
+        return metaOps;
+    }
+
     private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int 
index, V value) {
         List<V> vList = map.computeIfAbsent(key, k -> new ArrayList<>());
         extend(vList, index);

Reply via email to