This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7add930  Attach names for all producers/readers in worker service 
(#7165)
7add930 is described below

commit 7add93095a88c8acfeae214ce8a1d856a54b474d
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Thu Jun 4 19:52:38 2020 -0700

    Attach names for all producers/readers in worker service (#7165)
    
    * Attach names for all producers/readers in worker service
    
    * Fix tests
    
    Co-authored-by: Sanjeev Kulkarni <sanje...@splunk.com>
---
 .../apache/pulsar/functions/worker/FunctionMetaDataManager.java  | 6 +++++-
 .../apache/pulsar/functions/worker/FunctionRuntimeManager.java   | 1 +
 .../org/apache/pulsar/functions/worker/SchedulerManager.java     | 1 +
 .../apache/pulsar/functions/worker/rest/api/ComponentImpl.java   | 9 ++++++++-
 .../pulsar/functions/worker/FunctionMetaDataManagerTest.java     | 1 +
 .../pulsar/functions/worker/FunctionRuntimeManagerTest.java      | 1 +
 .../org/apache/pulsar/functions/worker/SchedulerManagerTest.java | 1 +
 7 files changed, 18 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index c3273fa..2ab913b 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -91,6 +91,7 @@ public class FunctionMetaDataManager implements AutoCloseable 
{
             Reader<byte[]> reader = pulsarClient.newReader()
                     .topic(this.workerConfig.getFunctionMetadataTopic())
                     .startMessageId(MessageId.earliest)
+                    .readerName(workerConfig.getWorkerId() + 
"-function-metadata-manager")
                     .create();
 
             this.functionMetaDataTopicTailer = new 
FunctionMetaDataTopicTailer(this, reader);
@@ -432,6 +433,9 @@ public class FunctionMetaDataManager implements 
AutoCloseable {
     }
 
     private ServiceRequestManager getServiceRequestManager(PulsarClient 
pulsarClient, String functionMetadataTopic) throws PulsarClientException {
-        return new 
ServiceRequestManager(pulsarClient.newProducer().topic(functionMetadataTopic).create());
+        return new ServiceRequestManager(pulsarClient.newProducer()
+                .topic(functionMetadataTopic)
+                .producerName(workerConfig.getWorkerId() + 
"-function-metadata-manager")
+                .create());
     }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 075146e..90cab3f 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -211,6 +211,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
         log.info("/** Initializing Runtime Manager **/");
         try {
             Reader<byte[]> reader = 
this.getWorkerService().getClient().newReader()
+                    .readerName(workerConfig.getWorkerId() + 
"-function-runtime-manager")
                     
.topic(this.getWorkerConfig().getFunctionAssignmentTopic()).readCompacted(true)
                     .startMessageId(MessageId.earliest).create();
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index bcbccda..9c93443 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -106,6 +106,7 @@ public class SchedulerManager implements AutoCloseable {
                                 .blockIfQueueFull(true)
                                 .compressionType(CompressionType.LZ4)
                                 .sendTimeout(0, TimeUnit.MILLISECONDS)
+                                .producerName(config.getWorkerId() + 
"-scheduler-manager")
                                 .createAsync().get(10, TimeUnit.SECONDS);
                         return 
Actions.ActionResult.builder().success(true).result(producer).build();
                     } catch (Exception e) {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 0b128d9..f49971b 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -994,10 +994,17 @@ public abstract class ComponentImpl {
         Producer<byte[]> producer = null;
         try {
             if (outputTopic != null && !outputTopic.isEmpty()) {
-                reader = 
worker().getClient().newReader().topic(outputTopic).startMessageId(MessageId.latest).create();
+                reader = worker().getClient().newReader()
+                        .topic(outputTopic)
+                        .startMessageId(MessageId.latest)
+                        .readerName(worker().getWorkerConfig().getWorkerId() + 
"-trigger-" +
+                                FunctionCommon.getFullyQualifiedName(tenant, 
namespace, functionName))
+                        .create();
             }
             producer = 
worker().getClient().newProducer(Schema.AUTO_PRODUCE_BYTES())
                     .topic(inputTopicToWrite)
+                    .producerName(worker().getWorkerConfig().getWorkerId() + 
"-trigger-" +
+                            FunctionCommon.getFullyQualifiedName(tenant, 
namespace, functionName))
                     .create();
             byte[] targetArray;
             if (uploadedInputStream != null) {
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
index 75be623..da6852c 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
@@ -48,6 +48,7 @@ public class FunctionMetaDataManagerTest {
     private static PulsarClient mockPulsarClient() throws 
PulsarClientException {
         ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
         when(builder.topic(anyString())).thenReturn(builder);
+        when(builder.producerName(anyString())).thenReturn(builder);
 
         when(builder.create()).thenReturn(mock(Producer.class));
 
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 2e6ed5b..4230304 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -562,6 +562,7 @@ public class FunctionRuntimeManagerTest {
         ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
+        doReturn(readerBuilder).when(readerBuilder).readerName(anyString());
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
         
doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 243f127..fafed1f 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -110,6 +110,7 @@ public class SchedulerManagerTest {
 
         ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
         when(builder.topic(anyString())).thenReturn(builder);
+        when(builder.producerName(anyString())).thenReturn(builder);
         when(builder.enableBatching(anyBoolean())).thenReturn(builder);
         when(builder.blockIfQueueFull(anyBoolean())).thenReturn(builder);
         
when(builder.compressionType(any(CompressionType.class))).thenReturn(builder);

Reply via email to