jerrypeng closed pull request #3128: Few optimizations for pulsar sql URL: https://github.com/apache/pulsar/pull/3128
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java index 402a36e4f4..eebfdbeecb 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java @@ -19,9 +19,11 @@ package org.apache.pulsar.sql.presto; import io.airlift.log.Logger; +import org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DecoderFactory; @@ -34,6 +36,9 @@ private final List<PulsarColumnHandle> columnHandles; + public static final FastThreadLocal<BinaryDecoder> decoders = + new FastThreadLocal<>(); + private static final Logger log = Logger.get(AvroSchemaHandler.class); public AvroSchemaHandler(Schema schema, List<PulsarColumnHandle> columnHandles) { @@ -44,7 +49,12 @@ public AvroSchemaHandler(Schema schema, List<PulsarColumnHandle> columnHandles) @Override public Object deserialize(byte[] bytes) { try { - return this.datumReader.read(null, DecoderFactory.get().binaryDecoder(bytes, null)); + BinaryDecoder decoderFromCache = decoders.get(); + BinaryDecoder decoder=DecoderFactory.get().binaryDecoder(bytes, decoderFromCache); + if (decoderFromCache==null) { + decoders.set(decoder); + } + return this.datumReader.read(null, decoder); } catch (IOException e) { log.error(e); } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorMetricsTracker.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorMetricsTracker.java index 9ff337e09b..d62a78837e 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorMetricsTracker.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorMetricsTracker.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.sql.presto; +import org.apache.pulsar.shade.org.apache.bookkeeper.stats.Counter; +import org.apache.pulsar.shade.org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.pulsar.shade.org.apache.bookkeeper.stats.StatsProvider; import org.apache.pulsar.shade.org.apache.bookkeeper.stats.NullStatsProvider; import org.apache.pulsar.shade.org.apache.bookkeeper.stats.StatsLogger; @@ -26,7 +28,7 @@ public class PulsarConnectorMetricsTracker implements AutoCloseable{ - private StatsLogger statsLogger; + private final StatsLogger statsLogger; private static final String SCOPE = "split"; @@ -65,7 +67,7 @@ public static final String NUM_MESSAGES_DERSERIALIZED_PER_QUERY = "num-messages-deserialized-per-query"; // number of read attempts. Will fail if queues are full - public static final String READ_ATTEMTPS = "read-attempts"; + public static final String READ_ATTEMPTS = "read-attempts"; // number of read attempts per query public static final String READ_ATTEMTPS_PER_QUERY= "read-attempts-per-query"; @@ -82,7 +84,7 @@ // number of entries per query public static final String NUM_ENTRIES_PER_QUERY = "num-entries-per-query"; - // time spent waiting to dequeue from message queue because its empty + // time spent waiting to dequeue from message queue because its empty per query public static final String MESSAGE_QUEUE_DEQUEUE_WAIT_TIME_PER_QUERY = "message-queue-dequeue-wait-time-per-query"; // time spent deserializing message to record e.g. avro, json, etc @@ -95,6 +97,21 @@ private static final String TOTAL_EXECUTION_TIME = "total-execution-time"; + /** stats loggers **/ + + private final OpStatsLogger statsLogger_entryQueueDequeueWaitTime; + private final Counter statsLogger_bytesRead; + private final OpStatsLogger statsLogger_entryDeserializetime; + private final OpStatsLogger statsLogger_messageQueueEnqueueWaitTime; + private final Counter statsLogger_numMessagesDeserialized; + private final OpStatsLogger statsLogger_numMessagesDeserializedPerEntry; + private final OpStatsLogger statsLogger_readAttempts; + private final OpStatsLogger statsLogger_readLatencyPerBatch; + private final OpStatsLogger statsLogger_numEntriesPerBatch; + private final OpStatsLogger statsLogger_recordDeserializeTime; + private final Counter statsLogger_numRecordDeserialized; + private final OpStatsLogger statsLogger_totalExecutionTime; + /** internal tracking variables **/ private long ENTRY_QUEUE_DEQUEUE_WAIT_TIME_startTime; private long ENTRY_QUEUE_DEQUEUE_WAIT_TIME_sum = 0L; @@ -117,6 +134,34 @@ public PulsarConnectorMetricsTracker(StatsProvider statsProvider) { this.statsLogger = statsProvider instanceof NullStatsProvider ? null : statsProvider.getStatsLogger(SCOPE); + + if (this.statsLogger != null) { + statsLogger_entryQueueDequeueWaitTime = statsLogger.getOpStatsLogger(ENTRY_QUEUE_DEQUEUE_WAIT_TIME); + statsLogger_bytesRead = statsLogger.getCounter(BYTES_READ); + statsLogger_entryDeserializetime = statsLogger.getOpStatsLogger(ENTRY_DESERIALIZE_TIME); + statsLogger_messageQueueEnqueueWaitTime = statsLogger.getOpStatsLogger(MESSAGE_QUEUE_ENQUEUE_WAIT_TIME); + statsLogger_numMessagesDeserialized = statsLogger.getCounter(NUM_MESSAGES_DERSERIALIZED); + statsLogger_numMessagesDeserializedPerEntry = statsLogger.getOpStatsLogger(NUM_MESSAGES_DERSERIALIZED_PER_ENTRY); + statsLogger_readAttempts = statsLogger.getOpStatsLogger(READ_ATTEMPTS); + statsLogger_readLatencyPerBatch = statsLogger.getOpStatsLogger(READ_LATENCY_PER_BATCH); + statsLogger_numEntriesPerBatch = statsLogger.getOpStatsLogger(NUM_ENTRIES_PER_BATCH); + statsLogger_recordDeserializeTime = statsLogger.getOpStatsLogger(RECORD_DESERIALIZE_TIME); + statsLogger_numRecordDeserialized = statsLogger.getCounter(NUM_RECORD_DESERIALIZED); + statsLogger_totalExecutionTime = statsLogger.getOpStatsLogger(TOTAL_EXECUTION_TIME); + } else { + statsLogger_entryQueueDequeueWaitTime = null; + statsLogger_bytesRead = null; + statsLogger_entryDeserializetime = null; + statsLogger_messageQueueEnqueueWaitTime = null; + statsLogger_numMessagesDeserialized = null; + statsLogger_numMessagesDeserializedPerEntry = null; + statsLogger_readAttempts = null; + statsLogger_readLatencyPerBatch = null; + statsLogger_numEntriesPerBatch = null; + statsLogger_recordDeserializeTime = null; + statsLogger_numRecordDeserialized = null; + statsLogger_totalExecutionTime = null; + } } public void start_ENTRY_QUEUE_DEQUEUE_WAIT_TIME() { @@ -129,15 +174,14 @@ public void end_ENTRY_QUEUE_DEQUEUE_WAIT_TIME() { if (statsLogger != null) { long time = System.nanoTime() - ENTRY_QUEUE_DEQUEUE_WAIT_TIME_startTime; ENTRY_QUEUE_DEQUEUE_WAIT_TIME_sum += time; - statsLogger.getOpStatsLogger(ENTRY_QUEUE_DEQUEUE_WAIT_TIME) - .registerSuccessfulEvent(time, TimeUnit.NANOSECONDS); + statsLogger_entryQueueDequeueWaitTime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS); } } public void register_BYTES_READ(long bytes) { if (statsLogger != null) { BYTES_READ_sum += bytes; - statsLogger.getCounter(BYTES_READ).add(bytes); + statsLogger_bytesRead.add(bytes); } } @@ -151,8 +195,7 @@ public void end_ENTRY_DESERIALIZE_TIME() { if (statsLogger != null) { long time = System.nanoTime() - ENTRY_DESERIALIZE_TIME_startTime; ENTRY_DESERIALIZE_TIME_sum += time; - statsLogger.getOpStatsLogger(ENTRY_DESERIALIZE_TIME) - .registerSuccessfulEvent(time, TimeUnit.NANOSECONDS); + statsLogger_entryDeserializetime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS); } } @@ -166,15 +209,14 @@ public void end_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME() { if (statsLogger != null) { long time = System.nanoTime() - MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_startTime; MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_sum += time; - statsLogger.getOpStatsLogger(MESSAGE_QUEUE_ENQUEUE_WAIT_TIME) - .registerSuccessfulEvent(time, TimeUnit.NANOSECONDS); + statsLogger_messageQueueEnqueueWaitTime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS); } } public void incr_NUM_MESSAGES_DESERIALIZED_PER_ENTRY() { if (statsLogger != null) { NUM_MESSAGED_DERSERIALIZED_PER_BATCH++; - statsLogger.getCounter(NUM_MESSAGES_DERSERIALIZED).add(1); + statsLogger_numMessagesDeserialized.add(1); } } @@ -182,8 +224,7 @@ public void end_NUM_MESSAGES_DESERIALIZED_PER_ENTRY() { if (statsLogger != null) { NUM_MESSAGES_DERSERIALIZED_sum += NUM_MESSAGED_DERSERIALIZED_PER_BATCH; - statsLogger.getOpStatsLogger(NUM_MESSAGES_DERSERIALIZED_PER_ENTRY) - .registerSuccessfulValue(NUM_MESSAGED_DERSERIALIZED_PER_BATCH); + statsLogger_numMessagesDeserializedPerEntry.registerSuccessfulValue(NUM_MESSAGED_DERSERIALIZED_PER_BATCH); NUM_MESSAGED_DERSERIALIZED_PER_BATCH = 0L; } @@ -192,47 +233,41 @@ public void end_NUM_MESSAGES_DESERIALIZED_PER_ENTRY() { public void incr_READ_ATTEMPTS_SUCCESS() { if (statsLogger != null) { READ_ATTEMTPS_SUCCESS_sum++; - statsLogger.getOpStatsLogger(READ_ATTEMTPS) - .registerSuccessfulValue(1L); + statsLogger_readAttempts.registerSuccessfulValue(1L); } } public void incr_READ_ATTEMPTS_FAIL() { if (statsLogger != null) { READ_ATTEMTPS_FAIL_sum++; - statsLogger.getOpStatsLogger(READ_ATTEMTPS) - .registerFailedValue(1L); + statsLogger_readAttempts.registerFailedValue(1L); } } public void register_READ_LATENCY_PER_BATCH_SUCCESS(long latency) { if (statsLogger != null) { READ_LATENCY_SUCCESS_sum += latency; - statsLogger.getOpStatsLogger(READ_LATENCY_PER_BATCH) - .registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS); + statsLogger_readLatencyPerBatch.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS); } } public void register_READ_LATENCY_PER_BATCH_FAIL(long latency) { if (statsLogger != null) { READ_LATENCY_FAIL_sum += latency; - statsLogger.getOpStatsLogger(READ_LATENCY_PER_BATCH) - .registerFailedEvent(latency, TimeUnit.NANOSECONDS); + statsLogger_readLatencyPerBatch.registerFailedEvent(latency, TimeUnit.NANOSECONDS); } } public void incr_NUM_ENTRIES_PER_BATCH_SUCCESS(long delta) { if (statsLogger != null) { NUM_ENTRIES_PER_BATCH_sum += delta; - statsLogger.getOpStatsLogger(NUM_ENTRIES_PER_BATCH) - .registerSuccessfulValue(delta); + statsLogger_numEntriesPerBatch.registerSuccessfulValue(delta); } } public void incr_NUM_ENTRIES_PER_BATCH_FAIL(long delta) { if (statsLogger != null) { - statsLogger.getOpStatsLogger(NUM_ENTRIES_PER_BATCH) - .registerFailedValue(delta); + statsLogger_numEntriesPerBatch.registerFailedValue(delta); } } @@ -252,21 +287,19 @@ public void end_RECORD_DESERIALIZE_TIME() { if (statsLogger != null) { long time = System.nanoTime() - RECORD_DESERIALIZE_TIME_startTime; RECORD_DESERIALIZE_TIME_sum += time; - statsLogger.getOpStatsLogger(RECORD_DESERIALIZE_TIME) - .registerSuccessfulEvent(time, TimeUnit.NANOSECONDS); + statsLogger_recordDeserializeTime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS); } } public void incr_NUM_RECORD_DESERIALIZED() { if (statsLogger != null) { - statsLogger.getCounter(NUM_RECORD_DESERIALIZED).add(1); + statsLogger_numRecordDeserialized.add(1); } } public void register_TOTAL_EXECUTION_TIME(long latency) { if (statsLogger != null) { - statsLogger.getOpStatsLogger(TOTAL_EXECUTION_TIME) - .registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS); + statsLogger_totalExecutionTime.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS); } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services