This is an automated email from the ASF dual-hosted git repository.

imaxon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c775dd9  [NO ISSUE][RT] Profiling fixes
c775dd9 is described below

commit c775dd9d2c01a66b9f2da8447c75400bac6ddf90
Author: Ian Maxon <ima...@apache.org>
AuthorDate: Wed Nov 20 18:43:33 2019 -0800

    [NO ISSUE][RT] Profiling fixes
    
    - Fix profile serialization for profiles > 64kb in size
    - Break deadlock for OperatorNodePushables that keep initialize() open
      and call downstream operators that block until other operators have
      initialized (e.g. Intersect).
    
    Change-Id: I87ec970eaf2d5db76e7bfaa60be9190efb1a70ae
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/4263
    Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Sandeep Gupta <sandeep.gu...@couchbase.com>
---
 .../asterix/translator/IStatementExecutor.java     | 31 +++++++++++++++++--
 .../api/dataflow/TimedOperatorNodePushable.java    | 36 +++++++++++-----------
 2 files changed, 47 insertions(+), 20 deletions(-)

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
index c0cf8eb..a5c8162 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
@@ -54,11 +54,15 @@ import org.apache.hyracks.api.result.ResultSetId;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
 
 /**
  * An interface that takes care of executing a list of statements that are 
submitted through an Asterix API
  */
 public interface IStatementExecutor {
+    public static final char UNIT_SEPARATOR = 31;
+    public static final char END_OF_BLOCK = 23;
 
     /**
      * Specifies result delivery of executed statements
@@ -200,12 +204,35 @@ public interface IStatementExecutor {
 
         private void writeObject(ObjectOutputStream out) throws IOException {
             ObjectMapper om = new ObjectMapper();
-            out.writeUTF(om.writeValueAsString(profile));
+            java.lang.String prof = om.writeValueAsString(profile);
+            //split the string if it is >=64K to avoid writeUTF limit
+            List<String> pieces;
+            if (prof.length() > 65534L) {
+                pieces = 
Lists.newArrayList(Splitter.fixedLength(32768).split(prof));
+            } else {
+                pieces = Lists.newArrayList(prof);
+            }
+
+            for (int i = 0; i < pieces.size(); i++) {
+                if (i == pieces.size() - 1) {
+                    out.writeChar(UNIT_SEPARATOR);
+                } else {
+                    out.writeChar(END_OF_BLOCK);
+                }
+                out.writeUTF(pieces.get(i));
+            }
+
         }
 
         private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
             ObjectMapper om = new ObjectMapper();
-            JsonNode inNode = om.readTree(in.readUTF());
+            StringBuilder objSplits = new StringBuilder();
+            for (char cmd = in.readChar(); cmd != END_OF_BLOCK && cmd == 
UNIT_SEPARATOR; cmd = in.readChar()) {
+                objSplits.append(in.readUTF());
+            }
+            objSplits.append(in.readUTF());
+
+            JsonNode inNode = om.readTree(objSplits.toString());
             if (!inNode.isObject()) {
                 throw new IOException("Deserialization error");
             }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TimedOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TimedOperatorNodePushable.java
index 2d46bea..1d47c98 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TimedOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TimedOperatorNodePushable.java
@@ -33,7 +33,7 @@ public class TimedOperatorNodePushable extends 
TimedFrameWriter implements IOper
     HashMap<Integer, IFrameWriter> inputs;
     long frameStart;
 
-    TimedOperatorNodePushable(IOperatorNodePushable op, IStatsCollector 
collector) throws HyracksDataException {
+    private TimedOperatorNodePushable(IOperatorNodePushable op, 
IStatsCollector collector) {
         super(null, collector, op.getDisplayName());
         this.op = op;
         inputs = new HashMap<>();
@@ -41,20 +41,16 @@ public class TimedOperatorNodePushable extends 
TimedFrameWriter implements IOper
 
     @Override
     public void initialize() throws HyracksDataException {
-        synchronized (collector) {
-            startClock();
-            op.initialize();
-            stopClock();
-        }
+        startClock();
+        op.initialize();
+        stopClock();
     }
 
     @Override
     public void deinitialize() throws HyracksDataException {
-        synchronized (collector) {
-            startClock();
-            op.deinitialize();
-            stopClock();
-        }
+        startClock();
+        op.deinitialize();
+        stopClock();
     }
 
     @Override
@@ -83,15 +79,19 @@ public class TimedOperatorNodePushable extends 
TimedFrameWriter implements IOper
     }
 
     private void stopClock() {
-        pause();
-        collector.giveClock(this);
+        synchronized (collector) {
+            pause();
+            collector.giveClock(this);
+        }
     }
 
     private void startClock() {
-        if (frameStart > 0) {
-            return;
+        synchronized (collector) {
+            if (frameStart > 0) {
+                return;
+            }
+            frameStart = collector.takeClock(this);
         }
-        frameStart = collector.takeClock(this);
     }
 
     @Override
@@ -113,11 +113,11 @@ public class TimedOperatorNodePushable extends 
TimedFrameWriter implements IOper
         }
     }
 
-    public static IOperatorNodePushable time(IOperatorNodePushable op, 
IHyracksTaskContext ctx)
-            throws HyracksDataException {
+    public static IOperatorNodePushable time(IOperatorNodePushable op, 
IHyracksTaskContext ctx) {
         if (!(op instanceof TimedOperatorNodePushable) && !(op instanceof 
SuperActivityOperatorNodePushable)) {
             return new TimedOperatorNodePushable(op, ctx.getStatsCollector());
         }
         return op;
     }
+
 }

Reply via email to