Change in asterixdb[master]: Optimize PartitionWriter
abdullah alamoudi has submitted this change and it was merged. Change subject: Optimize PartitionWriter .. Optimize PartitionWriter Change-Id: Ic36c58b143f3fc2d37b180559c11c1566bcc1a86 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1347 Sonar-Qube: Jenkins Tested-by: Jenkins Integration-Tests: Jenkins Reviewed-by: Yingyi Bu --- M hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java 3 files changed, 22 insertions(+), 11 deletions(-) Approvals: Yingyi Bu: Looks good to me, approved Jenkins: Verified; No violations found; Verified diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java index 7f518cd..9463982 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java @@ -104,6 +104,12 @@ } @Override +public int getTupleCount() { +// if message is set, there is always a message. that message could be a null message (TODO: optimize) +return tupleCount + ((message == null) ? 0 : 1); +} + +@Override public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException { if (!initialized) { message = TaskUtils. get(HyracksConstants.KEY_MESSAGE, ctx); diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java index c047567..6d87d89 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java @@ -40,13 +40,14 @@ private final FrameTupleAccessor tupleAccessor; private final ITuplePartitionComputer tpc; private final IHyracksTaskContext ctx; -private boolean allocatedFrame = false; +private boolean[] allocatedFrames; public PartitionDataWriter(IHyracksTaskContext ctx, int consumerPartitionCount, IPartitionWriterFactory pwFactory, RecordDescriptor recordDescriptor, ITuplePartitionComputer tpc) throws HyracksDataException { this.consumerPartitionCount = consumerPartitionCount; pWriters = new IFrameWriter[consumerPartitionCount]; isOpen = new boolean[consumerPartitionCount]; +allocatedFrames = new boolean[consumerPartitionCount]; appenders = new FrameTupleAppender[consumerPartitionCount]; for (int i = 0; i < consumerPartitionCount; ++i) { try { @@ -70,7 +71,7 @@ HyracksDataException closeException = null; for (int i = 0; i < pWriters.length; ++i) { if (isOpen[i]) { -if (allocatedFrame) { +if (allocatedFrames[i] && appenders[i].getTupleCount() > 0) { try { appenders[i].write(pWriters[i], true); } catch (Throwable th) { @@ -103,9 +104,6 @@ isOpen[i] = true; pWriters[i].open(); } -if (!allocatedFrame) { -allocateFrames(); -} } @Override @@ -114,15 +112,16 @@ int tupleCount = tupleAccessor.getTupleCount(); for (int i = 0; i < tupleCount; ++i) { int h = tpc.partition(tupleAccessor, i, consumerPartitionCount); +if (!allocatedFrames[h]) { +allocateFrames(h); +} FrameUtils.appendToWriter(pWriters[h], appenders[h], tupleAccessor, i); } } -private void allocateFrames() throws HyracksDataException { -for (int i = 0; i < appenders.length; ++i) { -appenders[i].reset(new VSizeFrame(ctx), true); -} -allocatedFrame = true; +protected void allocateFrames(int i) throws HyracksDataException { +appenders[i].reset(new VSizeFrame(ctx), true); +allocatedFrames[i] = true; } @Override @@ -149,7 +148,9 @@ @Override public void flush() throws HyracksDataE
Change in asterixdb[master]: Optimize PartitionWriter
Yingyi Bu has posted comments on this change. Change subject: Optimize PartitionWriter .. Patch Set 2: Code-Review+2 -- To view, visit https://asterix-gerrit.ics.uci.edu/1347 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: comment Gerrit-Change-Id: Ic36c58b143f3fc2d37b180559c11c1566bcc1a86 Gerrit-PatchSet: 2 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi Gerrit-Reviewer: Jenkins Gerrit-Reviewer: Till Westmann Gerrit-Reviewer: Yingyi Bu Gerrit-HasComments: No
Change in asterixdb[master]: Optimize PartitionWriter
Jenkins has posted comments on this change. Change subject: Optimize PartitionWriter .. Patch Set 2: Integration-Tests+1 Integration Tests Successful https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-integration-tests/1114/ : SUCCESS -- To view, visit https://asterix-gerrit.ics.uci.edu/1347 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: comment Gerrit-Change-Id: Ic36c58b143f3fc2d37b180559c11c1566bcc1a86 Gerrit-PatchSet: 2 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi Gerrit-Reviewer: Jenkins Gerrit-Reviewer: Till Westmann Gerrit-Reviewer: Yingyi Bu Gerrit-HasComments: No
Change in asterixdb[master]: Optimize PartitionWriter
Jenkins has posted comments on this change. Change subject: Optimize PartitionWriter .. Patch Set 2: Integration Tests Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-integration-tests/1114/ -- To view, visit https://asterix-gerrit.ics.uci.edu/1347 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: comment Gerrit-Change-Id: Ic36c58b143f3fc2d37b180559c11c1566bcc1a86 Gerrit-PatchSet: 2 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi Gerrit-Reviewer: Jenkins Gerrit-Reviewer: Till Westmann Gerrit-Reviewer: Yingyi Bu Gerrit-HasComments: No
Change in asterixdb[master]: Optimize PartitionWriter
Jenkins has posted comments on this change. Change subject: Optimize PartitionWriter .. Patch Set 2: Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/3275/ -- To view, visit https://asterix-gerrit.ics.uci.edu/1347 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: comment Gerrit-Change-Id: Ic36c58b143f3fc2d37b180559c11c1566bcc1a86 Gerrit-PatchSet: 2 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi Gerrit-Reviewer: Jenkins Gerrit-Reviewer: Till Westmann Gerrit-Reviewer: Yingyi Bu Gerrit-HasComments: No
Change in asterixdb[master]: Optimize PartitionWriter
Hello Till Westmann, Jenkins, I'd like you to reexamine a change. Please visit https://asterix-gerrit.ics.uci.edu/1347 to look at the new patch set (#2). Change subject: Optimize PartitionWriter .. Optimize PartitionWriter Change-Id: Ic36c58b143f3fc2d37b180559c11c1566bcc1a86 --- M hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java 3 files changed, 22 insertions(+), 11 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/47/1347/2 -- To view, visit https://asterix-gerrit.ics.uci.edu/1347 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newpatchset Gerrit-Change-Id: Ic36c58b143f3fc2d37b180559c11c1566bcc1a86 Gerrit-PatchSet: 2 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi Gerrit-Reviewer: Jenkins Gerrit-Reviewer: Till Westmann Gerrit-Reviewer: Yingyi Bu
Change in asterixdb[master]: Optimize PartitionWriter
Till Westmann has posted comments on this change. Change subject: Optimize PartitionWriter .. Patch Set 1: Code-Review+1 -- To view, visit https://asterix-gerrit.ics.uci.edu/1347 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: comment Gerrit-Change-Id: Ic36c58b143f3fc2d37b180559c11c1566bcc1a86 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi Gerrit-Reviewer: Jenkins Gerrit-Reviewer: Till Westmann Gerrit-Reviewer: Yingyi Bu Gerrit-HasComments: No
Change in asterixdb[master]: Optimize PartitionWriter
Jenkins has posted comments on this change. Change subject: Optimize PartitionWriter .. Patch Set 1: Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/3261/ -- To view, visit https://asterix-gerrit.ics.uci.edu/1347 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: comment Gerrit-Change-Id: Ic36c58b143f3fc2d37b180559c11c1566bcc1a86 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi Gerrit-Reviewer: Jenkins Gerrit-HasComments: No
Change in asterixdb[master]: Optimize PartitionWriter
abdullah alamoudi has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1347 Change subject: Optimize PartitionWriter .. Optimize PartitionWriter Change-Id: Ic36c58b143f3fc2d37b180559c11c1566bcc1a86 --- M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java 1 file changed, 12 insertions(+), 11 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/47/1347/1 diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java index c047567..013472f 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java @@ -40,13 +40,14 @@ private final FrameTupleAccessor tupleAccessor; private final ITuplePartitionComputer tpc; private final IHyracksTaskContext ctx; -private boolean allocatedFrame = false; +private boolean[] allocatedFrames; public PartitionDataWriter(IHyracksTaskContext ctx, int consumerPartitionCount, IPartitionWriterFactory pwFactory, RecordDescriptor recordDescriptor, ITuplePartitionComputer tpc) throws HyracksDataException { this.consumerPartitionCount = consumerPartitionCount; pWriters = new IFrameWriter[consumerPartitionCount]; isOpen = new boolean[consumerPartitionCount]; +allocatedFrames = new boolean[consumerPartitionCount]; appenders = new FrameTupleAppender[consumerPartitionCount]; for (int i = 0; i < consumerPartitionCount; ++i) { try { @@ -70,7 +71,7 @@ HyracksDataException closeException = null; for (int i = 0; i < pWriters.length; ++i) { if (isOpen[i]) { -if (allocatedFrame) { +if (allocatedFrames[i] && appenders[i].getTupleCount() > 0) { try { appenders[i].write(pWriters[i], true); } catch (Throwable th) { @@ -103,9 +104,6 @@ isOpen[i] = true; pWriters[i].open(); } -if (!allocatedFrame) { -allocateFrames(); -} } @Override @@ -114,15 +112,16 @@ int tupleCount = tupleAccessor.getTupleCount(); for (int i = 0; i < tupleCount; ++i) { int h = tpc.partition(tupleAccessor, i, consumerPartitionCount); +if (!allocatedFrames[h]) { +allocateFrames(h); +} FrameUtils.appendToWriter(pWriters[h], appenders[h], tupleAccessor, i); } } -private void allocateFrames() throws HyracksDataException { -for (int i = 0; i < appenders.length; ++i) { -appenders[i].reset(new VSizeFrame(ctx), true); -} -allocatedFrame = true; +private void allocateFrames(int i) throws HyracksDataException { +appenders[i].reset(new VSizeFrame(ctx), true); +allocatedFrames[i] = true; } @Override @@ -149,7 +148,9 @@ @Override public void flush() throws HyracksDataException { for (int i = 0; i < consumerPartitionCount; i++) { -appenders[i].flush(pWriters[i]); +if (allocatedFrames[i]) { +appenders[i].flush(pWriters[i]); +} } } } -- To view, visit https://asterix-gerrit.ics.uci.edu/1347 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ic36c58b143f3fc2d37b180559c11c1566bcc1a86 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi