Steven Jacobs has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/2717
Change subject: Improve performance of NotifyBrokerRuntime code ...................................................................... Improve performance of NotifyBrokerRuntime code Change-Id: Ia4ecd381d102c67f7c66cfa965312bfb885aa281 --- M asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java 1 file changed, 57 insertions(+), 52 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb-bad refs/changes/17/2717/1 diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java index 6ffb244..97615b1 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java @@ -19,34 +19,33 @@ package org.apache.asterix.bad.runtime; +import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.io.PrintStream; +import java.io.UnsupportedEncodingException; import java.net.HttpURLConnection; import java.net.URL; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.asterix.active.ActiveManager; import org.apache.asterix.active.EntityId; import org.apache.asterix.bad.BADConstants; -import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.AOrderedlistPrinterFactory; +import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.ARecordPrinterFactory; import org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer; -import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer; -import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer; import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer; import org.apache.asterix.om.base.ADateTime; -import org.apache.asterix.om.base.AOrderedList; -import org.apache.asterix.om.base.ARecord; -import org.apache.asterix.om.base.AUUID; import org.apache.asterix.om.types.AOrderedListType; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.BuiltinType; import org.apache.asterix.om.types.IAType; +import org.apache.hyracks.algebricks.data.IPrinter; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime; @@ -58,15 +57,19 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream; import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; +import org.apache.hyracks.util.string.UTF8StringReader; +import org.apache.hyracks.util.string.UTF8StringWriter; public class NotifyBrokerRuntime extends AbstractOneInputOneOutputOneFramePushRuntime { private static final Logger LOGGER = Logger.getLogger(NotifyBrokerRuntime.class.getName()); private final ByteBufferInputStream bbis = new ByteBufferInputStream(); private final DataInputStream di = new DataInputStream(bbis); - private final AOrderedListSerializerDeserializer subSerDes = - new AOrderedListSerializerDeserializer(new AOrderedListType(BuiltinType.AUUID, null)); - private final ARecordSerializerDeserializer recordSerDes; + private static final AStringSerializerDeserializer stringSerDes = + new AStringSerializerDeserializer(new UTF8StringWriter(), new UTF8StringReader()); + + private final IPrinter recordPrinterFactory; + private final IPrinter listPrinterFactory; private IPointable inputArg0 = new VoidPointable(); private IPointable inputArg1 = new VoidPointable(); @@ -74,14 +77,14 @@ private IScalarEvaluator eval0; private IScalarEvaluator eval1; private IScalarEvaluator eval2; - private final ActiveManager activeManager; private final EntityId entityId; private final boolean push; - private AOrderedList pushList; - private ARecord pushRecord; - private final IAType recordType; - private final Map<String, HashSet<String>> sendData = new HashMap<>(); + private final Map<String, String> sendData = new HashMap<>(); + private final Map<String, ByteArrayOutputStream> sendbaos = new HashMap<>(); + private final Map<String, PrintStream> sendStreams = new HashMap<>(); private String executionTimeString; + private boolean firstResult = true; + String endpoint; public NotifyBrokerRuntime(IHyracksTaskContext ctx, IScalarEvaluatorFactory brokerEvalFactory, IScalarEvaluatorFactory pushListEvalFactory, IScalarEvaluatorFactory channelExecutionEvalFactory, @@ -90,14 +93,11 @@ eval0 = brokerEvalFactory.createScalarEvaluator(ctx); eval1 = pushListEvalFactory.createScalarEvaluator(ctx); eval2 = channelExecutionEvalFactory.createScalarEvaluator(ctx); - this.activeManager = (ActiveManager) ((INcApplicationContext) ctx.getJobletContext().getServiceContext() - .getApplicationContext()).getActiveManager(); this.entityId = activeJobId; this.push = push; - this.pushList = null; - this.pushRecord = null; - this.recordType = recordType; - recordSerDes = new ARecordSerializerDeserializer((ARecordType) recordType); + recordPrinterFactory = new ARecordPrinterFactory((ARecordType) recordType).createPrinter(); + listPrinterFactory = + new AOrderedlistPrinterFactory(new AOrderedListType(BuiltinType.AUUID, null)).createPrinter(); executionTimeString = null; } @@ -106,26 +106,16 @@ return; } - private void addSubscriptions(String endpoint, AOrderedList subscriptionIds) { - for (int i = 0; i < subscriptionIds.size(); i++) { - AUUID subId = (AUUID) subscriptionIds.getItem(i); - String subscriptionString = subId.toString(); - //Broker code currently cannot handle the "uuid {}" part of the string, so we parse just the value - subscriptionString = subscriptionString.substring(8, subscriptionString.length() - 2); - subscriptionString = "\"" + subscriptionString + "\""; - sendData.get(endpoint).add(subscriptionString); - } - } - public String createData(String endpoint) { + String resultTitle = "\"subscriptionIds"; + if (push) { + resultTitle = "\"results\""; + } String JSON = "{ \"dataverseName\":\"" + entityId.getDataverse() + "\", \"channelName\":\"" + entityId.getEntityName() + "\", \"" + BADConstants.ChannelExecutionTime + "\":\"" - + executionTimeString + "\", \"subscriptionIds\":["; - for (String value : sendData.get(endpoint)) { - JSON += value; - JSON += ","; - } - JSON = JSON.substring(0, JSON.length() - 1); + + executionTimeString + "\", " + resultTitle + ":["; + JSON += sendData.get(endpoint); + JSON = JSON.substring(0, JSON.length()); JSON += "]}"; return JSON; @@ -185,34 +175,49 @@ int serBrokerOffset = inputArg0.getStartOffset(); bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serBrokerOffset + 1); - String endpoint = AStringSerializerDeserializer.INSTANCE.deserialize(di).getStringValue(); - sendData.putIfAbsent(endpoint, new HashSet<>()); + endpoint = stringSerDes.deserialize(di).getStringValue(); + sendbaos.putIfAbsent(endpoint, new ByteArrayOutputStream()); + try { + sendStreams.putIfAbsent(endpoint, new PrintStream(sendbaos.get(endpoint), true, "UTF-8")); + } catch (UnsupportedEncodingException e) { + throw new HyracksDataException(e.getMessage()); + } if (push) { int pushOffset = inputArg1.getStartOffset(); bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), pushOffset + 1); - //TODO: Right now this creates an object per channel result. Need to find a better way to deserialize - pushRecord = recordSerDes.deserialize(di); - sendData.get(endpoint).add(pushRecord.toString()); + if (!firstResult) { + sendStreams.get(endpoint).append(','); + } + recordPrinterFactory.print(inputArg1.getByteArray(), inputArg1.getStartOffset(), inputArg1.getLength(), + sendStreams.get(endpoint)); } else { - int serSubOffset = inputArg1.getStartOffset(); - bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serSubOffset + 1); - pushList = subSerDes.deserialize(di); - addSubscriptions(endpoint, pushList); + if (!firstResult) { + sendStreams.get(endpoint).append(','); + } + listPrinterFactory.print(inputArg1.getByteArray(), inputArg1.getStartOffset(), inputArg1.getLength(), + sendStreams.get(endpoint)); } + firstResult = false; } } @Override public void close() throws HyracksDataException { - for (String endpoint : sendData.keySet()) { - if (sendData.get(endpoint).size() > 0) { - sendGroupOfResults(endpoint); - sendData.get(endpoint).clear(); + for (String endpoint : sendStreams.keySet()) { + sendData.put(endpoint, new String(sendbaos.get(endpoint).toByteArray(), StandardCharsets.UTF_8)); + sendGroupOfResults(endpoint); + sendStreams.get(endpoint).close(); + try { + sendbaos.get(endpoint).close(); + } catch (IOException e) { + throw new HyracksDataException(e.getMessage()); } + } + return; } -- To view, visit https://asterix-gerrit.ics.uci.edu/2717 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ia4ecd381d102c67f7c66cfa965312bfb885aa281 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb-bad Gerrit-Branch: master Gerrit-Owner: Steven Jacobs <sjaco...@ucr.edu>