>From Ritik Raj <raj.ritik9...@gmail.com>: Ritik Raj has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18249 )
Change subject: [NO ISSUE][*DB] Refactoring AssignRuntime Factory ...................................................................... [NO ISSUE][*DB] Refactoring AssignRuntime Factory Change-Id: Ieb583580f7bc8a40a15839ce15c492aa0bfb410c --- M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java 1 file changed, 106 insertions(+), 76 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/49/18249/1 diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java index 5343069..300fac6 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java @@ -39,10 +39,9 @@ public class AssignRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory { private static final long serialVersionUID = 1L; - - private int[] outColumns; - private IScalarEvaluatorFactory[] evalFactories; - private final boolean flushFramesRapidly; + protected int[] outColumns; + protected IScalarEvaluatorFactory[] evalFactories; + protected final boolean flushFramesRapidly; /** * @param outColumns @@ -64,6 +63,14 @@ this.flushFramesRapidly = flushFramesRapidly; } + public int[] getOutColumns() { + return outColumns; + } + + public IScalarEvaluatorFactory[] getEvalFactories() { + return evalFactories; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); @@ -88,92 +95,106 @@ @Override public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx) throws HyracksDataException { - IEvaluatorContext evalCtx = new EvaluatorContext(ctx); - final int[] projectionToOutColumns = new int[projectionList.length]; - for (int j = 0; j < projectionList.length; j++) { - projectionToOutColumns[j] = Arrays.binarySearch(outColumns, projectionList[j]); + return new AssignRuntime(ctx); + } + + public int[] getProjectionList() { + return projectionList; + } + + public class AssignRuntime extends AbstractOneInputOneOutputOneFramePushRuntime { + private IPointable result; + private IScalarEvaluator[] eval; + protected ArrayTupleBuilder tupleBuilder; + private final int[] projectionToOutColumns; + private boolean first = true; + protected int tupleIndex = 0; + protected final IHyracksTaskContext ctx; + protected final IEvaluatorContext evalCtx; + + public AssignRuntime(IHyracksTaskContext ctx) { + this.ctx = ctx; + this.evalCtx = new EvaluatorContext(ctx); + projectionToOutColumns = new int[projectionList.length]; + for (int j = 0; j < projectionList.length; j++) { + projectionToOutColumns[j] = Arrays.binarySearch(outColumns, projectionList[j]); + } + tupleBuilder = new ArrayTupleBuilder(projectionList.length); + eval = new IScalarEvaluator[evalFactories.length]; + result = VoidPointable.FACTORY.createPointable(); } - return new AbstractOneInputOneOutputOneFramePushRuntime() { - private IPointable result = VoidPointable.FACTORY.createPointable(); - private IScalarEvaluator[] eval = new IScalarEvaluator[evalFactories.length]; - private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length); - private boolean first = true; - private int tupleIndex = 0; - - @Override - public void open() throws HyracksDataException { - if (first) { - initAccessAppendRef(ctx); - first = false; - int n = evalFactories.length; - for (int i = 0; i < n; i++) { - eval[i] = evalFactories[i].createScalarEvaluator(evalCtx); - } + @Override + public void open() throws HyracksDataException { + if (first) { + initAccessAppendRef(ctx); + first = false; + int n = evalFactories.length; + for (int i = 0; i < n; i++) { + eval[i] = evalFactories[i].createScalarEvaluator(evalCtx); } - super.open(); } + super.open(); + } - @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { - // what if nTuple is 0? - tAccess.reset(buffer); - int nTuple = tAccess.getTupleCount(); - if (nTuple < 1) { - if (nTuple < 0) { - throw new HyracksDataException("Negative number of tuples in the frame: " + nTuple); - } - appender.flush(writer); - } else { - if (nTuple > 1) { - for (; tupleIndex < nTuple - 1; tupleIndex++) { - tRef.reset(tAccess, tupleIndex); - produceTuple(tupleBuilder, tAccess, tupleIndex, tRef); - appendToFrameFromTupleBuilder(tupleBuilder); - } - } - - if (tupleIndex < nTuple) { + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + // what if nTuple is 0? + tAccess.reset(buffer); + int nTuple = tAccess.getTupleCount(); + if (nTuple < 1) { + if (nTuple < 0) { + throw new HyracksDataException("Negative number of tuples in the frame: " + nTuple); + } + appender.flush(writer); + } else { + if (nTuple > 1) { + for (; tupleIndex < nTuple - 1; tupleIndex++) { tRef.reset(tAccess, tupleIndex); produceTuple(tupleBuilder, tAccess, tupleIndex, tRef); - if (flushFramesRapidly) { - // Whenever all the tuples in the incoming frame have been consumed, the assign operator - // will push its frame to the next operator; i.e., it won't wait until the frame gets full. - appendToFrameFromTupleBuilder(tupleBuilder, true); - } else { - appendToFrameFromTupleBuilder(tupleBuilder); - } + appendToFrameFromTupleBuilder(tupleBuilder); + } + } + if (tupleIndex < nTuple) { + tRef.reset(tAccess, tupleIndex); + produceTuple(tupleBuilder, tAccess, tupleIndex, tRef); + if (flushFramesRapidly) { + // Whenever all the tuples in the incoming frame have been consumed, the assign operator + // will push its frame to the next operator; i.e., it won't wait until the frame gets full. + appendToFrameFromTupleBuilder(tupleBuilder, true); } else { - if (flushFramesRapidly) { - flushAndReset(); - } + appendToFrameFromTupleBuilder(tupleBuilder); + } + } else { + if (flushFramesRapidly) { + flushAndReset(); } } - tupleIndex = 0; } + tupleIndex = 0; + } - private void produceTuple(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex, - FrameTupleReference tupleRef) throws HyracksDataException { - try { - tb.reset(); - for (int f = 0; f < projectionList.length; f++) { - int k = projectionToOutColumns[f]; - if (k >= 0) { - eval[k].evaluate(tupleRef, result); - tb.addField(result.getByteArray(), result.getStartOffset(), result.getLength()); - } else { - tb.addField(accessor, tIndex, projectionList[f]); - } + protected void produceTuple(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex, + FrameTupleReference tupleRef) throws HyracksDataException { + try { + tb.reset(); + for (int f = 0; f < projectionList.length; f++) { + int k = projectionToOutColumns[f]; + if (k >= 0) { + eval[k].evaluate(tupleRef, result); + tb.addField(result.getByteArray(), result.getStartOffset(), result.getLength()); + } else { + tb.addField(accessor, tIndex, projectionList[f]); } - } catch (HyracksDataException e) { - throw HyracksDataException.create(ErrorCode.ERROR_PROCESSING_TUPLE, e, sourceLoc, tupleIndex); } + } catch (HyracksDataException e) { + throw HyracksDataException.create(ErrorCode.ERROR_PROCESSING_TUPLE, e, sourceLoc, tupleIndex); } + } - @Override - public void flush() throws HyracksDataException { - appender.flush(writer); - } - }; + @Override + public void flush() throws HyracksDataException { + appender.flush(writer); + } } -} +} \ No newline at end of file -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18249 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: Ieb583580f7bc8a40a15839ce15c492aa0bfb410c Gerrit-Change-Number: 18249 Gerrit-PatchSet: 1 Gerrit-Owner: Ritik Raj <raj.ritik9...@gmail.com> Gerrit-MessageType: newchange