This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 7add930 Attach names for all producers/readers in worker service (#7165) 7add930 is described below commit 7add93095a88c8acfeae214ce8a1d856a54b474d Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Thu Jun 4 19:52:38 2020 -0700 Attach names for all producers/readers in worker service (#7165) * Attach names for all producers/readers in worker service * Fix tests Co-authored-by: Sanjeev Kulkarni <sanje...@splunk.com> --- .../apache/pulsar/functions/worker/FunctionMetaDataManager.java | 6 +++++- .../apache/pulsar/functions/worker/FunctionRuntimeManager.java | 1 + .../org/apache/pulsar/functions/worker/SchedulerManager.java | 1 + .../apache/pulsar/functions/worker/rest/api/ComponentImpl.java | 9 ++++++++- .../pulsar/functions/worker/FunctionMetaDataManagerTest.java | 1 + .../pulsar/functions/worker/FunctionRuntimeManagerTest.java | 1 + .../org/apache/pulsar/functions/worker/SchedulerManagerTest.java | 1 + 7 files changed, 18 insertions(+), 2 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java index c3273fa..2ab913b 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java @@ -91,6 +91,7 @@ public class FunctionMetaDataManager implements AutoCloseable { Reader<byte[]> reader = pulsarClient.newReader() .topic(this.workerConfig.getFunctionMetadataTopic()) .startMessageId(MessageId.earliest) + .readerName(workerConfig.getWorkerId() + "-function-metadata-manager") .create(); this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this, reader); @@ -432,6 +433,9 @@ public class FunctionMetaDataManager implements AutoCloseable { } private ServiceRequestManager getServiceRequestManager(PulsarClient pulsarClient, String functionMetadataTopic) throws PulsarClientException { - return new ServiceRequestManager(pulsarClient.newProducer().topic(functionMetadataTopic).create()); + return new ServiceRequestManager(pulsarClient.newProducer() + .topic(functionMetadataTopic) + .producerName(workerConfig.getWorkerId() + "-function-metadata-manager") + .create()); } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index 075146e..90cab3f 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -211,6 +211,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ log.info("/** Initializing Runtime Manager **/"); try { Reader<byte[]> reader = this.getWorkerService().getClient().newReader() + .readerName(workerConfig.getWorkerId() + "-function-runtime-manager") .topic(this.getWorkerConfig().getFunctionAssignmentTopic()).readCompacted(true) .startMessageId(MessageId.earliest).create(); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java index bcbccda..9c93443 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java @@ -106,6 +106,7 @@ public class SchedulerManager implements AutoCloseable { .blockIfQueueFull(true) .compressionType(CompressionType.LZ4) .sendTimeout(0, TimeUnit.MILLISECONDS) + .producerName(config.getWorkerId() + "-scheduler-manager") .createAsync().get(10, TimeUnit.SECONDS); return Actions.ActionResult.builder().success(true).result(producer).build(); } catch (Exception e) { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java index 0b128d9..f49971b 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java @@ -994,10 +994,17 @@ public abstract class ComponentImpl { Producer<byte[]> producer = null; try { if (outputTopic != null && !outputTopic.isEmpty()) { - reader = worker().getClient().newReader().topic(outputTopic).startMessageId(MessageId.latest).create(); + reader = worker().getClient().newReader() + .topic(outputTopic) + .startMessageId(MessageId.latest) + .readerName(worker().getWorkerConfig().getWorkerId() + "-trigger-" + + FunctionCommon.getFullyQualifiedName(tenant, namespace, functionName)) + .create(); } producer = worker().getClient().newProducer(Schema.AUTO_PRODUCE_BYTES()) .topic(inputTopicToWrite) + .producerName(worker().getWorkerConfig().getWorkerId() + "-trigger-" + + FunctionCommon.getFullyQualifiedName(tenant, namespace, functionName)) .create(); byte[] targetArray; if (uploadedInputStream != null) { diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java index 75be623..da6852c 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java @@ -48,6 +48,7 @@ public class FunctionMetaDataManagerTest { private static PulsarClient mockPulsarClient() throws PulsarClientException { ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class); when(builder.topic(anyString())).thenReturn(builder); + when(builder.producerName(anyString())).thenReturn(builder); when(builder.create()).thenReturn(mock(Producer.class)); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java index 2e6ed5b..4230304 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java @@ -562,6 +562,7 @@ public class FunctionRuntimeManagerTest { ReaderBuilder readerBuilder = mock(ReaderBuilder.class); doReturn(readerBuilder).when(pulsarClient).newReader(); doReturn(readerBuilder).when(readerBuilder).topic(anyString()); + doReturn(readerBuilder).when(readerBuilder).readerName(anyString()); doReturn(readerBuilder).when(readerBuilder).startMessageId(any()); doReturn(readerBuilder).when(readerBuilder).startMessageId(any()); doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean()); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java index 243f127..fafed1f 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java @@ -110,6 +110,7 @@ public class SchedulerManagerTest { ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class); when(builder.topic(anyString())).thenReturn(builder); + when(builder.producerName(anyString())).thenReturn(builder); when(builder.enableBatching(anyBoolean())).thenReturn(builder); when(builder.blockIfQueueFull(anyBoolean())).thenReturn(builder); when(builder.compressionType(any(CompressionType.class))).thenReturn(builder);