Steven Jacobs has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2717

Change subject: Improve performance of NotifyBrokerRuntime code
......................................................................

Improve performance of NotifyBrokerRuntime code

Change-Id: Ia4ecd381d102c67f7c66cfa965312bfb885aa281
---
M 
asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
1 file changed, 57 insertions(+), 52 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb-bad 
refs/changes/17/2717/1

diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
index 6ffb244..97615b1 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
@@ -19,34 +19,33 @@
 
 package org.apache.asterix.bad.runtime;
 
+import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.active.ActiveManager;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.common.api.INcApplicationContext;
+import 
org.apache.asterix.dataflow.data.nontagged.printers.json.clean.AOrderedlistPrinterFactory;
+import 
org.apache.asterix.dataflow.data.nontagged.printers.json.clean.ARecordPrinterFactory;
 import 
org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer;
-import 
org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
-import 
org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
 import 
org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
 import org.apache.asterix.om.base.ADateTime;
-import org.apache.asterix.om.base.AOrderedList;
-import org.apache.asterix.om.base.ARecord;
-import org.apache.asterix.om.base.AUUID;
 import org.apache.asterix.om.types.AOrderedListType;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.data.IPrinter;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import 
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
@@ -58,15 +57,19 @@
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.util.string.UTF8StringReader;
+import org.apache.hyracks.util.string.UTF8StringWriter;
 
 public class NotifyBrokerRuntime extends 
AbstractOneInputOneOutputOneFramePushRuntime {
     private static final Logger LOGGER = 
Logger.getLogger(NotifyBrokerRuntime.class.getName());
 
     private final ByteBufferInputStream bbis = new ByteBufferInputStream();
     private final DataInputStream di = new DataInputStream(bbis);
-    private final AOrderedListSerializerDeserializer subSerDes =
-            new AOrderedListSerializerDeserializer(new 
AOrderedListType(BuiltinType.AUUID, null));
-    private final ARecordSerializerDeserializer recordSerDes;
+    private static final AStringSerializerDeserializer stringSerDes =
+            new AStringSerializerDeserializer(new UTF8StringWriter(), new 
UTF8StringReader());
+
+    private final IPrinter recordPrinterFactory;
+    private final IPrinter listPrinterFactory;
 
     private IPointable inputArg0 = new VoidPointable();
     private IPointable inputArg1 = new VoidPointable();
@@ -74,14 +77,14 @@
     private IScalarEvaluator eval0;
     private IScalarEvaluator eval1;
     private IScalarEvaluator eval2;
-    private final ActiveManager activeManager;
     private final EntityId entityId;
     private final boolean push;
-    private AOrderedList pushList;
-    private ARecord pushRecord;
-    private final IAType recordType;
-    private final Map<String, HashSet<String>> sendData = new HashMap<>();
+    private final Map<String, String> sendData = new HashMap<>();
+    private final Map<String, ByteArrayOutputStream> sendbaos = new 
HashMap<>();
+    private final Map<String, PrintStream> sendStreams = new HashMap<>();
     private String executionTimeString;
+    private boolean firstResult = true;
+    String endpoint;
 
     public NotifyBrokerRuntime(IHyracksTaskContext ctx, 
IScalarEvaluatorFactory brokerEvalFactory,
             IScalarEvaluatorFactory pushListEvalFactory, 
IScalarEvaluatorFactory channelExecutionEvalFactory,
@@ -90,14 +93,11 @@
         eval0 = brokerEvalFactory.createScalarEvaluator(ctx);
         eval1 = pushListEvalFactory.createScalarEvaluator(ctx);
         eval2 = channelExecutionEvalFactory.createScalarEvaluator(ctx);
-        this.activeManager = (ActiveManager) ((INcApplicationContext) 
ctx.getJobletContext().getServiceContext()
-                .getApplicationContext()).getActiveManager();
         this.entityId = activeJobId;
         this.push = push;
-        this.pushList = null;
-        this.pushRecord = null;
-        this.recordType = recordType;
-        recordSerDes = new ARecordSerializerDeserializer((ARecordType) 
recordType);
+        recordPrinterFactory = new ARecordPrinterFactory((ARecordType) 
recordType).createPrinter();
+        listPrinterFactory =
+                new AOrderedlistPrinterFactory(new 
AOrderedListType(BuiltinType.AUUID, null)).createPrinter();
         executionTimeString = null;
     }
 
@@ -106,26 +106,16 @@
         return;
     }
 
