>From Peeyush Gupta <[email protected]>:
Peeyush Gupta has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19744 )
Change subject: [ASTERIXDB-3603][FUN] Runtime changes for transform functions
part 1
......................................................................
[ASTERIXDB-3603][FUN] Runtime changes for transform functions part 1
- user model changes: no
- storage format changes: no
- interface changes: yes
Ext-ref: MB-63039
Change-Id: I0b23d88db632d90aedec409db816c66a5a688daf
---
M
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
M
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
3 files changed, 163 insertions(+), 89 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/44/19744/1
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
index 1701e64..005bdf5 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
@@ -69,6 +69,18 @@
this.funID = funID;
}
+ public IScalarEvaluatorFactory getListEvalFactory() {
+ return listEvalFactory;
+ }
+
+ public SourceLocation getSourceLoc() {
+ return sourceLoc;
+ }
+
+ public FunctionIdentifier getFunID() {
+ return funID;
+ }
+
@Override
public IUnnestingEvaluator createUnnestingEvaluator(IEvaluatorContext
ctx) throws HyracksDataException {
return new IUnnestingEvaluator() {
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index 7feca3c..7d7fbe7 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -90,6 +90,22 @@
return pipelines;
}
+ public RecordDescriptor getInputRecordDesc() {
+ return inputRecordDesc;
+ }
+
+ public RecordDescriptor getOutputRecordDesc() {
+ return outputRecordDesc;
+ }
+
+ public IMissingWriterFactory[] getMissingWriterFactories() {
+ return missingWriterFactories;
+ }
+
+ public Map<IPushRuntimeFactory, IOperatorStats> getStats() {
+ return stats;
+ }
+
public void setStats(Map<IPushRuntimeFactory, IOperatorStats> stats) {
this.stats.putAll(stats);
}
@@ -100,22 +116,24 @@
return new SubplanPushRuntime(ctx);
}
- private class SubplanPushRuntime extends
AbstractOneInputOneOutputOneFramePushRuntime {
+ public class SubplanPushRuntime extends
AbstractOneInputOneOutputOneFramePushRuntime {
- final IHyracksTaskContext ctx;
+ protected final IHyracksTaskContext ctx;
- final NestedTupleSourceRuntime[] startOfPipelines;
+ protected final NestedTupleSourceRuntime[] startOfPipelines;
boolean first;
boolean profile;
+ protected IMissingWriter[] missingWriters;
+ protected ArrayTupleBuilder missingTupleBuilder;
- SubplanPushRuntime(IHyracksTaskContext ctx) throws
HyracksDataException {
+ protected SubplanPushRuntime(IHyracksTaskContext ctx) throws
HyracksDataException {
this.ctx = ctx;
this.profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
this.first = true;
- IMissingWriter[] missingWriters = new
IMissingWriter[missingWriterFactories.length];
+ missingWriters = new IMissingWriter[missingWriterFactories.length];
for (int i = 0; i < missingWriterFactories.length; i++) {
missingWriters[i] =
missingWriterFactories[i].createMissingWriter();
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index 0874f2b..9cfc070 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -52,8 +52,8 @@
private static final long serialVersionUID = 1L;
- private final int outCol;
- private final int positionalCol;
+ protected final int outCol;
+ protected final int positionalCol;
private final IUnnestingEvaluatorFactory unnestingFactory;
private final IUnnestingPositionWriterFactory positionWriterFactory;
private final boolean leftOuter;
@@ -76,6 +76,26 @@
this.missingWriterFactory = missingWriterFactory;
}
+ public int[] getProjectionList() {
+ return projectionList;
+ }
+
+ public int getOutCol() {
+ return outCol;
+ }
+
+ public int getPositionalCol() {
+ return positionalCol;
+ }
+
+ public IUnnestingEvaluatorFactory getUnnestingFactory() {
+ return unnestingFactory;
+ }
+
+ public IUnnestingPositionWriterFactory getPositionWriterFactory() {
+ return positionWriterFactory;
+ }
+
@Override
public String toString() {
return "unnest " + outCol + (positionalCol >= 0 ? " at " +
positionalCol : "") + " <- " + unnestingFactory
@@ -85,92 +105,102 @@
@Override
public AbstractOneInputOneOutputOneFramePushRuntime
createOneOutputPushRuntime(final IHyracksTaskContext ctx)
throws HyracksDataException {
- ByteArrayAccessibleOutputStream missingBytes = leftOuter ?
writeMissingBytes() : null;
- IEvaluatorContext evalCtx = new EvaluatorContext(ctx);
- return new AbstractOneInputOneOutputOneFramePushRuntime() {
- private IPointable p = VoidPointable.FACTORY.createPointable();
- private ArrayTupleBuilder tupleBuilder = new
ArrayTupleBuilder(projectionList.length);
- private IUnnestingEvaluator unnest =
unnestingFactory.createUnnestingEvaluator(evalCtx);
- private final IUnnestingPositionWriter positionWriter =
- positionWriterFactory != null ?
positionWriterFactory.createUnnestingPositionWriter() : null;
-
- @Override
- public void open() throws HyracksDataException {
- super.open();
- if (tRef == null) {
- initAccessAppendRef(ctx);
- }
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws
HyracksDataException {
- tAccess.reset(buffer);
- int nTuple = tAccess.getTupleCount();
- for (int t = 0; t < nTuple; t++) {
- tRef.reset(tAccess, t);
- try {
- unnest.init(tRef);
- unnesting(t);
- } catch (IOException ae) {
- throw HyracksDataException.create(ae);
- }
- }
- }
-
- private void unnesting(int t) throws IOException {
- // Assumes that when unnesting the tuple, each step() call for
each element
- // in the tuple will increase the positionIndex, and the
positionIndex will
- // be reset when a new tuple is to be processed.
- int positionIndex = 1;
- boolean emitted = false;
- do {
- if (!unnest.step(p)) {
- break;
- }
- writeOutput(t, positionIndex++, false);
- emitted = true;
- } while (true);
- if (leftOuter && !emitted) {
- writeOutput(t, -1, true);
- }
- }
-
- private void writeOutput(int t, int positionIndex, boolean missing)
- throws HyracksDataException, IOException {
- tupleBuilder.reset();
- for (int f = 0; f < projectionList.length; f++) {
- int col = projectionList[f];
- if (col == outCol) {
- if (missing) {
- tupleBuilder.addField(missingBytes.getByteArray(),
0, missingBytes.size());
- } else {
- tupleBuilder.addField(p.getByteArray(),
p.getStartOffset(), p.getLength());
- }
- } else if (col == positionalCol) {
- if (missing) {
- tupleBuilder.addField(missingBytes.getByteArray(),
0, missingBytes.size());
- } else {
- positionWriter.write(tupleBuilder.getDataOutput(),
positionIndex);
- tupleBuilder.addFieldEndOffset();
- }
- } else {
- tupleBuilder.addField(tAccess, t, projectionList[f]);
- }
- }
- appendToFrameFromTupleBuilder(tupleBuilder);
- }
-
- @Override
- public void flush() throws HyracksDataException {
- appender.flush(writer);
- }
- };
+ return new UnnestPushRuntime(ctx);
}
- private ByteArrayAccessibleOutputStream writeMissingBytes() throws
HyracksDataException {
+ public class UnnestPushRuntime extends
AbstractOneInputOneOutputOneFramePushRuntime {
+ private IPointable p = VoidPointable.FACTORY.createPointable();
+ protected ArrayTupleBuilder tupleBuilder;
+ protected IUnnestingEvaluator unnest;
+ private final IUnnestingPositionWriter positionWriter;
+ private final IHyracksTaskContext ctx;
+ protected ByteArrayAccessibleOutputStream missingBytes;
+
+ public UnnestPushRuntime(IHyracksTaskContext ctx) throws
HyracksDataException {
+ this.ctx = ctx;
+ IEvaluatorContext evalCtx = new EvaluatorContext(ctx);
+ unnest = unnestingFactory.createUnnestingEvaluator(evalCtx);
+ tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+ positionWriter =
+ positionWriterFactory != null ?
positionWriterFactory.createUnnestingPositionWriter() : null;
+ missingBytes = leftOuter ? writeMissingBytes() : null;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ super.open();
+ if (tRef == null) {
+ initAccessAppendRef(ctx);
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ tAccess.reset(buffer);
+ int nTuple = tAccess.getTupleCount();
+ for (int t = 0; t < nTuple; t++) {
+ tRef.reset(tAccess, t);
+ try {
+ unnest.init(tRef);
+ unnesting(t);
+ } catch (IOException ae) {
+ throw HyracksDataException.create(ae);
+ }
+ }
+ }
+
+ protected void unnesting(int t) throws IOException {
+ // Assumes that when unnesting the tuple, each step() call for
each element
+ // in the tuple will increase the positionIndex, and the
positionIndex will
+ // be reset when a new tuple is to be processed.
+ int positionIndex = 1;
+ boolean emitted = false;
+ do {
+ if (!unnest.step(p)) {
+ break;
+ }
+ writeOutput(t, positionIndex++, false);
+ emitted = true;
+ } while (true);
+ if (leftOuter && !emitted) {
+ writeOutput(t, -1, true);
+ }
+ }
+
+ private void writeOutput(int t, int positionIndex, boolean missing)
throws HyracksDataException, IOException {
+ tupleBuilder.reset();
+ for (int f = 0; f < projectionList.length; f++) {
+ int col = projectionList[f];
+ if (col == outCol) {
+ if (missing) {
+ tupleBuilder.addField(missingBytes.getByteArray(), 0,
missingBytes.size());
+ } else {
+ tupleBuilder.addField(p.getByteArray(),
p.getStartOffset(), p.getLength());
+ }
+ } else if (col == positionalCol) {
+ if (missing) {
+ tupleBuilder.addField(missingBytes.getByteArray(), 0,
missingBytes.size());
+ } else {
+ positionWriter.write(tupleBuilder.getDataOutput(),
positionIndex);
+ tupleBuilder.addFieldEndOffset();
+ }
+ } else {
+ tupleBuilder.addField(tAccess, t, projectionList[f]);
+ }
+ }
+ appendToFrameFromTupleBuilder(tupleBuilder);
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ appender.flush(writer);
+ }
+ };
+
+ protected ByteArrayAccessibleOutputStream writeMissingBytes() throws
HyracksDataException {
ByteArrayAccessibleOutputStream baos = new
ByteArrayAccessibleOutputStream();
IMissingWriter missingWriter =
missingWriterFactory.createMissingWriter();
missingWriter.writeMissing(new DataOutputStream(baos));
return baos;
}
-}
+}
\ No newline at end of file
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19744
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I0b23d88db632d90aedec409db816c66a5a688daf
Gerrit-Change-Number: 19744
Gerrit-PatchSet: 1
Gerrit-Owner: Peeyush Gupta <[email protected]>
Gerrit-MessageType: newchange