Murtadha Hubail has submitted this change and it was merged. Change subject: [NO ISSUE][RT] Make StreamSelectRuntimeFactory Extensible ......................................................................
[NO ISSUE][RT] Make StreamSelectRuntimeFactory Extensible - user model changes: no - storage format changes: no - interface changes: no Details: - Make StreamSelectRuntimeFactory and its runtime extensible by providing accessors and replacing the runtime anonymous class by a named class. Change-Id: I9c575e6c037e5c8c1818cfa3c6b0bf65697bfb9e Reviewed-on: https://asterix-gerrit.ics.uci.edu/3308 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Till Westmann <[email protected]> --- M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java 1 file changed, 92 insertions(+), 64 deletions(-) Approvals: Till Westmann: Looks good to me, approved Jenkins: Verified; No violations found; ; Verified diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java index 933e640..e84b5b4 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java @@ -40,22 +40,13 @@ private static final long serialVersionUID = 1L; // Final - private final IScalarEvaluatorFactory cond; - private final IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory; - private final IMissingWriterFactory missingWriterFactory; + protected final IScalarEvaluatorFactory cond; + protected final IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory; + protected final IMissingWriterFactory missingWriterFactory; // Mutable - private boolean retainMissing; + protected boolean retainMissing; private int missingPlaceholderVariableIndex; - /** - * @param cond - * @param projectionList - * if projectionList is null, then no projection is performed - * @param retainMissing - * @param missingPlaceholderVariableIndex - * @param missingWriterFactory - * @throws HyracksDataException - */ public StreamSelectRuntimeFactory(IScalarEvaluatorFactory cond, int[] projectionList, IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory, boolean retainMissing, int missingPlaceholderVariableIndex, IMissingWriterFactory missingWriterFactory) { @@ -67,11 +58,6 @@ this.missingWriterFactory = missingWriterFactory; } - public void retainMissing(boolean retainMissing, int index) { - this.retainMissing = retainMissing; - this.missingPlaceholderVariableIndex = index; - } - @Override public String toString() { return "stream-select " + cond.toString(); @@ -80,61 +66,103 @@ @Override public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx) { final IBinaryBooleanInspector bbi = binaryBooleanInspectorFactory.createBinaryBooleanInspector(ctx); - return new AbstractOneInputOneOutputOneFieldFramePushRuntime() { - private IPointable p = VoidPointable.FACTORY.createPointable(); - private IScalarEvaluator eval; - private IMissingWriter missingWriter = null; - private ArrayTupleBuilder missingTupleBuilder = null; + return new StreamSelectRuntime(ctx, bbi); + } - @Override - public void open() throws HyracksDataException { - if (eval == null) { - initAccessAppendFieldRef(ctx); - eval = cond.createScalarEvaluator(ctx); - } - super.open(); - //prepare nullTupleBuilder - if (retainMissing && missingWriter == null) { - missingWriter = missingWriterFactory.createMissingWriter(); - missingTupleBuilder = new ArrayTupleBuilder(1); - DataOutput out = missingTupleBuilder.getDataOutput(); - missingWriter.writeMissing(out); - missingTupleBuilder.addFieldEndOffset(); - } + public void retainMissing(boolean retainMissing, int index) { + this.retainMissing = retainMissing; + this.missingPlaceholderVariableIndex = index; + } + + public IScalarEvaluatorFactory getCond() { + return cond; + } + + public IBinaryBooleanInspectorFactory getBinaryBooleanInspectorFactory() { + return binaryBooleanInspectorFactory; + } + + public IMissingWriterFactory getMissingWriterFactory() { + return missingWriterFactory; + } + + public boolean isRetainMissing() { + return retainMissing; + } + + public int getMissingPlaceholderVariableIndex() { + return missingPlaceholderVariableIndex; + } + + public int[] getProjectionList() { + return projectionList; + } + + public class StreamSelectRuntime extends AbstractOneInputOneOutputOneFieldFramePushRuntime { + + protected final IPointable p = VoidPointable.FACTORY.createPointable(); + protected final IHyracksTaskContext ctx; + protected final IBinaryBooleanInspector bbi; + protected IScalarEvaluator eval; + protected IMissingWriter missingWriter; + protected ArrayTupleBuilder missingTupleBuilder; + + public StreamSelectRuntime(IHyracksTaskContext ctx, IBinaryBooleanInspector bbi) { + this.ctx = ctx; + this.bbi = bbi; + } + + @Override + public void open() throws HyracksDataException { + if (eval == null) { + initAccessAppendFieldRef(ctx); + eval = cond.createScalarEvaluator(ctx); } + super.open(); + if (retainMissing && missingWriter == null) { + missingWriter = missingWriterFactory.createMissingWriter(); + missingTupleBuilder = new ArrayTupleBuilder(1); + DataOutput out = missingTupleBuilder.getDataOutput(); + missingWriter.writeMissing(out); + missingTupleBuilder.addFieldEndOffset(); + } + } - @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { - tAccess.reset(buffer); - int nTuple = tAccess.getTupleCount(); - for (int t = 0; t < nTuple; t++) { - tRef.reset(tAccess, t); - eval.evaluate(tRef, p); - if (bbi.getBooleanValue(p.getByteArray(), p.getStartOffset(), p.getLength())) { - if (projectionList != null) { - appendProjectionToFrame(t, projectionList); - } else { - appendTupleToFrame(t); - } + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + tAccess.reset(buffer); + int nTuple = tAccess.getTupleCount(); + for (int t = 0; t < nTuple; t++) { + tRef.reset(tAccess, t); + eval.evaluate(tRef, p); + if (bbi.getBooleanValue(p.getByteArray(), p.getStartOffset(), p.getLength())) { + if (projectionList != null) { + appendProjectionToFrame(t, projectionList); } else { - if (retainMissing) { - for (int i = 0; i < tRef.getFieldCount(); i++) { - if (i == missingPlaceholderVariableIndex) { - appendField(missingTupleBuilder.getByteArray(), 0, missingTupleBuilder.getSize()); - } else { - appendField(tAccess, t, i); - } - } - } + appendTupleToFrame(t); + } + } else { + if (retainMissing) { + retainMissingTuple(t); } } } + } - @Override - public void flush() throws HyracksDataException { - appender.flush(writer); + @Override + public void flush() throws HyracksDataException { + appender.flush(writer); + } + + protected void retainMissingTuple(int t) throws HyracksDataException { + for (int i = 0; i < tRef.getFieldCount(); i++) { + if (i == missingPlaceholderVariableIndex) { + appendField(missingTupleBuilder.getByteArray(), 0, missingTupleBuilder.getSize()); + } else { + appendField(tAccess, t, i); + } } - }; + } } } -- To view, visit https://asterix-gerrit.ics.uci.edu/3308 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I9c575e6c037e5c8c1818cfa3c6b0bf65697bfb9e Gerrit-PatchSet: 2 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Dmitry Lychagin <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]>
