This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new eab17ad  Few optimizations for pulsar sql (#3128)
eab17ad is described below

commit eab17ad6edf6da83f207de2f80eb497600698a04
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Wed Dec 5 23:11:39 2018 -0800

    Few optimizations for pulsar sql (#3128)
    
    * optimizing pulsar sql
    
    * optimizing
    
    * cleaning up
---
 .../pulsar/sql/presto/AvroSchemaHandler.java       | 12 ++-
 .../sql/presto/PulsarConnectorMetricsTracker.java  | 93 +++++++++++++++-------
 2 files changed, 74 insertions(+), 31 deletions(-)

diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
index 402a36e..eebfdbe 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
@@ -19,9 +19,11 @@
 package org.apache.pulsar.sql.presto;
 
 import io.airlift.log.Logger;
+import org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DecoderFactory;
 
@@ -34,6 +36,9 @@ public class AvroSchemaHandler implements SchemaHandler {
 
     private final List<PulsarColumnHandle> columnHandles;
 
+    public static final FastThreadLocal<BinaryDecoder> decoders =
+            new FastThreadLocal<>();
+
     private static final Logger log = Logger.get(AvroSchemaHandler.class);
 
     public AvroSchemaHandler(Schema schema, List<PulsarColumnHandle> 
columnHandles) {
@@ -44,7 +49,12 @@ public class AvroSchemaHandler implements SchemaHandler {
     @Override
     public Object deserialize(byte[] bytes) {
         try {
-            return this.datumReader.read(null, 
DecoderFactory.get().binaryDecoder(bytes, null));
+            BinaryDecoder decoderFromCache = decoders.get();
+            BinaryDecoder decoder=DecoderFactory.get().binaryDecoder(bytes, 
decoderFromCache);
+            if (decoderFromCache==null) {
+                decoders.set(decoder);
+            }
+            return this.datumReader.read(null, decoder);
         } catch (IOException e) {
             log.error(e);
         }
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorMetricsTracker.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorMetricsTracker.java
index 9ff337e..d62a788 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorMetricsTracker.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorMetricsTracker.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.sql.presto;
 
+import org.apache.pulsar.shade.org.apache.bookkeeper.stats.Counter;
+import org.apache.pulsar.shade.org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.pulsar.shade.org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.pulsar.shade.org.apache.bookkeeper.stats.NullStatsProvider;
 import org.apache.pulsar.shade.org.apache.bookkeeper.stats.StatsLogger;
@@ -26,7 +28,7 @@ import java.util.concurrent.TimeUnit;
 
 public class PulsarConnectorMetricsTracker implements AutoCloseable{
 
-    private StatsLogger statsLogger;
+    private final StatsLogger statsLogger;
 
     private static final String SCOPE = "split";
 
@@ -65,7 +67,7 @@ public class PulsarConnectorMetricsTracker implements 
AutoCloseable{
     public static final String NUM_MESSAGES_DERSERIALIZED_PER_QUERY = 
"num-messages-deserialized-per-query";
 
     // number of read attempts.  Will fail if queues are full
-    public static final String READ_ATTEMTPS = "read-attempts";
+    public static final String READ_ATTEMPTS = "read-attempts";
 
     // number of read attempts per query
     public static final String READ_ATTEMTPS_PER_QUERY= 
"read-attempts-per-query";
@@ -82,7 +84,7 @@ public class PulsarConnectorMetricsTracker implements 
AutoCloseable{
     // number of entries per query
     public static final String NUM_ENTRIES_PER_QUERY = "num-entries-per-query";
 
-    // time spent waiting to dequeue from message queue because its empty
+    // time spent waiting to dequeue from message queue because its empty per 
query
     public static final String MESSAGE_QUEUE_DEQUEUE_WAIT_TIME_PER_QUERY = 
"message-queue-dequeue-wait-time-per-query";
 
     // time spent deserializing message to record e.g. avro, json, etc
@@ -95,6 +97,21 @@ public class PulsarConnectorMetricsTracker implements 
AutoCloseable{
 
     private static final String TOTAL_EXECUTION_TIME = "total-execution-time";
 
+    /** stats loggers **/
+
+    private final OpStatsLogger statsLogger_entryQueueDequeueWaitTime;
+    private final Counter statsLogger_bytesRead;
+    private final OpStatsLogger statsLogger_entryDeserializetime;
+    private final OpStatsLogger statsLogger_messageQueueEnqueueWaitTime;
+    private final Counter statsLogger_numMessagesDeserialized;
+    private final OpStatsLogger statsLogger_numMessagesDeserializedPerEntry;
+    private final OpStatsLogger statsLogger_readAttempts;
+    private final OpStatsLogger statsLogger_readLatencyPerBatch;
+    private final OpStatsLogger statsLogger_numEntriesPerBatch;
+    private final OpStatsLogger statsLogger_recordDeserializeTime;
+    private final Counter statsLogger_numRecordDeserialized;
+    private final OpStatsLogger statsLogger_totalExecutionTime;
+
     /** internal tracking variables **/
     private long ENTRY_QUEUE_DEQUEUE_WAIT_TIME_startTime;
     private long ENTRY_QUEUE_DEQUEUE_WAIT_TIME_sum = 0L;
@@ -117,6 +134,34 @@ public class PulsarConnectorMetricsTracker implements 
AutoCloseable{
     public PulsarConnectorMetricsTracker(StatsProvider statsProvider) {
         this.statsLogger = statsProvider instanceof NullStatsProvider
                 ? null : statsProvider.getStatsLogger(SCOPE);
+
+        if (this.statsLogger != null) {
+            statsLogger_entryQueueDequeueWaitTime = 
statsLogger.getOpStatsLogger(ENTRY_QUEUE_DEQUEUE_WAIT_TIME);
+            statsLogger_bytesRead = statsLogger.getCounter(BYTES_READ);
+            statsLogger_entryDeserializetime = 
statsLogger.getOpStatsLogger(ENTRY_DESERIALIZE_TIME);
+            statsLogger_messageQueueEnqueueWaitTime = 
statsLogger.getOpStatsLogger(MESSAGE_QUEUE_ENQUEUE_WAIT_TIME);
+            statsLogger_numMessagesDeserialized = 
statsLogger.getCounter(NUM_MESSAGES_DERSERIALIZED);
+            statsLogger_numMessagesDeserializedPerEntry = 
statsLogger.getOpStatsLogger(NUM_MESSAGES_DERSERIALIZED_PER_ENTRY);
+            statsLogger_readAttempts = 
statsLogger.getOpStatsLogger(READ_ATTEMPTS);
+            statsLogger_readLatencyPerBatch = 
statsLogger.getOpStatsLogger(READ_LATENCY_PER_BATCH);
+            statsLogger_numEntriesPerBatch = 
statsLogger.getOpStatsLogger(NUM_ENTRIES_PER_BATCH);
+            statsLogger_recordDeserializeTime = 
statsLogger.getOpStatsLogger(RECORD_DESERIALIZE_TIME);
+            statsLogger_numRecordDeserialized = 
statsLogger.getCounter(NUM_RECORD_DESERIALIZED);
+            statsLogger_totalExecutionTime = 
statsLogger.getOpStatsLogger(TOTAL_EXECUTION_TIME);
+        } else {
+            statsLogger_entryQueueDequeueWaitTime = null;
+            statsLogger_bytesRead = null;
+            statsLogger_entryDeserializetime = null;
+            statsLogger_messageQueueEnqueueWaitTime = null;
+            statsLogger_numMessagesDeserialized = null;
+            statsLogger_numMessagesDeserializedPerEntry = null;
+            statsLogger_readAttempts = null;
+            statsLogger_readLatencyPerBatch = null;
+            statsLogger_numEntriesPerBatch = null;
+            statsLogger_recordDeserializeTime = null;
+            statsLogger_numRecordDeserialized = null;
+            statsLogger_totalExecutionTime = null;
+        }
     }
 
     public void start_ENTRY_QUEUE_DEQUEUE_WAIT_TIME() {
@@ -129,15 +174,14 @@ public class PulsarConnectorMetricsTracker implements 
AutoCloseable{
         if (statsLogger != null) {
             long time = System.nanoTime() - 
ENTRY_QUEUE_DEQUEUE_WAIT_TIME_startTime;
             ENTRY_QUEUE_DEQUEUE_WAIT_TIME_sum += time;
-            statsLogger.getOpStatsLogger(ENTRY_QUEUE_DEQUEUE_WAIT_TIME)
-                    .registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
+            
statsLogger_entryQueueDequeueWaitTime.registerSuccessfulEvent(time, 
TimeUnit.NANOSECONDS);
         }
     }
 
     public void register_BYTES_READ(long bytes) {
         if (statsLogger != null) {
             BYTES_READ_sum += bytes;
-            statsLogger.getCounter(BYTES_READ).add(bytes);
+            statsLogger_bytesRead.add(bytes);
         }
     }
 
@@ -151,8 +195,7 @@ public class PulsarConnectorMetricsTracker implements 
AutoCloseable{
         if (statsLogger != null) {
             long time = System.nanoTime() - ENTRY_DESERIALIZE_TIME_startTime;
             ENTRY_DESERIALIZE_TIME_sum += time;
-            statsLogger.getOpStatsLogger(ENTRY_DESERIALIZE_TIME)
-                    .registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
+            statsLogger_entryDeserializetime.registerSuccessfulEvent(time, 
TimeUnit.NANOSECONDS);
         }
     }
 
@@ -166,15 +209,14 @@ public class PulsarConnectorMetricsTracker implements 
AutoCloseable{
         if (statsLogger != null) {
             long time = System.nanoTime() - 
MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_startTime;
             MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_sum += time;
-            statsLogger.getOpStatsLogger(MESSAGE_QUEUE_ENQUEUE_WAIT_TIME)
-                    .registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
+            
statsLogger_messageQueueEnqueueWaitTime.registerSuccessfulEvent(time, 
TimeUnit.NANOSECONDS);
         }
     }
 
     public void incr_NUM_MESSAGES_DESERIALIZED_PER_ENTRY() {
         if (statsLogger != null) {
             NUM_MESSAGED_DERSERIALIZED_PER_BATCH++;
-            statsLogger.getCounter(NUM_MESSAGES_DERSERIALIZED).add(1);
+            statsLogger_numMessagesDeserialized.add(1);
         }
     }
 
@@ -182,8 +224,7 @@ public class PulsarConnectorMetricsTracker implements 
AutoCloseable{
         if (statsLogger != null) {
             NUM_MESSAGES_DERSERIALIZED_sum += 
NUM_MESSAGED_DERSERIALIZED_PER_BATCH;
 
-            statsLogger.getOpStatsLogger(NUM_MESSAGES_DERSERIALIZED_PER_ENTRY)
-                    
.registerSuccessfulValue(NUM_MESSAGED_DERSERIALIZED_PER_BATCH);
+            
statsLogger_numMessagesDeserializedPerEntry.registerSuccessfulValue(NUM_MESSAGED_DERSERIALIZED_PER_BATCH);
 
             NUM_MESSAGED_DERSERIALIZED_PER_BATCH = 0L;
         }
@@ -192,47 +233,41 @@ public class PulsarConnectorMetricsTracker implements 
AutoCloseable{
     public void incr_READ_ATTEMPTS_SUCCESS() {
         if (statsLogger != null) {
             READ_ATTEMTPS_SUCCESS_sum++;
-            statsLogger.getOpStatsLogger(READ_ATTEMTPS)
-                    .registerSuccessfulValue(1L);
+            statsLogger_readAttempts.registerSuccessfulValue(1L);
         }
     }
 
     public void incr_READ_ATTEMPTS_FAIL() {
         if (statsLogger != null) {
             READ_ATTEMTPS_FAIL_sum++;
-            statsLogger.getOpStatsLogger(READ_ATTEMTPS)
-                    .registerFailedValue(1L);
+            statsLogger_readAttempts.registerFailedValue(1L);
         }
     }
 
     public void register_READ_LATENCY_PER_BATCH_SUCCESS(long latency) {
         if (statsLogger != null) {
             READ_LATENCY_SUCCESS_sum += latency;
-            statsLogger.getOpStatsLogger(READ_LATENCY_PER_BATCH)
-                    .registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
+            statsLogger_readLatencyPerBatch.registerSuccessfulEvent(latency, 
TimeUnit.NANOSECONDS);
         }
     }
 
     public void register_READ_LATENCY_PER_BATCH_FAIL(long latency) {
         if (statsLogger != null) {
             READ_LATENCY_FAIL_sum += latency;
-            statsLogger.getOpStatsLogger(READ_LATENCY_PER_BATCH)
-                    .registerFailedEvent(latency, TimeUnit.NANOSECONDS);
+            statsLogger_readLatencyPerBatch.registerFailedEvent(latency, 
TimeUnit.NANOSECONDS);
         }
     }
 
     public void incr_NUM_ENTRIES_PER_BATCH_SUCCESS(long delta) {
         if (statsLogger != null) {
             NUM_ENTRIES_PER_BATCH_sum += delta;
-            statsLogger.getOpStatsLogger(NUM_ENTRIES_PER_BATCH)
-                    .registerSuccessfulValue(delta);
+            statsLogger_numEntriesPerBatch.registerSuccessfulValue(delta);
         }
     }
 
     public void incr_NUM_ENTRIES_PER_BATCH_FAIL(long delta) {
         if (statsLogger != null) {
-            statsLogger.getOpStatsLogger(NUM_ENTRIES_PER_BATCH)
-                    .registerFailedValue(delta);
+            statsLogger_numEntriesPerBatch.registerFailedValue(delta);
         }
     }
 
@@ -252,21 +287,19 @@ public class PulsarConnectorMetricsTracker implements 
AutoCloseable{
         if (statsLogger != null) {
             long time = System.nanoTime() - RECORD_DESERIALIZE_TIME_startTime;
             RECORD_DESERIALIZE_TIME_sum += time;
-            statsLogger.getOpStatsLogger(RECORD_DESERIALIZE_TIME)
-                    .registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
+            statsLogger_recordDeserializeTime.registerSuccessfulEvent(time, 
TimeUnit.NANOSECONDS);
         }
     }
 
     public void incr_NUM_RECORD_DESERIALIZED() {
         if (statsLogger != null) {
-            statsLogger.getCounter(NUM_RECORD_DESERIALIZED).add(1);
+            statsLogger_numRecordDeserialized.add(1);
         }
     }
 
     public void register_TOTAL_EXECUTION_TIME(long latency) {
         if (statsLogger != null) {
-            statsLogger.getOpStatsLogger(TOTAL_EXECUTION_TIME)
-                    .registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
+            statsLogger_totalExecutionTime.registerSuccessfulEvent(latency, 
TimeUnit.NANOSECONDS);
         }
     }
 

Reply via email to