This is an automated email from the ASF dual-hosted git repository. imaxon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push: new c775dd9 [NO ISSUE][RT] Profiling fixes c775dd9 is described below commit c775dd9d2c01a66b9f2da8447c75400bac6ddf90 Author: Ian Maxon <ima...@apache.org> AuthorDate: Wed Nov 20 18:43:33 2019 -0800 [NO ISSUE][RT] Profiling fixes - Fix profile serialization for profiles > 64kb in size - Break deadlock for OperatorNodePushables that keep initialize() open and call downstream operators that block until other operators have initialized (e.g. Intersect). Change-Id: I87ec970eaf2d5db76e7bfaa60be9190efb1a70ae Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/4263 Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Sandeep Gupta <sandeep.gu...@couchbase.com> --- .../asterix/translator/IStatementExecutor.java | 31 +++++++++++++++++-- .../api/dataflow/TimedOperatorNodePushable.java | 36 +++++++++++----------- 2 files changed, 47 insertions(+), 20 deletions(-) diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java index c0cf8eb..a5c8162 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java @@ -54,11 +54,15 @@ import org.apache.hyracks.api.result.ResultSetId; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; /** * An interface that takes care of executing a list of statements that are submitted through an Asterix API */ public interface IStatementExecutor { + public static final char UNIT_SEPARATOR = 31; + public static final char END_OF_BLOCK = 23; /** * Specifies result delivery of executed statements @@ -200,12 +204,35 @@ public interface IStatementExecutor { private void writeObject(ObjectOutputStream out) throws IOException { ObjectMapper om = new ObjectMapper(); - out.writeUTF(om.writeValueAsString(profile)); + java.lang.String prof = om.writeValueAsString(profile); + //split the string if it is >=64K to avoid writeUTF limit + List<String> pieces; + if (prof.length() > 65534L) { + pieces = Lists.newArrayList(Splitter.fixedLength(32768).split(prof)); + } else { + pieces = Lists.newArrayList(prof); + } + + for (int i = 0; i < pieces.size(); i++) { + if (i == pieces.size() - 1) { + out.writeChar(UNIT_SEPARATOR); + } else { + out.writeChar(END_OF_BLOCK); + } + out.writeUTF(pieces.get(i)); + } + } private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { ObjectMapper om = new ObjectMapper(); - JsonNode inNode = om.readTree(in.readUTF()); + StringBuilder objSplits = new StringBuilder(); + for (char cmd = in.readChar(); cmd != END_OF_BLOCK && cmd == UNIT_SEPARATOR; cmd = in.readChar()) { + objSplits.append(in.readUTF()); + } + objSplits.append(in.readUTF()); + + JsonNode inNode = om.readTree(objSplits.toString()); if (!inNode.isObject()) { throw new IOException("Deserialization error"); } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TimedOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TimedOperatorNodePushable.java index 2d46bea..1d47c98 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TimedOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TimedOperatorNodePushable.java @@ -33,7 +33,7 @@ public class TimedOperatorNodePushable extends TimedFrameWriter implements IOper HashMap<Integer, IFrameWriter> inputs; long frameStart; - TimedOperatorNodePushable(IOperatorNodePushable op, IStatsCollector collector) throws HyracksDataException { + private TimedOperatorNodePushable(IOperatorNodePushable op, IStatsCollector collector) { super(null, collector, op.getDisplayName()); this.op = op; inputs = new HashMap<>(); @@ -41,20 +41,16 @@ public class TimedOperatorNodePushable extends TimedFrameWriter implements IOper @Override public void initialize() throws HyracksDataException { - synchronized (collector) { - startClock(); - op.initialize(); - stopClock(); - } + startClock(); + op.initialize(); + stopClock(); } @Override public void deinitialize() throws HyracksDataException { - synchronized (collector) { - startClock(); - op.deinitialize(); - stopClock(); - } + startClock(); + op.deinitialize(); + stopClock(); } @Override @@ -83,15 +79,19 @@ public class TimedOperatorNodePushable extends TimedFrameWriter implements IOper } private void stopClock() { - pause(); - collector.giveClock(this); + synchronized (collector) { + pause(); + collector.giveClock(this); + } } private void startClock() { - if (frameStart > 0) { - return; + synchronized (collector) { + if (frameStart > 0) { + return; + } + frameStart = collector.takeClock(this); } - frameStart = collector.takeClock(this); } @Override @@ -113,11 +113,11 @@ public class TimedOperatorNodePushable extends TimedFrameWriter implements IOper } } - public static IOperatorNodePushable time(IOperatorNodePushable op, IHyracksTaskContext ctx) - throws HyracksDataException { + public static IOperatorNodePushable time(IOperatorNodePushable op, IHyracksTaskContext ctx) { if (!(op instanceof TimedOperatorNodePushable) && !(op instanceof SuperActivityOperatorNodePushable)) { return new TimedOperatorNodePushable(op, ctx.getStatsCollector()); } return op; } + }