This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new eab17ad Few optimizations for pulsar sql (#3128) eab17ad is described below commit eab17ad6edf6da83f207de2f80eb497600698a04 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Wed Dec 5 23:11:39 2018 -0800 Few optimizations for pulsar sql (#3128) * optimizing pulsar sql * optimizing * cleaning up --- .../pulsar/sql/presto/AvroSchemaHandler.java | 12 ++- .../sql/presto/PulsarConnectorMetricsTracker.java | 93 +++++++++++++++------- 2 files changed, 74 insertions(+), 31 deletions(-) diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java index 402a36e..eebfdbe 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java @@ -19,9 +19,11 @@ package org.apache.pulsar.sql.presto; import io.airlift.log.Logger; +import org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DecoderFactory; @@ -34,6 +36,9 @@ public class AvroSchemaHandler implements SchemaHandler { private final List<PulsarColumnHandle> columnHandles; + public static final FastThreadLocal<BinaryDecoder> decoders = + new FastThreadLocal<>(); + private static final Logger log = Logger.get(AvroSchemaHandler.class); public AvroSchemaHandler(Schema schema, List<PulsarColumnHandle> columnHandles) { @@ -44,7 +49,12 @@ public class AvroSchemaHandler implements SchemaHandler { @Override public Object deserialize(byte[] bytes) { try { - return this.datumReader.read(null, DecoderFactory.get().binaryDecoder(bytes, null)); + BinaryDecoder decoderFromCache = decoders.get(); + BinaryDecoder decoder=DecoderFactory.get().binaryDecoder(bytes, decoderFromCache); + if (decoderFromCache==null) { + decoders.set(decoder); + } + return this.datumReader.read(null, decoder); } catch (IOException e) { log.error(e); } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorMetricsTracker.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorMetricsTracker.java index 9ff337e..d62a788 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorMetricsTracker.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorMetricsTracker.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.sql.presto; +import org.apache.pulsar.shade.org.apache.bookkeeper.stats.Counter; +import org.apache.pulsar.shade.org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.pulsar.shade.org.apache.bookkeeper.stats.StatsProvider; import org.apache.pulsar.shade.org.apache.bookkeeper.stats.NullStatsProvider; import org.apache.pulsar.shade.org.apache.bookkeeper.stats.StatsLogger; @@ -26,7 +28,7 @@ import java.util.concurrent.TimeUnit; public class PulsarConnectorMetricsTracker implements AutoCloseable{ - private StatsLogger statsLogger; + private final StatsLogger statsLogger; private static final String SCOPE = "split"; @@ -65,7 +67,7 @@ public class PulsarConnectorMetricsTracker implements AutoCloseable{ public static final String NUM_MESSAGES_DERSERIALIZED_PER_QUERY = "num-messages-deserialized-per-query"; // number of read attempts. Will fail if queues are full - public static final String READ_ATTEMTPS = "read-attempts"; + public static final String READ_ATTEMPTS = "read-attempts"; // number of read attempts per query public static final String READ_ATTEMTPS_PER_QUERY= "read-attempts-per-query"; @@ -82,7 +84,7 @@ public class PulsarConnectorMetricsTracker implements AutoCloseable{ // number of entries per query public static final String NUM_ENTRIES_PER_QUERY = "num-entries-per-query"; - // time spent waiting to dequeue from message queue because its empty + // time spent waiting to dequeue from message queue because its empty per query public static final String MESSAGE_QUEUE_DEQUEUE_WAIT_TIME_PER_QUERY = "message-queue-dequeue-wait-time-per-query"; // time spent deserializing message to record e.g. avro, json, etc @@ -95,6 +97,21 @@ public class PulsarConnectorMetricsTracker implements AutoCloseable{ private static final String TOTAL_EXECUTION_TIME = "total-execution-time"; + /** stats loggers **/ + + private final OpStatsLogger statsLogger_entryQueueDequeueWaitTime; + private final Counter statsLogger_bytesRead; + private final OpStatsLogger statsLogger_entryDeserializetime; + private final OpStatsLogger statsLogger_messageQueueEnqueueWaitTime; + private final Counter statsLogger_numMessagesDeserialized; + private final OpStatsLogger statsLogger_numMessagesDeserializedPerEntry; + private final OpStatsLogger statsLogger_readAttempts; + private final OpStatsLogger statsLogger_readLatencyPerBatch; + private final OpStatsLogger statsLogger_numEntriesPerBatch; + private final OpStatsLogger statsLogger_recordDeserializeTime; + private final Counter statsLogger_numRecordDeserialized; + private final OpStatsLogger statsLogger_totalExecutionTime; + /** internal tracking variables **/ private long ENTRY_QUEUE_DEQUEUE_WAIT_TIME_startTime; private long ENTRY_QUEUE_DEQUEUE_WAIT_TIME_sum = 0L; @@ -117,6 +134,34 @@ public class PulsarConnectorMetricsTracker implements AutoCloseable{ public PulsarConnectorMetricsTracker(StatsProvider statsProvider) { this.statsLogger = statsProvider instanceof NullStatsProvider ? null : statsProvider.getStatsLogger(SCOPE); + + if (this.statsLogger != null) { + statsLogger_entryQueueDequeueWaitTime = statsLogger.getOpStatsLogger(ENTRY_QUEUE_DEQUEUE_WAIT_TIME); + statsLogger_bytesRead = statsLogger.getCounter(BYTES_READ); + statsLogger_entryDeserializetime = statsLogger.getOpStatsLogger(ENTRY_DESERIALIZE_TIME); + statsLogger_messageQueueEnqueueWaitTime = statsLogger.getOpStatsLogger(MESSAGE_QUEUE_ENQUEUE_WAIT_TIME); + statsLogger_numMessagesDeserialized = statsLogger.getCounter(NUM_MESSAGES_DERSERIALIZED); + statsLogger_numMessagesDeserializedPerEntry = statsLogger.getOpStatsLogger(NUM_MESSAGES_DERSERIALIZED_PER_ENTRY); + statsLogger_readAttempts = statsLogger.getOpStatsLogger(READ_ATTEMPTS); + statsLogger_readLatencyPerBatch = statsLogger.getOpStatsLogger(READ_LATENCY_PER_BATCH); + statsLogger_numEntriesPerBatch = statsLogger.getOpStatsLogger(NUM_ENTRIES_PER_BATCH); + statsLogger_recordDeserializeTime = statsLogger.getOpStatsLogger(RECORD_DESERIALIZE_TIME); + statsLogger_numRecordDeserialized = statsLogger.getCounter(NUM_RECORD_DESERIALIZED); + statsLogger_totalExecutionTime = statsLogger.getOpStatsLogger(TOTAL_EXECUTION_TIME); + } else { + statsLogger_entryQueueDequeueWaitTime = null; + statsLogger_bytesRead = null; + statsLogger_entryDeserializetime = null; + statsLogger_messageQueueEnqueueWaitTime = null; + statsLogger_numMessagesDeserialized = null; + statsLogger_numMessagesDeserializedPerEntry = null; + statsLogger_readAttempts = null; + statsLogger_readLatencyPerBatch = null; + statsLogger_numEntriesPerBatch = null; + statsLogger_recordDeserializeTime = null; + statsLogger_numRecordDeserialized = null; + statsLogger_totalExecutionTime = null; + } } public void start_ENTRY_QUEUE_DEQUEUE_WAIT_TIME() { @@ -129,15 +174,14 @@ public class PulsarConnectorMetricsTracker implements AutoCloseable{ if (statsLogger != null) { long time = System.nanoTime() - ENTRY_QUEUE_DEQUEUE_WAIT_TIME_startTime; ENTRY_QUEUE_DEQUEUE_WAIT_TIME_sum += time; - statsLogger.getOpStatsLogger(ENTRY_QUEUE_DEQUEUE_WAIT_TIME) - .registerSuccessfulEvent(time, TimeUnit.NANOSECONDS); + statsLogger_entryQueueDequeueWaitTime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS); } } public void register_BYTES_READ(long bytes) { if (statsLogger != null) { BYTES_READ_sum += bytes; - statsLogger.getCounter(BYTES_READ).add(bytes); + statsLogger_bytesRead.add(bytes); } } @@ -151,8 +195,7 @@ public class PulsarConnectorMetricsTracker implements AutoCloseable{ if (statsLogger != null) { long time = System.nanoTime() - ENTRY_DESERIALIZE_TIME_startTime; ENTRY_DESERIALIZE_TIME_sum += time; - statsLogger.getOpStatsLogger(ENTRY_DESERIALIZE_TIME) - .registerSuccessfulEvent(time, TimeUnit.NANOSECONDS); + statsLogger_entryDeserializetime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS); } } @@ -166,15 +209,14 @@ public class PulsarConnectorMetricsTracker implements AutoCloseable{ if (statsLogger != null) { long time = System.nanoTime() - MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_startTime; MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_sum += time; - statsLogger.getOpStatsLogger(MESSAGE_QUEUE_ENQUEUE_WAIT_TIME) - .registerSuccessfulEvent(time, TimeUnit.NANOSECONDS); + statsLogger_messageQueueEnqueueWaitTime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS); } } public void incr_NUM_MESSAGES_DESERIALIZED_PER_ENTRY() { if (statsLogger != null) { NUM_MESSAGED_DERSERIALIZED_PER_BATCH++; - statsLogger.getCounter(NUM_MESSAGES_DERSERIALIZED).add(1); + statsLogger_numMessagesDeserialized.add(1); } } @@ -182,8 +224,7 @@ public class PulsarConnectorMetricsTracker implements AutoCloseable{ if (statsLogger != null) { NUM_MESSAGES_DERSERIALIZED_sum += NUM_MESSAGED_DERSERIALIZED_PER_BATCH; - statsLogger.getOpStatsLogger(NUM_MESSAGES_DERSERIALIZED_PER_ENTRY) - .registerSuccessfulValue(NUM_MESSAGED_DERSERIALIZED_PER_BATCH); + statsLogger_numMessagesDeserializedPerEntry.registerSuccessfulValue(NUM_MESSAGED_DERSERIALIZED_PER_BATCH); NUM_MESSAGED_DERSERIALIZED_PER_BATCH = 0L; } @@ -192,47 +233,41 @@ public class PulsarConnectorMetricsTracker implements AutoCloseable{ public void incr_READ_ATTEMPTS_SUCCESS() { if (statsLogger != null) { READ_ATTEMTPS_SUCCESS_sum++; - statsLogger.getOpStatsLogger(READ_ATTEMTPS) - .registerSuccessfulValue(1L); + statsLogger_readAttempts.registerSuccessfulValue(1L); } } public void incr_READ_ATTEMPTS_FAIL() { if (statsLogger != null) { READ_ATTEMTPS_FAIL_sum++; - statsLogger.getOpStatsLogger(READ_ATTEMTPS) - .registerFailedValue(1L); + statsLogger_readAttempts.registerFailedValue(1L); } } public void register_READ_LATENCY_PER_BATCH_SUCCESS(long latency) { if (statsLogger != null) { READ_LATENCY_SUCCESS_sum += latency; - statsLogger.getOpStatsLogger(READ_LATENCY_PER_BATCH) - .registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS); + statsLogger_readLatencyPerBatch.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS); } } public void register_READ_LATENCY_PER_BATCH_FAIL(long latency) { if (statsLogger != null) { READ_LATENCY_FAIL_sum += latency; - statsLogger.getOpStatsLogger(READ_LATENCY_PER_BATCH) - .registerFailedEvent(latency, TimeUnit.NANOSECONDS); + statsLogger_readLatencyPerBatch.registerFailedEvent(latency, TimeUnit.NANOSECONDS); } } public void incr_NUM_ENTRIES_PER_BATCH_SUCCESS(long delta) { if (statsLogger != null) { NUM_ENTRIES_PER_BATCH_sum += delta; - statsLogger.getOpStatsLogger(NUM_ENTRIES_PER_BATCH) - .registerSuccessfulValue(delta); + statsLogger_numEntriesPerBatch.registerSuccessfulValue(delta); } } public void incr_NUM_ENTRIES_PER_BATCH_FAIL(long delta) { if (statsLogger != null) { - statsLogger.getOpStatsLogger(NUM_ENTRIES_PER_BATCH) - .registerFailedValue(delta); + statsLogger_numEntriesPerBatch.registerFailedValue(delta); } } @@ -252,21 +287,19 @@ public class PulsarConnectorMetricsTracker implements AutoCloseable{ if (statsLogger != null) { long time = System.nanoTime() - RECORD_DESERIALIZE_TIME_startTime; RECORD_DESERIALIZE_TIME_sum += time; - statsLogger.getOpStatsLogger(RECORD_DESERIALIZE_TIME) - .registerSuccessfulEvent(time, TimeUnit.NANOSECONDS); + statsLogger_recordDeserializeTime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS); } } public void incr_NUM_RECORD_DESERIALIZED() { if (statsLogger != null) { - statsLogger.getCounter(NUM_RECORD_DESERIALIZED).add(1); + statsLogger_numRecordDeserialized.add(1); } } public void register_TOTAL_EXECUTION_TIME(long latency) { if (statsLogger != null) { - statsLogger.getOpStatsLogger(TOTAL_EXECUTION_TIME) - .registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS); + statsLogger_totalExecutionTime.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS); } }