-    private void addSubscriptions(String endpoint, AOrderedList 
subscriptionIds) {
-        for (int i = 0; i < subscriptionIds.size(); i++) {
-            AUUID subId = (AUUID) subscriptionIds.getItem(i);
-            String subscriptionString = subId.toString();
-            //Broker code currently cannot handle the "uuid {}" part of the 
string, so we parse just the value
-            subscriptionString = subscriptionString.substring(8, 
subscriptionString.length() - 2);
-            subscriptionString = "\"" + subscriptionString + "\"";
-            sendData.get(endpoint).add(subscriptionString);
-        }
-    }
-
     public String createData(String endpoint) {
+        String resultTitle = "\"subscriptionIds";
+        if (push) {
+            resultTitle = "\"results\"";
+        }
         String JSON = "{ \"dataverseName\":\"" + entityId.getDataverse() + 
"\", \"channelName\":\""
                 + entityId.getEntityName() + "\", \"" + 
BADConstants.ChannelExecutionTime + "\":\""
-                + executionTimeString + "\", \"subscriptionIds\":[";
-        for (String value : sendData.get(endpoint)) {
-            JSON += value;
-            JSON += ",";
-        }
-        JSON = JSON.substring(0, JSON.length() - 1);
+                + executionTimeString + "\", " + resultTitle + ":[";
+        JSON += sendData.get(endpoint);
+        JSON = JSON.substring(0, JSON.length());
         JSON += "]}";
         return JSON;
 
@@ -185,34 +175,49 @@
 
             int serBrokerOffset = inputArg0.getStartOffset();
             bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), 
serBrokerOffset + 1);
-            String endpoint = 
AStringSerializerDeserializer.INSTANCE.deserialize(di).getStringValue();
-            sendData.putIfAbsent(endpoint, new HashSet<>());
+            endpoint = stringSerDes.deserialize(di).getStringValue();
+            sendbaos.putIfAbsent(endpoint, new ByteArrayOutputStream());
+            try {
+                sendStreams.putIfAbsent(endpoint, new 
PrintStream(sendbaos.get(endpoint), true, "UTF-8"));
+            } catch (UnsupportedEncodingException e) {
+                throw new HyracksDataException(e.getMessage());
+            }
 
             if (push) {
                 int pushOffset = inputArg1.getStartOffset();
                 bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), 
pushOffset + 1);
-                //TODO: Right now this creates an object per channel result. 
Need to find a better way to deserialize
-                pushRecord = recordSerDes.deserialize(di);
-                sendData.get(endpoint).add(pushRecord.toString());
+                if (!firstResult) {
+                    sendStreams.get(endpoint).append(',');
+                }
+                recordPrinterFactory.print(inputArg1.getByteArray(), 
inputArg1.getStartOffset(), inputArg1.getLength(),
+                        sendStreams.get(endpoint));
 
             } else {
-                int serSubOffset = inputArg1.getStartOffset();
-                bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), 
serSubOffset + 1);
-                pushList = subSerDes.deserialize(di);
-                addSubscriptions(endpoint, pushList);
+                if (!firstResult) {
+                    sendStreams.get(endpoint).append(',');
+                }
+                listPrinterFactory.print(inputArg1.getByteArray(), 
inputArg1.getStartOffset(), inputArg1.getLength(),
+                        sendStreams.get(endpoint));
             }
+            firstResult = false;
         }
 
     }
 
     @Override
     public void close() throws HyracksDataException {
-        for (String endpoint : sendData.keySet()) {
-            if (sendData.get(endpoint).size() > 0) {
-                sendGroupOfResults(endpoint);
-                sendData.get(endpoint).clear();
+        for (String endpoint : sendStreams.keySet()) {
+            sendData.put(endpoint, new 
String(sendbaos.get(endpoint).toByteArray(), StandardCharsets.UTF_8));
+            sendGroupOfResults(endpoint);
+            sendStreams.get(endpoint).close();
+            try {
+                sendbaos.get(endpoint).close();
+            } catch (IOException e) {
+                throw new HyracksDataException(e.getMessage());
             }
+
         }
+
         return;
     }
 

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2717
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ia4ecd381d102c67f7c66cfa965312bfb885aa281
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sjaco...@ucr.edu>

Reply via email to