This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 5cc4dd8 add sink and source prometheus stats (#3261) 5cc4dd8 is described below commit 5cc4dd8b759f1bfd0ed76f88e065aee65ac6f857 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Mon Dec 31 12:15:55 2018 -0800 add sink and source prometheus stats (#3261) * add sink and source prometheus stats * fixing stuff --- distribution/io/src/assemble/io.xml | 2 +- pulsar-broker/pom.xml | 10 +- .../apache/pulsar/io/PulsarFunctionE2ETest.java | 307 +++++++++++++++++++ .../pulsar/functions/instance/ContextImpl.java | 32 +- .../pulsar/functions/instance/InstanceUtils.java | 26 ++ .../functions/instance/JavaInstanceRunnable.java | 18 +- .../instance/stats/ComponentStatsManager.java | 170 +++++++++++ .../instance/{ => stats}/FunctionStatsManager.java | 248 ++++----------- .../functions/instance/stats/SinkStatsManager.java | 340 +++++++++++++++++++++ .../instance/stats/SourceStatsManager.java | 339 ++++++++++++++++++++ .../pulsar/functions/instance/ContextImplTest.java | 5 +- .../org/apache/pulsar/functions/utils/Utils.java | 17 ++ .../functions/worker/rest/api/ComponentImpl.java | 23 +- .../functions/worker/rest/api/FunctionsImpl.java | 3 +- .../pulsar/functions/worker/rest/api/SinkImpl.java | 3 +- .../functions/worker/rest/api/SourceImpl.java | 3 +- .../rest/api/v2/FunctionApiV2ResourceTest.java | 3 +- .../rest/api/v3/FunctionApiV3ResourceTest.java | 11 +- .../worker/rest/api/v3/SinkApiV3ResourceTest.java | 9 +- .../rest/api/v3/SourceApiV3ResourceTest.java | 9 +- .../{data-genenator => data-generator}/pom.xml | 0 .../io/datagenerator/DataGeneratorPrintSink.java | 0 .../io/datagenerator/DataGeneratorSource.java | 0 .../org/apache/pulsar/io/datagenerator/Person.java | 0 .../resources/META-INF/services/pulsar-io.yaml | 0 pulsar-io/pom.xml | 2 +- 26 files changed, 1337 insertions(+), 243 deletions(-) diff --git a/distribution/io/src/assemble/io.xml b/distribution/io/src/assemble/io.xml index a0704c3..10ea149 100644 --- a/distribution/io/src/assemble/io.xml +++ b/distribution/io/src/assemble/io.xml @@ -83,7 +83,7 @@ </file> <file> - <source>${basedir}/../../pulsar-io/data-genenator/target/pulsar-io-data-generator-${project.version}.nar</source> + <source>${basedir}/../../pulsar-io/data-generator/target/pulsar-io-data-generator-${project.version}.nar</source> <outputDirectory>connectors</outputDirectory> <fileMode>644</fileMode> </file> diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml index 07c9d6f..c451132 100644 --- a/pulsar-broker/pom.xml +++ b/pulsar-broker/pom.xml @@ -272,6 +272,13 @@ </dependency> <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>pulsar-io-data-generator</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> <groupId>javax.xml.bind</groupId> <artifactId>jaxb-api</artifactId> </dependency> @@ -328,7 +335,8 @@ <echo>copy test examples package</echo> <mkdir dir="${basedir}/src/test/resources"/> <copy file="${basedir}/../pulsar-functions/java-examples/target/pulsar-functions-api-examples.jar" tofile="${basedir}/src/test/resources/pulsar-functions-api-examples.jar"/> - </tasks> + <copy file="${basedir}/../pulsar-io/data-generator/target/pulsar-io-data-generator-${project.version}.nar" tofile="${basedir}/src/test/resources/pulsar-io-data-generator.nar"/> + </tasks> </configuration> </execution> </executions> diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java index 7841453..fa1177d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java @@ -40,6 +40,8 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.Utils; +import org.apache.pulsar.common.io.SinkConfig; +import org.apache.pulsar.common.io.SourceConfig; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.FunctionStats; import org.apache.pulsar.common.policies.data.FunctionStatus; @@ -65,6 +67,7 @@ import java.net.HttpURLConnection; import java.net.InetAddress; import java.net.URL; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -78,6 +81,7 @@ import java.util.regex.Pattern; import static com.google.common.base.Preconditions.checkArgument; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; +import static org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY; import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -191,6 +195,8 @@ public class PulsarFunctionE2ETest { propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use"))); admin.tenants().updateTenant(tenant, propAdmin); + System.setProperty(JAVA_INSTANCE_JAR_PROPERTY, ""); + Thread.sleep(100); } @@ -257,6 +263,28 @@ public class PulsarFunctionE2ETest { return functionConfig; } + private static SourceConfig createSourceConfig(String tenant, String namespace, String functionName, String sinkTopic) { + SourceConfig sourceConfig = new SourceConfig(); + sourceConfig.setTenant(tenant); + sourceConfig.setNamespace(namespace); + sourceConfig.setName(functionName); + sourceConfig.setParallelism(1); + sourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); + sourceConfig.setTopicName(sinkTopic); + return sourceConfig; + } + + private static SinkConfig createSinkConfig(String tenant, String namespace, String functionName, String sourceTopic, String subName) { + SinkConfig sinkConfig = new SinkConfig(); + sinkConfig.setTenant(tenant); + sinkConfig.setNamespace(namespace); + sinkConfig.setName(functionName); + sinkConfig.setParallelism(1); + sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); + sinkConfig.setInputs(Collections.singleton(sourceTopic)); + sinkConfig.setSourceSubscriptionName(subName); + return sinkConfig; + } /** * Validates pulsar sink e2e functionality on functions. * @@ -325,6 +353,285 @@ public class PulsarFunctionE2ETest { } @Test(timeOut = 20000) + public void testPulsarSinkStats() throws Exception { + final String namespacePortion = "io"; + final String replNamespace = tenant + "/" + namespacePortion; + final String sourceTopic = "persistent://" + replNamespace + "/input"; + final String functionName = "PulsarSink-test"; + final String propertyKey = "key"; + final String propertyValue = "value"; + final String subscriptionName = "test-sub"; + admin.namespaces().createNamespace(replNamespace); + Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use")); + admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); + + // create a producer that creates a topic at broker + Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create(); + + String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile(); + SinkConfig sinkConfig = createSinkConfig(tenant, namespacePortion, functionName, sourceTopic, subscriptionName); + admin.sink().createSinkWithUrl(sinkConfig, jarFilePathUrl); + + admin.sink().updateSinkWithUrl(sinkConfig, jarFilePathUrl); + + retryStrategically((test) -> { + try { + return admin.topics().getStats(sourceTopic).subscriptions.size() == 1; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 150); + // validate pulsar sink consumer has started on the topic + assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1); + + // validate prometheus metrics empty + String prometheusMetrics = getPrometheusMetrics(brokerWebServicePort); + log.info("prometheus metrics: {}", prometheusMetrics); + + Map<String, Metric> metrics = parseMetrics(prometheusMetrics); + Metric m = metrics.get("pulsar_sink_received_total"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.value, 0.0); + m = metrics.get("pulsar_sink_received_total_1min"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.value, 0.0); + m = metrics.get("pulsar_sink_written_total"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.value, 0.0); + m = metrics.get("pulsar_sink_written_total_1min"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.value, 0.0); + m = metrics.get("pulsar_sink_sink_exceptions_total"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.value, 0.0); + m = metrics.get("pulsar_sink_sink_exceptions_total_1min"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.value, 0.0); + m = metrics.get("pulsar_sink_system_exceptions_total"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.value, 0.0); + m = metrics.get("pulsar_sink_system_exceptions_total_1min"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.value, 0.0); + m = metrics.get("pulsar_sink_last_invocation"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.value, 0.0); + + int totalMsgs = 10; + for (int i = 0; i < totalMsgs; i++) { + String data = "my-message-" + i; + producer.newMessage().property(propertyKey, propertyValue).value(data).send(); + } + retryStrategically((test) -> { + try { + SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName); + return subStats.unackedMessages == 0 && subStats.msgThroughputOut == totalMsgs; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 200); + + // get stats after producing + prometheusMetrics = getPrometheusMetrics(brokerWebServicePort); + log.info("prometheusMetrics: {}", prometheusMetrics); + + metrics = parseMetrics(prometheusMetrics); + m = metrics.get("pulsar_sink_received_total"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.value, (double) totalMsgs); + m = metrics.get("pulsar_sink_received_total_1min"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.value, (double) totalMsgs); + m = metrics.get("pulsar_sink_written_total"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.value, (double) totalMsgs); + m = metrics.get("pulsar_sink_written_total_1min"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.value, (double) totalMsgs); + m = metrics.get("pulsar_sink_sink_exceptions_total"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.value, 0.0); + m = metrics.get("pulsar_sink_sink_exceptions_total_1min"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.value, 0.0); + m = metrics.get("pulsar_sink_system_exceptions_total"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.value, 0.0); + m = metrics.get("pulsar_sink_system_exceptions_total_1min"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.value, 0.0); + m = metrics.get("pulsar_sink_last_invocation"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertTrue(m.value > 0.0); + } + + @Test(timeOut = 20000) + public void testPulsarSourceStats() throws Exception { + final String namespacePortion = "io"; + final String replNamespace = tenant + "/" + namespacePortion; + final String sinkTopic = "persistent://" + replNamespace + "/output"; + final String functionName = "PulsarSource-test"; + admin.namespaces().createNamespace(replNamespace); + Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use")); + admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); + + String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile(); + SourceConfig sourceConfig = createSourceConfig(tenant, namespacePortion, functionName, sinkTopic); + admin.source().createSourceWithUrl(sourceConfig, jarFilePathUrl); + + admin.source().updateSourceWithUrl(sourceConfig, jarFilePathUrl); + + retryStrategically((test) -> { + try { + return (admin.topics().getStats(sinkTopic).publishers.size() == 1) && (admin.topics().getInternalStats(sinkTopic).numberOfEntries > 4); + } catch (PulsarAdminException e) { + return false; + } + }, 10, 150); + assertEquals(admin.topics().getStats(sinkTopic).publishers.size(), 1); + + String prometheusMetrics = getPrometheusMetrics(brokerWebServicePort); + log.info("prometheusMetrics: {}", prometheusMetrics); + + Map<String, Metric> metrics = parseMetrics(prometheusMetrics); + Metric m = metrics.get("pulsar_source_received_total"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertTrue(m.value > 0.0); + m = metrics.get("pulsar_source_received_total_1min"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertTrue(m.value > 0.0); + m = metrics.get("pulsar_source_written_total"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertTrue(m.value > 0.0); + m = metrics.get("pulsar_source_written_total_1min"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertTrue(m.value > 0.0); + m = metrics.get("pulsar_source_source_exceptions_total"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.value, 0.0); + m = metrics.get("pulsar_source_source_exceptions_total_1min"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.value, 0.0); + m = metrics.get("pulsar_source_system_exceptions_total"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.value, 0.0); + m = metrics.get("pulsar_source_system_exceptions_total_1min"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertEquals(m.value, 0.0); + m = metrics.get("pulsar_source_last_invocation"); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), "0"); + assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); + assertTrue(m.value > 0.0); + } + + @Test(timeOut = 20000) public void testPulsarFunctionStats() throws Exception { final String namespacePortion = "io"; diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index 69fd6c8..460ba75 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -34,9 +34,14 @@ import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.instance.state.StateContextImpl; +import org.apache.pulsar.functions.instance.stats.ComponentStatsManager; +import org.apache.pulsar.functions.instance.stats.FunctionStatsManager; +import org.apache.pulsar.functions.instance.stats.SinkStatsManager; +import org.apache.pulsar.functions.instance.stats.SourceStatsManager; import org.apache.pulsar.functions.proto.Function.SinkSpec; import org.apache.pulsar.functions.secretsprovider.SecretsProvider; import org.apache.pulsar.functions.source.TopicSchema; +import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.core.SourceContext; import org.slf4j.Logger; @@ -52,7 +57,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import static com.google.common.base.Preconditions.checkState; -import static org.apache.pulsar.functions.instance.FunctionStatsManager.USER_METRIC_PREFIX; +import static org.apache.pulsar.functions.instance.stats.FunctionStatsManager.USER_METRIC_PREFIX; /** * This class implements the Context interface exposed to the user. @@ -87,12 +92,13 @@ class ContextImpl implements Context, SinkContext, SourceContext { private final static String[] userMetricsLabelNames; static { // add label to indicate user metric - userMetricsLabelNames = Arrays.copyOf(FunctionStatsManager.metricsLabelNames, FunctionStatsManager.metricsLabelNames.length + 1); - userMetricsLabelNames[FunctionStatsManager.metricsLabelNames.length] = "metric"; + userMetricsLabelNames = Arrays.copyOf(ComponentStatsManager.metricsLabelNames, ComponentStatsManager.metricsLabelNames.length + 1); + userMetricsLabelNames[ComponentStatsManager.metricsLabelNames.length] = "metric"; } public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, List<String> inputTopics, - SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels) { + SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels, + Utils.ComponentType componentType) { this.config = config; this.logger = logger; this.publishProducers = new HashMap<>(); @@ -119,9 +125,23 @@ class ContextImpl implements Context, SinkContext, SourceContext { } this.metricsLabels = metricsLabels; + String prefix; + switch (componentType) { + case FUNCTION: + prefix = FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX; + break; + case SINK: + prefix = SinkStatsManager.PULSAR_SINK_METRICS_PREFIX; + break; + case SOURCE: + prefix = SourceStatsManager.PULSAR_SOURCE_METRICS_PREFIX; + break; + default: + throw new RuntimeException("Unknown component type: " + componentType); + } this.userMetricsSummary = Summary.build() - .name("pulsar_function_user_metric") - .help("Pulsar Function user defined metric.") + .name(prefix + ComponentStatsManager.USER_METRIC_PREFIX) + .help("User defined metric.") .labelNames(userMetricsLabelNames) .quantile(0.5, 0.01) .quantile(0.9, 0.01) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java index 86a9aa2..9e32736 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java @@ -19,15 +19,22 @@ package org.apache.pulsar.functions.instance; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.commons.lang3.StringUtils.isEmpty; +import static org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION; +import static org.apache.pulsar.functions.utils.Utils.ComponentType.SINK; +import static org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE; import lombok.experimental.UtilityClass; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.functions.api.SerDe; +import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.sink.PulsarSink; import org.apache.pulsar.functions.utils.Reflections; import net.jodah.typetools.TypeResolver; +import org.apache.pulsar.functions.utils.Utils; @UtilityClass public class InstanceUtils { @@ -77,4 +84,23 @@ public class InstanceUtils { return Reflections.createInstance(className, baseClass, clsLoader); } } + + public Utils.ComponentType calculateSubjectType(Function.FunctionDetails functionDetails) { + Function.SourceSpec sourceSpec = functionDetails.getSource(); + Function.SinkSpec sinkSpec = functionDetails.getSink(); + if (sourceSpec.getInputSpecsCount() == 0) { + return SOURCE; + } + // Now its between sink and function + + if (!isEmpty(sinkSpec.getBuiltin())) { + // if its built in, its a sink + return SINK; + } + + if (isEmpty(sinkSpec.getClassName()) || sinkSpec.getClassName().equals(PulsarSink.class.getName())) { + return FUNCTION; + } + return SINK; + } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index e858e90..7c36b58 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -55,6 +55,8 @@ import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.instance.state.StateContextImpl; +import org.apache.pulsar.functions.instance.stats.ComponentStatsManager; +import org.apache.pulsar.functions.instance.stats.FunctionStatsManager; import org.apache.pulsar.functions.proto.Function.SinkSpec; import org.apache.pulsar.functions.proto.Function.SourceSpec; import org.apache.pulsar.functions.proto.InstanceCommunication; @@ -116,7 +118,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { // function stats @Getter - private FunctionStatsManager stats; + private ComponentStatsManager stats; private Record<?> currentRecord; @@ -202,7 +204,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { } Logger instanceLog = LoggerFactory.getLogger( "function-" + instanceConfig.getFunctionDetails().getName()); - return new ContextImpl(instanceConfig, instanceLog, client, inputTopics, secretsProvider, collectorRegistry, metricsLabels); + return new ContextImpl(instanceConfig, instanceLog, client, inputTopics, secretsProvider, + collectorRegistry, metricsLabels, InstanceUtils.calculateSubjectType(this.instanceConfig.getFunctionDetails())); } /** @@ -216,7 +219,9 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { if (this.collectorRegistry == null) { this.collectorRegistry = new CollectorRegistry(); } - this.stats = new FunctionStatsManager(this.collectorRegistry, this.metricsLabels, this.instanceCache.getScheduledExecutorService()); + this.stats = ComponentStatsManager.getStatsManager(this.collectorRegistry, this.metricsLabels, + this.instanceCache.getScheduledExecutorService(), + InstanceUtils.calculateSubjectType(this.instanceConfig.getFunctionDetails())); ContextImpl contextImpl = setupContext(); javaInstance = setupJavaInstance(contextImpl); @@ -284,6 +289,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { private void loadJars() throws Exception { try { + log.info("jarFile: {}", jarFile); // Let's first try to treat it as a nar archive fnCache.registerFunctionInstanceWithArchive( instanceConfig.getFunctionId(), @@ -546,6 +552,12 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { stats.getLatestSystemExceptions().forEach(ex -> { functionStatusBuilder.addLatestSystemExceptions(ex); }); + stats.getLatestSourceExceptions().forEach(ex -> { + functionStatusBuilder.addLatestSourceExceptions(ex); + }); + stats.getLatestSinkExceptions().forEach(ex -> { + functionStatusBuilder.addLatestSinkExceptions(ex); + }); functionStatusBuilder.setAverageLatency(stats.getAvgProcessLatency()); functionStatusBuilder.setLastInvocationTime((long) stats.getLastInvocation()); return functionStatusBuilder; diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java new file mode 100644 index 0000000..3cb654b --- /dev/null +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.instance.stats; + +import com.google.common.collect.EvictingQueue; +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.exporter.common.TextFormat; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.functions.proto.InstanceCommunication; +import org.apache.pulsar.functions.utils.Utils; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.Arrays; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +@Slf4j +public abstract class ComponentStatsManager implements AutoCloseable { + + protected String[] metricsLabels; + + protected ScheduledFuture<?> scheduledFuture; + + protected final CollectorRegistry collectorRegistry; + + protected final EvictingQueue EMPTY_QUEUE = EvictingQueue.create(0); + + public final static String USER_METRIC_PREFIX = "user_metric_"; + + public static final String[] metricsLabelNames = {"tenant", "namespace", "function", "instance_id", "cluster", "fqfn"}; + + protected static final String[] exceptionMetricsLabelNames; + + static { + exceptionMetricsLabelNames = Arrays.copyOf(metricsLabelNames, metricsLabelNames.length + 2); + exceptionMetricsLabelNames[metricsLabelNames.length] = "error"; + exceptionMetricsLabelNames[metricsLabelNames.length + 1] = "ts"; + } + + public static ComponentStatsManager getStatsManager(CollectorRegistry collectorRegistry, + String[] metricsLabels, + ScheduledExecutorService scheduledExecutorService, + Utils.ComponentType componentType) { + switch (componentType) { + case FUNCTION: + return new FunctionStatsManager(collectorRegistry, metricsLabels, scheduledExecutorService); + case SOURCE: + return new SourceStatsManager(collectorRegistry, metricsLabels, scheduledExecutorService); + case SINK: + return new SinkStatsManager(collectorRegistry, metricsLabels, scheduledExecutorService); + default: + throw new RuntimeException("Unknown component type: " + componentType); + } + } + + public ComponentStatsManager(CollectorRegistry collectorRegistry, + String[] metricsLabels, + ScheduledExecutorService scheduledExecutorService) { + + this.collectorRegistry = collectorRegistry; + this.metricsLabels = metricsLabels; + + scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + reset(); + } catch (Exception e) { + log.error("Failed to reset metrics for 1min window", e); + } + } + }, 1, 1, TimeUnit.MINUTES); + } + + public abstract void reset(); + + public abstract void incrTotalReceived(); + + public abstract void incrTotalProcessedSuccessfully(); + + public abstract void incrSysExceptions(Throwable sysException); + + public abstract void incrUserExceptions(Exception userException); + + public abstract void incrSourceExceptions(Exception userException); + + public abstract void incrSinkExceptions(Exception userException); + + public abstract void setLastInvocation(long ts); + + public abstract void processTimeStart(); + + public abstract void processTimeEnd(); + + public abstract double getTotalProcessedSuccessfully(); + + public abstract double getTotalRecordsReceived(); + + public abstract double getTotalSysExceptions(); + + public abstract double getTotalUserExceptions(); + + public abstract double getLastInvocation(); + + public abstract double getAvgProcessLatency(); + + public abstract double getTotalProcessedSuccessfully1min(); + + public abstract double getTotalRecordsReceived1min(); + + public abstract double getTotalSysExceptions1min(); + + public abstract double getTotalUserExceptions1min(); + + public abstract double getAvgProcessLatency1min(); + + public abstract EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> getLatestUserExceptions(); + + public abstract EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> getLatestSystemExceptions(); + + public abstract EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> getLatestSourceExceptions(); + + public abstract EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> getLatestSinkExceptions(); + + public String getStatsAsString() throws IOException { + StringWriter outputWriter = new StringWriter(); + + TextFormat.write004(outputWriter, collectorRegistry.metricFamilySamples()); + + return outputWriter.toString(); + } + + protected InstanceCommunication.FunctionStatus.ExceptionInformation getExceptionInfo(Throwable th, long ts) { + InstanceCommunication.FunctionStatus.ExceptionInformation.Builder exceptionInfoBuilder = + InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder().setMsSinceEpoch(ts); + String msg = th.getMessage(); + if (msg != null) { + exceptionInfoBuilder.setExceptionString(msg); + } + return exceptionInfoBuilder.build(); + } + + + @Override + public void close() { + if (scheduledFuture != null) { + scheduledFuture.cancel(false); + scheduledFuture = null; + } + } +} diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java similarity index 64% rename from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java rename to pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java index 818fdb3..d03bfe2 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java @@ -16,25 +16,21 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.functions.instance; +package org.apache.pulsar.functions.instance.stats; import com.google.common.collect.EvictingQueue; import io.prometheus.client.CollectorRegistry; import io.prometheus.client.Counter; import io.prometheus.client.Gauge; import io.prometheus.client.Summary; -import io.prometheus.client.exporter.common.TextFormat; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.common.util.RateLimiter; import org.apache.pulsar.functions.proto.InstanceCommunication; -import java.io.IOException; -import java.io.StringWriter; import java.util.Arrays; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; /** @@ -43,18 +39,9 @@ import java.util.concurrent.TimeUnit; @Slf4j @Getter @Setter -public class FunctionStatsManager implements AutoCloseable { - - static final String[] metricsLabelNames = {"tenant", "namespace", "function", "instance_id", "cluster", "fqfn"}; - static final String[] exceptionMetricsLabelNames; - static { - exceptionMetricsLabelNames = Arrays.copyOf(metricsLabelNames, metricsLabelNames.length + 2); - exceptionMetricsLabelNames[metricsLabelNames.length] = "error"; - exceptionMetricsLabelNames[metricsLabelNames.length + 1] = "ts"; - } +public class FunctionStatsManager extends ComponentStatsManager{ public static final String PULSAR_FUNCTION_METRICS_PREFIX = "pulsar_function_"; - public final static String USER_METRIC_PREFIX = "user_metric_"; /** Declare metric names **/ public static final String PROCESSED_SUCCESSFULLY_TOTAL = "processed_successfully_total"; @@ -81,16 +68,12 @@ public class FunctionStatsManager implements AutoCloseable { final Counter statTotalSysExceptions; final Counter statTotalUserExceptions; - - final Counter statTotalSourceExceptions; - - final Counter statTotalSinkExceptions; - + final Summary statProcessLatency; final Gauge statlastInvocation; - final Counter statTotalRecordsRecieved; + final Counter statTotalRecordsReceived; // windowed metrics @@ -99,14 +82,10 @@ public class FunctionStatsManager implements AutoCloseable { final Counter statTotalSysExceptions1min; final Counter statTotalUserExceptions1min; - - final Counter statTotalSourceExceptions1min; - - final Counter statTotalSinkExceptions1min; - + final Summary statProcessLatency1min; - final Counter statTotalRecordsRecieved1min; + final Counter statTotalRecordsReceived1min; // exceptions @@ -122,47 +101,28 @@ public class FunctionStatsManager implements AutoCloseable { private final Counter.Child _statTotalProcessedSuccessfully; private final Counter.Child _statTotalSysExceptions; private final Counter.Child _statTotalUserExceptions; - private final Counter.Child _statTotalSourceExceptions; - private final Counter.Child _statTotalSinkExceptions; private final Summary.Child _statProcessLatency; private final Gauge.Child _statlastInvocation; - private final Counter.Child _statTotalRecordsRecieved; + private final Counter.Child _statTotalRecordsReceived; private Counter.Child _statTotalProcessedSuccessfully1min; private Counter.Child _statTotalSysExceptions1min; private Counter.Child _statTotalUserExceptions1min; - private Counter.Child _statTotalSourceExceptions1min; - private Counter.Child _statTotalSinkExceptions1min; private Summary.Child _statProcessLatency1min; - private Counter.Child _statTotalRecordsRecieved1min; - - private String[] metricsLabels; - - private ScheduledFuture<?> scheduledFuture; - - private final CollectorRegistry collectorRegistry; + private Counter.Child _statTotalRecordsReceived1min; @Getter private EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> latestUserExceptions = EvictingQueue.create(10); @Getter private EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> latestSystemExceptions = EvictingQueue.create(10); - @Getter - private EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> latestSourceExceptions = EvictingQueue.create(10); - @Getter - private EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> latestSinkExceptions = EvictingQueue.create(10); private final RateLimiter userExceptionRateLimiter; private final RateLimiter sysExceptionRateLimiter; - private final RateLimiter sourceExceptionRateLimiter; - - private final RateLimiter sinkExceptionRateLimiter; - - public FunctionStatsManager(CollectorRegistry collectorRegistry, String[] metricsLabels, ScheduledExecutorService scheduledExecutorService) { - - this.collectorRegistry = collectorRegistry; - - this.metricsLabels = metricsLabels; + public FunctionStatsManager(CollectorRegistry collectorRegistry, + String[] metricsLabels, + ScheduledExecutorService scheduledExecutorService) { + super(collectorRegistry, metricsLabels, scheduledExecutorService); statTotalProcessedSuccessfully = Counter.build() .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL) @@ -185,20 +145,6 @@ public class FunctionStatsManager implements AutoCloseable { .register(collectorRegistry); _statTotalUserExceptions = statTotalUserExceptions.labels(metricsLabels); - statTotalSourceExceptions = Counter.build() - .name(PULSAR_FUNCTION_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL) - .help("Total number of source exceptions.") - .labelNames(metricsLabelNames) - .register(collectorRegistry); - _statTotalSourceExceptions = statTotalSourceExceptions.labels(metricsLabels); - - statTotalSinkExceptions = Counter.build() - .name(PULSAR_FUNCTION_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL) - .help("Total number of sink exceptions.") - .labelNames(metricsLabelNames) - .register(collectorRegistry); - _statTotalSinkExceptions = statTotalSinkExceptions.labels(metricsLabels); - statProcessLatency = Summary.build() .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS) .help("Process latency in milliseconds.") @@ -217,12 +163,12 @@ public class FunctionStatsManager implements AutoCloseable { .register(collectorRegistry); _statlastInvocation = statlastInvocation.labels(metricsLabels); - statTotalRecordsRecieved = Counter.build() + statTotalRecordsReceived = Counter.build() .name(PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL) .help("Total number of messages received from source.") .labelNames(metricsLabelNames) .register(collectorRegistry); - _statTotalRecordsRecieved = statTotalRecordsRecieved.labels(metricsLabels); + _statTotalRecordsReceived = statTotalRecordsReceived.labels(metricsLabels); statTotalProcessedSuccessfully1min = Counter.build() .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL_1min) @@ -245,20 +191,6 @@ public class FunctionStatsManager implements AutoCloseable { .register(collectorRegistry); _statTotalUserExceptions1min = statTotalUserExceptions1min.labels(metricsLabels); - statTotalSourceExceptions1min = Counter.build() - .name(PULSAR_FUNCTION_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL_1min) - .help("Total number of source exceptions in the last 1 minute.") - .labelNames(metricsLabelNames) - .register(collectorRegistry); - _statTotalSourceExceptions1min = statTotalSourceExceptions1min.labels(metricsLabels); - - statTotalSinkExceptions1min = Counter.build() - .name(PULSAR_FUNCTION_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL_1min) - .help("Total number of sink exceptions in the last 1 minute.") - .labelNames(metricsLabelNames) - .register(collectorRegistry); - _statTotalSinkExceptions1min = statTotalSinkExceptions1min.labels(metricsLabels); - statProcessLatency1min = Summary.build() .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS_1min) .help("Process latency in milliseconds in the last 1 minute.") @@ -270,12 +202,12 @@ public class FunctionStatsManager implements AutoCloseable { .register(collectorRegistry); _statProcessLatency1min = statProcessLatency1min.labels(metricsLabels); - statTotalRecordsRecieved1min = Counter.build() + statTotalRecordsReceived1min = Counter.build() .name(PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL_1min) .help("Total number of messages received from source in the last 1 minute.") .labelNames(metricsLabelNames) .register(collectorRegistry); - _statTotalRecordsRecieved1min = statTotalRecordsRecieved1min.labels(metricsLabels); + _statTotalRecordsReceived1min = statTotalRecordsReceived1min.labels(metricsLabels); userExceptions = Gauge.build() .name(PULSAR_FUNCTION_METRICS_PREFIX + "user_exception") @@ -300,22 +232,8 @@ public class FunctionStatsManager implements AutoCloseable { .help("Exception from sink.") .register(collectorRegistry); - scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - reset(); - } catch (Exception e) { - log.error("Failed to reset metrics for 1min window", e); - } - } - }, 1, 1, TimeUnit.MINUTES); - userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); - sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); - sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); - } public void addUserException(Exception ex) { @@ -346,87 +264,54 @@ public class FunctionStatsManager implements AutoCloseable { } } - public void addSourceException(Throwable ex) { - long ts = System.currentTimeMillis(); - InstanceCommunication.FunctionStatus.ExceptionInformation info = getExceptionInfo(ex, ts); - latestSourceExceptions.add(info); - - // report exception throw prometheus - if (sourceExceptionRateLimiter.tryAcquire()) { - String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2); - exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : ""; - exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts); - sourceExceptions.labels(exceptionMetricsLabels).set(1.0); - } - } - - public void addSinkException(Throwable ex) { - long ts = System.currentTimeMillis(); - InstanceCommunication.FunctionStatus.ExceptionInformation info = getExceptionInfo(ex, ts); - latestSinkExceptions.add(info); - - // report exception throw prometheus - if (sinkExceptionRateLimiter.tryAcquire()) { - String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2); - exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : ""; - exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts); - sinkExceptions.labels(exceptionMetricsLabels).set(1.0); - } - } - - private InstanceCommunication.FunctionStatus.ExceptionInformation getExceptionInfo(Throwable th, long ts) { - InstanceCommunication.FunctionStatus.ExceptionInformation.Builder exceptionInfoBuilder = - InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder().setMsSinceEpoch(ts); - String msg = th.getMessage(); - if (msg != null) { - exceptionInfoBuilder.setExceptionString(msg); - } - return exceptionInfoBuilder.build(); - } - + @Override public void incrTotalReceived() { - _statTotalRecordsRecieved.inc(); - _statTotalRecordsRecieved1min.inc(); + _statTotalRecordsReceived.inc(); + _statTotalRecordsReceived1min.inc(); } + @Override public void incrTotalProcessedSuccessfully() { _statTotalProcessedSuccessfully.inc(); _statTotalProcessedSuccessfully1min.inc(); } + @Override public void incrSysExceptions(Throwable sysException) { _statTotalSysExceptions.inc(); _statTotalSysExceptions1min.inc(); addSystemException(sysException); } + @Override public void incrUserExceptions(Exception userException) { _statTotalUserExceptions.inc(); _statTotalUserExceptions1min.inc(); addUserException(userException); } - public void incrSourceExceptions(Exception userException) { - _statTotalSourceExceptions.inc(); - _statTotalSourceExceptions1min.inc(); - addSourceException(userException); + @Override + public void incrSourceExceptions(Exception ex) { + incrSysExceptions(ex); } - public void incrSinkExceptions(Exception userException) { - _statTotalSinkExceptions.inc(); - _statTotalSinkExceptions1min.inc(); - addSinkException(userException); + @Override + public void incrSinkExceptions(Exception ex) { + incrSysExceptions(ex); } + @Override public void setLastInvocation(long ts) { _statlastInvocation.set(ts); } private Long processTimeStart; + @Override public void processTimeStart() { processTimeStart = System.nanoTime(); } + @Override public void processTimeEnd() { if (processTimeStart != null) { double endTimeMs = ((double) System.nanoTime() - processTimeStart) / 1.0E6D; @@ -435,30 +320,27 @@ public class FunctionStatsManager implements AutoCloseable { } } + @Override public double getTotalProcessedSuccessfully() { return _statTotalProcessedSuccessfully.get(); } + @Override public double getTotalRecordsReceived() { - return _statTotalRecordsRecieved.get(); + return _statTotalRecordsReceived.get(); } + @Override public double getTotalSysExceptions() { return _statTotalSysExceptions.get(); } + @Override public double getTotalUserExceptions() { return _statTotalUserExceptions.get(); } - - public double getTotalSourceExceptions() { - return _statTotalSourceExceptions.get(); - } - - public double getTotalSinkExceptions() { - return _statTotalSinkExceptions.get(); - } - + + @Override public double getLastInvocation() { return _statlastInvocation.get(); } @@ -484,33 +366,40 @@ public class FunctionStatsManager implements AutoCloseable { return _statProcessLatency.get().quantiles.get(0.999); } + @Override public double getTotalProcessedSuccessfully1min() { return _statTotalProcessedSuccessfully1min.get(); } + @Override public double getTotalRecordsReceived1min() { - return _statTotalRecordsRecieved1min.get(); + return _statTotalRecordsReceived1min.get(); } + @Override public double getTotalSysExceptions1min() { return _statTotalSysExceptions1min.get(); } + @Override public double getTotalUserExceptions1min() { return _statTotalUserExceptions1min.get(); } - - public double getTotalSourceExceptions1min() { - return _statTotalSourceExceptions1min.get(); + + @Override + public double getAvgProcessLatency1min() { + return _statProcessLatency1min.get().count <= 0.0 + ? 0 : _statProcessLatency1min.get().sum / _statProcessLatency1min.get().count; } - public double getTotalSinkExceptions1min() { - return _statTotalSinkExceptions1min.get(); + @Override + public EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> getLatestSourceExceptions() { + return EMPTY_QUEUE; } - public double getAvgProcessLatency1min() { - return _statProcessLatency1min.get().count <= 0.0 - ? 0 : _statProcessLatency1min.get().sum / _statProcessLatency1min.get().count; + @Override + public EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> getLatestSinkExceptions() { + return EMPTY_QUEUE; } public double getProcessLatency50P1min() { @@ -529,6 +418,7 @@ public class FunctionStatsManager implements AutoCloseable { return _statProcessLatency1min.get().quantiles.get(0.999); } + @Override public void reset() { statTotalProcessedSuccessfully1min.clear(); _statTotalProcessedSuccessfully1min = statTotalProcessedSuccessfully1min.labels(metricsLabels); @@ -539,37 +429,13 @@ public class FunctionStatsManager implements AutoCloseable { statTotalUserExceptions1min.clear(); _statTotalUserExceptions1min = statTotalUserExceptions1min.labels(metricsLabels); - statTotalSourceExceptions1min.clear(); - _statTotalSourceExceptions1min = statTotalSourceExceptions1min.labels(metricsLabels); - - statTotalSinkExceptions1min.clear(); - _statTotalSinkExceptions1min = statTotalSinkExceptions1min.labels(metricsLabels); - statProcessLatency1min.clear(); _statProcessLatency1min = statProcessLatency1min.labels(metricsLabels); - statTotalRecordsRecieved1min.clear(); - _statTotalRecordsRecieved1min = statTotalRecordsRecieved1min.labels(metricsLabels); + statTotalRecordsReceived1min.clear(); + _statTotalRecordsReceived1min = statTotalRecordsReceived1min.labels(metricsLabels); latestUserExceptions.clear(); latestSystemExceptions.clear(); - latestSourceExceptions.clear(); - latestSinkExceptions.clear(); - } - - public String getStatsAsString() throws IOException { - StringWriter outputWriter = new StringWriter(); - - TextFormat.write004(outputWriter, collectorRegistry.metricFamilySamples()); - - return outputWriter.toString(); - } - - @Override - public void close() { - if (scheduledFuture != null) { - scheduledFuture.cancel(false); - scheduledFuture = null; - } } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java new file mode 100644 index 0000000..14e4c21 --- /dev/null +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java @@ -0,0 +1,340 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.instance.stats; + +import com.google.common.collect.EvictingQueue; +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.Counter; +import io.prometheus.client.Gauge; +import lombok.Getter; +import org.apache.pulsar.common.util.RateLimiter; +import org.apache.pulsar.functions.proto.InstanceCommunication; + +import java.util.Arrays; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class SinkStatsManager extends ComponentStatsManager { + + public static final String PULSAR_SINK_METRICS_PREFIX = "pulsar_sink_"; + + /** Declare metric names **/ + public static final String SYSTEM_EXCEPTIONS_TOTAL = "system_exceptions_total"; + public static final String SINK_EXCEPTIONS_TOTAL = "sink_exceptions_total"; + public static final String LAST_INVOCATION = "last_invocation"; + public static final String RECEIVED_TOTAL = "received_total"; + public static final String WRITTEN_TOTAL = "written_total"; + + public static final String SYSTEM_EXCEPTIONS_TOTAL_1min = "system_exceptions_total_1min"; + public static final String SINK_EXCEPTIONS_TOTAL_1min = "sink_exceptions_total_1min"; + public static final String RECEIVED_TOTAL_1min = "received_total_1min"; + public static final String WRITTEN_TOTAL_1min = "written_total_1min"; + + /** Declare Prometheus stats **/ + + private final Counter statTotalRecordsReceived; + + private final Counter statTotalSysExceptions; + + private final Counter statTotalSinkExceptions; + + private final Counter statTotalWritten; + + private final Gauge statlastInvocation; + + // windowed metrics + private final Counter statTotalRecordsReceived1min; + + private final Counter statTotalSysExceptions1min; + + private final Counter statTotalSinkExceptions1min; + + private final Counter statTotalWritten1min; + + // exceptions + + final Gauge sysExceptions; + + final Gauge sinkExceptions; + + // As an optimization + private final Counter.Child _statTotalRecordsReceived; + private final Counter.Child _statTotalSysExceptions; + private final Counter.Child _statTotalSinkExceptions; + private final Counter.Child _statTotalWritten; + private final Gauge.Child _statlastInvocation; + + private Counter.Child _statTotalRecordsReceived1min; + private Counter.Child _statTotalSysExceptions1min; + private Counter.Child _statTotalSinkExceptions1min; + private Counter.Child _statTotalWritten1min; + + @Getter + private EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> latestSystemExceptions = EvictingQueue.create(10); + @Getter + private EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> latestSinkExceptions = EvictingQueue.create(10); + + private final RateLimiter sysExceptionRateLimiter; + + private final RateLimiter sinkExceptionRateLimiter; + + + public SinkStatsManager(CollectorRegistry collectorRegistry, String[] metricsLabels, ScheduledExecutorService + scheduledExecutorService) { + super(collectorRegistry, metricsLabels, scheduledExecutorService); + + statTotalRecordsReceived = Counter.build() + .name(PULSAR_SINK_METRICS_PREFIX + RECEIVED_TOTAL) + .help("Total number of records sink has received from Pulsar topic(s).") + .labelNames(metricsLabelNames) + .register(collectorRegistry); + _statTotalRecordsReceived = statTotalRecordsReceived.labels(metricsLabels); + + statTotalSysExceptions = Counter.build() + .name(PULSAR_SINK_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL) + .help("Total number of system exceptions.") + .labelNames(metricsLabelNames) + .register(collectorRegistry); + _statTotalSysExceptions = statTotalSysExceptions.labels(metricsLabels); + + statTotalSinkExceptions = Counter.build() + .name(PULSAR_SINK_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL) + .help("Total number of sink exceptions.") + .labelNames(metricsLabelNames) + .register(collectorRegistry); + _statTotalSinkExceptions = statTotalSinkExceptions.labels(metricsLabels); + + statTotalWritten = Counter.build() + .name(PULSAR_SINK_METRICS_PREFIX + WRITTEN_TOTAL) + .help("Total number of records processed by sink.") + .labelNames(metricsLabelNames) + .register(collectorRegistry); + _statTotalWritten = statTotalWritten.labels(metricsLabels); + + statlastInvocation = Gauge.build() + .name(PULSAR_SINK_METRICS_PREFIX + LAST_INVOCATION) + .help("The timestamp of the last invocation of the sink.") + .labelNames(metricsLabelNames) + .register(collectorRegistry); + _statlastInvocation = statlastInvocation.labels(metricsLabels); + + statTotalRecordsReceived1min = Counter.build() + .name(PULSAR_SINK_METRICS_PREFIX + RECEIVED_TOTAL_1min) + .help("Total number of messages sink has received from Pulsar topic(s) in the last 1 minute.") + .labelNames(metricsLabelNames) + .register(collectorRegistry); + _statTotalRecordsReceived1min = statTotalRecordsReceived1min.labels(metricsLabels); + + statTotalSysExceptions1min = Counter.build() + .name(PULSAR_SINK_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min) + .help("Total number of system exceptions in the last 1 minute.") + .labelNames(metricsLabelNames) + .register(collectorRegistry); + _statTotalSysExceptions1min = statTotalSysExceptions1min.labels(metricsLabels); + + statTotalSinkExceptions1min = Counter.build() + .name(PULSAR_SINK_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL_1min) + .help("Total number of sink exceptions in the last 1 minute.") + .labelNames(metricsLabelNames) + .register(collectorRegistry); + _statTotalSinkExceptions1min = statTotalSinkExceptions1min.labels(metricsLabels); + + statTotalWritten1min = Counter.build() + .name(PULSAR_SINK_METRICS_PREFIX + WRITTEN_TOTAL_1min) + .help("Total number of records processed by sink the last 1 minute.") + .labelNames(metricsLabelNames) + .register(collectorRegistry); + _statTotalWritten1min = statTotalWritten1min.labels(metricsLabels); + + sysExceptions = Gauge.build() + .name(PULSAR_SINK_METRICS_PREFIX + "system_exception") + .labelNames(exceptionMetricsLabelNames) + .help("Exception from system code.") + .register(collectorRegistry); + + sinkExceptions = Gauge.build() + .name(PULSAR_SINK_METRICS_PREFIX + "sink_exception") + .labelNames(exceptionMetricsLabelNames) + .help("Exception from sink.") + .register(collectorRegistry); + + sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); + sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); + } + + @Override + public void reset() { + statTotalRecordsReceived1min.clear(); + _statTotalRecordsReceived1min = statTotalRecordsReceived1min.labels(metricsLabels); + + statTotalSysExceptions1min.clear(); + _statTotalSysExceptions1min = statTotalSysExceptions1min.labels(metricsLabels); + + statTotalSinkExceptions1min.clear(); + _statTotalSinkExceptions1min = statTotalSinkExceptions1min.labels(metricsLabels); + + statTotalWritten1min.clear(); + _statTotalWritten1min = statTotalWritten1min.labels(metricsLabels); + + latestSystemExceptions.clear(); + latestSinkExceptions.clear(); + } + + @Override + public void incrTotalReceived() { + _statTotalRecordsReceived.inc(); + _statTotalRecordsReceived1min.inc(); + } + + @Override + public void incrTotalProcessedSuccessfully() { + _statTotalWritten.inc(); + _statTotalWritten1min.inc(); + } + + @Override + public void incrSysExceptions(Throwable ex) { + long ts = System.currentTimeMillis(); + InstanceCommunication.FunctionStatus.ExceptionInformation info = getExceptionInfo(ex, ts); + latestSystemExceptions.add(info); + + // report exception throw prometheus + if (sysExceptionRateLimiter.tryAcquire()) { + String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2); + exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : ""; + exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts); + sysExceptions.labels(exceptionMetricsLabels).set(1.0); + } + } + + @Override + public void incrUserExceptions(Exception ex) { + incrSysExceptions(ex); + } + + @Override + public void incrSourceExceptions(Exception ex) { + incrSysExceptions(ex); + } + + @Override + public void incrSinkExceptions(Exception ex) { + long ts = System.currentTimeMillis(); + InstanceCommunication.FunctionStatus.ExceptionInformation info = getExceptionInfo(ex, ts); + latestSinkExceptions.add(info); + + // report exception throw prometheus + if (sinkExceptionRateLimiter.tryAcquire()) { + String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2); + exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : ""; + exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts); + sinkExceptions.labels(exceptionMetricsLabels).set(1.0); + } + } + + @Override + public void setLastInvocation(long ts) { + _statlastInvocation.set(ts); + } + + @Override + public void processTimeStart() { + //no-p[ + } + + @Override + public void processTimeEnd() { + //no-op + } + + @Override + public double getTotalProcessedSuccessfully() { + return _statTotalWritten.get(); + } + + @Override + public double getTotalRecordsReceived() { + return _statTotalRecordsReceived.get(); + } + + @Override + public double getTotalSysExceptions() { + return _statTotalSysExceptions.get(); + } + + @Override + public double getTotalUserExceptions() { + return 0; + } + + @Override + public double getLastInvocation() { + return _statlastInvocation.get(); + } + + @Override + public double getAvgProcessLatency() { + return 0; + } + + @Override + public double getTotalProcessedSuccessfully1min() { + return _statTotalWritten1min.get(); + } + + @Override + public double getTotalRecordsReceived1min() { + return _statTotalRecordsReceived1min.get(); + } + + @Override + public double getTotalSysExceptions1min() { + return _statTotalSysExceptions1min.get(); + } + + @Override + public double getTotalUserExceptions1min() { + return 0; + } + + @Override + public double getAvgProcessLatency1min() { + return 0; + } + + @Override + public EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> getLatestUserExceptions() { + return EMPTY_QUEUE; + } + + @Override + public EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> getLatestSystemExceptions() { + return latestSystemExceptions; + } + + @Override + public EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> getLatestSourceExceptions() { + return EMPTY_QUEUE; + } + + @Override + public EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> getLatestSinkExceptions() { + return latestSinkExceptions; + } +} diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java new file mode 100644 index 0000000..5679f2e --- /dev/null +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java @@ -0,0 +1,339 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.instance.stats; + +import com.google.common.collect.EvictingQueue; +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.Counter; +import io.prometheus.client.Gauge; +import lombok.Getter; +import org.apache.pulsar.common.util.RateLimiter; +import org.apache.pulsar.functions.proto.InstanceCommunication; + +import java.util.Arrays; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class SourceStatsManager extends ComponentStatsManager { + + public static final String PULSAR_SOURCE_METRICS_PREFIX = "pulsar_source_"; + + /** Declare metric names **/ + public static final String SYSTEM_EXCEPTIONS_TOTAL = "system_exceptions_total"; + public static final String SOURCE_EXCEPTIONS_TOTAL = "source_exceptions_total"; + public static final String LAST_INVOCATION = "last_invocation"; + public static final String RECEIVED_TOTAL = "received_total"; + public static final String WRITTEN_TOTAL = "written_total"; + + public static final String SYSTEM_EXCEPTIONS_TOTAL_1min = "system_exceptions_total_1min"; + public static final String SOURCE_EXCEPTIONS_TOTAL_1min = "source_exceptions_total_1min"; + public static final String RECEIVED_TOTAL_1min = "received_total_1min"; + public static final String WRITTEN_TOTAL_1min = "written_total_1min"; + + /** Declare Prometheus stats **/ + + private final Counter statTotalRecordsReceived; + + private final Counter statTotalSysExceptions; + + private final Counter statTotalSourceExceptions; + + private final Counter statTotalWritten; + + private final Gauge statlastInvocation; + + // windowed metrics + private final Counter statTotalRecordsReceived1min; + + private final Counter statTotalSysExceptions1min; + + private final Counter statTotalSourceExceptions1min; + + private final Counter statTotalWritten1min; + + // exceptions + + final Gauge sysExceptions; + + final Gauge sourceExceptions; + + // As an optimization + private final Counter.Child _statTotalRecordsReceived; + private final Counter.Child _statTotalSysExceptions; + private final Counter.Child _statTotalSourceExceptions; + private final Counter.Child _statTotalWritten; + private final Gauge.Child _statlastInvocation; + + private Counter.Child _statTotalRecordsReceived1min; + private Counter.Child _statTotalSysExceptions1min; + private Counter.Child _statTotalSourceExceptions1min; + private Counter.Child _statTotalWritten1min; + + @Getter + private EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> latestSystemExceptions = EvictingQueue.create(10); + @Getter + private EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> latestSourceExceptions = EvictingQueue.create(10); + + protected final RateLimiter sysExceptionRateLimiter; + + protected final RateLimiter sourceExceptionRateLimiter; + + public SourceStatsManager(CollectorRegistry collectorRegistry, String[] metricsLabels, ScheduledExecutorService + scheduledExecutorService) { + super(collectorRegistry, metricsLabels, scheduledExecutorService); + + statTotalRecordsReceived = Counter.build() + .name(PULSAR_SOURCE_METRICS_PREFIX + RECEIVED_TOTAL) + .help("Total number of records received from source.") + .labelNames(metricsLabelNames) + .register(collectorRegistry); + _statTotalRecordsReceived = statTotalRecordsReceived.labels(metricsLabels); + + statTotalSysExceptions = Counter.build() + .name(PULSAR_SOURCE_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL) + .help("Total number of system exceptions.") + .labelNames(metricsLabelNames) + .register(collectorRegistry); + _statTotalSysExceptions = statTotalSysExceptions.labels(metricsLabels); + + statTotalSourceExceptions = Counter.build() + .name(PULSAR_SOURCE_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL) + .help("Total number of source exceptions.") + .labelNames(metricsLabelNames) + .register(collectorRegistry); + _statTotalSourceExceptions = statTotalSourceExceptions.labels(metricsLabels); + + statTotalWritten = Counter.build() + .name(PULSAR_SOURCE_METRICS_PREFIX + WRITTEN_TOTAL) + .help("Total number of records written to a Pulsar topic.") + .labelNames(metricsLabelNames) + .register(collectorRegistry); + _statTotalWritten = statTotalWritten.labels(metricsLabels); + + statlastInvocation = Gauge.build() + .name(PULSAR_SOURCE_METRICS_PREFIX + LAST_INVOCATION) + .help("The timestamp of the last invocation of the source.") + .labelNames(metricsLabelNames) + .register(collectorRegistry); + _statlastInvocation = statlastInvocation.labels(metricsLabels); + + statTotalRecordsReceived1min = Counter.build() + .name(PULSAR_SOURCE_METRICS_PREFIX + RECEIVED_TOTAL_1min) + .help("Total number of records received from source in the last 1 minute.") + .labelNames(metricsLabelNames) + .register(collectorRegistry); + _statTotalRecordsReceived1min = statTotalRecordsReceived1min.labels(metricsLabels); + + statTotalSysExceptions1min = Counter.build() + .name(PULSAR_SOURCE_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min) + .help("Total number of system exceptions in the last 1 minute.") + .labelNames(metricsLabelNames) + .register(collectorRegistry); + _statTotalSysExceptions1min = statTotalSysExceptions1min.labels(metricsLabels); + + statTotalSourceExceptions1min = Counter.build() + .name(PULSAR_SOURCE_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL_1min) + .help("Total number of source exceptions in the last 1 minute.") + .labelNames(metricsLabelNames) + .register(collectorRegistry); + _statTotalSourceExceptions1min = statTotalSourceExceptions1min.labels(metricsLabels); + + statTotalWritten1min = Counter.build() + .name(PULSAR_SOURCE_METRICS_PREFIX + WRITTEN_TOTAL_1min) + .help("Total number of records written to a Pulsar topic in the last 1 minute.") + .labelNames(metricsLabelNames) + .register(collectorRegistry); + _statTotalWritten1min = statTotalWritten1min.labels(metricsLabels); + + sysExceptions = Gauge.build() + .name(PULSAR_SOURCE_METRICS_PREFIX + "system_exception") + .labelNames(exceptionMetricsLabelNames) + .help("Exception from system code.") + .register(collectorRegistry); + + sourceExceptions = Gauge.build() + .name(PULSAR_SOURCE_METRICS_PREFIX + "source_exception") + .labelNames(exceptionMetricsLabelNames) + .help("Exception from source.") + .register(collectorRegistry); + + sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); + sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); + } + + @Override + public void reset() { + statTotalRecordsReceived1min.clear(); + _statTotalRecordsReceived1min = statTotalRecordsReceived1min.labels(metricsLabels); + + statTotalSysExceptions1min.clear(); + _statTotalSysExceptions1min = statTotalSysExceptions1min.labels(metricsLabels); + + statTotalSourceExceptions1min.clear(); + _statTotalSourceExceptions1min = statTotalSourceExceptions1min.labels(metricsLabels); + + statTotalWritten1min.clear(); + _statTotalWritten1min = statTotalWritten1min.labels(metricsLabels); + + latestSystemExceptions.clear(); + latestSourceExceptions.clear(); + } + + @Override + public void incrTotalReceived() { + _statTotalRecordsReceived.inc(); + _statTotalRecordsReceived1min.inc(); + } + + @Override + public void incrTotalProcessedSuccessfully() { + _statTotalWritten.inc(); + _statTotalWritten1min.inc(); + } + + @Override + public void incrSysExceptions(Throwable ex) { + long ts = System.currentTimeMillis(); + InstanceCommunication.FunctionStatus.ExceptionInformation info = getExceptionInfo(ex, ts); + latestSystemExceptions.add(info); + + // report exception throw prometheus + if (sysExceptionRateLimiter.tryAcquire()) { + String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2); + exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : ""; + exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts); + sysExceptions.labels(exceptionMetricsLabels).set(1.0); + } + } + + @Override + public void incrUserExceptions(Exception ex) { + incrSysExceptions(ex); + } + + @Override + public void incrSourceExceptions(Exception ex) { + long ts = System.currentTimeMillis(); + InstanceCommunication.FunctionStatus.ExceptionInformation info = getExceptionInfo(ex, ts); + latestSourceExceptions.add(info); + + // report exception throw prometheus + if (sourceExceptionRateLimiter.tryAcquire()) { + String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2); + exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : ""; + exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts); + sourceExceptions.labels(exceptionMetricsLabels).set(1.0); + } + } + + @Override + public void incrSinkExceptions(Exception ex) { + incrSysExceptions(ex); + } + + @Override + public void setLastInvocation(long ts) { + _statlastInvocation.set(ts); + } + + @Override + public void processTimeStart() { + //no-op + } + + @Override + public void processTimeEnd() { + //no-op + } + + @Override + public double getTotalProcessedSuccessfully() { + return _statTotalWritten.get(); + } + + @Override + public double getTotalRecordsReceived() { + return _statTotalRecordsReceived.get(); + } + + @Override + public double getTotalSysExceptions() { + return _statTotalSysExceptions.get(); + } + + @Override + public double getTotalUserExceptions() { + return 0; + } + + @Override + public double getLastInvocation() { + return _statlastInvocation.get(); + } + + @Override + public double getAvgProcessLatency() { + return 0; + } + + @Override + public double getTotalProcessedSuccessfully1min() { + return _statTotalWritten1min.get(); + } + + @Override + public double getTotalRecordsReceived1min() { + return _statTotalRecordsReceived1min.get(); + } + + @Override + public double getTotalSysExceptions1min() { + return _statTotalSysExceptions1min.get(); + } + + @Override + public double getTotalUserExceptions1min() { + return 0; + } + + @Override + public double getAvgProcessLatency1min() { + return 0; + } + + @Override + public EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> getLatestUserExceptions() { + return EvictingQueue.create(0); + } + + @Override + public EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> getLatestSystemExceptions() { + return EMPTY_QUEUE; + } + + @Override + public EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> getLatestSourceExceptions() { + return EMPTY_QUEUE; + } + + @Override + public EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> getLatestSinkExceptions() { + return EvictingQueue.create(0); + } +} diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java index 77427f7..278cd5a 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java @@ -39,6 +39,7 @@ import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.functions.instance.state.StateContextImpl; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider; +import org.apache.pulsar.functions.utils.Utils; import org.junit.Before; import org.junit.Test; import org.mockito.Matchers; @@ -75,8 +76,8 @@ public class ContextImplTest { logger, client, new ArrayList<>(), - new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), new String[0] - ); + new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), new String[0], + Utils.ComponentType.FUNCTION); } @Test(expected = IllegalStateException.class) diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java index 1c5d64d..b50bcd5 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java @@ -364,4 +364,21 @@ public class Utils { String functionName, int instanceId) { return String.format("%s/%s/%s:%d", tenant, namespace, functionName, instanceId); } + + public enum ComponentType { + FUNCTION("Function"), + SOURCE("Source"), + SINK("Sink"); + + private final String componentName; + + ComponentType(String componentName) { + this.componentName = componentName; + } + + @Override + public String toString() { + return componentName; + } + } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java index 0e4b1b1..50afdc6 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java @@ -72,9 +72,9 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.commons.lang3.StringUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.join; import static org.apache.pulsar.functions.utils.Utils.*; -import static org.apache.pulsar.functions.worker.rest.api.ComponentImpl.ComponentType.FUNCTION; -import static org.apache.pulsar.functions.worker.rest.api.ComponentImpl.ComponentType.SINK; -import static org.apache.pulsar.functions.worker.rest.api.ComponentImpl.ComponentType.SOURCE; +import static org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION; +import static org.apache.pulsar.functions.utils.Utils.ComponentType.SINK; +import static org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -122,23 +122,6 @@ import net.jodah.typetools.TypeResolver; @Slf4j public abstract class ComponentImpl { - public enum ComponentType { - FUNCTION("Function"), - SOURCE("Source"), - SINK("Sink"); - - private final String componentName; - - ComponentType(String componentName) { - this.componentName = componentName; - } - - @Override - public String toString() { - return componentName; - } - } - private final AtomicReference<StorageClient> storageClient = new AtomicReference<>(); protected final Supplier<WorkerService> workerServiceSupplier; protected final ComponentType componentType; diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index f3e41a1..be8fd49 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -24,6 +24,7 @@ import org.apache.pulsar.common.policies.data.ExceptionInformation; import org.apache.pulsar.common.policies.data.FunctionStatus; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.InstanceCommunication; +import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.functions.worker.rest.RestException; @@ -199,7 +200,7 @@ public class FunctionsImpl extends ComponentImpl { } public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) { - super(workerServiceSupplier, ComponentType.FUNCTION); + super(workerServiceSupplier, Utils.ComponentType.FUNCTION); } /** diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java index 846900f..4a801b2 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java @@ -26,6 +26,7 @@ import org.apache.pulsar.common.policies.data.SinkStatus; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.utils.SinkConfigUtils; +import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.functions.worker.FunctionMetaDataManager; import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.functions.worker.rest.RestException; @@ -206,7 +207,7 @@ public class SinkImpl extends ComponentImpl { } public SinkImpl(Supplier<WorkerService> workerServiceSupplier) { - super(workerServiceSupplier, ComponentType.SINK); + super(workerServiceSupplier, Utils.ComponentType.SINK); } public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkInstanceStatus(final String tenant, diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java index ac096bb..ec7ba30 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java @@ -26,6 +26,7 @@ import org.apache.pulsar.common.policies.data.SourceStatus; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.utils.SourceConfigUtils; +import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.functions.worker.FunctionMetaDataManager; import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.functions.worker.rest.RestException; @@ -204,7 +205,7 @@ public class SourceImpl extends ComponentImpl { } public SourceImpl(Supplier<WorkerService> workerServiceSupplier) { - super(workerServiceSupplier, ComponentType.SOURCE); + super(workerServiceSupplier, Utils.ComponentType.SOURCE); } public SourceStatus getSourceStatus(final String tenant, diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java index aa22650..d8d54c9 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java @@ -76,6 +76,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import static org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION; import static org.apache.pulsar.functions.utils.Utils.mergeJson; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; @@ -179,7 +180,7 @@ public class FunctionApiV2ResourceTest { FunctionsImpl functions = spy(new FunctionsImpl(() -> mockedWorkerService)); - doReturn(ComponentImpl.ComponentType.FUNCTION).when(functions).calculateSubjectType(any()); + doReturn(FUNCTION).when(functions).calculateSubjectType(any()); this.resource = spy(new FunctionsImplV2(functions)); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java index 580345e..7695993 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java @@ -76,6 +76,9 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import static org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION; +import static org.apache.pulsar.functions.utils.Utils.ComponentType.SINK; +import static org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; @@ -177,7 +180,7 @@ public class FunctionApiV3ResourceTest { when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig); this.resource = spy(new FunctionsImpl(() -> mockedWorkerService)); - doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(any()); + doReturn(FUNCTION).when(this.resource).calculateSubjectType(any()); } // @@ -1383,9 +1386,9 @@ public class FunctionApiV3ResourceTest { FunctionDetails.newBuilder().setName("test-3").build()).build(); functionMetaDataList.add(f3); when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList); - doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1); - doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2); - doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(f3); + doReturn(SOURCE).when(this.resource).calculateSubjectType(f1); + doReturn(FUNCTION).when(this.resource).calculateSubjectType(f2); + doReturn(SINK).when(this.resource).calculateSubjectType(f3); List<String> functionList = listDefaultFunctions(); assertEquals(functions, functionList); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java index bec3ca9..9e55800 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java @@ -47,7 +47,6 @@ import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.functions.worker.request.RequestResult; import org.apache.pulsar.functions.worker.rest.RestException; -import org.apache.pulsar.functions.worker.rest.api.ComponentImpl; import org.apache.pulsar.functions.worker.rest.api.SinkImpl; import org.apache.pulsar.io.cassandra.CassandraStringSink; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; @@ -175,7 +174,7 @@ public class SinkApiV3ResourceTest { when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig); this.resource = spy(new SinkImpl(() -> mockedWorkerService)); - Mockito.doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(any()); + Mockito.doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SINK).when(this.resource).calculateSubjectType(any()); } // @@ -1308,9 +1307,9 @@ public class SinkApiV3ResourceTest { FunctionDetails.newBuilder().setName("test-3").build()).build(); functionMetaDataList.add(f3); when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList); - doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1); - doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2); - doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(f3); + doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1); + doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2); + doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SINK).when(this.resource).calculateSubjectType(f3); List<String> sinkList = listDefaultSinks(); assertEquals(functions, sinkList); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java index 72d23e2..6ba8914 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java @@ -50,7 +50,6 @@ import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.functions.worker.request.RequestResult; import org.apache.pulsar.functions.worker.rest.RestException; -import org.apache.pulsar.functions.worker.rest.api.ComponentImpl; import org.apache.pulsar.functions.worker.rest.api.SourceImpl; import org.apache.pulsar.io.twitter.TwitterFireHose; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; @@ -170,7 +169,7 @@ public class SourceApiV3ResourceTest { when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig); this.resource = spy(new SourceImpl(() -> mockedWorkerService)); - Mockito.doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(any()); + Mockito.doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE).when(this.resource).calculateSubjectType(any()); } // @@ -1322,9 +1321,9 @@ public class SourceApiV3ResourceTest { FunctionDetails.newBuilder().setName("test-3").build()).build(); functionMetaDataList.add(f3); when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList); - doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1); - doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2); - doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(f3); + doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1); + doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2); + doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SINK).when(this.resource).calculateSubjectType(f3); List<String> sourceList = listDefaultSources(); assertEquals(functions, sourceList); diff --git a/pulsar-io/data-genenator/pom.xml b/pulsar-io/data-generator/pom.xml similarity index 100% rename from pulsar-io/data-genenator/pom.xml rename to pulsar-io/data-generator/pom.xml diff --git a/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorPrintSink.java b/pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorPrintSink.java similarity index 100% rename from pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorPrintSink.java rename to pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorPrintSink.java diff --git a/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java b/pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java similarity index 100% rename from pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java rename to pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java diff --git a/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/Person.java b/pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/Person.java similarity index 100% rename from pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/Person.java rename to pulsar-io/data-generator/src/main/java/org/apache/pulsar/io/datagenerator/Person.java diff --git a/pulsar-io/data-genenator/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/data-generator/src/main/resources/META-INF/services/pulsar-io.yaml similarity index 100% rename from pulsar-io/data-genenator/src/main/resources/META-INF/services/pulsar-io.yaml rename to pulsar-io/data-generator/src/main/resources/META-INF/services/pulsar-io.yaml diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml index 628686b..ea433ce 100644 --- a/pulsar-io/pom.xml +++ b/pulsar-io/pom.xml @@ -42,7 +42,7 @@ <module>kinesis</module> <module>hdfs3</module> <module>jdbc</module> - <module>data-genenator</module> + <module>data-generator</module> <module>elastic-search</module> <module>kafka-connect-adaptor</module> <module>debezium</module>