Repository: asterixdb Updated Branches: refs/heads/master caf43069b -> 10e5ad1a7
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java index 32eff3a..7b3fb46 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java @@ -34,8 +34,8 @@ public abstract class AbstractOneInputOneOutputRuntimeFactory implements IPushRu } @Override - public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { - return createOneOutputPushRuntime(ctx); + public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { + return new IPushRuntime[] { createOneOutputPushRuntime(ctx) }; } public abstract AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java index a838557..f0e9406 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java @@ -38,8 +38,8 @@ public class SinkRuntimeFactory implements IPushRuntimeFactory { } @Override - public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { - return new AbstractOneInputSinkPushRuntime() { + public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { + return new IPushRuntime[] { new AbstractOneInputSinkPushRuntime() { @Override public void open() throws HyracksDataException { @@ -61,7 +61,6 @@ public class SinkRuntimeFactory implements IPushRuntimeFactory { public void flush() throws HyracksDataException { // flush() is meaningless for sink operators } - }; + } }; } - } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java index b4d23fc..07365db 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java @@ -45,11 +45,18 @@ public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOper public AlgebricksMetaOperatorDescriptor(IOperatorDescriptorRegistry spec, int inputArity, int outputArity, IPushRuntimeFactory[] runtimeFactories, RecordDescriptor[] internalRecordDescriptors) { + this(spec, inputArity, outputArity, runtimeFactories, internalRecordDescriptors, null, null); + } + + public AlgebricksMetaOperatorDescriptor(IOperatorDescriptorRegistry spec, int inputArity, int outputArity, + IPushRuntimeFactory[] runtimeFactories, RecordDescriptor[] internalRecordDescriptors, + IPushRuntimeFactory[] outputRuntimeFactories, int[] outputPositions) { super(spec, inputArity, outputArity); if (outputArity == 1) { this.outRecDescs[0] = internalRecordDescriptors[internalRecordDescriptors.length - 1]; } - this.pipeline = new AlgebricksPipeline(runtimeFactories, internalRecordDescriptors); + this.pipeline = new AlgebricksPipeline(runtimeFactories, internalRecordDescriptors, outputRuntimeFactories, + outputPositions); } public AlgebricksPipeline getPipeline() { @@ -81,7 +88,7 @@ public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOper private class SourcePushRuntime extends AbstractUnaryOutputSourceOperatorNodePushable { private final IHyracksTaskContext ctx; - public SourcePushRuntime(IHyracksTaskContext ctx) { + SourcePushRuntime(IHyracksTaskContext ctx) { this.ctx = ctx; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java index e1081e0..a717794 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java @@ -18,9 +18,13 @@ */ package org.apache.hyracks.algebricks.runtime.operators.meta; +import java.util.HashMap; +import java.util.Map; + import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline; import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime; import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; +import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.EnforceFrameWriter; @@ -37,6 +41,7 @@ public class PipelineAssembler { private final int inputArity; private final int outputArity; private final AlgebricksPipeline pipeline; + private final Map<IPushRuntimeFactory, IPushRuntime[]> runtimeMap; public PipelineAssembler(AlgebricksPipeline pipeline, int inputArity, int outputArity, RecordDescriptor pipelineInputRecordDescriptor, RecordDescriptor pipelineOutputRecordDescriptor) { @@ -45,6 +50,7 @@ public class PipelineAssembler { this.pipelineOutputRecordDescriptor = pipelineOutputRecordDescriptor; this.inputArity = inputArity; this.outputArity = outputArity; + this.runtimeMap = new HashMap<>(); } public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx) throws HyracksDataException { @@ -52,19 +58,30 @@ public class PipelineAssembler { boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT); // plug the operators IFrameWriter start = writer;// this.writer; - for (int i = pipeline.getRuntimeFactories().length - 1; i >= 0; i--) { - IPushRuntime newRuntime = pipeline.getRuntimeFactories()[i].createPushRuntime(ctx); - newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : newRuntime; + IPushRuntimeFactory[] runtimeFactories = pipeline.getRuntimeFactories(); + RecordDescriptor[] recordDescriptors = pipeline.getRecordDescriptors(); + for (int i = runtimeFactories.length - 1; i >= 0; i--) { start = enforce ? EnforceFrameWriter.enforce(start) : start; - if (i == pipeline.getRuntimeFactories().length - 1) { - if (outputArity == 1) { - newRuntime.setOutputFrameWriter(0, start, pipelineOutputRecordDescriptor); + + IPushRuntimeFactory runtimeFactory = runtimeFactories[i]; + IPushRuntime[] newRuntimes = runtimeFactory.createPushRuntime(ctx); + for (int j = 0; j < newRuntimes.length; j++) { + if (enforce) { + newRuntimes[j] = EnforcePushRuntime.enforce(newRuntimes[j]); + } + if (i == runtimeFactories.length - 1) { + if (outputArity == 1) { + newRuntimes[j].setOutputFrameWriter(0, start, pipelineOutputRecordDescriptor); + } + } else { + newRuntimes[j].setOutputFrameWriter(0, start, recordDescriptors[i]); } - } else { - newRuntime.setOutputFrameWriter(0, start, pipeline.getRecordDescriptors()[i]); } + runtimeMap.put(runtimeFactory, newRuntimes); + + IPushRuntime newRuntime = newRuntimes[0]; if (i > 0) { - newRuntime.setInputRecordDescriptor(0, pipeline.getRecordDescriptors()[i - 1]); + newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 1]); } else if (inputArity > 0) { newRuntime.setInputRecordDescriptor(0, pipelineInputRecordDescriptor); } @@ -72,4 +89,8 @@ public class PipelineAssembler { } return start; } + + public IPushRuntime[] getPushRuntime(IPushRuntimeFactory runtimeFactory) { + return runtimeMap.get(runtimeFactory); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java index 7e04750..159fde7 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java @@ -20,9 +20,11 @@ package org.apache.hyracks.algebricks.runtime.operators.meta; import java.io.DataOutput; import java.nio.ByteBuffer; +import java.util.List; import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException; import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline; +import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime; import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputPushRuntime; @@ -41,16 +43,21 @@ public class SubplanRuntimeFactory extends AbstractOneInputOneOutputRuntimeFacto private static final long serialVersionUID = 1L; - private final AlgebricksPipeline pipeline; + private final List<AlgebricksPipeline> pipelines; + private final RecordDescriptor inputRecordDesc; + + private final RecordDescriptor outputRecordDesc; + private final IMissingWriterFactory[] missingWriterFactories; - public SubplanRuntimeFactory(AlgebricksPipeline pipeline, IMissingWriterFactory[] missingWriterFactories, - RecordDescriptor inputRecordDesc, int[] projectionList) { + public SubplanRuntimeFactory(List<AlgebricksPipeline> pipelines, IMissingWriterFactory[] missingWriterFactories, + RecordDescriptor inputRecordDesc, RecordDescriptor outputRecordDesc, int[] projectionList) { super(projectionList); - this.pipeline = pipeline; + this.pipelines = pipelines; this.missingWriterFactories = missingWriterFactories; this.inputRecordDesc = inputRecordDesc; + this.outputRecordDesc = outputRecordDesc; if (projectionList != null) { throw new NotImplementedException(); } @@ -60,8 +67,12 @@ public class SubplanRuntimeFactory extends AbstractOneInputOneOutputRuntimeFacto public String toString() { StringBuilder sb = new StringBuilder(); sb.append("Subplan { \n"); - for (IPushRuntimeFactory f : pipeline.getRuntimeFactories()) { - sb.append(" " + f.toString() + ";\n"); + for (AlgebricksPipeline pipeline : pipelines) { + sb.append('{'); + for (IPushRuntimeFactory f : pipeline.getRuntimeFactories()) { + sb.append(" ").append(f).append(";\n"); + } + sb.append('}'); } sb.append("}"); return sb.toString(); @@ -70,110 +81,177 @@ public class SubplanRuntimeFactory extends AbstractOneInputOneOutputRuntimeFacto @Override public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx) throws HyracksDataException { + return new SubplanPushRuntime(ctx); + } - RecordDescriptor pipelineOutputRecordDescriptor = null; + private class SubplanPushRuntime extends AbstractOneInputOneOutputOneFramePushRuntime { - final PipelineAssembler pa = - new PipelineAssembler(pipeline, 1, 1, inputRecordDesc, pipelineOutputRecordDescriptor); - final IMissingWriter[] nullWriters = new IMissingWriter[missingWriterFactories.length]; - for (int i = 0; i < missingWriterFactories.length; i++) { - nullWriters[i] = missingWriterFactories[i].createMissingWriter(); - } + final IHyracksTaskContext ctx; - return new AbstractOneInputOneOutputOneFramePushRuntime() { + final NestedTupleSourceRuntime[] startOfPipelines; - /** - * Computes the outer product between a given tuple and the frames - * passed. - */ - class TupleOuterProduct implements IFrameWriter { + boolean first; - private boolean smthWasWritten = false; - private FrameTupleAccessor ta = new FrameTupleAccessor( - pipeline.getRecordDescriptors()[pipeline.getRecordDescriptors().length - 1]); - private ArrayTupleBuilder tb = new ArrayTupleBuilder( - nullWriters.length + SubplanRuntimeFactory.this.inputRecordDesc.getFieldCount()); + SubplanPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { + this.ctx = ctx; + this.first = true; - @Override - public void open() throws HyracksDataException { - smthWasWritten = false; - } + IMissingWriter[] missingWriters = new IMissingWriter[missingWriterFactories.length]; + for (int i = 0; i < missingWriterFactories.length; i++) { + missingWriters[i] = missingWriterFactories[i].createMissingWriter(); + } + + int pipelineCount = pipelines.size(); + startOfPipelines = new NestedTupleSourceRuntime[pipelineCount]; + PipelineAssembler[] pipelineAssemblers = new PipelineAssembler[pipelineCount]; + for (int i = 0; i < pipelineCount; i++) { + AlgebricksPipeline pipeline = pipelines.get(i); + RecordDescriptor pipelineLastRecordDescriptor = + pipeline.getRecordDescriptors()[pipeline.getRecordDescriptors().length - 1]; - @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { - ta.reset(buffer); - int nTuple = ta.getTupleCount(); - for (int t = 0; t < nTuple; t++) { - appendConcat(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), ta, t); + RecordDescriptor outputRecordDescriptor; + IFrameWriter outputWriter; + if (i == 0) { + // primary pipeline + outputWriter = new TupleOuterProduct(pipelineLastRecordDescriptor, missingWriters); + outputRecordDescriptor = SubplanRuntimeFactory.this.outputRecordDesc; + } else { + // secondary pipeline + IPushRuntime outputPushRuntime = linkSecondaryPipeline(pipeline, pipelineAssemblers, i); + if (outputPushRuntime == null) { + throw new IllegalStateException("Invalid pipeline"); } - smthWasWritten = true; + outputPushRuntime.setInputRecordDescriptor(0, pipelineLastRecordDescriptor); + outputWriter = outputPushRuntime; + outputRecordDescriptor = pipelineLastRecordDescriptor; } - @Override - public void close() throws HyracksDataException { - if (!smthWasWritten && !failed) { - // the case when we need to write nulls - appendNullsToTuple(); - appendToFrameFromTupleBuilder(tb); - } + PipelineAssembler pa = new PipelineAssembler(pipeline, 1, 1, inputRecordDesc, outputRecordDescriptor); + startOfPipelines[i] = (NestedTupleSourceRuntime) pa.assemblePipeline(outputWriter, ctx); + pipelineAssemblers[i] = pa; + } + } + + IPushRuntime linkSecondaryPipeline(AlgebricksPipeline pipeline, PipelineAssembler[] pipelineAssemblers, + int pipelineAssemblersCount) { + IPushRuntimeFactory[] outputRuntimeFactories = pipeline.getOutputRuntimeFactories(); + if (outputRuntimeFactories == null || outputRuntimeFactories.length != 1) { + throw new IllegalStateException(); + } + IPushRuntimeFactory outRuntimeFactory = outputRuntimeFactories[0]; + int outputPosition = pipeline.getOutputPositions()[0]; + for (int i = 0; i < pipelineAssemblersCount; i++) { + IPushRuntime[] p = pipelineAssemblers[i].getPushRuntime(outRuntimeFactory); + if (p != null) { + return p[outputPosition]; } + } + return null; + } - @Override - public void fail() throws HyracksDataException { - // writer.fail() is called by the outer class' writer.fail(). + @Override + public void open() throws HyracksDataException { + writer.open(); + if (first) { + first = false; + initAccessAppendRef(ctx); + } + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + tAccess.reset(buffer); + int nTuple = tAccess.getTupleCount(); + for (int t = 0; t < nTuple; t++) { + tRef.reset(tAccess, t); + + for (NestedTupleSourceRuntime nts : startOfPipelines) { + nts.writeTuple(buffer, t); } - private void appendNullsToTuple() throws HyracksDataException { - tb.reset(); - int n0 = tRef.getFieldCount(); - for (int f = 0; f < n0; f++) { - tb.addField(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), f); + int n = 0; + try { + for (; n < startOfPipelines.length; n++) { + NestedTupleSourceRuntime nts = startOfPipelines[n]; + try { + nts.open(); + } catch (Exception e) { + nts.fail(); + throw e; + } } - DataOutput dos = tb.getDataOutput(); - for (int i = 0; i < nullWriters.length; i++) { - nullWriters[i].writeMissing(dos); - tb.addFieldEndOffset(); + } finally { + for (int i = n - 1; i >= 0; i--) { + startOfPipelines[i].close(); } } } + } - IFrameWriter endPipe = new TupleOuterProduct(); + @Override + public void flush() throws HyracksDataException { + writer.flush(); + } - NestedTupleSourceRuntime startOfPipeline = (NestedTupleSourceRuntime) pa.assemblePipeline(endPipe, ctx); + /** + * Computes the outer product between a given tuple and the frames + * passed. + */ + class TupleOuterProduct implements IFrameWriter { - boolean first = true; + private boolean smthWasWritten; + private final FrameTupleAccessor ta; + private final ArrayTupleBuilder tb; + private final IMissingWriter[] missingWriters; + + private TupleOuterProduct(RecordDescriptor recordDescriptor, IMissingWriter[] missingWriters) { + ta = new FrameTupleAccessor(recordDescriptor); + tb = new ArrayTupleBuilder( + missingWriters.length + SubplanRuntimeFactory.this.inputRecordDesc.getFieldCount()); + this.missingWriters = missingWriters; + } @Override public void open() throws HyracksDataException { - writer.open(); - if (first) { - first = false; - initAccessAppendRef(ctx); - } + smthWasWritten = false; } @Override public void nextFrame(ByteBuffer buffer) throws HyracksDataException { - tAccess.reset(buffer); - int nTuple = tAccess.getTupleCount(); + ta.reset(buffer); + int nTuple = ta.getTupleCount(); for (int t = 0; t < nTuple; t++) { - tRef.reset(tAccess, t); - startOfPipeline.writeTuple(buffer, t); - try { - startOfPipeline.open(); - } catch (Exception e) { - startOfPipeline.fail(); - throw e; - } finally { - startOfPipeline.close(); - } + appendConcat(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), ta, t); } + smthWasWritten = true; } @Override - public void flush() throws HyracksDataException { - writer.flush(); + public void close() throws HyracksDataException { + if (!smthWasWritten && !failed) { + // the case when we need to write nulls + appendNullsToTuple(); + appendToFrameFromTupleBuilder(tb); + } } - }; + + @Override + public void fail() throws HyracksDataException { + // writer.fail() is called by the outer class' writer.fail(). + } + + private void appendNullsToTuple() throws HyracksDataException { + tb.reset(); + int n0 = tRef.getFieldCount(); + for (int f = 0; f < n0; f++) { + tb.addField(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), f); + } + DataOutput dos = tb.getDataOutput(); + for (IMissingWriter missingWriter : missingWriters) { + missingWriter.writeMissing(dos); + tb.addFieldEndOffset(); + } + } + } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java index 3ccceed..67f4a77 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java @@ -40,8 +40,8 @@ public class EmptyTupleSourceRuntimeFactory implements IPushRuntimeFactory { } @Override - public IPushRuntime createPushRuntime(final IHyracksTaskContext ctx) throws HyracksDataException { - return new AbstractOneInputSourcePushRuntime() { + public IPushRuntime[] createPushRuntime(final IHyracksTaskContext ctx) throws HyracksDataException { + return new IPushRuntime[] { new AbstractOneInputSourcePushRuntime() { private final ArrayTupleBuilder tb = new ArrayTupleBuilder(0); private final FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx)); @@ -69,6 +69,6 @@ public class EmptyTupleSourceRuntimeFactory implements IPushRuntimeFactory { public void flush() throws HyracksDataException { appender.flush(writer); } - }; + } }; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java index 496679f..8e64092 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java @@ -39,8 +39,8 @@ public class NestedTupleSourceRuntimeFactory implements IPushRuntimeFactory { } @Override - public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { - return new NestedTupleSourceRuntime(ctx); + public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { + return new IPushRuntime[] { new NestedTupleSourceRuntime(ctx) }; } public static class NestedTupleSourceRuntime extends AbstractOneInputOneOutputOneFramePushRuntime { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java index 021784a..8a06ecf 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java @@ -56,9 +56,9 @@ public class PrinterRuntimeFactory implements IPushRuntimeFactory { } @Override - public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) { + public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) { IAWriter w = PrinterBasedWriterFactory.INSTANCE.createWriter(printColumns, System.out, printerFactories, inputRecordDesc); - return new SinkWriterRuntime(w, System.out, inputRecordDesc); + return new IPushRuntime[] { new SinkWriterRuntime(w, System.out, inputRecordDesc) }; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java index d52ceee..536a769 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java @@ -67,11 +67,11 @@ public class SinkWriterRuntimeFactory implements IPushRuntimeFactory { } @Override - public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { + public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { try { PrintStream filePrintStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outputFile))); IAWriter w = writerFactory.createWriter(fields, filePrintStream, printerFactories, inputRecordDesc); - return new SinkWriterRuntime(w, filePrintStream, inputRecordDesc, true); + return new IPushRuntime[] { new SinkWriterRuntime(w, filePrintStream, inputRecordDesc, true) }; } catch (IOException e) { throw new HyracksDataException(e); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/union/MicroUnionAllRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/union/MicroUnionAllRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/union/MicroUnionAllRuntimeFactory.java new file mode 100644 index 0000000..1706e59 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/union/MicroUnionAllRuntimeFactory.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.runtime.operators.union; + +import java.nio.ByteBuffer; + +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.commons.lang3.mutable.MutableObject; +import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; +import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class MicroUnionAllRuntimeFactory implements IPushRuntimeFactory { + + private static final long serialVersionUID = 1L; + + private final int inputArity; + + public MicroUnionAllRuntimeFactory(int inputArity) { + this.inputArity = inputArity; + } + + @Override + public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) { + Mutable<Boolean> failedShared = new MutableObject<>(Boolean.FALSE); + IPushRuntime[] result = new IPushRuntime[inputArity]; + for (int i = 0; i < inputArity; i++) { + result[i] = new MicroUnionAllPushRuntime(i, failedShared); + } + return result; + } + + @Override + public String toString() { + return "union-all"; + } + + private final class MicroUnionAllPushRuntime implements IPushRuntime { + + private final int idx; + + private final Mutable<Boolean> failedShared; + + private IFrameWriter writer; + + MicroUnionAllPushRuntime(int idx, Mutable<Boolean> failedShared) { + this.idx = idx; + this.failedShared = failedShared; + } + + @Override + public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) { + if (index != 0) { + throw new IllegalArgumentException(String.valueOf(index)); + } + this.writer = writer; + } + + @Override + public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) { + // input is not accessed + } + + @Override + public void open() throws HyracksDataException { + if (idx == 0) { + writer.open(); + } + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + writer.nextFrame(buffer); + } + + @Override + public void fail() throws HyracksDataException { + boolean failed = failedShared.getValue(); + failedShared.setValue(Boolean.TRUE); + if (!failed) { + writer.fail(); + } + } + + @Override + public void close() throws HyracksDataException { + if (idx == 0) { + writer.close(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java index 40e2ec6..a7621ec 100644 --- a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java +++ b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java @@ -22,6 +22,7 @@ import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; +import java.util.Collections; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.io.FileUtils; @@ -439,7 +440,7 @@ public class PushRuntimeTest { RecordDescriptor aggDesc = new RecordDescriptor(new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE }); AlgebricksPipeline pipeline = new AlgebricksPipeline(new IPushRuntimeFactory[] { nts, agg }, - new RecordDescriptor[] { ntsDesc, aggDesc }); + new RecordDescriptor[] { ntsDesc, aggDesc }, null, null); NestedPlansAccumulatingAggregatorFactory npaaf = new NestedPlansAccumulatingAggregatorFactory( new AlgebricksPipeline[] { pipeline }, new int[] { 3 }, new int[] {}); RecordDescriptor gbyDesc = new RecordDescriptor(new ISerializerDeserializer[] { @@ -780,10 +781,10 @@ public class PushRuntimeTest { new RecordDescriptor(new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE }); AlgebricksPipeline pipeline = new AlgebricksPipeline(new IPushRuntimeFactory[] { nts, assign2, project1 }, - new RecordDescriptor[] { assign1Desc, assign2Desc, project1Desc }); + new RecordDescriptor[] { assign1Desc, assign2Desc, project1Desc }, null, null); - SubplanRuntimeFactory subplan = new SubplanRuntimeFactory(pipeline, - new IMissingWriterFactory[] { NoopMissingWriterFactory.INSTANCE }, assign1Desc, null); + SubplanRuntimeFactory subplan = new SubplanRuntimeFactory(Collections.singletonList(pipeline), + new IMissingWriterFactory[] { NoopMissingWriterFactory.INSTANCE }, assign1Desc, null, null); RecordDescriptor subplanDesc = new RecordDescriptor(new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE }); @@ -851,7 +852,7 @@ public class PushRuntimeTest { RecordDescriptor aggDesc = new RecordDescriptor(new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE }); AlgebricksPipeline pipeline = new AlgebricksPipeline(new IPushRuntimeFactory[] { nts, agg }, - new RecordDescriptor[] { ntsDesc, aggDesc }); + new RecordDescriptor[] { ntsDesc, aggDesc }, null, null); NestedPlansAccumulatingAggregatorFactory npaaf = new NestedPlansAccumulatingAggregatorFactory( new AlgebricksPipeline[] { pipeline }, new int[] { 3 }, new int[] {}); RecordDescriptor gbyDesc = new RecordDescriptor(new ISerializerDeserializer[] { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10e5ad1a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java index 9f66080..c4c7320 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java @@ -89,6 +89,8 @@ public class JobSpecification implements Serializable, IOperatorDescriptorRegist private transient int connectorIdCounter; + private transient List<IOperatorDescriptor> metaOps; + // This constructor uses the default frame size. It is for test purposes only. // For other use cases, use the one which sets the frame size. public JobSpecification() { @@ -308,6 +310,14 @@ public class JobSpecification implements Serializable, IOperatorDescriptorRegist return requiredClusterCapacity; } + public void setMetaOps(List<IOperatorDescriptor> metaOps) { + this.metaOps = metaOps; + } + + public List<IOperatorDescriptor> getMetaOps() { + return metaOps; + } + private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) { List<V> vList = map.computeIfAbsent(key, k -> new ArrayList<>()); extend(vList, index);