[pulsar] branch master updated (634e9b9 -> f474ff0)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 634e9b9 expose ensemblePlacementPolicy in bookkeeper.conf (#8210) add f474ff0 Upgrade Jcommander to 1.78 (#8206) No new revisions were added by this update. Summary of changes: distribution/server/src/assemble/LICENSE.bin.txt | 2 +- pom.xml | 2 +- .../java/org/apache/pulsar/utils/auth/tokens/TokensCliUtils.java | 5 - .../src/main/java/org/apache/pulsar/admin/cli/CmdBase.java| 6 +- .../src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java | 2 +- .../java/org/apache/pulsar/admin/cli/CmdGenerateDocument.java | 8 ++-- .../src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java | 2 +- .../src/main/java/org/apache/pulsar/admin/cli/CmdSources.java | 2 +- .../main/java/org/apache/pulsar/client/cli/PulsarClientTool.java | 7 ++- pulsar-sql/presto-distribution/LICENSE| 2 +- .../tests/integration/functions/utils/CommandGenerator.java | 2 +- 11 files changed, 28 insertions(+), 12 deletions(-)
[pulsar] branch master updated: [Issue-8162] [pulsar-io] Added org.apache.pulsar.io.core.Context interface (#8164)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 700131b [Issue-8162] [pulsar-io] Added org.apache.pulsar.io.core.Context interface (#8164) 700131b is described below commit 700131bab6bfd7746f40ad362ea96f9f2ef339c4 Author: David Kjerrumgaard <35466513+david-stream...@users.noreply.github.com> AuthorDate: Fri Oct 2 11:30:37 2020 -0700 [Issue-8162] [pulsar-io] Added org.apache.pulsar.io.core.Context interface (#8164) * Added org.apache.pulsar.io.core.Context interface * Added the Context interface * Renamed Context to ConnectorContext Co-authored-by: David Kjerrumgaard --- .../{SinkContext.java => ConnectorContext.java}| 43 +++ .../org/apache/pulsar/io/core/SinkContext.java | 118 +--- .../org/apache/pulsar/io/core/SourceContext.java | 124 + 3 files changed, 18 insertions(+), 267 deletions(-) diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/ConnectorContext.java similarity index 82% copy from pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java copy to pulsar-io/core/src/main/java/org/apache/pulsar/io/core/ConnectorContext.java index f7e3b4d..a06451b 100644 --- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java +++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/ConnectorContext.java @@ -18,63 +18,52 @@ */ package org.apache.pulsar.io.core; -import org.slf4j.Logger; - import java.nio.ByteBuffer; -import java.util.Collection; import java.util.concurrent.CompletableFuture; +import org.slf4j.Logger; + /** - * Interface for a sink connector providing information about environment where it is running. + * Interface for a connector providing information about environment where it is running. * It also allows to propagate information, such as logs, metrics, states, back to the Pulsar environment. */ -public interface SinkContext { +public interface ConnectorContext { /** - * The id of the instance that invokes this sink. + * The id of the instance that invokes this source. * * @return the instance id */ int getInstanceId(); /** - * Get the number of instances that invoke this sink. + * Get the number of instances that invoke this source. * - * @return the number of instances that invoke this sink. + * @return the number of instances that invoke this source. */ int getNumInstances(); - + /** * Record a user defined metric * @param metricName The name of the metric * @param value The value of the metric */ void recordMetric(String metricName, double value); - + /** - * Get a list of all input topics - * @return a list of all input topics - */ -Collection getInputTopics(); - -/** - * The tenant this sink belongs to - * @return the tenant this sink belongs to + * The tenant this source belongs to. + * + * @return the tenant this source belongs to */ String getTenant(); /** - * The namespace this sink belongs to - * @return the namespace this sink belongs to + * The namespace this source belongs to. + * + * @return the namespace this source belongs to */ String getNamespace(); - -/** - * The name of the sink that we are executing - * @return The Sink name - */ -String getSinkName(); - + /** * The logger object that can be used to log in a sink * @return the logger object diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java index f7e3b4d..f145784 100644 --- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java +++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java @@ -18,38 +18,13 @@ */ package org.apache.pulsar.io.core; -import org.slf4j.Logger; - -import java.nio.ByteBuffer; import java.util.Collection; -import java.util.concurrent.CompletableFuture; /** * Interface for a sink connector providing information about environment where it is running. * It also allows to propagate information, such as logs, metrics, states, back to the Pulsar environment. */ -public interface SinkContext { - -/** - * The id of the instance that invokes this sink. - * - * @return the instance id - */ -int getInstanceId(); - -/** - * Get the number of instances that invoke this sink. - * - * @return the number of instances that invoke this sink. - */ -int getNumInstances(); - -/** - * Record a use
[pulsar-client-go] branch master updated (7dd67c9 -> 6f646aa)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git. from 7dd67c9 Fixed buffer resize when writing request on connection (#374) add 6f646aa Fixed deadlock in DLQ ack processing (#375) No new revisions were added by this update. Summary of changes: pulsar/dlq_router.go | 6 +- 1 file changed, 5 insertions(+), 1 deletion(-)
[pulsar-client-go] branch master updated (3c523ba -> 7dd67c9)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git. from 3c523ba Support retry letter topic (#359) add 7dd67c9 Fixed buffer resize when writing request on connection (#374) No new revisions were added by this update. Summary of changes: pulsar/internal/connection.go | 1 + 1 file changed, 1 insertion(+)
[pulsar] branch master updated (5871aba -> f096eb3)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 5871aba removed unavailable parameter from the Java doc (#8104) add f096eb3 Standardize kubernetes client version (#8119) No new revisions were added by this update. Summary of changes: pom.xml | 1 + pulsar-functions/runtime/pom.xml | 2 +- pulsar-functions/secrets/pom.xml | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-)
[pulsar] branch master updated: Add ability to specify EnvironmentBasedSecretsProvider in LocalRunner (#8098)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new fefaf52 Add ability to specify EnvironmentBasedSecretsProvider in LocalRunner (#8098) fefaf52 is described below commit fefaf52a30e89f82079f095e4bddad39f6c17e52 Author: Sanjeev Kulkarni AuthorDate: Wed Sep 23 07:32:37 2020 -0700 Add ability to specify EnvironmentBasedSecretsProvider in LocalRunner (#8098) * Add ability to specify EnvironmentBasedSecretsProvider in LocalRunner * Addressed feedback * Took out wildcard * Fix build * Address feedback Co-authored-by: Sanjeev Kulkarni --- .../org/apache/pulsar/admin/cli/CmdFunctions.java | 4 ++ .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 5 ++ .../org/apache/pulsar/admin/cli/CmdSources.java| 5 ++ .../org/apache/pulsar/functions/LocalRunner.java | 43 +- ...eAndConfigBasedSecretsProviderConfigurator.java | 66 ++ 5 files changed, 120 insertions(+), 3 deletions(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index d28bd69..4792a6f 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -627,6 +627,10 @@ public class CmdFunctions extends CmdBase { protected Integer instanceIdOffset = 0; @Parameter(names = "--runtime", description = "either THREAD or PROCESS. Only applies for Java functions") protected String runtime; +@Parameter(names = "--secrets-provider-classname", description = "Whats the classname for secrets provider") +protected String secretsProviderClassName; +@Parameter(names = "--secrets-provider-config", description = "Config that needs to be passed to secrets provider") +protected String secretsProviderConfig; private void mergeArgs() { if (!StringUtils.isBlank(DEPRECATED_stateStorageServiceUrl)) stateStorageServiceUrl = DEPRECATED_stateStorageServiceUrl; diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index 04bbf94..f6b1823 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -164,6 +164,11 @@ public class CmdSinks extends CmdBase { @Parameter(names = "--tls-trust-cert-path", description = "tls trust cert file path") protected String tlsTrustCertFilePath; +@Parameter(names = "--secrets-provider-classname", description = "Whats the classname for secrets provider") +protected String secretsProviderClassName; +@Parameter(names = "--secrets-provider-config", description = "Config that needs to be passed to secrets provider") +protected String secretsProviderConfig; + private void mergeArgs() { if (!StringUtils.isBlank(DEPRECATED_brokerServiceUrl)) brokerServiceUrl = DEPRECATED_brokerServiceUrl; if (!StringUtils.isBlank(DEPRECATED_clientAuthPlugin)) clientAuthPlugin = DEPRECATED_clientAuthPlugin; diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java index ea61202..9bd49be 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java @@ -165,6 +165,11 @@ public class CmdSources extends CmdBase { @Parameter(names = "--tls-trust-cert-path", description = "tls trust cert file path") protected String tlsTrustCertFilePath; +@Parameter(names = "--secrets-provider-classname", description = "Whats the classname for secrets provider") +protected String secretsProviderClassName; +@Parameter(names = "--secrets-provider-config", description = "Config that needs to be passed to secrets provider") +protected String secretsProviderConfig; + private void mergeArgs() { if (!isBlank(DEPRECATED_brokerServiceUrl)) brokerServiceUrl = DEPRECATED_brokerServiceUrl; if (!isBlank(DEPRECATED_clientAuthPlugin)) clientAuthPlugin = DEPRECATED_clientAuthPlugin; diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalR
[pulsar] branch master updated (ad8abd5 -> ef2a4bd)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from ad8abd5 Set dryrun of KubernetesRuntime is null (#8064) add ef2a4bd Reduce log level to warning for authorization failures (#8088) No new revisions were added by this update. Summary of changes: .../src/main/java/org/apache/pulsar/broker/service/ServerCnx.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-)
[pulsar] branch master updated: Upgrade Bouncy castle to newest version (#8047)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new be35d81 Upgrade Bouncy castle to newest version (#8047) be35d81 is described below commit be35d812e14d950b777cb9e03f155488b54fe485 Author: Sanjeev Kulkarni AuthorDate: Sat Sep 12 22:36:33 2020 -0700 Upgrade Bouncy castle to newest version (#8047) Co-authored-by: Sanjeev Kulkarni --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 80cb7b9..11c2d3b 100644 --- a/pom.xml +++ b/pom.xml @@ -113,7 +113,7 @@ flexible messaging model and an intuitive client API. 1.7.25 3.2.2 2.10.0 -1.60 +1.66 1.0.2 2.11.1 2.11.1
[pulsar] branch master updated (116df8c -> 55098a9)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 116df8c Rollback protobuf version to 3.5.1 (#8014) add 55098a9 Fix log level for auth failure and not found paths (#8018) No new revisions were added by this update. Summary of changes: .../apache/pulsar/broker/service/ServerCnx.java| 4 +-- .../pulsar/broker/web/PulsarWebResource.java | 5 ++- .../functions/worker/rest/api/ComponentImpl.java | 42 +++--- .../functions/worker/rest/api/SinksImpl.java | 4 +-- .../functions/worker/rest/api/SourcesImpl.java | 4 +-- 5 files changed, 31 insertions(+), 28 deletions(-)
[pulsar] branch master updated (4a441db -> bd4a830)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 4a441db Support topic-level inactiveTopicPolicies (#7986) add bd4a830 Implement better error handling during close() of a batch source (#7984) No new revisions were added by this update. Summary of changes: .../source/batch/BatchSourceExecutor.java | 30 +++--- 1 file changed, 27 insertions(+), 3 deletions(-)
[pulsar] branch master updated (23d795c -> 809c6be)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 23d795c Fix pending batchIndexAcks bitSet batchSize in PersistentAcknowledgmentsGroupingTracker (#7828) add 809c6be Limit cpu count for proxy unit test cases (#7845) No new revisions were added by this update. Summary of changes: .../workflows/ci-integration-backwards-compatibility.yaml| 2 +- .github/workflows/ci-integration-cli.yaml| 2 +- .github/workflows/ci-integration-function-state.yaml | 2 +- .github/workflows/ci-integration-messaging.yaml | 2 +- .github/workflows/ci-integration-process.yaml| 2 +- .github/workflows/ci-integration-schema.yaml | 2 +- .github/workflows/ci-integration-sql.yaml| 2 +- .github/workflows/ci-integration-standalone.yaml | 2 +- .github/workflows/ci-integration-thread.yaml | 2 +- .github/workflows/ci-integration-tiered-filesystem.yaml | 2 +- .github/workflows/ci-integration-tiered-jcloud.yaml | 2 +- .github/workflows/ci-license.yaml| 2 +- .github/workflows/ci-unit-broker-broker-gp1.yaml | 6 +- .github/workflows/ci-unit-broker-client-api.yaml | 12 pulsar-client-cpp/python/setup.py| 4 ++-- .../apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java| 3 +++ 16 files changed, 30 insertions(+), 19 deletions(-)
[pulsar] branch master updated (23d795c -> 809c6be)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 23d795c Fix pending batchIndexAcks bitSet batchSize in PersistentAcknowledgmentsGroupingTracker (#7828) add 809c6be Limit cpu count for proxy unit test cases (#7845) No new revisions were added by this update. Summary of changes: .../workflows/ci-integration-backwards-compatibility.yaml| 2 +- .github/workflows/ci-integration-cli.yaml| 2 +- .github/workflows/ci-integration-function-state.yaml | 2 +- .github/workflows/ci-integration-messaging.yaml | 2 +- .github/workflows/ci-integration-process.yaml| 2 +- .github/workflows/ci-integration-schema.yaml | 2 +- .github/workflows/ci-integration-sql.yaml| 2 +- .github/workflows/ci-integration-standalone.yaml | 2 +- .github/workflows/ci-integration-thread.yaml | 2 +- .github/workflows/ci-integration-tiered-filesystem.yaml | 2 +- .github/workflows/ci-integration-tiered-jcloud.yaml | 2 +- .github/workflows/ci-license.yaml| 2 +- .github/workflows/ci-unit-broker-broker-gp1.yaml | 6 +- .github/workflows/ci-unit-broker-client-api.yaml | 12 pulsar-client-cpp/python/setup.py| 4 ++-- .../apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java| 3 +++ 16 files changed, 30 insertions(+), 19 deletions(-)
[pulsar] branch master updated: Added ability to specify producer config for functions and sources (#7721)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 22d7a6c Added ability to specify producer config for functions and sources (#7721) 22d7a6c is described below commit 22d7a6cbcc36c79be64d3f39f707139ab43889c6 Author: Sanjeev Kulkarni AuthorDate: Tue Aug 4 18:34:58 2020 -0700 Added ability to specify producer config for functions and sources (#7721) * Added ability to specify producer config for functions and sources * Fixed test * Fix test * Add generated function proto * Add header * Address comments Co-authored-by: Sanjeev Kulkarni --- .../pulsar/common/functions/FunctionConfig.java| 3 + .../pulsar/common/functions/ProducerConfig.java| 35 +- .../org/apache/pulsar/common/io/SourceConfig.java | 3 + .../pulsar/functions/instance/ContextImpl.java | 8 + .../functions/instance/JavaInstanceRunnable.java | 4 + .../apache/pulsar/functions/sink/PulsarSink.java | 8 + .../pulsar/functions/sink/PulsarSinkConfig.java| 2 + .../instance/src/main/python/Function_pb2.py | 610 - .../proto/src/main/proto/Function.proto| 6 + .../functions/utils/FunctionConfigUtils.java | 25 +- .../pulsar/functions/utils/SourceConfigUtils.java | 22 + .../functions/utils/FunctionConfigUtilsTest.java | 13 +- .../functions/utils/SourceConfigUtilsTest.java | 6 + 13 files changed, 589 insertions(+), 156 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java index 8c680e8..fa925c4 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java @@ -77,6 +77,9 @@ public class FunctionConfig { private String output; +// Any configuration that need to be applied for producers +private ProducerConfig producerConfig; + /** * Represents either a builtin schema type (eg: 'avro', 'json', ect) or the class name for a Schema * implementation. diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java similarity index 59% copy from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java copy to pulsar-common/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java index 4e47812..b28370e 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java @@ -16,24 +16,23 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.functions.sink; +package org.apache.pulsar.common.functions; -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; -import org.apache.pulsar.common.functions.FunctionConfig; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; -import java.util.Map; - -@Getter -@Setter -@ToString -public class PulsarSinkConfig { -private FunctionConfig.ProcessingGuarantees processingGuarantees; -private String topic; -private String serdeClassName; -private String schemaType; -private Map schemaProperties; -private String typeClassName; -private boolean forwardSourceMessageProperty; +/** + * Configuration of the producer inside the function. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@EqualsAndHashCode +public class ProducerConfig { +private Integer maxPendingMessages; +private Integer maxPendingMessagesAcrossPartitions; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java index b3a5634..31a8634 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java @@ -24,6 +24,7 @@ import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.common.functions.ProducerConfig; import org.apache.pulsar.common.functions.Resources; /** @@ -41,6 +42,8 @@ public class SourceConfig { private String topicName; +private ProducerConfig producerConfig; + private String serdeClassName; private String schemaType; diff
[pulsar] branch master updated: create producer before test to make sure the namespace onload finished. (#7748)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 739bd1e create producer before test to make sure the namespace onload finished. (#7748) 739bd1e is described below commit 739bd1ed0e292fa760acc6fda3b0a28871cd2bc0 Author: ran AuthorDate: Wed Aug 5 06:31:39 2020 +0800 create producer before test to make sure the namespace onload finished. (#7748) --- .../java/org/apache/pulsar/broker/admin/TopicBacklogQuotaTest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicBacklogQuotaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicBacklogQuotaTest.java index 1104ab6..c892eca 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicBacklogQuotaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicBacklogQuotaTest.java @@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.service.BacklogQuotaManager; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.ClusterData; @@ -71,6 +72,8 @@ public class TopicBacklogQuotaTest extends MockedPulsarServiceBaseTest { admin.tenants().createTenant(this.testTenant, tenantInfo); admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Sets.newHashSet("test")); admin.topics().createPartitionedTopic(backlogQuotaTopic, 2); +Producer producer = pulsarClient.newProducer().topic(testTenant + "/" + testNamespace + "/" + "lookup-topic").create(); +producer.close(); } @AfterMethod
[pulsar] branch master updated: Fixed race condition on deleting topic with active readers (#7715)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 03131fb Fixed race condition on deleting topic with active readers (#7715) 03131fb is described below commit 03131fb7f4d7bc3bb551acbc4d5117465ad22b53 Author: Matteo Merli AuthorDate: Sun Aug 2 12:47:55 2020 -0700 Fixed race condition on deleting topic with active readers (#7715) --- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 10 +- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index eaf06fc..02d8453 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -94,6 +94,7 @@ import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException; +import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorNotFoundException; import org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositionException; import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlreadyClosedException; import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException; @@ -2229,7 +2230,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { @Override public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) { -log.warn("[{}] Failed to delete cursor {} : {}", name, cursor, exception); +if (exception instanceof CursorNotFoundException) { +// This could happen if a cursor is getting deleted while we are deleting the topic +// Treating this as a "success" case, since the cursor is gone in any case. +deleteCursorComplete(ctx); +return; +} + +log.warn("[{}] Failed to delete cursor {}: {}", name, cursor, exception.getMessage(), exception); cursorDeleteException.compareAndSet(null, exception); if (cursorsToDelete.decrementAndGet() == 0) { // Trigger callback only once
[pulsar] branch master updated: Call open before discover. This will avoid discover being called before open (#7703)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new bbd5fa1 Call open before discover. This will avoid discover being called before open (#7703) bbd5fa1 is described below commit bbd5fa1d14f6f8f18a0d94aaae0ce2ff4413eb18 Author: Sanjeev Kulkarni AuthorDate: Thu Jul 30 15:00:58 2020 -0700 Call open before discover. This will avoid discover being called before open (#7703) Co-authored-by: Sanjeev Kulkarni --- .../src/main/java/org/apache/pulsar/io/batch/BatchSourceExecutor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-io/batch/src/main/java/org/apache/pulsar/io/batch/BatchSourceExecutor.java b/pulsar-io/batch/src/main/java/org/apache/pulsar/io/batch/BatchSourceExecutor.java index e624db9..04d87fa 100644 --- a/pulsar-io/batch/src/main/java/org/apache/pulsar/io/batch/BatchSourceExecutor.java +++ b/pulsar-io/batch/src/main/java/org/apache/pulsar/io/batch/BatchSourceExecutor.java @@ -122,12 +122,12 @@ public class BatchSourceExecutor implements Source { // This is the first thing to do to ensure that any tasks discovered during the discover // phase are not lost setupInstanceSubscription(); +batchSource.open(this.config, this.sourceContext); if (sourceContext.getInstanceId() == 0) { discoveryTriggerer.init(batchSourceConfig.getDiscoveryTriggererConfig(), this.sourceContext); discoveryTriggerer.start(this::triggerDiscover); } -batchSource.open(this.config, this.sourceContext); } private void triggerDiscover(String discoveredEvent) {
[pulsar] branch master updated: Use available cores for io thread processing (#7689)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 89b2eff Use available cores for io thread processing (#7689) 89b2eff is described below commit 89b2effee264f93893cb76f773c78db68080850c Author: Sanjeev Kulkarni AuthorDate: Wed Jul 29 10:36:44 2020 -0700 Use available cores for io thread processing (#7689) Co-authored-by: Sanjeev Kulkarni --- .../org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java index ce03a27..5a60f64 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java @@ -101,6 +101,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory { clientBuilder.enableTlsHostnameVerification(authConfig.isTlsHostnameVerificationEnable()); clientBuilder.tlsTrustCertsFilePath(authConfig.getTlsTrustCertsFilePath()); } + clientBuilder.ioThreads(Runtime.getRuntime().availableProcessors()); return clientBuilder.build(); } return null;
[pulsar] branch master updated (e2bf486 -> c76fd37)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from e2bf486 Allow topic compaction to be disabled in Pulsar Functions (#7677) add c76fd37 Added ability to specify runtime for localrunner (#7681) No new revisions were added by this update. Summary of changes: .../src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java | 2 ++ 1 file changed, 2 insertions(+)
[pulsar] branch master updated: fix: batch source able to be submitted (#7659)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new ee39e40 fix: batch source able to be submitted (#7659) ee39e40 is described below commit ee39e40aa83c7d6779ac35509f0ae38ee167d23e Author: Boyang Jerry Peng AuthorDate: Fri Jul 24 21:42:25 2020 -0700 fix: batch source able to be submitted (#7659) * fix: batch source able to be submitted * fix logic Co-authored-by: Jerry Peng --- .../java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java | 7 --- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java index fcddf5c..bdcab9d 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java @@ -39,6 +39,7 @@ import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.common.util.Reflections; import org.apache.pulsar.functions.utils.Exceptions; +import org.apache.pulsar.io.core.BatchSource; import org.apache.pulsar.io.core.Sink; import org.apache.pulsar.io.core.Source; import org.apache.pulsar.io.core.annotations.FieldDoc; @@ -65,9 +66,9 @@ public class ConnectorUtils { try { // Try to load source class and check it implements Source interface Class sourceClass = ncl.loadClass(conf.getSourceClass()); -if (!(Source.class.isAssignableFrom(sourceClass))) { -throw new IOException("Class " + conf.getSourceClass() + " does not implement interface " -+ Source.class.getName()); +if (!(Source.class.isAssignableFrom(sourceClass) || BatchSource.class.isAssignableFrom(sourceClass))) { +throw new IOException(String.format("Class %s does not implement interface %s or %s", + conf.getSourceClass(), Source.class.getName(), BatchSource.class.getName())); } } catch (Throwable t) { Exceptions.rethrowIOException(t);
[pulsar] branch master updated (ff4c3f6 -> e21db6e)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from ff4c3f6 Allow ability to specify retain key ordering in functions (#7647) add e21db6e Improve InvalidTopicName Error message (#7609) No new revisions were added by this update. Summary of changes: .../java/org/apache/pulsar/client/impl/PulsarClientImpl.java | 10 ++ 1 file changed, 6 insertions(+), 4 deletions(-)
[pulsar] branch master updated: Allow ability to specify retain key ordering in functions (#7647)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new ff4c3f6 Allow ability to specify retain key ordering in functions (#7647) ff4c3f6 is described below commit ff4c3f65367526d3bdbcd51a55c7be675ed10de3 Author: Sanjeev Kulkarni AuthorDate: Fri Jul 24 13:31:04 2020 -0700 Allow ability to specify retain key ordering in functions (#7647) * Allow ability to specify retain key ordering in functions * Address comments Co-authored-by: Sanjeev Kulkarni --- .../pulsar/common/functions/FunctionConfig.java| 4 ++ .../functions/instance/JavaInstanceRunnable.java | 3 ++ .../proto/src/main/proto/Function.proto| 3 ++ .../functions/utils/FunctionConfigUtils.java | 53 -- .../functions/utils/FunctionConfigUtilsTest.java | 10 5 files changed, 60 insertions(+), 13 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java index 97de6ad..8c680e8 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java @@ -86,7 +86,11 @@ public class FunctionConfig { private String outputSerdeClassName; private String logTopic; private ProcessingGuarantees processingGuarantees; +// Do we want function instances to process data in the same order as in the input topics +// This essentially means that every partition of input topic is consumed by only one instance private Boolean retainOrdering; +// Do we want the same function instance to process all data keyed by the input topic's message key +private Boolean retainKeyOrdering; private Boolean forwardSourceMessageProperty; private Map userConfig; // This is a map of secretName(aka how the secret is going to be diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 753cc17..06dbf19 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -722,6 +722,9 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { case FAILOVER: pulsarSourceConfig.setSubscriptionType(SubscriptionType.Failover); break; +case KEY_SHARED: + pulsarSourceConfig.setSubscriptionType(SubscriptionType.Key_Shared); +break; default: pulsarSourceConfig.setSubscriptionType(SubscriptionType.Shared); break; diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto index db834a5..f39fdfd 100644 --- a/pulsar-functions/proto/src/main/proto/Function.proto +++ b/pulsar-functions/proto/src/main/proto/Function.proto @@ -32,6 +32,7 @@ enum ProcessingGuarantees { enum SubscriptionType { SHARED = 0; FAILOVER = 1; +KEY_SHARED = 2; } enum SubscriptionPosition { @@ -84,6 +85,8 @@ message FunctionDetails { /* If specified, this will refer to an archive that is * already present in the server */ string builtin = 20; +bool retainOrdering = 21; +bool retainKeyOrdering = 22; } message ConsumerSpec { diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java index 169ffc1..342ba27 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java @@ -138,11 +138,16 @@ public class FunctionConfigUtils { }); } -// Set subscription type based on ordering and EFFECTIVELY_ONCE semantics -Function.SubscriptionType subType = ((functionConfig.getRetainOrdering() != null && functionConfig.getRetainOrdering()) -|| FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE.equals(functionConfig.getProcessingGuarantees())) -? Function.SubscriptionType.FAILOVER -: Function.SubscriptionType.SHARED; +// Set subscription type +Function.SubscriptionType subType; +if ((functionConfig.getRetainOrdering()
[pulsar] branch master updated: Allow null consume in BatchPushSource (#7573)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 4e1a677 Allow null consume in BatchPushSource (#7573) 4e1a677 is described below commit 4e1a677171ab40504bd6306dee832cf900ed4472 Author: Sanjeev Kulkarni AuthorDate: Fri Jul 17 15:19:36 2020 -0700 Allow null consume in BatchPushSource (#7573) * Added upgrade notes * Allow null message to be passed * More private impl * Fix unittest * Address comments Co-authored-by: Sanjeev Kulkarni --- .../pulsar/io/batch/BatchSourceExecutorTest.java | 145 +++-- .../org/apache/pulsar/io/core/BatchPushSource.java | 25 +++- 2 files changed, 151 insertions(+), 19 deletions(-) diff --git a/pulsar-io/batch/src/test/java/org/apache/pulsar/io/batch/BatchSourceExecutorTest.java b/pulsar-io/batch/src/test/java/org/apache/pulsar/io/batch/BatchSourceExecutorTest.java index d9d4112..a9d9ef3 100644 --- a/pulsar-io/batch/src/test/java/org/apache/pulsar/io/batch/BatchSourceExecutorTest.java +++ b/pulsar-io/batch/src/test/java/org/apache/pulsar/io/batch/BatchSourceExecutorTest.java @@ -25,6 +25,7 @@ import org.apache.pulsar.client.api.*; import org.apache.pulsar.common.io.BatchSourceConfig; import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.BatchPushSource; import org.apache.pulsar.io.core.BatchSource; import org.apache.pulsar.io.core.BatchSourceTriggerer; import org.apache.pulsar.io.core.SourceContext; @@ -90,6 +91,46 @@ public class BatchSourceExecutorTest { } } +public static class TestBatchPushSource extends BatchPushSource { +@Getter +private static int prepareCount; +@Getter +private static int discoverCount; +@Getter +private static int recordCount; +private Record record = Mockito.mock(Record.class); +public TestBatchPushSource() { } + +@Override +public void open(Map config, SourceContext context) throws Exception { +if (!config.containsKey("foo")) { +throw new IllegalArgumentException("Bad config passed to TestBatchPushSource"); +} +} + +@Override +public void discover(Consumer taskEater) throws Exception { +byte[] retval = new byte[10]; +discoverCount++; +taskEater.accept(retval); +} + +@Override +public void prepare(byte[] task) throws Exception { +prepareCount++; +for (int i = 0; i < 5; ++i) { +consume(record); +++recordCount; +} +consume(null); +} + +@Override +public void close() throws Exception { + +} +} + public static class TestDiscoveryTriggerer implements BatchSourceTriggerer { private Consumer trigger; private Thread thread; @@ -121,17 +162,21 @@ public class BatchSourceExecutorTest { @Override public void stop() { -thread.interrupt(); -try { -thread.join(); -} catch (Exception e) { +if (thread != null) { +thread.interrupt(); +try { +thread.join(); +} catch (Exception e) { +} } } } private TestBatchSource testBatchSource; +private TestBatchPushSource testBatchPushSource; private BatchSourceConfig testBatchConfig; private Map config; +private Map pushConfig; private BatchSourceExecutor batchSourceExecutor; private SourceContext context; private ConsumerBuilder consumerBuilder; @@ -140,20 +185,32 @@ public class BatchSourceExecutorTest { private CyclicBarrier discoveryBarrier; private Message discoveredTask; -@BeforeMethod -public void setUp() throws Exception { -testBatchSource = new TestBatchSource(); -batchSourceExecutor = new BatchSourceExecutor<>(); -context = Mockito.mock(SourceContext.class); -config = new HashMap<>(); +private static Map createConfig(String className, BatchSourceConfig batchConfig) { +Map config = new HashMap<>(); config.put("foo", "bar"); -testBatchConfig = new BatchSourceConfig(); +config.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(batchConfig)); +config.put(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY, className); +return config; +} + +private static BatchSourceConfig createBatchSourceConfig() { +BatchSourceConfig testBatchConfig = new BatchSourceConfig(); testBatchConfig.setDiscoveryTriggererClassName(TestDiscoveryT
[pulsar] branch master updated (f8b2a23 -> d2b866b)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from f8b2a23 Handle NotAllowed Exception at the client side. (#7430) add d2b866b Update Jersey to 2.31 (#7515) No new revisions were added by this update. Summary of changes: distribution/server/licenses/LICENSE-EPL-2.0.txt | 277 +++ distribution/server/src/assemble/LICENSE.bin.txt | 44 ++-- pom.xml | 4 +- pulsar-client-admin-shaded/pom.xml | 2 +- pulsar-client-all/pom.xml| 2 +- pulsar-sql/presto-distribution/LICENSE | 6 +- 6 files changed, 310 insertions(+), 25 deletions(-) create mode 100644 distribution/server/licenses/LICENSE-EPL-2.0.txt
[pulsar] branch master updated (2a8c8c6 -> b8cba18)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 2a8c8c6 Add test group tags for functions and state integration tests (#7529) add b8cba18 Add limited module set profile to the pom files (#7541) No new revisions were added by this update. Summary of changes: README.md| 7 +- distribution/pom.xml | 28 ++-- pom.xml | 178 +-- pulsar-io/pom.xml| 86 - 4 files changed, 197 insertions(+), 102 deletions(-)
[pulsar] branch master updated (eef63de -> 2ab520b)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from eef63de Fix: function BC issue introduced in 2.6 (#7528) add 2ab520b Metadata compact config (#7532) No new revisions were added by this update. Summary of changes: conf/functions_worker.yml | 2 ++ 1 file changed, 2 insertions(+)
[pulsar] branch master updated (c94067d -> f8ee8de)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from c94067d Add BatchPushSource interface (#7493) add f8ee8de Rejigger contract between LeaderService and rest of components (#7520) No new revisions were added by this update. Summary of changes: .../worker/PulsarFunctionE2ESecurityTest.java | 4 +- .../worker/PulsarFunctionPublishTest.java | 4 +- .../apache/pulsar/io/PulsarFunctionE2ETest.java| 6 +- .../functions/worker/FunctionMetaDataManager.java | 67 +++--- .../pulsar/functions/worker/LeaderService.java | 40 + .../pulsar/functions/worker/SchedulerManager.java | 4 -- .../functions/worker/rest/api/FunctionsImpl.java | 4 ++ 7 files changed, 59 insertions(+), 70 deletions(-)
[pulsar] branch master updated: Add BatchPushSource interface (#7493)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new c94067d Add BatchPushSource interface (#7493) c94067d is described below commit c94067d10b0397182c466a34fe953f8e5f101731 Author: Sanjeev Kulkarni AuthorDate: Sun Jul 12 14:28:21 2020 -0700 Add BatchPushSource interface (#7493) * Added upgrade notes * Add a simple BatchPushSource akin to PushSource Co-authored-by: Sanjeev Kulkarni --- .../org/apache/pulsar/io/core/BatchPushSource.java | 68 ++ 1 file changed, 68 insertions(+) diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java new file mode 100644 index 000..b056238 --- /dev/null +++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.core; + +import org.apache.pulsar.functions.api.Record; + +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.Consumer; + +/** + * Pulsar's Batch Push Source interface. Batch Push Sources have the same lifecycle + * as the regular BatchSource, aka discover, prepare. The reason its called Push is + * because BatchPushSource can emit a record using the consume method that they + * invoke whenever they have data to be published to Pulsar. + */ +public abstract class BatchPushSource implements BatchSource { + +private LinkedBlockingQueue> queue; +private static final int DEFAULT_QUEUE_LENGTH = 1000; + +public BatchPushSource() { +this.queue = new LinkedBlockingQueue<>(this.getQueueLength()); +} + +@Override +public Record readNext() throws Exception { +return queue.take(); +} + +/** + * Send this message to be written to Pulsar. + * + * @param record next message from source which should be sent to a Pulsar topic + */ +public void consume(Record record) { +try { +queue.put(record); +} catch (InterruptedException e) { +throw new RuntimeException(e); +} +} + +/** + * Get length of the queue that records are push onto + * Users can override this method to customize the queue length + * @return queue length + */ +public int getQueueLength() { +return DEFAULT_QUEUE_LENGTH; +} +} \ No newline at end of file
[pulsar] branch master updated: Differentiate authorization between source/sink/function operations (#7466)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 77dccd2 Differentiate authorization between source/sink/function operations (#7466) 77dccd2 is described below commit 77dccd2824699c9be96f518a8a1079df51612c8e Author: Sanjeev Kulkarni AuthorDate: Tue Jul 7 15:43:57 2020 -0700 Differentiate authorization between source/sink/function operations (#7466) * Differentiate between source/sink/function operations * Added release notes Co-authored-by: Sanjeev Kulkarni --- .../broker/authorization/AuthorizationProvider.java | 20 .../broker/authorization/AuthorizationService.java | 10 ++ .../authorization/PulsarAuthorizationProvider.java | 20 ++-- .../api/AuthorizationProducerConsumerTest.java | 10 ++ .../pulsar/common/policies/data/AuthAction.java | 6 ++ .../functions/worker/rest/api/ComponentImpl.java | 14 -- site2/website/release-notes.md | 6 ++ 7 files changed, 82 insertions(+), 4 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java index 4eb5d93..63ca4cd 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java @@ -142,6 +142,26 @@ public interface AuthorizationProvider extends Closeable { AuthenticationDataSource authenticationData); /** + * Allow all source operations with in this namespace + * @param namespaceName The namespace that the sources operations can be executed in + * @param role The role to check + * @param authenticationData authentication data related to the role + * @return a boolean to determine whether authorized or not + */ +CompletableFuture allowSourceOpsAsync(NamespaceName namespaceName, String role, + AuthenticationDataSource authenticationData); + +/** + * Allow all sink operations with in this namespace + * @param namespaceName The namespace that the sink operations can be executed in + * @param role The role to check + * @param authenticationData authentication data related to the role + * @return a boolean to determine whether authorized or not + */ +CompletableFuture allowSinkOpsAsync(NamespaceName namespaceName, String role, + AuthenticationDataSource authenticationData); + +/** * * Grant authorization-action permission on a namespace to the given client * diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java index 10b35ef..e964faa 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java @@ -331,6 +331,16 @@ public class AuthorizationService { return provider.allowFunctionOpsAsync(namespaceName, role, authenticationData); } +public CompletableFuture allowSourceOpsAsync(NamespaceName namespaceName, String role, + AuthenticationDataSource authenticationData) { +return provider.allowSourceOpsAsync(namespaceName, role, authenticationData); +} + +public CompletableFuture allowSinkOpsAsync(NamespaceName namespaceName, String role, + AuthenticationDataSource authenticationData) { +return provider.allowSinkOpsAsync(namespaceName, role, authenticationData); +} + /** * Grant authorization-action permission on a tenant to the given client * diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index 1aa79bf..a394311 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -221,6 +221,22 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider { @Override public
[pulsar] branch master updated: Functions metadata compaction (#7377)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 3d94553 Functions metadata compaction (#7377) 3d94553 is described below commit 3d94553fe7f1f6ed17d3e115e1d08a02585c7a77 Author: Sanjeev Kulkarni AuthorDate: Tue Jun 30 21:27:25 2020 -0700 Functions metadata compaction (#7377) * Function workers re-direct call update requests to the leader * Fixed test * tests pass * Working version * Fix test * Short circuit update * Fix test * Fix test * Fix tests * Added one more catch * Added one more catch * Seperated internal and external errors * Fix test * Address feedback * Do not expose updateOnLeader to functions * hide api * hide api * removed duplicate comments * Do leadership changes in function metadata manager * make the function sync * Added more comments * Throw error * Changed name * address comments * Deleted unused classes * Rework metadata manager * Working * Fix test * A better way for test * Address feedback * Added an option to compact function metadata topic * Address feedback * Incorporate feedback Co-authored-by: Sanjeev Kulkarni --- .../pulsar/functions/worker/WorkerConfig.java | 5 + .../pulsar/functions/utils/FunctionCommon.java | 20 .../functions/worker/FunctionMetaDataManager.java | 109 +++-- .../worker/FunctionMetaDataTopicTailer.java| 10 +- .../pulsar/functions/worker/SchedulerManager.java | 21 .../worker/FunctionMetaDataManagerTest.java| 59 ++- 6 files changed, 186 insertions(+), 38 deletions(-) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index bb8d896..c056028 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -169,6 +169,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration { ) private String functionMetadataTopicName; @FieldContext( +category = CATEGORY_FUNC_METADATA_MNG, +doc = "Should the metadata topic be compacted?" +) +private Boolean useCompactedMetadataTopic = false; +@FieldContext( category = CATEGORY_FUNC_METADATA_MNG, doc = "The web service url for function workers" ) diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java index b8ac299..5a68855 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java @@ -341,6 +341,26 @@ public class FunctionCommon { return String.format("%s/%s/%s", tenant, namespace, functionName); } +public static String extractTenantFromFullyQualifiedName(String fqfn) { +return extractFromFullyQualifiedName(fqfn, 0); +} + +public static String extractNamespaceFromFullyQualifiedName(String fqfn) { +return extractFromFullyQualifiedName(fqfn, 1); +} + +public static String extractNameFromFullyQualifiedName(String fqfn) { +return extractFromFullyQualifiedName(fqfn, 2); +} + +private static String extractFromFullyQualifiedName(String fqfn, int index) { +String[] parts = fqfn.split("/"); +if (parts.length >= 3) { +return parts[index]; +} +throw new RuntimeException("Invalid Fully Qualified Function Name " + fqfn); +} + public static Class getTypeArg(String className, Class funClass, ClassLoader classLoader) throws ClassNotFoundException { Class loadedClass = classLoader.loadClass(className); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java index 795a735..abe2b38 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java @@ -25,6 +2
[pulsar] branch master updated: Fix race condition in which exitFuture in FunctionAssignmentTailer never gets completed even though the tailer thread has exited (#7351)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 14ac116 Fix race condition in which exitFuture in FunctionAssignmentTailer never gets completed even though the tailer thread has exited (#7351) 14ac116 is described below commit 14ac1162d5e506fc0ff3625605f01e3b363c3972 Author: Boyang Jerry Peng AuthorDate: Wed Jun 24 12:28:54 2020 -0700 Fix race condition in which exitFuture in FunctionAssignmentTailer never gets completed even though the tailer thread has exited (#7351) * fix race condition in which exitFuture in FunctionAssignmentTailer never gets completed even though the tailer thread has exited * fix Co-authored-by: Jerry Peng --- .../apache/pulsar/functions/worker/FunctionAssignmentTailer.java | 8 +--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java index e1d9c1f..d2ee952 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java @@ -80,7 +80,6 @@ public class FunctionAssignmentTailer implements AutoCloseable { if (tailerThread == null || !tailerThread.isAlive()) { tailerThread = getTailerThread(); } -exitFuture = new CompletableFuture<>(); tailerThread.start(); } } @@ -113,15 +112,18 @@ public class FunctionAssignmentTailer implements AutoCloseable { } } tailerThread = null; + +// complete exit future to be safe +exitFuture.complete(null); +// reset the future +exitFuture = new CompletableFuture<>(); } if (reader != null) { reader.close(); reader = null; } -exitFuture = null; exitOnEndOfTopic = false; - } catch (IOException e) { log.error("Failed to stop function assignment tailer", e); }
[pulsar] branch master updated (51d2aa4 -> a9daf4c)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 51d2aa4 [doc] Refine link and heading issues in Pulsar metrics (#7331) add a9daf4c Set source spec's negativeacktimeout as well as timeout (#7337) No new revisions were added by this update. Summary of changes: .../java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java | 2 ++ 1 file changed, 2 insertions(+)
[pulsar] branch master updated: Fix leader/scheduler assignment processing lag problem (#7237)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 68877f8 Fix leader/scheduler assignment processing lag problem (#7237) 68877f8 is described below commit 68877f8947ce5ed28f09152cbf91ea0b0ecafb87 Author: Boyang Jerry Peng AuthorDate: Fri Jun 19 14:40:26 2020 -0700 Fix leader/scheduler assignment processing lag problem (#7237) * Fix leader/scheduler assignment processing lag problem * add license header * adding more comments * improving impl * fixing bugs * improving impl * fixing tests * adding comments * add more testing * addressing comments * cleaning up * refactoring implementation * addressing comments Co-authored-by: Jerry Peng --- .../org/apache/pulsar/PulsarStandaloneStarter.java | 2 +- .../worker/ClusterServiceCoordinator.java | 9 +- .../functions/worker/FunctionAssignmentTailer.java | 170 ++--- .../functions/worker/FunctionMetaDataManager.java | 13 +- .../worker/FunctionMetaDataTopicTailer.java| 4 +- .../functions/worker/FunctionRuntimeManager.java | 52 ++- .../pulsar/functions/worker/LeaderService.java | 134 +++ .../pulsar/functions/worker/MembershipManager.java | 75 +--- .../pulsar/functions/worker/SchedulerManager.java | 232 +++ .../pulsar/functions/worker/WorkerService.java | 157 +--- .../pulsar/functions/worker/WorkerUtils.java | 16 + .../worker/ClusterServiceCoordinatorTest.java | 14 +- .../worker/FunctionAssignmentTailerTest.java | 422 + .../worker/FunctionRuntimeManagerTest.java | 128 --- .../pulsar/functions/worker/LeaderServiceTest.java | 152 .../functions/worker/MembershipManagerTest.java| 43 --- .../functions/worker/SchedulerManagerTest.java | 120 -- 17 files changed, 1256 insertions(+), 487 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java index e5e9b45..9da388b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java @@ -102,7 +102,7 @@ public class PulsarStandaloneStarter extends PulsarStandalone { bkEnsemble.stop(); } } catch (Exception e) { -log.error("Shutdown failed: {}", e.getMessage()); +log.error("Shutdown failed: {}", e.getMessage(), e); } } }); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java index 419f65a..c2bde9d 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java @@ -48,11 +48,11 @@ public class ClusterServiceCoordinator implements AutoCloseable { private final String workerId; private final Map tasks = new HashMap<>(); private final ScheduledExecutorService executor; -private final MembershipManager membershipManager; +private final LeaderService leaderService; -public ClusterServiceCoordinator(String workerId, MembershipManager membershipManager) { +public ClusterServiceCoordinator(String workerId, LeaderService leaderService) { this.workerId = workerId; -this.membershipManager = membershipManager; +this.leaderService = leaderService; this.executor = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("cluster-service-coordinator-timer").build()); } @@ -62,11 +62,12 @@ public class ClusterServiceCoordinator implements AutoCloseable { } public void start() { +log.info("/** Starting cluster service coordinator **/"); for (Map.Entry entry : this.tasks.entrySet()) { TimerTaskInfo timerTaskInfo = entry.getValue(); String taskName = entry.getKey(); this.executor.scheduleAtFixedRate(() -> { -boolean isLeader = membershipManager.isLeader(); +boolean isLeader = leaderService.isLeader(); if (isLeader) { try { timerTaskInfo.getTask().run(); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmen
[pulsar] branch master updated (3ff532d -> 5d02c70)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 3ff532d Add tests and documentation for subscribing to non-persistent with topic pattern (#7240) add 5d02c70 Upgrade Presto version to 332 (#7194) No new revisions were added by this update. Summary of changes: bin/pulsar | 2 +- conf/presto/log.properties | 4 +- pom.xml| 4 +- pulsar-sql/presto-distribution/LICENSE | 143 - pulsar-sql/presto-distribution/pom.xml | 8 +- pulsar-sql/presto-pulsar/pom.xml | 10 +- .../pulsar/sql/presto/JSONSchemaHandler.java | 2 +- .../pulsar/sql/presto/PulsarColumnHandle.java | 6 +- .../pulsar/sql/presto/PulsarColumnMetadata.java| 4 +- .../apache/pulsar/sql/presto/PulsarConnector.java | 19 +-- .../pulsar/sql/presto/PulsarConnectorFactory.java | 8 +- .../pulsar/sql/presto/PulsarConnectorModule.java | 8 +- .../pulsar/sql/presto/PulsarHandleResolver.java| 12 +- .../pulsar/sql/presto/PulsarInternalColumn.java| 10 +- .../apache/pulsar/sql/presto/PulsarMetadata.java | 79 ++-- .../org/apache/pulsar/sql/presto/PulsarPlugin.java | 4 +- .../pulsar/sql/presto/PulsarRecordCursor.java | 32 ++--- .../apache/pulsar/sql/presto/PulsarRecordSet.java | 6 +- .../pulsar/sql/presto/PulsarRecordSetProvider.java | 12 +- .../pulsar/sql/presto/PulsarSchemaHandlers.java| 7 +- .../org/apache/pulsar/sql/presto/PulsarSplit.java | 8 +- .../pulsar/sql/presto/PulsarSplitManager.java | 26 ++-- .../pulsar/sql/presto/PulsarTableHandle.java | 4 +- .../pulsar/sql/presto/PulsarTableLayoutHandle.java | 6 +- .../pulsar/sql/presto/PulsarTransactionHandle.java | 2 +- ...k.presto.spi.Plugin => io.prestosql.spi.Plugin} | 0 .../pulsar/sql/presto/TestAvroSchemaHandler.java | 4 +- .../pulsar/sql/presto/TestPulsarConnector.java | 22 ++-- .../presto/TestPulsarKeyValueSchemaHandler.java| 2 +- .../pulsar/sql/presto/TestPulsarMetadata.java | 31 ++--- .../presto/TestPulsarPrimitiveSchemaHandler.java | 4 +- .../pulsar/sql/presto/TestPulsarSplitManager.java | 20 +-- site2/docs/sql-deployment-configurations.md| 6 +- .../version-2.2.0/sql-deployment-configurations.md | 4 +- .../versioned_docs/version-2.2.0/sql-overview.md | 2 +- .../version-2.5.0/sql-deployment-configurations.md | 4 +- tests/integration/pom.xml | 2 +- 37 files changed, 276 insertions(+), 251 deletions(-) rename pulsar-sql/presto-pulsar/src/main/resources/META-INF/services/{com.facebook.presto.spi.Plugin => io.prestosql.spi.Plugin} (100%)
[pulsar] branch master updated: Check for null arguments in Namespaces Rest API (#7247)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 134a8a2 Check for null arguments in Namespaces Rest API (#7247) 134a8a2 is described below commit 134a8a2e2064a895e9a70bab2ba4c01510fb349e Author: Sanjeev Kulkarni AuthorDate: Thu Jun 11 20:57:20 2020 -0700 Check for null arguments in Namespaces Rest API (#7247) * Check for null arguments * Fix test Co-authored-by: Sanjeev Kulkarni --- .../apache/pulsar/broker/admin/AdminResource.java | 15 +- .../pulsar/broker/admin/impl/NamespacesBase.java | 57 ++ .../broker/admin/impl/PersistentTopicsBase.java| 18 +++ .../pulsar/broker/admin/v1/V1_AdminApiTest2.java | 6 +-- 4 files changed, 62 insertions(+), 34 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 1d765f5..7d4b6d1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.broker.admin; -import static com.google.common.base.Preconditions.checkArgument; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; import static org.apache.pulsar.common.util.Codec.decode; @@ -397,7 +396,7 @@ public abstract class AdminResource extends PulsarWebResource { if (!brokerUrl.equals(pulsar().getSafeWebServiceAddress()) && !brokerUrlTls.equals(pulsar().getWebServiceAddressTls())) { String[] parts = broker.split(":"); -checkArgument(parts.length == 2, "Invalid broker url %s", broker); +checkArgument(parts.length == 2, String.format("Invalid broker url %s", broker)); String host = parts[0]; int port = Integer.parseInt(parts[1]); @@ -844,4 +843,16 @@ public abstract class AdminResource extends PulsarWebResource { asyncResponse.resume(new RestException(throwable)); } } + +protected void checkNotNull(Object o, String errorMessage) { +if (o == null) { +throw new RestException(Status.BAD_REQUEST, errorMessage); +} +} + +protected void checkArgument(boolean b, String errorMessage) { +if (!b) { +throw new RestException(Status.BAD_REQUEST, errorMessage); +} +} } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 42fd278..b117c9d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -18,8 +18,6 @@ */ package org.apache.pulsar.broker.admin.impl; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; @@ -110,6 +108,7 @@ public abstract class NamespacesBase extends AdminResource { private static final long MAX_BUNDLES = ((long) 1) << 32; protected List internalGetTenantNamespaces(String tenant) { +checkNotNull(tenant, "Tenant should not be null"); validateTenantOperation(tenant, TenantOperation.LIST_NAMESPACES); try { @@ -380,6 +379,8 @@ public abstract class NamespacesBase extends AdminResource { protected void internalGrantPermissionOnNamespace(String role, Set actions) { validateNamespaceOperation(namespaceName, NamespaceOperation.GRANT_PERMISSION); +checkNotNull(role, "Role should not be null"); +checkNotNull(actions, "Actions should not be null"); try { AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService(); @@ -411,6 +412,8 @@ public abstract class NamespacesBase extends AdminResource { protected void internalGrantPermissionOnSubscription(String subscription, Set roles) { validateNamespaceOperation(namespaceName, NamespaceOperation.GRANT_PERMISSION); +checkNotNull(subscription, "Subscription should not be null"); +checkNotNull(roles, "Roles should not be null"); try { AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService(); @@ -442,6 +445,7 @@ public abstract class N
[pulsar] branch master updated: Ensure that admin operations are gated by super user check (#7226)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new eda3526 Ensure that admin operations are gated by super user check (#7226) eda3526 is described below commit eda3526b335d58e7aa7ba4bb81d44ea03a2922a7 Author: Sanjeev Kulkarni AuthorDate: Thu Jun 11 16:43:35 2020 -0700 Ensure that admin operations are gated by super user check (#7226) * Ensure that admin operations are gated by super user check * keep /clusters open Co-authored-by: Sanjeev Kulkarni --- .../java/org/apache/pulsar/broker/admin/impl/BrokersBase.java | 8 1 file changed, 8 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index 072e91c..57c88ab 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -153,9 +153,12 @@ public class BrokersBase extends AdminResource { @Path("/configuration/values") @ApiOperation(value = "Get value of all dynamic configurations' value overridden on local config") @ApiResponses(value = { +@ApiResponse(code = 403, message = "You don't have admin permission to view configuration"), @ApiResponse(code = 404, message = "Configuration not found"), @ApiResponse(code = 500, message = "Internal server error")}) public Map getAllDynamicConfigurations() throws Exception { +validateSuperUserAccess(); + ZooKeeperDataCache> dynamicConfigurationCache = pulsar().getBrokerService() .getDynamicConfigurationCache(); Map configurationMap = null; @@ -175,7 +178,10 @@ public class BrokersBase extends AdminResource { @GET @Path("/configuration") @ApiOperation(value = "Get all updatable dynamic configurations's name") +@ApiResponses(value = { +@ApiResponse(code = 403, message = "You don't have admin permission to get configuration")}) public List getDynamicConfigurationName() { +validateSuperUserAccess(); return BrokerService.getDynamicConfiguration(); } @@ -240,7 +246,9 @@ public class BrokersBase extends AdminResource { @GET @Path("/internal-configuration") @ApiOperation(value = "Get the internal configuration data", response = InternalConfigurationData.class) +@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") }) public InternalConfigurationData getInternalConfigurationData() { +validateSuperUserAccess(); return pulsar().getInternalConfigurationData(); }
[pulsar] branch master updated (e64d951 -> 828d033)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from e64d951 Have metadata tailer use its own thread for processing (#7211) add 828d033 During Function update, cleanup should only happen for temp files that were generated (#7201) No new revisions were added by this update. Summary of changes: .../pulsar/functions/runtime/RuntimeSpawner.java | 2 - .../functions/worker/rest/api/FunctionsImpl.java | 15 ++--- .../functions/worker/rest/api/SinksImpl.java | 15 ++--- .../functions/worker/rest/api/SourcesImpl.java | 15 ++--- .../integration/functions/PulsarFunctionsTest.java | 68 +- .../functions/utils/CommandGenerator.java | 42 +++-- 6 files changed, 102 insertions(+), 55 deletions(-)
[pulsar] branch master updated: Have metadata tailer use its own thread for processing (#7211)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new e64d951 Have metadata tailer use its own thread for processing (#7211) e64d951 is described below commit e64d951bb6002e07a64674a7373d945d3e224bb6 Author: Sanjeev Kulkarni AuthorDate: Tue Jun 9 16:58:32 2020 -0700 Have metadata tailer use its own thread for processing (#7211) * Have metadata tailer use its own thread for processing * Merged with master * Address comments * Address comments Co-authored-by: Sanjeev Kulkarni --- .../functions/worker/FunctionMetaDataManager.java | 18 ++--- .../worker/FunctionMetaDataTopicTailer.java| 90 +- .../pulsar/functions/worker/WorkerService.java | 2 +- .../worker/FunctionMetaDataManagerTest.java| 24 +++--- .../worker/FunctionMetaDataTopicTailerTest.java| 37 ++--- 5 files changed, 101 insertions(+), 70 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java index 2ab913b..80f577d 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java @@ -60,6 +60,7 @@ public class FunctionMetaDataManager implements AutoCloseable { private final SchedulerManager schedulerManager; private final WorkerConfig workerConfig; private final PulsarClient pulsarClient; +private final ErrorNotifier errorNotifier; private FunctionMetaDataTopicTailer functionMetaDataTopicTailer; @@ -69,12 +70,14 @@ public class FunctionMetaDataManager implements AutoCloseable { public FunctionMetaDataManager(WorkerConfig workerConfig, SchedulerManager schedulerManager, - PulsarClient pulsarClient) throws PulsarClientException { + PulsarClient pulsarClient, + ErrorNotifier errorNotifier) throws PulsarClientException { this.workerConfig = workerConfig; this.pulsarClient = pulsarClient; this.serviceRequestManager = getServiceRequestManager( this.pulsarClient, this.workerConfig.getFunctionMetadataTopic()); this.schedulerManager = schedulerManager; +this.errorNotifier = errorNotifier; } /** @@ -88,17 +91,12 @@ public class FunctionMetaDataManager implements AutoCloseable { public void initialize() { log.info("/** Initializing Function Metadata Manager **/"); try { -Reader reader = pulsarClient.newReader() -.topic(this.workerConfig.getFunctionMetadataTopic()) -.startMessageId(MessageId.earliest) -.readerName(workerConfig.getWorkerId() + "-function-metadata-manager") -.create(); - -this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this, reader); +this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this, +pulsarClient.newReader(), this.workerConfig, this.errorNotifier); // read all existing messages this.setInitializePhase(true); -while (reader.hasMessageAvailable()) { - this.functionMetaDataTopicTailer.processRequest(reader.readNext()); +while (this.functionMetaDataTopicTailer.getReader().hasMessageAvailable()) { + this.functionMetaDataTopicTailer.processRequest(this.functionMetaDataTopicTailer.getReader().readNext()); } this.setInitializePhase(false); // schedule functions if necessary diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java index b7108e9..55a67a6 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java @@ -19,78 +19,92 @@ package org.apache.pulsar.functions.worker; import java.io.IOException; -import java.util.function.Function; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client
[pulsar] branch master updated: Attach names for all producers/readers in worker service (#7165)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 7add930 Attach names for all producers/readers in worker service (#7165) 7add930 is described below commit 7add93095a88c8acfeae214ce8a1d856a54b474d Author: Sanjeev Kulkarni AuthorDate: Thu Jun 4 19:52:38 2020 -0700 Attach names for all producers/readers in worker service (#7165) * Attach names for all producers/readers in worker service * Fix tests Co-authored-by: Sanjeev Kulkarni --- .../apache/pulsar/functions/worker/FunctionMetaDataManager.java | 6 +- .../apache/pulsar/functions/worker/FunctionRuntimeManager.java | 1 + .../org/apache/pulsar/functions/worker/SchedulerManager.java | 1 + .../apache/pulsar/functions/worker/rest/api/ComponentImpl.java | 9 - .../pulsar/functions/worker/FunctionMetaDataManagerTest.java | 1 + .../pulsar/functions/worker/FunctionRuntimeManagerTest.java | 1 + .../org/apache/pulsar/functions/worker/SchedulerManagerTest.java | 1 + 7 files changed, 18 insertions(+), 2 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java index c3273fa..2ab913b 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java @@ -91,6 +91,7 @@ public class FunctionMetaDataManager implements AutoCloseable { Reader reader = pulsarClient.newReader() .topic(this.workerConfig.getFunctionMetadataTopic()) .startMessageId(MessageId.earliest) +.readerName(workerConfig.getWorkerId() + "-function-metadata-manager") .create(); this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this, reader); @@ -432,6 +433,9 @@ public class FunctionMetaDataManager implements AutoCloseable { } private ServiceRequestManager getServiceRequestManager(PulsarClient pulsarClient, String functionMetadataTopic) throws PulsarClientException { -return new ServiceRequestManager(pulsarClient.newProducer().topic(functionMetadataTopic).create()); +return new ServiceRequestManager(pulsarClient.newProducer() +.topic(functionMetadataTopic) +.producerName(workerConfig.getWorkerId() + "-function-metadata-manager") +.create()); } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index 075146e..90cab3f 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -211,6 +211,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ log.info("/** Initializing Runtime Manager **/"); try { Reader reader = this.getWorkerService().getClient().newReader() +.readerName(workerConfig.getWorkerId() + "-function-runtime-manager") .topic(this.getWorkerConfig().getFunctionAssignmentTopic()).readCompacted(true) .startMessageId(MessageId.earliest).create(); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java index bcbccda..9c93443 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java @@ -106,6 +106,7 @@ public class SchedulerManager implements AutoCloseable { .blockIfQueueFull(true) .compressionType(CompressionType.LZ4) .sendTimeout(0, TimeUnit.MILLISECONDS) +.producerName(config.getWorkerId() + "-scheduler-manager") .createAsync().get(10, TimeUnit.SECONDS); return Actions.ActionResult.builder().success(true).result(producer).build(); } catch (Exception e) { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-function
[pulsar] branch master updated: Separate out FunctionMetadata related helper functions (#7146)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 8638022 Separate out FunctionMetadata related helper functions (#7146) 8638022 is described below commit 86380223203f0b9faecf4f2fa4d2565d6f92b31c Author: Sanjeev Kulkarni AuthorDate: Wed Jun 3 16:49:55 2020 -0700 Separate out FunctionMetadata related helper functions (#7146) * Seperate out FunctionMetaData related functions into a utility class * Fixed bug Co-authored-by: Sanjeev Kulkarni --- .../functions/utils/FunctionMetaDataUtils.java | 79 ++ .../functions/utils/FunctionMetaDataUtilsTest.java | 95 ++ .../functions/worker/FunctionMetaDataManager.java | 71 +++- .../functions/worker/rest/api/ComponentImpl.java | 5 +- .../worker/FunctionMetaDataManagerTest.java| 9 +- 5 files changed, 193 insertions(+), 66 deletions(-) diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionMetaDataUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionMetaDataUtils.java new file mode 100644 index 000..530b887 --- /dev/null +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionMetaDataUtils.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.functions.utils; + +import org.apache.pulsar.functions.proto.Function; + +public class FunctionMetaDataUtils { + +public static boolean canChangeState(Function.FunctionMetaData functionMetaData, int instanceId, Function.FunctionState newState) { +if (instanceId >= functionMetaData.getFunctionDetails().getParallelism()) { +return false; +} +if (functionMetaData.getInstanceStatesMap() == null || functionMetaData.getInstanceStatesMap().isEmpty()) { +// This means that all instances of the functions are running +return newState == Function.FunctionState.STOPPED; +} +if (instanceId >= 0) { +if (functionMetaData.getInstanceStatesMap().containsKey(instanceId)) { +return functionMetaData.getInstanceStatesMap().get(instanceId) != newState; +} else { +return false; +} +} else { +// want to change state for all instances +for (Function.FunctionState state : functionMetaData.getInstanceStatesMap().values()) { +if (state != newState) return true; +} +return false; +} +} + +public static Function.FunctionMetaData changeFunctionInstanceStatus(Function.FunctionMetaData functionMetaData, + Integer instanceId, boolean start) { +Function.FunctionMetaData.Builder builder = functionMetaData.toBuilder() +.setVersion(functionMetaData.getVersion() + 1); +if (builder.getInstanceStatesMap() == null || builder.getInstanceStatesMap().isEmpty()) { +for (int i = 0; i < functionMetaData.getFunctionDetails().getParallelism(); ++i) { +builder.putInstanceStates(i, Function.FunctionState.RUNNING); +} +} +Function.FunctionState state = start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED; +if (instanceId < 0) { +for (int i = 0; i < functionMetaData.getFunctionDetails().getParallelism(); ++i) { +builder.putInstanceStates(i, state); +} +} else if (instanceId < builder.getFunctionDetails().getParallelism()){ +builder.putInstanceStates(instanceId, state); +} +return builder.build(); +} + +public static Function.FunctionMetaData generateUpdatedMetadata(Function.FunctionMetaData existingMetaData, + Function.FunctionMetaData update
[pulsar] branch master updated (fcd9852 -> fb374e6)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from fcd9852 Correct tokenSecretKey base64 inline description. (#6959) add fb374e6 Extend PulsarIO to support Batch Sources. This pr implements PIP-65 (#7090) No new revisions were added by this update. Summary of changes: .../BatchSourceConfig.java}| 28 ++- .../org/apache/pulsar/common/io/SourceConfig.java | 3 + .../pulsar/functions/utils/SourceConfigUtils.java | 122 +- .../functions/utils/SourceConfigUtilsTest.java | 125 +-- .../pulsar/functions/worker/FunctionActioner.java | 125 ++- .../pom.xml| 30 ++- .../BatchDataGeneratorPrintSink.java} | 10 +- .../BatchDataGeneratorSource.java | 85 +++ .../pulsar/io/batchdatagenerator}/Person.java | 35 ++- .../resources/META-INF/services/pulsar-io.yaml | 8 +- .../batchdatagenerator/BatchDataGeneratorExec.java | 70 ++ .../batch-discovery-triggerers}/pom.xml| 26 ++- .../pulsar/io/batchdiscovery/CronTriggerer.java| 65 ++ .../java-examples => pulsar-io/batch}/pom.xml | 19 +- .../pulsar/io/batch/BatchSourceExecutor.java | 240 .../pulsar/io/batch/BatchSourceExecutorTest.java | 249 + .../org/apache/pulsar/io/core/BatchSource.java | 84 +++ .../pulsar/io/core/BatchSourceTriggerer.java | 71 ++ pulsar-io/pom.xml | 3 + 19 files changed, 1263 insertions(+), 135 deletions(-) copy pulsar-common/src/main/java/org/apache/pulsar/common/{functions/ConsumerConfig.java => io/BatchSourceConfig.java} (61%) copy pulsar-io/{data-generator => batch-data-generator}/pom.xml (67%) copy pulsar-io/{data-generator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorPrintSink.java => batch-data-generator/src/main/java/org/apache/pulsar/io/batchdatagenerator/BatchDataGeneratorPrintSink.java} (83%) create mode 100644 pulsar-io/batch-data-generator/src/main/java/org/apache/pulsar/io/batchdatagenerator/BatchDataGeneratorSource.java copy pulsar-io/{data-generator/src/main/java/org/apache/pulsar/io/datagenerator => batch-data-generator/src/main/java/org/apache/pulsar/io/batchdatagenerator}/Person.java (70%) copy pulsar-io/{aerospike => batch-data-generator}/src/main/resources/META-INF/services/pulsar-io.yaml (77%) create mode 100644 pulsar-io/batch-data-generator/src/test/java/org/apache/pulsar/io/batchdatagenerator/BatchDataGeneratorExec.java copy {pulsar-functions/java-examples => pulsar-io/batch-discovery-triggerers}/pom.xml (75%) create mode 100644 pulsar-io/batch-discovery-triggerers/src/main/java/org/apache/pulsar/io/batchdiscovery/CronTriggerer.java copy {pulsar-functions/java-examples => pulsar-io/batch}/pom.xml (78%) create mode 100644 pulsar-io/batch/src/main/java/org/apache/pulsar/io/batch/BatchSourceExecutor.java create mode 100644 pulsar-io/batch/src/test/java/org/apache/pulsar/io/batch/BatchSourceExecutorTest.java create mode 100644 pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchSource.java create mode 100644 pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchSourceTriggerer.java
[pulsar] branch master updated: Add a new api to get information about config definition of builtin sources/sinks (#7114)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new f0b6934 Add a new api to get information about config definition of builtin sources/sinks (#7114) f0b6934 is described below commit f0b6934dd563886bc2f7926fb3724784edfe74f2 Author: Sanjeev Kulkarni AuthorDate: Mon Jun 1 18:19:01 2020 -0700 Add a new api to get information about config definition of builtin sources/sinks (#7114) Co-authored-by: Sanjeev Kulkarni --- .../apache/pulsar/broker/admin/impl/SinksBase.java | 30 + .../pulsar/broker/admin/impl/SourcesBase.java | 30 + .../pulsar/common/io/ConfigFieldDefinition.java| 33 +- .../org/apache/pulsar/common/util/Reflections.java | 15 +++ .../pulsar/functions/utils/io/ConnectorUtils.java | 46 +++- .../pulsar/functions/utils/io/Connectors.java | 3 ++ .../pulsar/functions/worker/ConnectorsManager.java | 9 .../functions/worker/rest/api/SinksImpl.java | 29 +++-- .../functions/worker/rest/api/SourcesImpl.java | 29 +++-- .../worker/rest/api/v3/SinksApiV3Resource.java | 50 -- .../worker/rest/api/v3/SourcesApiV3Resource.java | 50 -- pulsar-io/common/pom.xml | 5 +++ .../org/apache/pulsar/io/common/IOConfigUtils.java | 17 +--- 13 files changed, 271 insertions(+), 75 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java index 61d3603..9f69e6c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java @@ -22,6 +22,7 @@ import io.swagger.annotations.*; import org.apache.commons.lang.StringUtils; import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.common.functions.UpdateOptions; +import org.apache.pulsar.common.io.ConfigFieldDefinition; import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.io.SinkConfig; import org.apache.pulsar.common.policies.data.SinkStatus; @@ -478,14 +479,27 @@ public class SinksBase extends AdminResource implements Supplier }) @Path("/builtinsinks") public List getSinkList() { -List connectorDefinitions = sink.getListOfConnectors(); -List retval = new ArrayList<>(); -for (ConnectorDefinition connectorDefinition : connectorDefinitions) { -if (!StringUtils.isEmpty(connectorDefinition.getSinkClass())) { -retval.add(connectorDefinition); -} -} -return retval; +return sink.getSinkList(); +} + +@GET +@ApiOperation( +value = "Fetches information about config fields associated with the specified builtin sink", +response = ConfigFieldDefinition.class, +responseContainer = "List" +) +@ApiResponses(value = { +@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), +@ApiResponse(code = 404, message = "builtin sink does not exist"), +@ApiResponse(code = 500, message = "Internal server error"), +@ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.") +}) +@Produces(MediaType.APPLICATION_JSON) +@Path("/builtinsinks/{name}/configdefinition") +public List getSinkConfigDefinition( +@ApiParam(value = "The name of the builtin sink") +final @PathParam("name") String name) throws IOException { +return sink.getSinkConfigDefinition(name); } @POST diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java index aba9069..c25bcdd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java @@ -22,6 +22,7 @@ import io.swagger.annotations.*; import org.apache.commons.lang.StringUtils; import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.common.functions.UpdateOptions; +import org.apache.pulsar.common.io.ConfigFieldDefinition; import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.io.SourceConfig; import org.apache.pulsar.common.policies.data.SourceStatus; @@ -468,14 +469,27 @@ public class SourcesBase extends AdminResource implements Supplier getSou
[pulsar] branch master updated: Moved ClassLoading and Reflections Helper functions to common (#7103)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new e451be3 Moved ClassLoading and Reflections Helper functions to common (#7103) e451be3 is described below commit e451be300e5a0e718c444dd3a5b3a80a16734974 Author: Sanjeev Kulkarni AuthorDate: Fri May 29 22:24:29 2020 -0700 Moved ClassLoading and Reflections Helper functions to common (#7103) * Moved ClassLoading and Reflections Helper functions to common * Fix tests * Fix test Co-authored-by: Sanjeev Kulkarni --- .../apache/pulsar/io/PulsarFunctionTlsTest.java| 4 +- .../apache/pulsar/admin/cli/CmdFunctionsTest.java | 2 +- .../org/apache/pulsar/admin/cli/TestCmdSinks.java | 4 +- .../apache/pulsar/admin/cli/TestCmdSources.java| 4 +- .../pulsar/common/util/ClassLoaderUtils.java | 80 ++ .../apache/pulsar/common/util}/Reflections.java| 16 +++-- .../pulsar/common/util}/ReflectionsTest.java | 5 +- .../pulsar/functions/instance/InstanceUtils.java | 2 +- .../functions/instance/JavaInstanceRunnable.java | 4 +- .../apache/pulsar/functions/sink/PulsarSink.java | 2 +- .../pulsar/functions/source/PulsarSource.java | 3 +- .../windowing/WindowFunctionExecutor.java | 2 +- .../functions/auth/FunctionAuthProvider.java | 2 +- .../auth/KubernetesFunctionAuthProvider.java | 4 +- .../functions/runtime/JavaInstanceStarter.java | 2 +- .../functions/runtime/RuntimeCustomizer.java | 2 +- .../pulsar/functions/runtime/RuntimeFactory.java | 2 +- .../runtime/thread/ThreadRuntimeFactory.java | 2 +- .../pulsar/functions/utils/FunctionCommon.java | 54 +-- .../functions/utils/FunctionConfigUtils.java | 2 +- .../pulsar/functions/utils/SinkConfigUtils.java| 5 +- .../pulsar/functions/utils/SourceConfigUtils.java | 4 +- .../pulsar/functions/utils/ValidatorUtils.java | 10 +-- .../functions/utils/FunctionConfigUtilsTest.java | 1 + .../functions/utils/SinkConfigUtilsTest.java | 1 + .../functions/utils/SourceConfigUtilsTest.java | 2 +- .../functions/worker/FunctionRuntimeManager.java | 2 +- .../pulsar/functions/worker/SchedulerManager.java | 2 +- .../worker/rest/api/v3/SinkApiV3ResourceTest.java | 9 ++- .../rest/api/v3/SourceApiV3ResourceTest.java | 9 ++- 30 files changed, 145 insertions(+), 98 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java index ed33614..e241f76 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java @@ -52,11 +52,11 @@ import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.util.ClassLoaderUtils; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.functions.api.utils.IdentityFunction; import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory; import org.apache.pulsar.functions.sink.PulsarSink; -import org.apache.pulsar.functions.utils.FunctionCommon; import org.apache.pulsar.functions.worker.FunctionMetaDataManager; import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig; import org.apache.pulsar.functions.worker.WorkerConfig; @@ -238,7 +238,7 @@ public class PulsarFunctionTlsTest { File file = new File(jarFile); try { -FunctionCommon.loadJar(file); +ClassLoaderUtils.loadJar(file); } catch (MalformedURLException e) { throw new RuntimeException("Failed to load user jar " + file, e); } diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java index c162c4b..9c2ddcf 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java @@ -57,7 +57,7 @@ import org.apache.pulsar.common.functions.UpdateOptions; import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.utils.IdentityFunction; -import org.apache.pulsar.functions.utils.Reflections; +import org.apache.pulsar.common.util.Reflections; import org.apache.pulsar.functions.utils.FunctionCommo
[pulsar] branch master updated (adf650e -> afd3296)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from adf650e fixes RESTAPIs and Cli button not working. (#7038) add afd3296 Fix the flaky function test. (#7057) No new revisions were added by this update. Summary of changes: .github/workflows/ci-unit-broker.yml | 2 +- .github/workflows/ci-unit-flaky.yaml | 2 +- .../functions/worker/PulsarFunctionE2ESecurityTest.java| 14 -- 3 files changed, 14 insertions(+), 4 deletions(-)
[pulsar] branch master updated: [docs] Fix spelling error (#7039)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new b021871 [docs] Fix spelling error (#7039) b021871 is described below commit b021871c91381ff086d3303e60c9c6b2f034c444 Author: Raman Gupta AuthorDate: Tue May 26 02:03:48 2020 -0400 [docs] Fix spelling error (#7039) --- site2/docs/reference-pulsar-admin.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/site2/docs/reference-pulsar-admin.md b/site2/docs/reference-pulsar-admin.md index 1ea946b..259b242 100644 --- a/site2/docs/reference-pulsar-admin.md +++ b/site2/docs/reference-pulsar-admin.md @@ -1138,7 +1138,7 @@ $ pulsar-admin namespaces set-persistence tenant/namespace options Options |Flag|Description|Default| ||---|---| -|`-a`, `--bookkeeper-ack-quorom`|The number of acks (guaranteed copies) to wait for each entry|0| +|`-a`, `--bookkeeper-ack-quorum`|The number of acks (guaranteed copies) to wait for each entry|0| |`-e`, `--bookkeeper-ensemble`|The number of bookies to use for a topic|0| |`-w`, `--bookkeeper-write-quorum`|How many writes to make of each entry|0| |`-r`, `--ml-mark-delete-max-rate`|Throttling rate of mark-delete operation (0 means no throttle)||
[pulsar] branch master updated: Secrets provider should be initialized for each new function (#6993)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 6e6e4e2 Secrets provider should be initialized for each new function (#6993) 6e6e4e2 is described below commit 6e6e4e28b36717f2b11c69babd4b9d34140ab5b1 Author: Boyang Jerry Peng AuthorDate: Fri May 22 20:40:58 2020 -0700 Secrets provider should be initialized for each new function (#6993) * Secrets provider should be initialized for each new function * add comment Co-authored-by: Jerry Peng --- .../pulsar/functions/runtime/thread/ThreadRuntimeFactory.java | 10 +++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java index cb82129..423d1f0 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java @@ -34,7 +34,6 @@ import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.runtime.RuntimeCustomizer; import org.apache.pulsar.functions.runtime.RuntimeFactory; import org.apache.pulsar.functions.runtime.RuntimeUtils; -import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider; import org.apache.pulsar.functions.secretsprovider.SecretsProvider; import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator; import org.apache.pulsar.functions.utils.Reflections; @@ -58,13 +57,17 @@ public class ThreadRuntimeFactory implements RuntimeFactory { private FunctionCacheManager fnCache; private PulsarClient pulsarClient; private String storageServiceUrl; -private SecretsProvider secretsProvider; +private SecretsProvider defaultSecretsProvider; private CollectorRegistry collectorRegistry; private String narExtractionDirectory; private volatile boolean closed; private SecretsProviderConfigurator secretsProviderConfigurator; private ClassLoader rootClassLoader; +/** + * This constructor is used by other runtimes (e.g. ProcessRuntime and KubernetesRuntime) that rely on ThreadRuntime to actually run an instance of the function. + * When used by other runtimes, the arguments such as secretsProvider and rootClassLoader will be provided. + */ public ThreadRuntimeFactory(String threadGroupName, String pulsarServiceUrl, String storageServiceUrl, AuthenticationConfig authConfig, SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String narExtractionDirectory, @@ -112,7 +115,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory { this.rootClassLoader = rootClassLoader; this.secretsProviderConfigurator = secretsProviderConfigurator; -this.secretsProvider = secretsProvider; +this.defaultSecretsProvider = secretsProvider; this.fnCache = new FunctionCacheManagerImpl(rootClassLoader); this.threadGroup = new ThreadGroup(threadGroupName); this.pulsarClient = pulsarClient; @@ -139,6 +142,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory { public ThreadRuntime createContainer(InstanceConfig instanceConfig, String jarFile, String originalCodeFileName, Long expectedHealthCheckInterval) { +SecretsProvider secretsProvider = defaultSecretsProvider; if (secretsProvider == null) { String secretsProviderClassName = secretsProviderConfigurator.getSecretsProviderClassName(instanceConfig.getFunctionDetails()); secretsProvider = (SecretsProvider) Reflections.createInstance(secretsProviderClassName, this.rootClassLoader);
[pulsar] branch master updated: Fix null pointer when getting function instance metrics. (#7010)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 20320c7 Fix null pointer when getting function instance metrics. (#7010) 20320c7 is described below commit 20320c7ca15498f5655668e99cce286be381c76d Author: Sanjeev Kulkarni AuthorDate: Fri May 22 11:32:17 2020 -0700 Fix null pointer when getting function instance metrics. (#7010) * Fix null pointer when getting function instance metrics. * Made more functions sync * Made the remaining public interface synchronized * Made stats class public method synchronized so they are thread safe. * Made setup synchronized so that it and close won't run together * Undo making stats sync until we resolve differences * Incorporated feedback Co-authored-by: Sanjeev Kulkarni --- .../functions/instance/JavaInstanceRunnable.java | 75 ++ .../functions/runtime/thread/ThreadRuntime.java| 2 +- 2 files changed, 51 insertions(+), 26 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 0fa4fc6..892c7e5 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -24,6 +24,8 @@ import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import io.netty.buffer.ByteBuf; import io.prometheus.client.CollectorRegistry; + +import java.io.IOException; import java.util.concurrent.CompletableFuture; import lombok.AccessLevel; import lombok.Getter; @@ -102,9 +104,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { // provide tables for storing states private final String stateStorageServiceUrl; -@Getter(AccessLevel.PACKAGE) private StorageClient storageClient; -@Getter(AccessLevel.PACKAGE) private Table stateTable; private JavaInstance javaInstance; @@ -112,7 +112,6 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { private Throwable deathException; // function stats -@Getter private ComponentStatsManager stats; private Record currentRecord; @@ -178,7 +177,17 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { /** * NOTE: this method should be called in the instance thread, in order to make class loading work. */ -JavaInstance setupJavaInstance() throws Exception { +synchronized private void setup() throws Exception { + +this.instanceCache = InstanceCache.getInstanceCache(); + +if (this.collectorRegistry == null) { +this.collectorRegistry = new CollectorRegistry(); +} +this.stats = ComponentStatsManager.getStatsManager(this.collectorRegistry, this.metricsLabels, +this.instanceCache.getScheduledExecutorService(), +this.componentType); + // initialize the thread context ThreadContext.put("function", FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails())); ThreadContext.put("functionname", instanceConfig.getFunctionDetails().getName()); @@ -218,7 +227,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { // start any log topic handler setupLogHandler(); -return new JavaInstance(contextImpl, object, instanceConfig); +javaInstance = new JavaInstance(contextImpl, object, instanceConfig); } ContextImpl setupContext() { @@ -234,16 +243,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { @Override public void run() { try { -this.instanceCache = InstanceCache.getInstanceCache(); - -if (this.collectorRegistry == null) { -this.collectorRegistry = new CollectorRegistry(); -} -this.stats = ComponentStatsManager.getStatsManager(this.collectorRegistry, this.metricsLabels, -this.instanceCache.getScheduledExecutorService(), -this.componentType); - -javaInstance = setupJavaInstance(); +setup(); + while (true) { currentRecord = readInput(); @@ -546,13 +547,32 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { } } -public InstanceCommunication.MetricsData getAndResetMetrics() { -InstanceCommunication.MetricsData metricsData = getMetrics(); -stats.reset(); +synchronized public Str
[pulsar] branch master updated: Added ability to add annotations to Connector Configs (#6983)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new c8170b7 Added ability to add annotations to Connector Configs (#6983) c8170b7 is described below commit c8170b7c9f0232c9a005b65b6b146e0417ec3190 Author: Sanjeev Kulkarni AuthorDate: Fri May 22 08:24:13 2020 -0700 Added ability to add annotations to Connector Configs (#6983) * Added sourceConfigClass and sinkConfigClass * Add Validator annotation helpers to validate class parameters * Fix build errors * Take feedback into account * Connected with validation * Fix bugs * Added tests * Fix class name * Address feedback Co-authored-by: Sanjeev Kulkarni --- .../pulsar/common/io/ConnectorDefinition.java | 22 ++ .../org/apache/pulsar/functions/LocalRunner.java | 8 ++-- .../pulsar/functions/worker/WorkerConfig.java | 5 +++ .../pulsar/functions/utils/SinkConfigUtils.java| 37 - .../pulsar/functions/utils/SourceConfigUtils.java | 34 ++- .../pulsar/functions/utils/io/ConnectorUtils.java | 16 .../functions/utils/SinkConfigUtilsTest.java | 47 - .../functions/utils/SourceConfigUtilsTest.java | 48 +- .../functions/worker/rest/api/SinksImpl.java | 3 +- .../functions/worker/rest/api/SourcesImpl.java | 3 +- .../resources/META-INF/services/pulsar-io.yaml | 1 + .../resources/META-INF/services/pulsar-io.yaml | 1 + .../resources/META-INF/services/pulsar-io.yaml | 1 + .../resources/META-INF/services/pulsar-io.yaml | 1 + .../resources/META-INF/services/pulsar-io.yaml | 1 + .../resources/META-INF/services/pulsar-io.yaml | 3 +- .../resources/META-INF/services/pulsar-io.yaml | 1 + .../resources/META-INF/services/pulsar-io.yaml | 1 + .../resources/META-INF/services/pulsar-io.yaml | 1 + .../resources/META-INF/services/pulsar-io.yaml | 1 + .../resources/META-INF/services/pulsar-io.yaml | 1 + .../resources/META-INF/services/pulsar-io.yaml | 2 + .../resources/META-INF/services/pulsar-io.yaml | 2 + .../resources/META-INF/services/pulsar-io.yaml | 2 + .../resources/META-INF/services/pulsar-io.yaml | 1 + .../resources/META-INF/services/pulsar-io.yaml | 4 +- .../resources/META-INF/services/pulsar-io.yaml | 1 + .../resources/META-INF/services/pulsar-io.yaml | 1 + .../resources/META-INF/services/pulsar-io.yaml | 1 + 29 files changed, 229 insertions(+), 21 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/io/ConnectorDefinition.java b/pulsar-common/src/main/java/org/apache/pulsar/common/io/ConnectorDefinition.java index d1bb334..99c3738 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/ConnectorDefinition.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/ConnectorDefinition.java @@ -51,4 +51,26 @@ public class ConnectorDefinition { * If not defined, it will be assumed this connector cannot act as a data sink. */ private String sinkClass; + +/** + * The class name for the source config implementation. + * Most of the sources are using a config class for managing their config + * and directly convert the supplied Map object at open to this object. + * These connector can declare their config class in this variable that will allow + * the framework to check for config parameter checking at submission time. + * + * If not defined, the framework will not be able to do any submission time checks. + */ +private String sourceConfigClass; + +/** + * The class name for the sink config implementation. + * Most of the sink are using a config class for managing their config + * and directly convert the supplied Map object at open to this object. + * These connector can declare their config class in this variable that will allow + * the framework to check for config parameter checking at submission time. + * + * If not defined, the framework will not be able to do any submission time checks. + */ +private String sinkConfigClass; } diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java index 4f63bd2..3170e6e 100644 --- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java +++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java @@ -257,14 +257,14 @@ public class LocalRunner { if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile
[pulsar] branch master updated (3633e24 -> adf920e)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 3633e24 Add Annotations for config validation checking (#6972) add adf920e Added ability to build consumers in functions and sources (#6954) No new revisions were added by this update. Summary of changes: .../main/java/org/apache/pulsar/functions/api/Context.java| 11 +++ .../org/apache/pulsar/functions/instance/ContextImpl.java | 7 +++ .../java/org/apache/pulsar/io/common/IOConfigUtilsTest.java | 6 ++ .../main/java/org/apache/pulsar/io/core/SourceContext.java| 11 +++ .../pulsar/io/kafka/source/KafkaAbstractSourceTest.java | 6 ++ 5 files changed, 41 insertions(+)
[pulsar] branch master updated (7396f26 -> 3633e24)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 7396f26 Fix bug that causes delivery of messages to stop on Shared subscription (#6966) add 3633e24 Add Annotations for config validation checking (#6972) No new revisions were added by this update. Summary of changes: .../pulsar/common/validator/ConfigValidation.java | 123 ++ .../validator/ConfigValidationAnnotations.java | 159 .../common/validator/ConfigValidationUtils.java| 180 + .../apache/pulsar/common/validator/Validator.java | 26 +- .../pulsar/common/validator/ValidatorImpls.java| 414 + .../pulsar/common/validator}/package-info.java | 4 +- .../common/validator/ConfigValidationTest.java | 148 .../common/validator/ValidatorImplsTest.java | 160 8 files changed, 1199 insertions(+), 15 deletions(-) create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/validator/ConfigValidation.java create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/validator/ConfigValidationAnnotations.java create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/validator/ConfigValidationUtils.java copy pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarPropertiesExtractor.java => pulsar-common/src/main/java/org/apache/pulsar/common/validator/Validator.java (60%) create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/validator/ValidatorImpls.java copy {pulsar-broker/src/main/java/org/apache/pulsar/broker/protocol => pulsar-common/src/main/java/org/apache/pulsar/common/validator}/package-info.java (88%) create mode 100644 pulsar-common/src/test/java/org/apache/pulsar/common/validator/ConfigValidationTest.java create mode 100644 pulsar-common/src/test/java/org/apache/pulsar/common/validator/ValidatorImplsTest.java
[pulsar] branch master updated: Remove unneeded dependency (#6956)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 45f2d80 Remove unneeded dependency (#6956) 45f2d80 is described below commit 45f2d80b40523dfc8ae189ec45af44d351408d87 Author: Sanjeev Kulkarni AuthorDate: Thu May 14 22:28:17 2020 -0700 Remove unneeded dependency (#6956) Co-authored-by: Sanjeev Kulkarni --- pulsar-functions/localrun/pom.xml | 6 -- .../src/main/java/org/apache/pulsar/functions/LocalRunner.java | 1 - 2 files changed, 7 deletions(-) diff --git a/pulsar-functions/localrun/pom.xml b/pulsar-functions/localrun/pom.xml index 44912eb..b7eeb14 100644 --- a/pulsar-functions/localrun/pom.xml +++ b/pulsar-functions/localrun/pom.xml @@ -55,12 +55,6 @@ -${project.groupId} -pulsar-client-original -${project.parent.version} - - - io.grpc grpc-all diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java index 57209de..4f63bd2 100644 --- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java +++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java @@ -61,7 +61,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.pulsar.common.functions.Utils.inferMissingArguments; -import static org.apache.pulsar.functions.utils.FunctionCommon.extractClassLoader; @Slf4j public class LocalRunner {
[pulsar] branch master updated: Added ability for sources to publish messages on their own just like their function counterparts (#6941)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 7af5ebe Added ability for sources to publish messages on their own just like their function counterparts (#6941) 7af5ebe is described below commit 7af5ebe167f30cb94ce7fd141746e9f736154c6e Author: Sanjeev Kulkarni AuthorDate: Wed May 13 03:49:46 2020 -0700 Added ability for sources to publish messages on their own just like their function counterparts (#6941) Co-authored-by: Sanjeev Kulkarni --- .../org/apache/pulsar/io/common/IOConfigUtilsTest.java | 8 .../main/java/org/apache/pulsar/io/core/SourceContext.java | 14 ++ .../pulsar/io/kafka/source/KafkaAbstractSourceTest.java| 8 3 files changed, 30 insertions(+) diff --git a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java index 51f4e6e..e85f7be 100644 --- a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java +++ b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java @@ -20,6 +20,9 @@ package org.apache.pulsar.io.common; import lombok.Data; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.core.annotations.FieldDoc; @@ -179,6 +182,11 @@ public class IOConfigUtilsTest { public CompletableFuture getStateAsync(String key) { return null; } + +@Override +public TypedMessageBuilder newOutputMessage(String topicName, Schema schema) throws PulsarClientException { +return null; +} } @Test diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java index e87a4bc..78fe211 100644 --- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java +++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java @@ -18,6 +18,9 @@ */ package org.apache.pulsar.io.core; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TypedMessageBuilder; import org.slf4j.Logger; import java.nio.ByteBuffer; @@ -160,4 +163,15 @@ public interface SourceContext { * @return the state value for the key. */ CompletableFuture getStateAsync(String key); + +/** + * New output message using schema for serializing to the topic + * + * @param topicName The name of the topic for output message + * @param schema provide a way to convert between serialized data and domain objects + * @param + * @return the message builder instance + * @throws PulsarClientException + */ + TypedMessageBuilder newOutputMessage(String topicName, Schema schema) throws PulsarClientException; } diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java index 3bfd358..c4dde22 100644 --- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java +++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java @@ -22,6 +22,9 @@ package org.apache.pulsar.io.kafka.source; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.kafka.KafkaAbstractSource; import org.apache.pulsar.io.kafka.KafkaSourceConfig; @@ -158,6 +161,11 @@ public class KafkaAbstractSourceTest { public CompletableFuture getStateAsync(String key) { return null; } + +@Override +public TypedMessageBuilder newOutputMessage(String topicName, Schema schema) throws PulsarClientException { +return null; +} }; Map config = new HashMap<>(); ThrowingRunnable openAndClose = ()->{
[pulsar] branch master updated: Make Nar Extraction Directory configurable (#6933)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 54f8d13 Make Nar Extraction Directory configurable (#6933) 54f8d13 is described below commit 54f8d13ba2688797caf2f553062e97dfb1b66bec Author: Sanjeev Kulkarni AuthorDate: Mon May 11 12:14:43 2020 -0700 Make Nar Extraction Directory configurable (#6933) * Make Nar Extraction Directory configurable * Fixed unittests Co-authored-by: Sanjeev Kulkarni --- .../bookkeeper/mledger/offload/OffloaderUtils.java | 15 --- .../org/apache/pulsar/broker/ServiceConfiguration.java | 7 +++ .../java/org/apache/pulsar/broker/PulsarService.java | 2 +- .../pulsar/broker/protocol/ProtocolHandlerUtils.java | 14 -- .../pulsar/broker/protocol/ProtocolHandlers.java | 4 ++-- .../broker/protocol/ProtocolHandlerUtilsTest.java | 15 +-- .../org/apache/pulsar/common/nar/NarClassLoader.java | 16 +++- .../apache/pulsar/common/util/SearchBcNarUtils.java| 2 +- .../functions/instance/JavaInstanceRunnable.java | 7 +-- .../functions/instance/JavaInstanceRunnableTest.java | 2 +- .../java/org/apache/pulsar/functions/LocalRunner.java | 15 +-- .../pulsar/functions/runtime/JavaInstanceStarter.java | 6 +- .../apache/pulsar/functions/runtime/RuntimeUtils.java | 15 --- .../runtime/kubernetes/KubernetesRuntime.java | 6 +- .../runtime/kubernetes/KubernetesRuntimeFactory.java | 3 +++ .../kubernetes/KubernetesRuntimeFactoryConfig.java | 7 +++ .../functions/runtime/process/ProcessRuntime.java | 5 - .../runtime/process/ProcessRuntimeFactory.java | 8 +++- .../pulsar/functions/runtime/thread/ThreadRuntime.java | 11 --- .../functions/runtime/thread/ThreadRuntimeFactory.java | 18 +++--- .../apache/pulsar/functions/worker/WorkerConfig.java | 6 ++ .../runtime/kubernetes/KubernetesRuntimeTest.java | 8 +--- .../functions/runtime/process/ProcessRuntimeTest.java | 8 +--- .../apache/pulsar/functions/utils/FunctionCommon.java | 7 --- .../apache/pulsar/functions/utils/SinkConfigUtils.java | 4 ++-- .../pulsar/functions/utils/SourceConfigUtils.java | 5 +++-- .../utils/functioncache/FunctionCacheEntry.java| 6 -- .../utils/functioncache/FunctionCacheManager.java | 4 +++- .../utils/functioncache/FunctionCacheManagerImpl.java | 5 +++-- .../pulsar/functions/utils/io/ConnectorUtils.java | 8 .../pulsar/functions/worker/ConnectorsManager.java | 4 ++-- .../pulsar/functions/worker/FunctionActioner.java | 8 .../pulsar/functions/worker/rest/api/SinksImpl.java| 3 ++- .../pulsar/functions/worker/rest/api/SourcesImpl.java | 3 ++- .../pulsar/functions/worker/SchedulerManagerTest.java | 18 +- .../functions/worker/rest/api/FunctionsImplTest.java | 4 ++-- .../worker/rest/api/v3/SinkApiV3ResourceTest.java | 6 +++--- .../worker/rest/api/v3/SourceApiV3ResourceTest.java| 6 +++--- .../apache/pulsar/sql/presto/PulsarConnectorCache.java | 13 - .../pulsar/sql/presto/PulsarConnectorConfig.java | 15 +++ .../apache/pulsar/sql/presto/PulsarRecordCursor.java | 3 ++- 41 files changed, 215 insertions(+), 107 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java index 845b53f..5243691 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java @@ -48,11 +48,12 @@ public class OffloaderUtils { * @return the offloader class name * @throws IOException when fail to retrieve the pulsar offloader class */ -static Pair getOffloaderFactory(String narPath) throws IOException { +static Pair getOffloaderFactory(String narPath, String narExtractionDirectory) throws IOException { // need to load offloader NAR to the classloader that also loaded LedgerOffloaderFactory in case // LedgerOffloaderFactory is loaded by a classloader that is not the default classloader // as is the case for the pulsar presto plugin -NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), LedgerOffloaderFactory.class.getClassLoader()); +NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), +LedgerOffloaderFactory.class.getClassLoader(), narExtractionDirectory); String configStr = ncl.getServiceDefinition
[pulsar] branch master updated: Ensure that all dangling consumers are cleaned up during failures (#6778)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 98b818b Ensure that all dangling consumers are cleaned up during failures (#6778) 98b818b is described below commit 98b818b5fa63ee2e4a67887cf96330ae652dafa2 Author: Sanjeev Kulkarni AuthorDate: Tue Apr 21 17:21:40 2020 -0700 Ensure that all dangling consumers are cleaned up during failures (#6778) Co-authored-by: Sanjeev Kulkarni --- .../pulsar/functions/source/PulsarSource.java | 25 +++-- .../pulsar/functions/source/PulsarSourceTest.java | 113 - 2 files changed, 103 insertions(+), 35 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java index 0d23ce1..fa7146d 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java @@ -47,7 +47,7 @@ public class PulsarSource extends PushSource implements MessageListener private final Map properties; private final ClassLoader functionClassLoader; private List inputTopics; -private List> inputConsumers = Collections.emptyList(); +private List> inputConsumers = new LinkedList<>(); private final TopicSchema topicSchema; public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarConfig, Map properties, @@ -65,7 +65,7 @@ public class PulsarSource extends PushSource implements MessageListener log.info("Opening pulsar source with config: {}", pulsarSourceConfig); Map> configs = setupConsumerConfigs(); -inputConsumers = configs.entrySet().stream().map(e -> { +for (Map.Entry> e : configs.entrySet()) { String topic = e.getKey(); ConsumerConfig conf = e.getValue(); log.info("Creating consumers for topic : {}, schema : {}, schemaInfo: {}", @@ -80,17 +80,17 @@ public class PulsarSource extends PushSource implements MessageListener .messageListener(this); if (conf.isRegexPattern) { -cb.topicsPattern(topic); +cb = cb.topicsPattern(topic); } else { -cb.topic(topic); +cb = cb.topics(Collections.singletonList(topic)); } if (conf.getReceiverQueueSize() != null) { -cb.receiverQueueSize(conf.getReceiverQueueSize()); +cb = cb.receiverQueueSize(conf.getReceiverQueueSize()); } -cb.properties(properties); +cb = cb.properties(properties); if (pulsarSourceConfig.getTimeoutMs() != null) { -cb.ackTimeout(pulsarSourceConfig.getTimeoutMs(), TimeUnit.MILLISECONDS); +cb = cb.ackTimeout(pulsarSourceConfig.getTimeoutMs(), TimeUnit.MILLISECONDS); } if (pulsarSourceConfig.getMaxMessageRetries() != null && pulsarSourceConfig.getMaxMessageRetries() >= 0) { @@ -99,11 +99,12 @@ public class PulsarSource extends PushSource implements MessageListener if (pulsarSourceConfig.getDeadLetterTopic() != null && !pulsarSourceConfig.getDeadLetterTopic().isEmpty()) { deadLetterPolicyBuilder.deadLetterTopic(pulsarSourceConfig.getDeadLetterTopic()); } -cb.deadLetterPolicy(deadLetterPolicyBuilder.build()); +cb = cb.deadLetterPolicy(deadLetterPolicyBuilder.build()); } -return cb.subscribeAsync(); - }).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList()); +Consumer consumer = cb.subscribeAsync().join(); +inputConsumers.add(consumer); +} inputTopics = inputConsumers.stream().flatMap(c -> { return (c instanceof MultiTopicsConsumerImpl) ? ((MultiTopicsConsumerImpl) c).getTopics().stream() @@ -176,6 +177,10 @@ public class PulsarSource extends PushSource implements MessageListener return inputTopics; } +public List> getInputConsumers() { +return inputConsumers; +} + @Data @Builder private static class ConsumerConfig { diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java index d6e03d3..c2e556c 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java +++ b/pulsar-f
[pulsar] branch master updated: Make the FunctionRunTimeManager methods used by rest-api syncrhonized since the assignment tailer also is updating it (#6786)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new fb930b7 Make the FunctionRunTimeManager methods used by rest-api syncrhonized since the assignment tailer also is updating it (#6786) fb930b7 is described below commit fb930b79a5f396ef5753fb0c6208be9dea6c1709 Author: Sanjeev Kulkarni AuthorDate: Tue Apr 21 12:04:21 2020 -0700 Make the FunctionRunTimeManager methods used by rest-api syncrhonized since the assignment tailer also is updating it (#6786) This fixes the flakiness in PulsarFunctionE2ESecurityTest Co-authored-by: Sanjeev Kulkarni --- .../org/apache/pulsar/functions/worker/FunctionRuntimeManager.java| 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index 9b681c8..c66be68 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -323,7 +323,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ } } -public void restartFunctionInstance(String tenant, String namespace, String functionName, int instanceId, +public synchronized void restartFunctionInstance(String tenant, String namespace, String functionName, int instanceId, URI uri) throws Exception { if (runtimeFactory.externallyManaged()) { throw new WebApplicationException(Response.serverError().status(Status.NOT_IMPLEMENTED) @@ -368,7 +368,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ } } -public void restartFunctionInstances(String tenant, String namespace, String functionName) +public synchronized void restartFunctionInstances(String tenant, String namespace, String functionName) throws Exception { final String fullFunctionName = String.format("%s/%s/%s", tenant, namespace, functionName); Collection assignments = this.findFunctionAssignments(tenant, namespace, functionName);
[pulsar] branch master updated: Do not retry on authorization failure (#6577)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 6cb0d25 Do not retry on authorization failure (#6577) 6cb0d25 is described below commit 6cb0d25aea861d6e4d927c483cf838330887be7a Author: Sanjeev Kulkarni AuthorDate: Sat Mar 21 17:23:25 2020 -0700 Do not retry on authorization failure (#6577) * Do not retry on authorization failure * Address feedback * Fix logic * Fix test * Fixed more tests * Fixed more test Co-authored-by: Sanjeev Kulkarni --- .../apache/pulsar/client/api/ClientErrorsTest.java| 10 +- .../websocket/proxy/ProxyPublishConsumeTest.java | 4 ++-- .../pulsar/client/api/PulsarClientException.java | 19 +++ .../apache/pulsar/client/impl/ConnectionHandler.java | 4 .../org/apache/pulsar/client/impl/ConsumerImpl.java | 2 +- .../org/apache/pulsar/client/impl/ProducerImpl.java | 2 +- .../apache/pulsar/client/impl/PulsarClientImpl.java | 2 +- 7 files changed, 29 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java index cfb7c02..7751342 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java @@ -208,7 +208,7 @@ public class ClientErrorsTest { mockBrokerService.setHandleProducer((ctx, producer) -> { if (counter.incrementAndGet() == 2) { // fail second producer -ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.AuthenticationError, "msg")); +ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.AuthorizationError, "msg")); return; } ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer", SchemaVersion.Empty)); @@ -436,7 +436,7 @@ public class ClientErrorsTest { mockBrokerService.setHandleSubscribe((ctx, subscribe) -> { if (counter.incrementAndGet() == 2) { // fail second producer -ctx.writeAndFlush(Commands.newError(subscribe.getRequestId(), ServerError.AuthenticationError, "msg")); +ctx.writeAndFlush(Commands.newError(subscribe.getRequestId(), ServerError.AuthorizationError, "msg")); return; } ctx.writeAndFlush(Commands.newSuccess(subscribe.getRequestId())); @@ -507,7 +507,7 @@ public class ClientErrorsTest { mockBrokerService.setHandleSubscribe((ctx, subscribe) -> { System.err.println("subscribeCounter: " + subscribeCounter.get()); if (subscribeCounter.incrementAndGet() == 3) { -ctx.writeAndFlush(Commands.newError(subscribe.getRequestId(), ServerError.AuthenticationError, "msg")); +ctx.writeAndFlush(Commands.newError(subscribe.getRequestId(), ServerError.AuthorizationError, "msg")); return; } ctx.writeAndFlush(Commands.newSuccess(subscribe.getRequestId())); @@ -520,8 +520,8 @@ public class ClientErrorsTest { try { client.newConsumer().topic("persistent://prop/use/ns/multi-part-t1").subscriptionName("sub1").subscribe(); -fail("Should have failed with an authentication error"); -} catch (PulsarClientException.AuthenticationException e) { +fail("Should have failed with an authorization error"); +} catch (PulsarClientException.AuthorizationException e) { } // should call close for 3 partitions diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java index 009596f..d33ed58 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java @@ -208,7 +208,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase { } } -@Test(timeOut = 1) +@Test(timeOut = 100) public void conflictingConsumerTest() throws Exception { final String consumerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() + "/ws/v2/consumer/persistent/my-property/my-ns/my-topic3/sub1?subscriptionType=Exclusive"; @@ -244,7 +244,
[pulsar] branch master updated: Add more logging during refresh credentials path (#6551)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new f24b41c Add more logging during refresh credentials path (#6551) f24b41c is described below commit f24b41c891f1bc09fcbc437994a9ffaf93ce90ad Author: Sanjeev Kulkarni AuthorDate: Wed Mar 18 12:14:35 2020 -0700 Add more logging during refresh credentials path (#6551) Co-authored-by: Sanjeev Kulkarni --- .../src/main/java/org/apache/pulsar/broker/service/ServerCnx.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 038cbab..d927365 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -522,7 +522,7 @@ public class ServerCnx extends PulsarHandler { log.warn("[{}] Principal cannot be changed during an authentication refresh", remoteAddress); ctx.close(); } else { -log.info("[{}] Refreshed authentication credentials", remoteAddress); +log.info("[{}] Refreshed authentication credentials for role {}", remoteAddress, authRole); } } } @@ -559,7 +559,7 @@ public class ServerCnx extends PulsarHandler { } ctx.executor().execute(SafeRun.safeRun(() -> { -log.info("[{}] Refreshing authentication credentials", remoteAddress); +log.info("[{}] Refreshing authentication credentials for originalPrincipal {} and authRole {}", remoteAddress, originalPrincipal, this.authRole); if (!supportsAuthenticationRefresh()) { log.warn("[{}] Closing connection because client doesn't support auth credentials refresh", remoteAddress);
[pulsar] branch master updated: Instead of always using admin access for topic, use read/write/admin access for topic (#6504)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 36ea153 Instead of always using admin access for topic, use read/write/admin access for topic (#6504) 36ea153 is described below commit 36ea153c0ff4fc3e3f04de4a37b658daa9f116fa Author: Sanjeev Kulkarni AuthorDate: Sat Mar 7 18:10:03 2020 -0800 Instead of always using admin access for topic, use read/write/admin access for topic (#6504) Co-authored-by: Sanjeev Kulkarni --- .../broker/admin/impl/PersistentTopicsBase.java| 64 +- 1 file changed, 51 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index d74017c..685d195 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -234,6 +234,44 @@ public class PersistentTopicsBase extends AdminResource { validateTopicOwnership(topicName, authoritative); } +public void validateReadOperationOnTopic(boolean authoritative) { +validateTopicOwnership(topicName, authoritative); +try { +validateAdminAccessForTenant(topicName.getTenant()); +} catch (Exception e) { +if (log.isDebugEnabled()) { +log.debug("[{}] failed to validate admin access for {}", topicName, clientAppId()); +} +validateAdminAccessForSubscriber(""); +} +} + +public void validateWriteOperationOnTopic(boolean authoritative) { +validateTopicOwnership(topicName, authoritative); +try { +validateAdminAccessForTenant(topicName.getTenant()); +} catch (Exception e) { +if (log.isDebugEnabled()) { +log.debug("[{}] failed to validate admin access for {}", topicName, clientAppId()); +} +try { +if (!pulsar().getBrokerService().getAuthorizationService().canProduce(topicName, clientAppId(), + clientAuthData())) { +log.warn("[{}} Subscriber {} is not authorized to access api", topicName, clientAppId()); +throw new RestException(Status.UNAUTHORIZED, + String.format("Subscriber %s is not authorized to access this operation", clientAppId())); +} +} catch (RestException re) { +throw re; +} catch (Exception ex) { +// unknown error marked as internal server error +log.warn("Unexpected error while authorizing request. topic={}, role={}. Error: {}", topicName, + clientAppId(), e.getMessage(), ex); +throw new RestException(ex); +} +} +} + protected void validateAdminAccessForSubscriber(String subscriptionName, boolean authoritative) { validateTopicOwnership(topicName, authoritative); try { @@ -317,7 +355,7 @@ public class PersistentTopicsBase extends AdminResource { } protected void internalDeleteTopicForcefully(boolean authoritative) { -validateAdminOperationOnTopic(authoritative); +validateWriteOperationOnTopic(authoritative); Topic topic = getTopicReference(topicName); try { topic.deleteForcefully().get(); @@ -391,7 +429,7 @@ public class PersistentTopicsBase extends AdminResource { } protected void internalCreateNonPartitionedTopic(boolean authoritative) { -validateAdminAccessForTenant(topicName.getTenant()); +validateWriteOperationOnTopic(authoritative); validateNonPartitionTopicName(topicName.getLocalName()); if (topicName.isGlobal()) { validateGlobalNamespaceOwnership(namespaceName); @@ -426,7 +464,7 @@ public class PersistentTopicsBase extends AdminResource { * @param numPartitions */ protected void internalUpdatePartitionedTopic(int numPartitions, boolean updateLocalTopicOnly) { -validateAdminAccessForTenant(topicName.getTenant()); +validateWriteOperationOnTopic(false); // Only do the validation if it's the first hop. if (!updateLocalTopicOnly) { validatePartitionTopicUpdate(topicName.getLocalName(), numPartitions); @@ -540,7 +578,7 @@ public class PersistentTopicsBase extends AdminResource { protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative, boolean force) { try { -validateAdminAccessForTenant(topicName.getTenant(
[pulsar] branch master updated: Enhance Authorization by adding TenantAdmin interface (#6487)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new f33567e Enhance Authorization by adding TenantAdmin interface (#6487) f33567e is described below commit f33567ec6b805a7ae704520ab93ff87b0abb02b6 Author: Sanjeev Kulkarni AuthorDate: Thu Mar 5 21:19:26 2020 -0800 Enhance Authorization by adding TenantAdmin interface (#6487) * Enhance Authorization by adding TenantAdmin interface * Remove debugging comment Co-authored-by: Sanjeev Kulkarni --- .../authorization/AuthorizationProvider.java | 13 +++ .../broker/authorization/AuthorizationService.java | 9 .../broker/admin/impl/PersistentTopicsBase.java| 2 +- .../pulsar/broker/web/PulsarWebResource.java | 26 ++ .../apache/pulsar/broker/admin/NamespacesTest.java | 2 ++ .../apache/pulsar/io/PulsarFunctionTlsTest.java| 5 + .../discovery/service/BrokerDiscoveryProvider.java | 2 +- .../functions/worker/rest/api/ComponentImpl.java | 4 ++-- .../worker/rest/api/FunctionsImplTest.java | 19 +++- .../proxy/server/BrokerDiscoveryProvider.java | 2 +- 10 files changed, 59 insertions(+), 25 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java index 9787eae..cb65416 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java @@ -29,6 +29,7 @@ import org.apache.pulsar.broker.cache.ConfigurationCacheService; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.TenantInfo; /** * Provider of authorization mechanism @@ -47,6 +48,18 @@ public interface AuthorizationProvider extends Closeable { } /** + * Check if specified role is an admin of the tenant + * @param tenant the tenant to check + * @param role the role to check + * @return a CompletableFuture containing a boolean in which true means the role is an admin user + * and false if it is not + */ +default CompletableFuture isTenantAdmin(String tenant, String role, TenantInfo tenantInfo, + AuthenticationDataSource authenticationData) { +return CompletableFuture.completedFuture(role != null && tenantInfo.getAdminRoles() != null && tenantInfo.getAdminRoles().contains(role) ? true : false); +} + +/** * Perform initialization for the authorization provider * * @param conf diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java index 95ff764..381e8cf 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java @@ -26,6 +26,7 @@ import org.apache.pulsar.broker.cache.ConfigurationCacheService; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,6 +77,14 @@ public class AuthorizationService { return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured")); } +public CompletableFuture isTenantAdmin(String tenant, String role, TenantInfo tenantInfo, +AuthenticationDataSource authenticationData) { +if (provider != null) { +return provider.isTenantAdmin(tenant, role, tenantInfo, authenticationData); +} +return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured")); +} + /** * * Grant authorization-action permission on a namespace to the given client diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 503b21b..e37f09d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/i
[pulsar] branch master updated: fix dependency issue with pulsar-io-common (#4398)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 347d4ab fix dependency issue with pulsar-io-common (#4398) 347d4ab is described below commit 347d4ab7ad6a74585a6a41cf98cda10904b41c00 Author: Boyang Jerry Peng AuthorDate: Wed May 29 15:52:25 2019 -0700 fix dependency issue with pulsar-io-common (#4398) --- pulsar-io/common/pom.xml | 5 ++--- .../src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/pulsar-io/common/pom.xml b/pulsar-io/common/pom.xml index 1d463b3..96afc51 100644 --- a/pulsar-io/common/pom.xml +++ b/pulsar-io/common/pom.xml @@ -40,9 +40,8 @@ ${project.version} -org.apache.pulsar -pulsar-common -${project.version} +com.fasterxml.jackson.core +jackson-databind diff --git a/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java b/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java index f5b8870..95fd977 100644 --- a/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java +++ b/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java @@ -18,8 +18,8 @@ */ package org.apache.pulsar.io.common; +import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.core.annotations.FieldDoc; @@ -65,6 +65,6 @@ public class IOConfigUtils { } } -return ObjectMapperFactory.getThreadLocal().convertValue(configs, clazz); +return new ObjectMapper().convertValue(configs, clazz); } }
[pulsar] branch master updated: Expose state to sources and sinks (#4364)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new f8349e2 Expose state to sources and sinks (#4364) f8349e2 is described below commit f8349e2235a340a1ee9a416a07a4d2648dfe4f52 Author: Sanjeev Kulkarni AuthorDate: Fri May 24 17:37:43 2019 -0700 Expose state to sources and sinks (#4364) * Expose state to sources and sinks * Fix unittest * Fix unittest --- .../apache/pulsar/io/common/IOConfigUtilsTest.java | 79 ++ .../org/apache/pulsar/io/core/SinkContext.java | 69 +++ .../org/apache/pulsar/io/core/SourceContext.java | 70 +++ .../io/kafka/sink/KafkaAbstractSinkTest.java | 42 .../io/kafka/source/KafkaAbstractSourceTest.java | 42 5 files changed, 302 insertions(+) diff --git a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java index af62c7f..2296dae 100644 --- a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java +++ b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java @@ -27,9 +27,11 @@ import org.slf4j.Logger; import org.testng.Assert; import org.testng.annotations.Test; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; @Slf4j public class IOConfigUtilsTest { @@ -115,6 +117,44 @@ public class IOConfigUtilsTest { public String getSecret(String secretName) { return secretsMap.get(secretName); } + +@Override +public void incrCounter(String key, long amount) { } + +@Override +public CompletableFuture incrCounterAsync(String key, long amount) { +return null; +} + +@Override +public long getCounter(String key) { +return 0; +} + +@Override +public CompletableFuture getCounterAsync(String key) { +return null; +} + +@Override +public void putState(String key, ByteBuffer value) { + +} + +@Override +public CompletableFuture putStateAsync(String key, ByteBuffer value) { +return null; +} + +@Override +public ByteBuffer getState(String key) { +return null; +} + +@Override +public CompletableFuture getStateAsync(String key) { +return null; +} } @Test @@ -189,6 +229,45 @@ public class IOConfigUtilsTest { public String getSecret(String secretName) { return secretsMap.get(secretName); } + +@Override +public void incrCounter(String key, long amount) { +} + +@Override +public CompletableFuture incrCounterAsync(String key, long amount) { +return null; +} + +@Override +public long getCounter(String key) { +return 0; +} + +@Override +public CompletableFuture getCounterAsync(String key) { +return null; +} + +@Override +public void putState(String key, ByteBuffer value) { + +} + +@Override +public CompletableFuture putStateAsync(String key, ByteBuffer value) { +return null; +} + +@Override +public ByteBuffer getState(String key) { +return null; +} + +@Override +public CompletableFuture getStateAsync(String key) { +return null; +} } @Test diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java index d30ff7b..1a8a859 100644 --- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java +++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java @@ -20,7 +20,9 @@ package org.apache.pulsar.io.core; import org.slf4j.Logger; +import java.nio.ByteBuffer; import java.util.Collection; +import java.util.concurrent.CompletableFuture; public interface SinkContext { @@ -81,4 +83,71 @@ public interface SinkContext { * @return The secret if anything was found or null */ String getSecret(String secretName); + +/** + * Increment the builtin distributed counter referred by key. + * + * @param keyThe name of the key + * @param amount The amount to be incremented + */ +void incrCounter(String key, long amount); + + +/** + * Increment the builtin distributed counter referred by key + * but dont wait for the completion
[pulsar] branch master updated: Fixed the display of schema information (#4318)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 5d688ee Fixed the display of schema information (#4318) 5d688ee is described below commit 5d688eec2596a784254e0580e865fbf287963915 Author: Sanjeev Kulkarni AuthorDate: Fri May 24 16:06:03 2019 -0700 Fixed the display of schema information (#4318) * Fixed the display of schema information * Fixed test * Took feedback into account * Use SchemaInfo.getSchemaDefinition to get information about schema * Reverted the rest api changes * Fixed integration test --- .../pulsar/admin/cli/PulsarAdminToolTest.java | 1 - .../org/apache/pulsar/admin/cli/CmdSchemas.java| 28 -- .../pulsar/tests/integration/cli/CLITest.java | 2 +- 3 files changed, 27 insertions(+), 4 deletions(-) diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 186a038..025a818 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -44,7 +44,6 @@ import org.apache.pulsar.client.admin.Lookup; import org.apache.pulsar.client.admin.Namespaces; import org.apache.pulsar.client.admin.NonPersistentTopics; import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.admin.ResourceQuotas; import org.apache.pulsar.client.admin.Tenants; import org.apache.pulsar.client.admin.Topics; diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java index 554f290..0ac8e41 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java @@ -22,12 +22,20 @@ import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameters; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.File; +import java.lang.reflect.Type; import java.net.URL; import java.net.URLClassLoader; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; +import com.google.gson.JsonPrimitive; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; import org.apache.pulsar.admin.cli.utils.SchemaExtractor; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.common.schema.PostSchemaPayload; +import org.apache.pulsar.common.schema.SchemaInfo; @Parameters(commandDescription = "Operations about schemas") public class CmdSchemas extends CmdBase { @@ -52,11 +60,27 @@ public class CmdSchemas extends CmdBase { @Override void run() throws Exception { String topic = validateTopicName(params); +SchemaInfo schemaInfo; if (version == null) { -print(admin.schemas().getSchemaInfo(topic)); +schemaInfo = admin.schemas().getSchemaInfo(topic); } else { -print(admin.schemas().getSchemaInfo(topic, version)); +schemaInfo = admin.schemas().getSchemaInfo(topic, version); } +Gson customGson = new GsonBuilder().registerTypeHierarchyAdapter(byte[].class, +new ByteArrayToStringAdapter(schemaInfo)).create(); +System.out.println(customGson.toJson(schemaInfo)); +} +} + +// Using Android's base64 libraries. This can be replaced with any base64 library. +private class ByteArrayToStringAdapter implements JsonSerializer { +private SchemaInfo schemaInfo; +public ByteArrayToStringAdapter(SchemaInfo schemaInfo) { +this.schemaInfo = schemaInfo; +} + +public JsonElement serialize(byte[] src, Type typeOfSrc, JsonSerializationContext context) { +return new JsonPrimitive(schemaInfo.getSchemaDefinition()); } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java index 3995ebf..2a400ae 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java @@ -159,7 +159,7 @@ public class CLITest extends PulsarTestSuite { "schemas", "get", topicName); -
[pulsar] branch master updated: [schema] AutoConsume should use the schema associated with messages as both writer and reader schema (#4325)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new bf06ef3 [schema] AutoConsume should use the schema associated with messages as both writer and reader schema (#4325) bf06ef3 is described below commit bf06ef3ebeccfe758a3db1724bac62dd99a060de Author: Sijie Guo AuthorDate: Thu May 23 00:23:37 2019 +0800 [schema] AutoConsume should use the schema associated with messages as both writer and reader schema (#4325) * [schema] AutoConsume should use the schema associated with messages for both writer and reader schema *Motivation* AutoConsume should use the schema associated with the messages for decoding the schemas. *Modifications* - provide a flag enable or disable using the provided schema as the reader schema - for AUTO_CONSUME schema, disable usnig the provided schema as the reader schema. so it can use the right schema version for decoding messages into right generic records - provide a few util methods for displaying schema data * Handle 64 bytes schema version * Addressed review comments --- .../apache/pulsar/common/schema/SchemaInfo.java| 18 +++ .../pulsar/client/impl/PulsarClientImpl.java | 11 +- .../pulsar/client/impl/schema/AvroSchema.java | 9 +- .../pulsar/client/impl/schema/JSONSchema.java | 1 - .../pulsar/client/impl/schema/ProtobufSchema.java | 4 +- .../pulsar/client/impl/schema/SchemaUtils.java | 22 +++- .../pulsar/client/impl/schema/StructSchema.java| 6 + .../impl/schema/generic/GenericAvroSchema.java | 23 +++- .../impl/schema/generic/GenericJsonSchema.java | 29 - .../impl/schema/generic/GenericSchemaImpl.java | 17 ++- .../integration/cli/SchemaUpdateStrategyTest.java | 140 ++--- 11 files changed, 245 insertions(+), 35 deletions(-) diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java index 87d0e15..c5d1b72 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java @@ -18,6 +18,9 @@ */ package org.apache.pulsar.common.schema; +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.util.Base64; import java.util.Collections; import java.util.Map; @@ -52,4 +55,19 @@ public class SchemaInfo { * Additional properties of the schema definition (implementation defined) */ private Map properties = Collections.emptyMap(); + +public String getSchemaDefinition() { +if (null == schema) { +return ""; +} + +switch (type) { +case AVRO: +case JSON: +case PROTOBUF: +return new String(schema, UTF_8); +default: +return Base64.getEncoder().encodeToString(schema); +} +} } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 73a4a3c..57d1956 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -730,13 +730,16 @@ public class PulsarClientImpl implements PulsarClient { if (schema instanceof AutoConsumeSchema) { SchemaInfo schemaInfo = schemaInfoProvider.getLatestSchema(); -if (schemaInfo.getType() != SchemaType.AVRO){ +if (schemaInfo.getType() != SchemaType.AVRO && schemaInfo.getType() != SchemaType.JSON){ throw new RuntimeException("Currently schema detection only works for topics with avro schemas"); - } -GenericSchema genericSchema = GenericSchemaImpl.of(schemaInfoProvider.getLatestSchema()); + +// when using `AutoConsumeSchema`, we use the schema associated with the messages as schema reader +// to decode the messages. +GenericSchema genericSchema = GenericSchemaImpl.of( +schemaInfoProvider.getLatestSchema(), false /*useProvidedSchemaAsReaderSchema*/); log.info("Auto detected schema for topic {} : {}", -topicName, new String(schemaInfo.getSchema(), UTF_8)); +topicName, schemaInfo.getSchemaDefinition()); ((AutoConsumeSchema) schema).setSchema(genericSchema); } schema.setSchemaInfoProvider(schemaInfoProvider); diff --git a/pulsar-client/
[pulsar] branch master updated: In Java we replace all "-" with "_" for state table namespace. (#4301)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 831476b In Java we replace all "-" with "_" for state table namespace. (#4301) 831476b is described below commit 831476b9bcf62af3d56c5245e83611b2842bfaca Author: Sanjeev Kulkarni AuthorDate: Fri May 17 18:04:04 2019 -0700 In Java we replace all "-" with "_" for state table namespace. (#4301) We need to do the same for python functions --- pulsar-functions/instance/src/main/python/python_instance.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index 86f8644..d6ff4fb 100644 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -328,6 +328,7 @@ class PythonInstance(object): def setup_state(self): table_ns = "%s_%s" % (str(self.instance_config.function_details.tenant), str(self.instance_config.function_details.namespace)) +table_ns = table_ns.replace("-", "_") table_name = str(self.instance_config.function_details.name) return state_context.create_state_context(self.state_storage_serviceurl, table_ns, table_name)
[pulsar] branch master updated: Misc fixes for state querying via pulsar admin (#4293)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 3ea6edc Misc fixes for state querying via pulsar admin (#4293) 3ea6edc is described below commit 3ea6edc628461512c809026261a916de8b7cd0f9 Author: Sanjeev Kulkarni AuthorDate: Fri May 17 10:02:47 2019 -0700 Misc fixes for state querying via pulsar admin (#4293) --- .../java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java | 2 +- .../org/apache/pulsar/client/admin/internal/FunctionsImpl.java| 3 +-- .../src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java | 2 +- .../java/org/apache/pulsar/common/functions/FunctionState.java| 8 ++-- .../apache/pulsar/functions/worker/rest/api/ComponentImpl.java| 4 5 files changed, 13 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index 86a0877..21ecd2e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -260,7 +260,7 @@ public class FunctionsBase extends AdminResource implements Supplier() {}.getType()); +return response.readEntity(FunctionState.class); } catch (Exception e) { throw getApiException(e); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 406c6c4..eab62d8 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -815,7 +815,7 @@ public class CmdFunctions extends CmdBase { FunctionState functionState = admin.functions() .getFunctionState(tenant, namespace, functionName, key); Gson gson = new GsonBuilder().setPrettyPrinting().create(); -gson.toJson(functionState); +System.out.println(gson.toJson(functionState)); } catch (PulsarAdminException pae) { if (pae.getStatusCode() == 404 && watch) { System.err.println(pae.getMessage()); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionState.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionState.java index 5062247..15bcda0 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionState.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionState.java @@ -23,9 +23,13 @@ import com.fasterxml.jackson.annotation.JsonInclude; import lombok.*; @Getter -@AllArgsConstructor +@Setter +@Data +@EqualsAndHashCode @ToString -@JsonInclude(JsonInclude.Include.USE_DEFAULTS) +@Builder(toBuilder=true) +@NoArgsConstructor +@AllArgsConstructor @JsonIgnoreProperties(ignoreUnknown = true) public class FunctionState { private String key; diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java index d9804e5..2dba1cd 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java @@ -1478,6 +1478,10 @@ public abstract class ComponentImpl { } } catch (RestException e) { throw e; +} catch (org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException e) { +log.error("Error while getFunctionState request @ /{}/{}/{}/{}", +tenant, namespace, functionName, key, e); +throw new RestException(Status.NOT_FOUND, e.getMessage()); } catch (Exception e) { log.error("Error while getFunctionState request @ /{}/{}/{}/{}", tenant, namespace, functionName, key, e);
[pulsar] branch master updated: add getting secrets utils method for sinks (#4291)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 39004c8 add getting secrets utils method for sinks (#4291) 39004c8 is described below commit 39004c83d5583237f34f75cd078c557421f0560a Author: Boyang Jerry Peng AuthorDate: Thu May 16 20:39:22 2019 -0700 add getting secrets utils method for sinks (#4291) --- .../org/apache/pulsar/io/common/IOConfigUtils.java | 12 +++- .../apache/pulsar/io/common/IOConfigUtilsTest.java | 84 -- 2 files changed, 91 insertions(+), 5 deletions(-) diff --git a/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java b/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java index f89e0e5..f5b8870 100644 --- a/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java +++ b/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java @@ -20,6 +20,7 @@ package org.apache.pulsar.io.common; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.core.annotations.FieldDoc; @@ -27,10 +28,19 @@ import java.lang.annotation.Annotation; import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; +import java.util.function.Function; @Slf4j public class IOConfigUtils { public static T loadWithSecrets(Map map, Class clazz, SourceContext sourceContext) { +return loadWithSecrets(map, clazz, secretName -> sourceContext.getSecret(secretName)); +} + +public static T loadWithSecrets(Map map, Class clazz, SinkContext sinkContext) { +return loadWithSecrets(map, clazz, secretName -> sinkContext.getSecret(secretName)); +} + +private static T loadWithSecrets(Map map, Class clazz, Function secretsGetter) { Map configs = new HashMap<>(map); for (Field field : clazz.getDeclaredFields()) { @@ -41,7 +51,7 @@ public class IOConfigUtils { if (((FieldDoc) annotation).sensitive()) { String secret = null; try { -secret = sourceContext.getSecret(field.getName()); +secret = secretsGetter.apply(field.getName()); } catch (Exception e) { log.warn("Failed to read secret {}", field.getName(), e); break; diff --git a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java index 7584d8b..af62c7f 100644 --- a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java +++ b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java @@ -20,12 +20,14 @@ package org.apache.pulsar.io.common; import lombok.Data; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.core.annotations.FieldDoc; import org.slf4j.Logger; import org.testng.Assert; import org.testng.annotations.Test; +import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -66,7 +68,7 @@ public class IOConfigUtilsTest { static Map secretsMap = new HashMap<>(); static { -secretsMap.put("password", "my-password"); +secretsMap.put("password", "my-source-password"); } @Override @@ -116,14 +118,14 @@ public class IOConfigUtilsTest { } @Test -public void loadWithSecrets() { +public void testSourceLoadWithSecrets() { Map configMap = new HashMap<>(); configMap.put("notSensitive", "foo"); TestConfig testConfig = IOConfigUtils.loadWithSecrets(configMap, TestConfig.class, new TestSourceContext()); Assert.assertEquals(testConfig.notSensitive, "foo"); -Assert.assertEquals(testConfig.password, "my-password"); +Assert.assertEquals(testConfig.password, "my-source-password"); configMap = new HashMap<>(); configMap.put("notSensitive", "foo"); @@ -133,7 +135,81 @@ public class IOConfigUtilsTest { testConfig = IOConfigUtils.loadWithSecrets(configMap, TestConfig.class, new TestSourceContext()); Assert.assertEquals(testConfig.notSensitive, "foo"); -Assert.assertEquals(testConfig.password, "my-password"); +Assert.assertEquals(testConfig.password, "my-so
[pulsar] branch master updated: Fix website build with required Python dependency (#4287)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new f51385e Fix website build with required Python dependency (#4287) f51385e is described below commit f51385e1ca2f91a16b2df8934c47ee27df2b17ae Author: Matteo Merli AuthorDate: Thu May 16 20:37:58 2019 -0700 Fix website build with required Python dependency (#4287) --- site2/tools/python-doc-gen.sh | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/site2/tools/python-doc-gen.sh b/site2/tools/python-doc-gen.sh index 2d2bc30..640cab8 100755 --- a/site2/tools/python-doc-gen.sh +++ b/site2/tools/python-doc-gen.sh @@ -29,11 +29,12 @@ find $ROOT_DIR -name CMakeFiles | xargs rm -rf find $ROOT_DIR -name PulsarApi.pb.h | xargs rm -rf find $ROOT_DIR -name PulsarApi.pb.cc | xargs rm -rf cd $ROOT_DIR/pulsar-client-cpp -cmake . +cmake . make -j8 _pulsar pip install enum34 pip install six pip install fastavro +pip install certifi DESTINATION=$ROOT_DIR/generated-site/api/python rm -fr $DESTINATION/{index.html,functions,pulsar}
[pulsar] branch master updated: Upgrade to BookKeeper 4.9.2 (#4288)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 7995b53 Upgrade to BookKeeper 4.9.2 (#4288) 7995b53 is described below commit 7995b538cdd61e7bf42c24963d5f839bd8f8c097 Author: Matteo Merli AuthorDate: Thu May 16 20:16:22 2019 -0700 Upgrade to BookKeeper 4.9.2 (#4288) --- distribution/server/src/assemble/LICENSE.bin.txt | 56 pom.xml | 2 +- pulsar-client-cpp/python/setup.py| 2 +- pulsar-sql/presto-distribution/LICENSE | 22 +- 4 files changed, 41 insertions(+), 41 deletions(-) diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index bd93978..af125e2 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -365,32 +365,32 @@ The Apache Software License, Version 2.0 - org.apache.logging.log4j-log4j-web-2.10.0.jar * Java Native Access JNA -- net.java.dev.jna-jna-4.2.0.jar * BookKeeper -- org.apache.bookkeeper-bookkeeper-common-4.9.1.jar -- org.apache.bookkeeper-bookkeeper-common-allocator-4.9.1.jar -- org.apache.bookkeeper-bookkeeper-proto-4.9.1.jar -- org.apache.bookkeeper-bookkeeper-server-4.9.1.jar -- org.apache.bookkeeper-bookkeeper-tools-framework-4.9.1.jar -- org.apache.bookkeeper-circe-checksum-4.9.1.jar -- org.apache.bookkeeper-cpu-affinity-4.9.1.jar -- org.apache.bookkeeper-statelib-4.9.1.jar -- org.apache.bookkeeper-stream-storage-api-4.9.1.jar -- org.apache.bookkeeper-stream-storage-common-4.9.1.jar -- org.apache.bookkeeper-stream-storage-java-client-4.9.1.jar -- org.apache.bookkeeper-stream-storage-java-client-base-4.9.1.jar -- org.apache.bookkeeper-stream-storage-proto-4.9.1.jar -- org.apache.bookkeeper-stream-storage-server-4.9.1.jar -- org.apache.bookkeeper-stream-storage-service-api-4.9.1.jar -- org.apache.bookkeeper-stream-storage-service-impl-4.9.1.jar -- org.apache.bookkeeper.http-http-server-4.9.1.jar -- org.apache.bookkeeper.http-vertx-http-server-4.9.1.jar -- org.apache.bookkeeper.stats-bookkeeper-stats-api-4.9.1.jar -- org.apache.bookkeeper.stats-prometheus-metrics-provider-4.9.1.jar -- org.apache.bookkeeper.tests-stream-storage-tests-common-4.9.1.jar -- org.apache.distributedlog-distributedlog-common-4.9.1.jar -- org.apache.distributedlog-distributedlog-core-4.9.1-tests.jar -- org.apache.distributedlog-distributedlog-core-4.9.1.jar -- org.apache.distributedlog-distributedlog-protocol-4.9.1.jar -- org.apache.bookkeeper.stats-codahale-metrics-provider-4.9.1.jar +- org.apache.bookkeeper-bookkeeper-common-4.9.2.jar +- org.apache.bookkeeper-bookkeeper-common-allocator-4.9.2.jar +- org.apache.bookkeeper-bookkeeper-proto-4.9.2.jar +- org.apache.bookkeeper-bookkeeper-server-4.9.2.jar +- org.apache.bookkeeper-bookkeeper-tools-framework-4.9.2.jar +- org.apache.bookkeeper-circe-checksum-4.9.2.jar +- org.apache.bookkeeper-cpu-affinity-4.9.2.jar +- org.apache.bookkeeper-statelib-4.9.2.jar +- org.apache.bookkeeper-stream-storage-api-4.9.2.jar +- org.apache.bookkeeper-stream-storage-common-4.9.2.jar +- org.apache.bookkeeper-stream-storage-java-client-4.9.2.jar +- org.apache.bookkeeper-stream-storage-java-client-base-4.9.2.jar +- org.apache.bookkeeper-stream-storage-proto-4.9.2.jar +- org.apache.bookkeeper-stream-storage-server-4.9.2.jar +- org.apache.bookkeeper-stream-storage-service-api-4.9.2.jar +- org.apache.bookkeeper-stream-storage-service-impl-4.9.2.jar +- org.apache.bookkeeper.http-http-server-4.9.2.jar +- org.apache.bookkeeper.http-vertx-http-server-4.9.2.jar +- org.apache.bookkeeper.stats-bookkeeper-stats-api-4.9.2.jar +- org.apache.bookkeeper.stats-prometheus-metrics-provider-4.9.2.jar +- org.apache.bookkeeper.tests-stream-storage-tests-common-4.9.2.jar +- org.apache.distributedlog-distributedlog-common-4.9.2.jar +- org.apache.distributedlog-distributedlog-core-4.9.2-tests.jar +- org.apache.distributedlog-distributedlog-core-4.9.2.jar +- org.apache.distributedlog-distributedlog-protocol-4.9.2.jar +- org.apache.bookkeeper.stats-codahale-metrics-provider-4.9.2.jar * LZ4 -- org.lz4-lz4-java-1.5.0.jar * AsyncHttpClient - org.asynchttpclient-async-http-client-2.7.0.jar @@ -480,7 +480,7 @@ The Apache Software License, Version 2.0 * JCTools - Java Concurrency Tools for the JVM - org.jctools-jctools-core-2.1.2.jar * Vertx -- io.vertx-vertx-auth-common-3.4.1.jar +- io.vertx-vertx-auth-common-3.4.1.jar - io.vertx-vertx-core-3.4.1.jar - io.vertx-vertx-web-3.4.1.jar @@ -504,7 +504,7 @@ MIT License * Animal
[pulsar] branch master updated: fix bug with source local run (#4278)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new ac8e4e3 fix bug with source local run (#4278) ac8e4e3 is described below commit ac8e4e38e8d5b811ed87177393553c9687f4 Author: Boyang Jerry Peng AuthorDate: Wed May 15 05:02:31 2019 -0700 fix bug with source local run (#4278) --- .../src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java index 581bbbf..1c180ab 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java @@ -133,7 +133,7 @@ public class LocalRunner { if (!file.exists()) { throw new RuntimeException("Source archive does not exist"); } -functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, null)); +functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file)); } } else { SinkConfig sinkConfig = new Gson().fromJson(sinkConfigString, SinkConfig.class);
[pulsar] branch master updated: Added an explicit field in the function details for componenttype (#4250)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new ad4c9f3 Added an explicit field in the function details for componenttype (#4250) ad4c9f3 is described below commit ad4c9f3608cb228bbb59bade7e4e12ccd14e2fb3 Author: Sanjeev Kulkarni AuthorDate: Tue May 14 23:29:45 2019 -0700 Added an explicit field in the function details for componenttype (#4250) * Added an explicit field in the function details for componenttype * Fixed unittests * Updated the defn of python pb file * Added licence * Took feedback into account * Added unittest --- .../pulsar/functions/instance/ContextImpl.java | 6 +- .../pulsar/functions/instance/InstanceUtils.java | 19 +- .../functions/instance/JavaInstanceRunnable.java | 3 +- .../instance/stats/ComponentStatsManager.java | 5 +- .../instance/src/main/python/Function_pb2.py | 366 + .../pulsar/functions/instance/ContextImplTest.java | 5 +- .../functions/instance/InstanceUtilsTest.java | 77 + .../proto/src/main/proto/Function.proto| 7 + .../functions/runtime/KubernetesRuntime.java | 3 +- ...{ComponentType.java => ComponentTypeUtils.java} | 27 +- .../functions/utils/FunctionConfigUtils.java | 2 + .../pulsar/functions/utils/SinkConfigUtils.java| 2 + .../pulsar/functions/utils/SourceConfigUtils.java | 2 + .../functions/worker/rest/api/ComponentImpl.java | 237 +++-- .../functions/worker/rest/api/FunctionsImpl.java | 3 +- .../pulsar/functions/worker/rest/api/SinkImpl.java | 14 +- .../functions/worker/rest/api/SourceImpl.java | 14 +- .../worker/rest/api/FunctionsImplTest.java | 5 +- .../rest/api/v2/FunctionApiV2ResourceTest.java | 3 +- .../rest/api/v3/FunctionApiV3ResourceTest.java | 13 +- .../worker/rest/api/v3/SinkApiV3ResourceTest.java | 9 +- .../rest/api/v3/SourceApiV3ResourceTest.java | 9 +- 22 files changed, 500 insertions(+), 331 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index f0d5ade..5bf8c59 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -35,10 +35,10 @@ import org.apache.pulsar.functions.instance.stats.ComponentStatsManager; import org.apache.pulsar.functions.instance.stats.FunctionStatsManager; import org.apache.pulsar.functions.instance.stats.SinkStatsManager; import org.apache.pulsar.functions.instance.stats.SourceStatsManager; +import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.SinkSpec; import org.apache.pulsar.functions.secretsprovider.SecretsProvider; import org.apache.pulsar.functions.source.TopicSchema; -import org.apache.pulsar.functions.utils.ComponentType; import org.apache.pulsar.functions.utils.FunctionCommon; import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.core.SourceContext; @@ -91,11 +91,11 @@ class ContextImpl implements Context, SinkContext, SourceContext { userMetricsLabelNames[ComponentStatsManager.metricsLabelNames.length] = "metric"; } -private final ComponentType componentType; +private final Function.FunctionDetails.ComponentType componentType; public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels, - ComponentType componentType, ComponentStatsManager statsManager) { + Function.FunctionDetails.ComponentType componentType, ComponentStatsManager statsManager) { this.config = config; this.logger = logger; this.publishProducers = new HashMap<>(); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java index 5cae6a8..e73f1ce 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java @@ -20,9 +20,6 @@ package org.apache.pulsar.functions.instance; import static com.google.common.base.Preconditions.checkArgument; import static org.apache.commons.lang3.StringUtils.isEmpty; -import static org.apache.pulsar.functions.utils.ComponentType.FUNCT
[pulsar] branch master updated: Allow configurble Request timeout for pulsar cli (#4235)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 5a2b10b Allow configurble Request timeout for pulsar cli (#4235) 5a2b10b is described below commit 5a2b10bbb2c96b472bd2ebf4ebbb1eb7c88926e6 Author: Sanjeev Kulkarni AuthorDate: Wed May 8 19:59:55 2019 -0700 Allow configurble Request timeout for pulsar cli (#4235) --- .../org/apache/pulsar/client/admin/PulsarAdmin.java | 17 + .../apache/pulsar/client/admin/PulsarAdminBuilder.java | 8 .../client/admin/internal/PulsarAdminBuilderImpl.java | 12 +++- .../client/admin/internal/http/AsyncHttpConnector.java | 6 +- .../admin/internal/http/AsyncHttpConnectorProvider.java | 4 ++-- 5 files changed, 39 insertions(+), 8 deletions(-) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java index d8357b3..9ab3d6d 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java @@ -69,6 +69,7 @@ public class PulsarAdmin implements Closeable { public static final int DEFAULT_CONNECT_TIMEOUT_SECONDS = 60; public static final int DEFAULT_READ_TIMEOUT_SECONDS = 60; +public static final int DEFAULT_REQUEST_TIMEOUT_SECONDS = 300; private final Clusters clusters; private final Brokers brokers; @@ -96,6 +97,8 @@ public class PulsarAdmin implements Closeable { private final TimeUnit connectTimeoutUnit; private final int readTimeout; private final TimeUnit readTimeoutUnit; +private final int requestTimeout; +private final TimeUnit requestTimeoutUnit; static { /** @@ -125,7 +128,8 @@ public class PulsarAdmin implements Closeable { public PulsarAdmin(String serviceUrl, ClientConfigurationData clientConfigData) throws PulsarClientException { this(serviceUrl, clientConfigData, DEFAULT_CONNECT_TIMEOUT_SECONDS, TimeUnit.SECONDS, -DEFAULT_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS); +DEFAULT_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS, +DEFAULT_REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS); } @@ -134,11 +138,15 @@ public class PulsarAdmin implements Closeable { int connectTimeout, TimeUnit connectTimeoutUnit, int readTimeout, - TimeUnit readTimeoutUnit) throws PulsarClientException { + TimeUnit readTimeoutUnit, + int requestTimeout, + TimeUnit requestTimeoutUnit) throws PulsarClientException { this.connectTimeout = connectTimeout; this.connectTimeoutUnit = connectTimeoutUnit; this.readTimeout = readTimeout; this.readTimeoutUnit = readTimeoutUnit; +this.requestTimeout = requestTimeout; +this.requestTimeoutUnit = requestTimeoutUnit; this.clientConfigData = clientConfigData; this.auth = clientConfigData != null ? clientConfigData.getAuthentication() : new AuthenticationDisabled(); LOG.debug("created: serviceUrl={}, authMethodName={}", serviceUrl, @@ -170,8 +178,9 @@ public class PulsarAdmin implements Closeable { root = client.target(serviceUrl); this.httpAsyncClient = asyncConnectorProvider.getConnector( - Math.toIntExact(TimeUnit.SECONDS.toMillis(this.connectTimeout)), - Math.toIntExact(TimeUnit.SECONDS.toMillis(this.readTimeout))).getHttpClient(); + Math.toIntExact(connectTimeoutUnit.toMillis(this.connectTimeout)), +Math.toIntExact(readTimeoutUnit.toMillis(this.readTimeout)), + Math.toIntExact(requestTimeoutUnit.toMillis(this.requestTimeout))).getHttpClient(); this.clusters = new ClustersImpl(root, auth); this.brokers = new BrokersImpl(root, auth); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java index a3826f9..e4e04af 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java @@ -187,4 +187,12 @@ public interface PulsarAdminBuilder { */ PulsarAdminBuilder readTimeout(int readTimeout, TimeUnit readTimeoutUnit); +/** + * This sets the server request time out for the pulsar admin client for any request. + * + * @param requestTimeout + * @param requestT
[pulsar] branch master updated: Check for existance of schemaInfo before accessing it (#4203)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 96bf9f6 Check for existance of schemaInfo before accessing it (#4203) 96bf9f6 is described below commit 96bf9f6156db388e2ad762ed6f471d1f8be3c8a2 Author: Sanjeev Kulkarni AuthorDate: Sat May 4 16:56:22 2019 -0700 Check for existance of schemaInfo before accessing it (#4203) --- .../src/main/java/org/apache/pulsar/client/impl/MessageImpl.java| 2 +- .../java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index d239dc1..d11d177 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -247,7 +247,7 @@ public class MessageImpl implements Message { byte [] schemaVersion = getSchemaVersion(); if (schema.supportSchemaVersioning() && schemaVersion != null) { return schema.decode(getData(), schemaVersion); -} else if (SchemaType.KEY_VALUE == schema.getSchemaInfo().getType()) { +} else if (schema.getSchemaInfo() != null && SchemaType.KEY_VALUE == schema.getSchemaInfo().getType()) { KeyValueSchema kvSchema = (KeyValueSchema) schema; if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) { return schema.decode(getKeyBytes(), getData()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java index 22f235e..e7404ab 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java @@ -99,7 +99,7 @@ public class TypedMessageBuilderImpl implements TypedMessageBuilder { public TypedMessageBuilder value(T value) { checkArgument(value != null, "Need Non-Null content value"); -if (schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) { +if (schema.getSchemaInfo() != null && schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) { KeyValueSchema kvSchema = (KeyValueSchema) schema; org.apache.pulsar.common.schema.KeyValue kv = (org.apache.pulsar.common.schema.KeyValue) value; if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
[pulsar] branch master updated: Added missing unittests for primitive schema types (#4147)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new d1d5aba Added missing unittests for primitive schema types (#4147) d1d5aba is described below commit d1d5aba085490ce26f0c19873b87381677596258 Author: Sanjeev Kulkarni AuthorDate: Sat Apr 27 09:00:09 2019 -0700 Added missing unittests for primitive schema types (#4147) * Added schema information and unittests for primitive schema types * Fixed unittest * For primitive types just check for schema type instead of the schema metadata * Address feedback * Revert changes to the schemas and only keep unittests --- .../client/impl/schema/BooleanSchemaTest.java | 65 ++ .../pulsar/client/impl/schema/DateSchemaTest.java | 75 + .../client/impl/schema/DoubleSchemaTest.java | 59 + .../pulsar/client/impl/schema/FloatSchemaTest.java | 53 +++ .../pulsar/client/impl/schema/IntSchemaTest.java | 69 +++ .../pulsar/client/impl/schema/LongSchemaTest.java | 77 ++ .../pulsar/client/impl/schema/ShortSchemaTest.java | 65 ++ .../pulsar/client/impl/schema/TimeSchemaTest.java | 75 + .../client/impl/schema/TimestampSchemaTest.java| 75 + 9 files changed, 613 insertions(+) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/BooleanSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/BooleanSchemaTest.java new file mode 100644 index 000..9f62d1a --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/BooleanSchemaTest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema; + +import org.testng.Assert; +import org.testng.annotations.Test; + +public class BooleanSchemaTest { + +@Test +public void testSchemaEncode() { +BooleanSchema schema = BooleanSchema.of(); +byte[] expectedTrue = new byte[] { +1 +}; +byte[] expectedFalse = new byte[] { +0 +}; +Assert.assertEquals(expectedTrue, schema.encode(true)); +Assert.assertEquals(expectedFalse, schema.encode(false)); +} + +@Test +public void testSchemaEncodeDecodeFidelity() { +BooleanSchema schema = BooleanSchema.of(); +Assert.assertEquals(new Boolean(true), schema.decode(schema.encode(true))); +Assert.assertEquals(new Boolean(false), schema.decode(schema.encode(false))); +} + +@Test +public void testSchemaDecode() { +byte[] trueBytes = new byte[] { +1 +}; +byte[] falseBytes = new byte[] { +0 +}; +BooleanSchema schema = BooleanSchema.of(); +Assert.assertEquals(new Boolean(true), schema.decode(trueBytes)); +Assert.assertEquals(new Boolean(false), schema.decode(falseBytes)); +} + +@Test +public void testNullEncodeDecode() { +Assert.assertNull(BooleanSchema.of().encode(null)); +Assert.assertNull(BooleanSchema.of().decode(null)); +} + +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DateSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DateSchemaTest.java new file mode 100644 index 000..0e707ed --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DateSchemaTest.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of
[pulsar] branch master updated: Add labels to function statefulsets and services (#4038)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new de6bc11 Add labels to function statefulsets and services (#4038) de6bc11 is described below commit de6bc1177a251066c7b148c3fa98a21c27f51417 Author: Sanjeev Kulkarni AuthorDate: Fri Apr 12 23:03:45 2019 -0700 Add labels to function statefulsets and services (#4038) * Add labels to function statefulsets and services. Also upgraded the kubernetes client version to the latest * Fix unittests * Reverted back the kubernertes library upgrade since shading was causing issues --- .../java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java index 3d11858..6dbb0d3 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java @@ -417,6 +417,7 @@ public class KubernetesRuntime implements Runtime { // setup stateful set metadata final V1ObjectMeta objectMeta = new V1ObjectMeta(); objectMeta.name(jobName); +objectMeta.setLabels(getLabels(instanceConfig.getFunctionDetails())); service.metadata(objectMeta); // create the stateful set spec @@ -798,6 +799,7 @@ public class KubernetesRuntime implements Runtime { // setup stateful set metadata final V1ObjectMeta objectMeta = new V1ObjectMeta(); objectMeta.name(jobName); +objectMeta.setLabels(getLabels(instanceConfig.getFunctionDetails())); statefulSet.metadata(objectMeta); // create the stateful set spec
[pulsar] branch master updated: Add Async State manipulation methods (#3978)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 13f870c Add Async State manipulation methods (#3978) 13f870c is described below commit 13f870c3bafedb7f73449a2a5b25f07b4f6667e9 Author: Sanjeev Kulkarni AuthorDate: Thu Apr 4 11:26:16 2019 -0700 Add Async State manipulation methods (#3978) * Add Async State manipulation methods * Fix build * Fixed unittest --- .../org/apache/pulsar/functions/api/Context.java | 34 ++ .../pulsar/functions/instance/ContextImpl.java | 33 +++--- .../functions/instance/state/StateContext.java | 9 ++--- .../functions/instance/state/StateContextImpl.java | 40 -- .../pulsar/functions/instance/ContextImplTest.java | 14 .../instance/state/StateContextImplTest.java | 8 ++--- 6 files changed, 101 insertions(+), 37 deletions(-) diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java index 63cbc9e..17f989e 100644 --- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java +++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java @@ -125,6 +125,15 @@ public interface Context { void incrCounter(String key, long amount); /** + * Increment the builtin distributed counter referred by key + * but dont wait for the completion of the increment operation + * + * @param key The name of the key + * @param amount The amount to be incremented + */ +CompletableFuture incrCounterAsync(String key, long amount); + +/** * Retrieve the counter value for the key. * * @param key name of the key @@ -133,6 +142,15 @@ public interface Context { long getCounter(String key); /** + * Retrieve the counter value for the key, but don't wait + * for the operation to be completed + * + * @param key name of the key + * @return the amount of the counter value for this key + */ +CompletableFuture getCounterAsync(String key); + +/** * Update the state value for the key. * * @param key name of the key @@ -141,6 +159,14 @@ public interface Context { void putState(String key, ByteBuffer value); /** + * Update the state value for the key, but don't wait for the operation to be completed + * + * @param key name of the key + * @param value state value of the key + */ +CompletableFuture putStateAsync(String key, ByteBuffer value); + +/** * Retrieve the state value for the key. * * @param key name of the key @@ -149,6 +175,14 @@ public interface Context { ByteBuffer getState(String key); /** + * Retrieve the state value for the key, but don't wait for the operation to be completed + * + * @param key name of the key + * @return the state value for the key. + */ +CompletableFuture getStateAsync(String key); + +/** * Get a map of all user-defined key/value configs for the function. * * @return The full map of user-defined config values diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index bb70b41..dc99f60 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -61,6 +61,7 @@ import java.util.concurrent.TimeUnit; import static com.google.common.base.Preconditions.checkState; import static org.apache.pulsar.functions.instance.stats.FunctionStatsManager.USER_METRIC_PREFIX; +import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; /** * This class implements the Context interface exposed to the user. @@ -263,40 +264,64 @@ class ContextImpl implements Context, SinkContext, SourceContext { } @Override +public CompletableFuture incrCounterAsync(String key, long amount) { +ensureStateEnabled(); +return stateContext.incrCounter(key, amount); +} + +@Override public void incrCounter(String key, long amount) { ensureStateEnabled(); try { -stateContext.incr(key, amount); +result(stateContext.incrCounter(key, amount)); } catch (Exception e) { throw new RuntimeException("Failed to increment key '" + key + "' by amount '" + amount + "'", e); } } @Override +public CompletableFut
[pulsar] branch master updated: fix submit function via url (#3934)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 5ea4231 fix submit function via url (#3934) 5ea4231 is described below commit 5ea423150ba4d879207a22899cdd6e8154e6382f Author: Boyang Jerry Peng AuthorDate: Fri Mar 29 16:41:31 2019 -0500 fix submit function via url (#3934) * fix submit function via url * cleaning up * add test * make method private * add additional tests * cleaning up * improving tests --- .../apache/pulsar/io/PulsarFunctionE2ETest.java| 119 +++-- .../org/apache/pulsar/functions/utils/Utils.java | 44 ++-- 2 files changed, 119 insertions(+), 44 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java index 042a952..abd9684 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java @@ -20,6 +20,8 @@ package org.apache.pulsar.io; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.sun.net.httpserver.Headers; +import com.sun.net.httpserver.HttpServer; import lombok.ToString; import org.apache.bookkeeper.test.PortManager; import org.apache.pulsar.broker.PulsarService; @@ -61,11 +63,16 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import java.io.BufferedInputStream; import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; +import java.io.OutputStream; import java.lang.reflect.Method; import java.net.HttpURLConnection; +import java.net.InetSocketAddress; import java.net.URL; import java.util.Arrays; import java.util.Collections; @@ -123,6 +130,9 @@ public class PulsarFunctionE2ETest { private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem"; private static final Logger log = LoggerFactory.getLogger(PulsarFunctionE2ETest.class); +private Thread fileServerThread; +private static final int fileServerPort = PortManager.nextFreePort(); +private HttpServer fileServer; @DataProvider(name = "validRoleName") public Object[][] validRoleName() { @@ -213,12 +223,71 @@ public class PulsarFunctionE2ETest { System.setProperty(JAVA_INSTANCE_JAR_PROPERTY, ""); -Thread.sleep(100); +// setting up simple web sever to test submitting function via URL +fileServerThread = new Thread(() -> { +try { +fileServer = HttpServer.create(new InetSocketAddress(fileServerPort), 0); +fileServer.createContext("/pulsar-io-data-generator.nar", he -> { +try { + +Headers headers = he.getResponseHeaders(); +headers.add("Content-Type", "application/octet-stream"); + +File file = new File(getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile()); +byte[] bytes = new byte [(int)file.length()]; + +FileInputStream fileInputStream = new FileInputStream(file); +BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream); +bufferedInputStream.read(bytes, 0, bytes.length); + +he.sendResponseHeaders(200, file.length()); +OutputStream outputStream = he.getResponseBody(); +outputStream.write(bytes, 0, bytes.length); +outputStream.close(); + +} catch (Exception e) { +log.error("Error when downloading: {}", e, e); +} +}); +fileServer.createContext("/pulsar-functions-api-examples.jar", he -> { +try { + +Headers headers = he.getResponseHeaders(); +headers.add("Content-Type", "application/octet-stream"); + +File file = new File(getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile()); +byte[] bytes = new byte [(int)file.length()]; + +FileInputStream fileInputStream = new FileInputStream(file); +BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream);
[pulsar] branch master updated: Classloader choice for validating Source/Sink (#3865)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new f3095d8 Classloader choice for validating Source/Sink (#3865) f3095d8 is described below commit f3095d8697ccbe62e4676f94e671047a82bebe40 Author: Sanjeev Kulkarni AuthorDate: Thu Mar 28 21:22:37 2019 -0700 Classloader choice for validating Source/Sink (#3865) * Try both regular classloader as well as nar class loader for validating source/sinks * Fixed test * Fix unittest * Added more comments to the code * rename variables * Wait for the create to succeed before updating. Otherwise there might be some reamnant producers --- .../apache/pulsar/io/PulsarFunctionE2ETest.java| 8 +++ .../pulsar/functions/utils/SinkConfigUtils.java| 34 +-- .../pulsar/functions/utils/SourceConfigUtils.java | 32 -- .../functions/worker/rest/api/ComponentImpl.java | 3 + .../worker/rest/api/v3/SinkApiV3ResourceTest.java | 2 +- .../rest/api/v3/SourceApiV3ResourceTest.java | 2 +- .../integration/functions/PulsarFunctionsTest.java | 68 ++ 7 files changed, 137 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java index 422e9ed..042a952 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java @@ -599,6 +599,14 @@ public class PulsarFunctionE2ETest { SourceConfig sourceConfig = createSourceConfig(tenant, namespacePortion, functionName, sinkTopic); admin.source().createSourceWithUrl(sourceConfig, jarFilePathUrl); +retryStrategically((test) -> { +try { +return (admin.topics().getStats(sinkTopic).publishers.size() == 1); +} catch (PulsarAdminException e) { +return false; +} +}, 10, 150); + admin.source().updateSourceWithUrl(sourceConfig, jarFilePathUrl); retryStrategically((test) -> { diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java index 42538a2..81554c4 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java @@ -24,6 +24,7 @@ import com.google.gson.reflect.TypeToken; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.apache.pulsar.common.functions.ConsumerConfig; import org.apache.pulsar.common.functions.FunctionConfig; @@ -46,6 +47,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee; import static org.apache.pulsar.functions.utils.Utils.getSinkType; +@Slf4j public class SinkConfigUtils { @Getter @@ -296,14 +298,37 @@ public class SinkConfigUtils { } String sinkClassName; -ClassLoader classLoader; +final Class typeArg; +final ClassLoader classLoader; if (!isEmpty(sinkConfig.getClassName())) { sinkClassName = sinkConfig.getClassName(); +// We really don't know if we should use nar class loader or regular classloader +ClassLoader jarClassLoader = null; +ClassLoader narClassLoader = null; try { -classLoader = Utils.extractClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile); +jarClassLoader = Utils.extractClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile); } catch (Exception e) { -throw new IllegalArgumentException("Invalid Sink Jar"); } +try { +narClassLoader = Utils.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile); +} catch (Exception e) { +} +if (jarClassLoader == null && narClassLoader == null) { +throw new IllegalArgumentException("Invalid Sink Package"); +} +// We use typeArg and classLoader as arguments for lambda functions that require them to be final +// Thus we use these tmp vars +Class tmptypeArg; +ClassLoader tmpclassLoader; +try { +tmptypeArg = getSinkType(sinkClassName, narClassLoader
[pulsar] branch master updated: Cleanup logic in JavaInstanceRunnable close method (#3932)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 5740699 Cleanup logic in JavaInstanceRunnable close method (#3932) 5740699 is described below commit 5740699b834439651dc57c87edac2a3002202921 Author: Sanjeev Kulkarni AuthorDate: Thu Mar 28 16:23:30 2019 -0700 Cleanup logic in JavaInstanceRunnable close method (#3932) * Cleanup logic in JavaInstanceRunnable close method * Added comments --- .../functions/instance/JavaInstanceRunnable.java | 25 -- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index e210eb6..8a60537 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -456,11 +456,16 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { return record; } +/** + * NOTE: this method is be syncrhonized because it is potentially called by two different places + * one inside the run/finally clause and one inside the ThreadRuntime::stop + */ @Override -public void close() { +synchronized public void close() { if (stats != null) { stats.close(); +stats = null; } if (source != null) { @@ -468,8 +473,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { source.close(); } catch (Throwable e) { log.error("Failed to close source {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e); - } +source = null; } if (sink != null) { @@ -478,10 +483,12 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { } catch (Throwable e) { log.error("Failed to close sink {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e); } +sink = null; } if (null != javaInstance) { javaInstance.close(); +javaInstance = null; } // kill the state table @@ -495,13 +502,17 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { log.warn("Failed to close state storage client", cause); return null; }); +storageClient = null; } -// once the thread quits, clean up the instance -fnCache.unregisterFunctionInstance( -instanceConfig.getFunctionId(), -instanceConfig.getInstanceName()); -log.info("Unloading JAR files for function {}", instanceConfig); +if (instanceCache != null) { +// once the thread quits, clean up the instance +fnCache.unregisterFunctionInstance( +instanceConfig.getFunctionId(), +instanceConfig.getInstanceName()); +log.info("Unloading JAR files for function {}", instanceConfig); +instanceCache = null; +} } public InstanceCommunication.MetricsData getAndResetMetrics() {
[pulsar] branch master updated: Using Auto Consume consumer on a topic that doesn't have a schema doesn't work (#3909)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 70cc682 Using Auto Consume consumer on a topic that doesn't have a schema doesn't work (#3909) 70cc682 is described below commit 70cc68229de6c3e6620b60a20f2f4faa4b93ce6a Author: Sanjeev Kulkarni AuthorDate: Thu Mar 28 09:32:17 2019 -0700 Using Auto Consume consumer on a topic that doesn't have a schema doesn't work (#3909) --- .../tests/integration/functions/PulsarFunctionsTest.java| 13 - 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index 9c44a05..d8e5f50 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -743,7 +743,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { } // get function status -getFunctionStatus(functionName, numMessages); +getFunctionStatus(functionName, numMessages, true); // get function stats getFunctionStats(functionName, numMessages); @@ -964,7 +964,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { } } -private static void getFunctionStatus(String functionName, int numMessages) throws Exception { +private static void getFunctionStatus(String functionName, int numMessages, boolean checkRestarts) throws Exception { ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd( PulsarCluster.ADMIN_SCRIPT, "functions", @@ -985,7 +985,9 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { assertTrue(functionStatus.getInstances().get(0).getStatus().getLastInvocationTime() > 0); assertEquals(functionStatus.getInstances().get(0).getStatus().getNumReceived(), numMessages); assertEquals(functionStatus.getInstances().get(0).getStatus().getNumSuccessfullyProcessed(), numMessages); - assertEquals(functionStatus.getInstances().get(0).getStatus().getNumRestarts(), 0); +if (checkRestarts) { + assertEquals(functionStatus.getInstances().get(0).getStatus().getNumRestarts(), 0); +} assertEquals(functionStatus.getInstances().get(0).getStatus().getLatestUserExceptions().size(), 0); assertEquals(functionStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(), 0); } @@ -1121,8 +1123,9 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { // publish and consume result publishAndConsumeAvroMessages(inputTopicName, outputTopicName, numMessages); -// get function status -getFunctionStatus(functionName, numMessages); +// get function status. Note that this function might restart a few times until +// the producer above writes the messages. +getFunctionStatus(functionName, numMessages, false); // delete function deleteFunction(functionName);
[pulsar] branch master updated: Added sensitive annotations (#3866)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 8db0f86 Added sensitive annotations (#3866) 8db0f86 is described below commit 8db0f8a5641ebd34db231326aaf4d843baf5 Author: Sanjeev Kulkarni AuthorDate: Mon Mar 25 20:39:09 2019 -0700 Added sensitive annotations (#3866) --- .../main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java| 2 ++ .../main/java/org/apache/pulsar/io/core/annotations/FieldDoc.java | 7 +++ .../org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java| 2 ++ .../src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java| 2 ++ .../main/java/org/apache/pulsar/io/redis/RedisAbstractConfig.java | 1 + .../java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java | 4 6 files changed, 18 insertions(+) diff --git a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java index 1af70c1..90cf08b 100644 --- a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java +++ b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java @@ -44,11 +44,13 @@ public class CanalSourceConfig implements Serializable{ @FieldDoc( required = true, defaultValue = "", +sensitive = true, help = "Username to connect to mysql database") private String username; @FieldDoc( required = true, defaultValue = "", +sensitive = true, help = "Password to connect to mysql database") private String password; @FieldDoc( diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/annotations/FieldDoc.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/annotations/FieldDoc.java index 465da13..beda53d 100644 --- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/annotations/FieldDoc.java +++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/annotations/FieldDoc.java @@ -45,6 +45,13 @@ public @interface FieldDoc { String defaultValue(); /** + * Return if the field is a sensitive type or not. + * usernames/password/accesstokensm etc are some example of sensitive fields + * @return true if the field is sensitive, otherwise false + */ +boolean sensitive() default false; + +/** * Return the description of this field. * * @return the help message of this field diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java index f929f97..1268f19 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java @@ -77,6 +77,7 @@ public class ElasticSearchConfig implements Serializable { @FieldDoc( required = false, defaultValue = "", +sensitive = true, help = "The username used by the connector to connect to the elastic search cluster. If username is set, a password should also be provided." ) private String username; @@ -84,6 +85,7 @@ public class ElasticSearchConfig implements Serializable { @FieldDoc( required = false, defaultValue = "", +sensitive = true, help = "The password used by the connector to connect to the elastic search cluster. If password is set, a username should also be provided" ) private String password; diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java index b0508ba..3bfc72c 100644 --- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java +++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java @@ -43,12 +43,14 @@ public class JdbcSinkConfig implements Serializable { @FieldDoc( required = false, defaultValue = "", +sensitive = true, help = "Username used to connect to the database specified by `jdbcUrl`" ) private String userName; @FieldDoc( required = false, defaultValue = "", +sensitive = true, help = "Password used to connect to the database specified by `jdbcUrl`" ) private String password; diff --git a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisAbstractConfig.java b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redi
[pulsar] branch master updated: Expand add env functionality to add variables if not present (#3827)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new da2bb66 Expand add env functionality to add variables if not present (#3827) da2bb66 is described below commit da2bb660461345f11446635298c2fdd35955ac32 Author: Sanjeev Kulkarni AuthorDate: Fri Mar 15 22:07:11 2019 -0700 Expand add env functionality to add variables if not present (#3827) * Add config variables if absent * Took feedback into account --- docker/pulsar/scripts/apply-config-from-env.py | 14 ++ 1 file changed, 14 insertions(+) diff --git a/docker/pulsar/scripts/apply-config-from-env.py b/docker/pulsar/scripts/apply-config-from-env.py index 0ad9b2b..94a8b4b 100755 --- a/docker/pulsar/scripts/apply-config-from-env.py +++ b/docker/pulsar/scripts/apply-config-from-env.py @@ -34,6 +34,9 @@ if len(sys.argv) < 2: # Always apply env config to env scripts as well conf_files = ['conf/pulsar_env.sh', 'conf/bkenv.sh'] + sys.argv[1:] +PF_ENV_PREFIX = 'PULSAR_' + + for conf_filename in conf_files: lines = [] # List of config file lines keys = {} # Map a key to its line number in the file @@ -56,6 +59,17 @@ for conf_filename in conf_files: idx = keys[k] lines[idx] = '%s=%s\n' % (k, v) + +# Add new keys from Env +for k in sorted(os.environ.keys()): +v = os.environ[k] +if not k.startswith(PF_ENV_PREFIX): +continue +k = k[len(PF_ENV_PREFIX):] +if k not in keys: +print('[%s] Adding config %s = %s' % (conf_filename, k, v)) +lines.append('%s=%s\n' % (k, v)) + # Store back the updated config in the same file f = open(conf_filename, 'w') for line in lines:
[pulsar] branch master updated: Allow users to update everything in inputspecs except for isregexpattern (#3770)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new f7874cd Allow users to update everything in inputspecs except for isregexpattern (#3770) f7874cd is described below commit f7874cd90f42f1dac27391502dc1567a8d7b1e82 Author: Sanjeev Kulkarni AuthorDate: Thu Mar 7 09:25:39 2019 -0800 Allow users to update everything in inputspecs except for isregexpattern (#3770) * Allow users to update everything in inputspecs except for isregexpattern * Added more tests and fixed a bug caught by them --- .../pulsar/functions/utils/FunctionConfigUtils.java | 5 +++-- .../pulsar/functions/utils/SinkConfigUtils.java | 5 +++-- .../functions/utils/FunctionConfigUtilsTest.java | 19 +++ .../pulsar/functions/utils/SinkConfigUtilsTest.java | 19 +++ 4 files changed, 44 insertions(+), 4 deletions(-) diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java index aef096a..1e6a85c 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java @@ -637,9 +637,10 @@ public class FunctionConfigUtils { if (!existingConfig.getInputSpecs().containsKey(topicName)) { throw new IllegalArgumentException("Input Topics cannot be altered"); } -if (!consumerConfig.equals(existingConfig.getInputSpecs().get(topicName))) { -throw new IllegalArgumentException("Input Specs mismatch"); +if (consumerConfig.isRegexPattern() != existingConfig.getInputSpecs().get(topicName).isRegexPattern()) { +throw new IllegalArgumentException("isRegexPattern for input topic " + topicName + " cannot be altered"); } +mergedConfig.getInputSpecs().put(topicName, consumerConfig); }); } if (!StringUtils.isEmpty(newConfig.getOutput()) && !newConfig.getOutput().equals(existingConfig.getOutput())) { diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java index e7c53c5..ecbe487 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java @@ -416,9 +416,10 @@ public class SinkConfigUtils { if (!existingConfig.getInputSpecs().containsKey(topicName)) { throw new IllegalArgumentException("Input Topics cannot be altered"); } -if (!consumerConfig.equals(existingConfig.getInputSpecs().get(topicName))) { -throw new IllegalArgumentException("Input Specs mismatch"); +if (consumerConfig.isRegexPattern() != existingConfig.getInputSpecs().get(topicName).isRegexPattern()) { +throw new IllegalArgumentException("isRegexPattern for input topic " + topicName + " cannot be altered"); } +mergedConfig.getInputSpecs().put(topicName, consumerConfig); }); } if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees().equals(existingConfig.getProcessingGuarantees())) { diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java index b758c72..0f3a5a3 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java @@ -157,6 +157,25 @@ public class FunctionConfigUtilsTest { FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); } +@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "isRegexPattern for input topic test-input cannot be altered") +public void testMergeDifferentInputSpecWithRegexChange() { +FunctionConfig functionConfig = createFunctionConfig(); +Map inputSpecs = new HashMap<>(); +inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(false).serdeClassNa
[pulsar] branch retry_creation updated (bbfb1cb -> 024831f)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a change to branch retry_creation in repository https://gitbox.apache.org/repos/asf/pulsar.git. from bbfb1cb Added header new ccfb949 When the loadmanager leader is not available, fall through regular least loaded selection (#3688) new 0142247 Revert "Remove broker mode to handle persistent/non-persistent topics separately (#3348)" (#3713) new 7c7c3fb Use python 3.5 runtime and pulsar client (#3723) new bb9a4fb fix pulsar_message_set_replication_clusters (#3729) new 40aecc1 Fix Test localhost resolution errors (#3730) new bc8b380 Fail the source record if the write fails (#3706) new 5fd5730 [logging] make root log level configurable (#3661) new 65daec1 fix Jenkins pulsar-website-build error 571 (#3691) new 548c726 [Issue #3436][pulsar-broker] Creating REST Endpoint for non-partitioned topic creation (#3625) new 37a2fda [Issue 3275][pulsar-io]Support source and sink of flume (#3597) new d84b687 Update README.md with IDE setup for lombok (#3711) new e06a894 add cli client consumer for standalone (#3697) new d847c35 Exposing getSchemaVersion in the client by making it public. (#3744) new ffa2a68 Avoid debug noise by consumer ack-tracker (#2953) new 9dc3df6 Adjust the serving threads to have a minimum of threads (#3698) new 07cebb1 On publish failures, log error and count them as sys exceptions (#3704) new 33f1c55 fix function termination cleanup (#3751) new c39e7d1 Broker considers fail-over consumer priority-level (#2954) new 77332b0 [docs] Fixed typo remove-backlog-quotas to remove-backlog-quota in docs (#3757) new a662757 Multi version generic schema provider (#3756) new 024831f Merge branch 'master' into retry_creation The 2979 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: README.md | 28 ++ bin/pulsar | 1 + conf/broker.conf | 8 +- conf/log4j2.yaml | 4 +- conf/proxy.conf| 2 +- conf/standalone.conf | 9 + deployment/terraform-ansible/templates/broker.conf | 6 + docker/pulsar/Dockerfile | 8 +- docker/pulsar/pom.xml | 6 +- ...ar-client-37.sh => install-pulsar-client-35.sh} | 2 +- .../apache/pulsar/broker/ServiceConfiguration.java | 14 +- .../org/apache/pulsar/broker/PulsarService.java| 15 +- .../broker/admin/impl/PersistentTopicsBase.java| 12 + .../pulsar/broker/admin/v2/PersistentTopics.java | 16 + .../broker/loadbalance/LeaderElectionService.java | 17 + .../broker/loadbalance/impl/LoadManagerShared.java | 34 +- .../loadbalance/impl/ModularLoadManagerImpl.java | 43 +- .../loadbalance/impl/SimpleLoadManagerImpl.java| 27 +- .../pulsar/broker/namespace/NamespaceService.java | 11 +- .../AbstractDispatcherSingleActiveConsumer.java| 29 +- .../pulsar/broker/service/BrokerService.java | 16 + .../pulsar/broker/admin/AdminApiTlsAuthTest.java | 28 +- .../pulsar/broker/admin/PersistentTopicsTest.java | 5 +- .../broker/auth/MockedPulsarServiceBaseTest.java | 16 +- .../broker/loadbalance/LoadBalancerTest.java | 70 ++- .../loadbalance/ModularLoadManagerImplTest.java| 84 ++-- .../broker/service/ReplicatorGlobalNSTest.java | 15 +- .../pulsar/broker/service/ReplicatorTestBase.java | 4 +- .../broker/service/v1/V1_ReplicatorTest.java | 6 +- .../pulsar/client/api/NonPersistentTopicTest.java | 151 ++ .../client/api/SimpleProducerConsumerTest.java | 93 .../pulsar/client/impl/TopicFromMessageTest.java | 5 - .../worker/PulsarWorkerAssignmentTest.java | 78 ++- .../apache/pulsar/io/PulsarFunctionAdminTest.java | 2 +- .../apache/pulsar/io/PulsarFunctionE2ETest.java| 13 +- .../stats/client/PulsarBrokerStatsClientTest.java | 22 +- .../org/apache/pulsar/client/admin/Topics.java | 19 + .../pulsar/client/admin/internal/TopicsImpl.java | 19 + .../apache/pulsar/client/api/ConsumerBuilder.java | 20 + .../java/org/apache/pulsar/client/api/Message.java | 7 + pulsar-client-cpp/include/pulsar/c/message.h | 4 +- pulsar-client-cpp/lib/c/c_Message.cc | 8 +- pulsar-client-go/pulsar/c_message.go | 2 +- .../pulsar/admin/cli/PulsarAdminToolTest.java | 3 + .../org/apache/pulsar/admin/cli/CmdTopics.java | 14 + .../client/impl/BinaryProtoLookupService.java | 11 +- .../pulsar/client/impl/
[pulsar] branch master updated: On publish failures, log error and count them as sys exceptions (#3704)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 07cebb1 On publish failures, log error and count them as sys exceptions (#3704) 07cebb1 is described below commit 07cebb17f5df0bc3de70acacc5981271396129a9 Author: Sanjeev Kulkarni AuthorDate: Mon Mar 4 22:12:56 2019 -0800 On publish failures, log error and count them as sys exceptions (#3704) * On publish failures, log error and count them as sys exceptions * Took feedback --- .../org/apache/pulsar/functions/instance/ContextImpl.java | 13 +++-- .../pulsar/functions/instance/JavaInstanceRunnable.java | 2 +- .../apache/pulsar/functions/instance/ContextImplTest.java | 2 +- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index 7271c87..60b8ec0 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -89,6 +89,8 @@ class ContextImpl implements Context, SinkContext, SourceContext { private StateContextImpl stateContext; private Map userConfigs; +private ComponentStatsManager statsManager; + Map userMetricsLabels = new HashMap<>(); private final String[] metricsLabels; private final Summary userMetricsSummary; @@ -103,12 +105,13 @@ class ContextImpl implements Context, SinkContext, SourceContext { public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, List inputTopics, SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels, - Utils.ComponentType componentType) { + Utils.ComponentType componentType, ComponentStatsManager statsManager) { this.config = config; this.logger = logger; this.publishProducers = new HashMap<>(); this.inputTopics = inputTopics; this.topicSchema = new TopicSchema(client); +this.statsManager = statsManager; this.producerBuilder = (ProducerBuilderImpl) client.newProducer().blockIfQueueFull(true).enableBatching(true) .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS); @@ -359,7 +362,13 @@ class ContextImpl implements Context, SinkContext, SourceContext { } } -return producer.sendAsync(object).thenApply(msgId -> null); +CompletableFuture future = producer.sendAsync(object).thenApply(msgId -> null); +future.exceptionally(e -> { +this.statsManager.incrSysExceptions(e); +logger.error("Failed to publish to topic {} with error {}", topicName, e); +return null; +}); +return future; } @Override diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 19a0b2a..c9bf644 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -215,7 +215,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { Logger instanceLog = LoggerFactory.getLogger( "function-" + instanceConfig.getFunctionDetails().getName()); return new ContextImpl(instanceConfig, instanceLog, client, inputTopics, secretsProvider, -collectorRegistry, metricsLabels, this.componentType); +collectorRegistry, metricsLabels, this.componentType, this.stats); } /** diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java index 47c3539..7523fb1 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java @@ -79,7 +79,7 @@ public class ContextImplTest { client, new ArrayList<>(), new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), new String[0], -Utils.ComponentType.FUNCTION); +Utils.ComponentType.FUNCTION, null); } @Test(expectedExceptions = IllegalStateException.class)
[pulsar] branch master updated: Adjust the serving threads to have a minimum of threads (#3698)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 9dc3df6 Adjust the serving threads to have a minimum of threads (#3698) 9dc3df6 is described below commit 9dc3df6139ca7d2dbee4bef4c4450416b5c5279d Author: Sanjeev Kulkarni AuthorDate: Mon Mar 4 15:46:20 2019 -0800 Adjust the serving threads to have a minimum of threads (#3698) --- conf/broker.conf | 2 +- conf/proxy.conf| 2 +- conf/standalone.conf | 3 +++ .../src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java | 2 +- .../main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java | 2 +- .../apache/pulsar/websocket/service/WebSocketProxyConfiguration.java | 2 +- 6 files changed, 8 insertions(+), 5 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 262c44c..73e94cd 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -46,7 +46,7 @@ advertisedAddress= # Number of threads to use for Netty IO. Default is set to 2 * Runtime.getRuntime().availableProcessors() numIOThreads= -# Number of threads to use for HTTP requests processing. Default is set to Runtime.getRuntime().availableProcessors() +# Number of threads to use for HTTP requests processing. Default is set to 2 * Runtime.getRuntime().availableProcessors() numHttpServerThreads= # Flag to control features that are meant to be used when running in standalone mode diff --git a/conf/proxy.conf b/conf/proxy.conf index 4d1904f..7e1a553 100644 --- a/conf/proxy.conf +++ b/conf/proxy.conf @@ -164,7 +164,7 @@ httpReverseProxyConfigs= httpOutputBufferSize=32768 # Number of threads to use for HTTP requests processing. Default is -# Runtime.getRuntime().availableProcessors() +# 2 * Runtime.getRuntime().availableProcessors() httpNumThreads= ### --- Token Authentication Provider --- ### diff --git a/conf/standalone.conf b/conf/standalone.conf index a8e7181..2c401b7 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -39,6 +39,9 @@ advertisedAddress= # Number of threads to use for Netty IO. Default is set to 2 * Runtime.getRuntime().availableProcessors() numIOThreads= +# Number of threads to use for HTTP requests processing. Default is set to 2 * Runtime.getRuntime().availableProcessors() +numHttpServerThreads= + # Name of the cluster to which this broker belongs to clusterName=standalone diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index cb36df4..a1f031d 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -146,7 +146,7 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "Number of threads to use for HTTP requests processing" + " Default is set to `2 * Runtime.getRuntime().availableProcessors()`" ) -private int numHttpServerThreads = 2 * Runtime.getRuntime().availableProcessors(); +private int numHttpServerThreads = Math.max(4, 2 * Runtime.getRuntime().availableProcessors()); @FieldContext( category = CATEGORY_WEBSOCKET, diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java index ae54d6e..37be744 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java @@ -305,7 +305,7 @@ public class ProxyConfiguration implements PulsarConfiguration { category = CATEGORY_HTTP, doc = "Number of threads to use for HTTP requests processing" ) -private int httpNumThreads = 2 * Runtime.getRuntime().availableProcessors(); +private int httpNumThreads = Math.max(4, 2 * Runtime.getRuntime().availableProcessors()); @PropertiesContext( properties = { diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java index 72dd160..8b212d8 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java @@ -93,7 +93,7 @@ public class WebSocketProxyConfiguration implements PulsarCo
[pulsar] branch retry_creation updated: Added header
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch retry_creation in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/retry_creation by this push: new bbfb1cb Added header bbfb1cb is described below commit bbfb1cb6562b7bd594f9ebdb6a76b872d6e6f5e2 Author: Sanjeev Kulkarni AuthorDate: Mon Mar 4 11:47:33 2019 -0800 Added header --- .../org/apache/pulsar/functions/utils/Actions.java | 18 ++ 1 file changed, 18 insertions(+) diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Actions.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Actions.java index 640a977..a451c08 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Actions.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Actions.java @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.pulsar.functions.utils; import lombok.Builder;
[pulsar] branch retry_creation updated: Fix unittest
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch retry_creation in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/retry_creation by this push: new f62c945 Fix unittest f62c945 is described below commit f62c9452aef07de1eb6257bc6871eaff93a880ee Author: Sanjeev Kulkarni AuthorDate: Mon Mar 4 11:21:11 2019 -0800 Fix unittest --- .../src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java index d665178..309e466 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java @@ -155,7 +155,7 @@ public class ActionsTest { actions.run(); assertEquals(actions.numActions(), 2); -verify(supplier1, times(10)).get(); +verify(supplier1, times(11)).get(); verify(onFail, times(1)).accept(any()); verify(onSucess, times(0)).accept(any()); verify(supplier2, times(1)).get();
[pulsar] branch retry_creation updated: Catch interrupted exception
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch retry_creation in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/retry_creation by this push: new 2654b10 Catch interrupted exception 2654b10 is described below commit 2654b10608caab08b4c373eb17d25e5a7cc70ee3 Author: Sanjeev Kulkarni AuthorDate: Mon Mar 4 10:40:27 2019 -0800 Catch interrupted exception --- .../java/org/apache/pulsar/functions/worker/SchedulerManager.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java index e19258c..c281b15 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java @@ -127,7 +127,9 @@ public class SchedulerManager implements AutoCloseable { .build()) .run(); } catch (InterruptedException e) { - +log.error("Interrupted at creating producer to topic {}", config.getFunctionAssignmentTopic(), e); +Thread.currentThread().interrupt(); +throw new RuntimeException(e); } if (producer.get() == null) { throw new RuntimeException("Can't create a producer on assignment topic "
[pulsar] branch retry_creation updated: Fix build
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch retry_creation in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/retry_creation by this push: new 5d5d13c Fix build 5d5d13c is described below commit 5d5d13c96e30b531b95e5c75e0b77040074ff78e Author: Sanjeev Kulkarni AuthorDate: Mon Mar 4 10:31:39 2019 -0800 Fix build --- .../apache/pulsar/functions/utils/ActionsTest.java | 25 +++--- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java index 2ada089..d665178 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java @@ -23,6 +23,7 @@ import org.testng.annotations.Test; import java.util.function.Supplier; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -42,8 +43,8 @@ public class ActionsTest { Supplier supplier2 = mock(Supplier.class); when(supplier2.get()).thenReturn(Actions.ActionResult.builder().success(true).build()); -Runnable onFail = mock(Runnable.class); -Runnable onSucess = mock(Runnable.class); +java.util.function.Consumer onFail = mock(java.util.function.Consumer.class); +java.util.function.Consumer onSucess = mock(java.util.function.Consumer.class); Actions.Action action1 = spy( Actions.Action.builder() @@ -71,8 +72,8 @@ public class ActionsTest { assertEquals(actions.numActions(), 2); verify(supplier1, times(1)).get(); -verify(onFail, times(0)).run(); -verify(onSucess, times(1)).run(); +verify(onFail, times(0)).accept(any()); +verify(onSucess, times(1)).accept(any()); verify(supplier2, times(1)).get(); // test only run 1 action @@ -83,8 +84,8 @@ public class ActionsTest { supplier2 = mock(Supplier.class); when(supplier2.get()).thenReturn(Actions.ActionResult.builder().success(true).build()); -onFail = mock(Runnable.class); -onSucess = mock(Runnable.class); +onFail = mock(java.util.function.Consumer.class); +onSucess = mock(java.util.function.Consumer.class); action1 = spy( Actions.Action.builder() @@ -114,8 +115,8 @@ public class ActionsTest { assertEquals(actions.numActions(), 2); verify(supplier1, times(1)).get(); -verify(onFail, times(0)).run(); -verify(onSucess, times(1)).run(); +verify(onFail, times(0)).accept(any()); +verify(onSucess, times(1)).accept(any()); verify(supplier2, times(0)).get(); // test retry @@ -126,8 +127,8 @@ public class ActionsTest { supplier2 = mock(Supplier.class); when(supplier2.get()).thenReturn(Actions.ActionResult.builder().success(true).build()); -onFail = mock(Runnable.class); -onSucess = mock(Runnable.class); +onFail = mock(java.util.function.Consumer.class); +onSucess = mock(java.util.function.Consumer.class); action1 = spy( Actions.Action.builder() @@ -155,8 +156,8 @@ public class ActionsTest { assertEquals(actions.numActions(), 2); verify(supplier1, times(10)).get(); -verify(onFail, times(1)).run(); -verify(onSucess, times(0)).run(); +verify(onFail, times(1)).accept(any()); +verify(onSucess, times(0)).accept(any()); verify(supplier2, times(1)).get(); }
[pulsar] branch retry_creation updated: Use Action based retry mechanism
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch retry_creation in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/retry_creation by this push: new 8092542 Use Action based retry mechanism 8092542 is described below commit 809254240b3e0c03b7cd1cc18efc442d33dbf3d9 Author: Sanjeev Kulkarni AuthorDate: Mon Mar 4 10:15:50 2019 -0800 Use Action based retry mechanism --- .../functions/runtime/KubernetesRuntime.java | 87 +++ .../pulsar/functions/runtime/RuntimeUtils.java | 105 -- .../org/apache/pulsar/functions/utils/Actions.java | 121 + .../pulsar/functions/utils/ActionsTest.java} | 43 .../pulsar/functions/worker/FunctionActioner.java | 14 +-- .../pulsar/functions/worker/SchedulerManager.java | 56 ++ 6 files changed, 230 insertions(+), 196 deletions(-) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java index 3fc0c69..3c0468d 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java @@ -60,6 +60,7 @@ import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; import org.apache.pulsar.functions.proto.InstanceControlGrpc; import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator; +import org.apache.pulsar.functions.utils.Actions; import org.apache.pulsar.functions.utils.FunctionDetailsUtils; import org.apache.pulsar.functions.utils.Utils; @@ -360,7 +361,7 @@ public class KubernetesRuntime implements Runtime { String fqfn = FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()); -RuntimeUtils.Actions.Action createService = RuntimeUtils.Actions.Action.builder() +Actions.Action createService = Actions.Action.builder() .actionName(String.format("Submitting service for function %s", fqfn)) .numRetries(NUM_RETRIES) .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS) @@ -372,25 +373,25 @@ public class KubernetesRuntime implements Runtime { // already exists if (e.getCode() == HTTP_CONFLICT) { log.warn("Service already present for function {}", fqfn); -return RuntimeUtils.Actions.ActionResult.builder().success(true).build(); +return Actions.ActionResult.builder().success(true).build(); } String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage(); -return RuntimeUtils.Actions.ActionResult.builder() +return Actions.ActionResult.builder() .success(false) .errorMsg(errorMsg) .build(); } -return RuntimeUtils.Actions.ActionResult.builder().success(true).build(); +return Actions.ActionResult.builder().success(true).build(); }) .build(); AtomicBoolean success = new AtomicBoolean(false); -RuntimeUtils.Actions.newBuilder() +Actions.newBuilder() .addAction(createService.toBuilder() -.onSuccess(() -> success.set(true)) +.onSuccess((ignored) -> success.set(true)) .build()) .run(); @@ -432,7 +433,7 @@ public class KubernetesRuntime implements Runtime { String fqfn = FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()); -RuntimeUtils.Actions.Action createStatefulSet = RuntimeUtils.Actions.Action.builder() +Actions.Action createStatefulSet = Actions.Action.builder() .actionName(String.format("Submitting statefulset for function %s", fqfn)) .numRetries(NUM_RETRIES) .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS) @@ -444,25 +445,25 @@ public class KubernetesRuntime implements Runtime { // already exists if (e.getCode() == HTTP_CONFLICT) { log.warn("Statefulset already present for function {}", fqfn); -return RuntimeUtils.Actions.ActionResult.builder().success(true).build(); +return Acti
[pulsar] branch retry_creation created (now 9f62a19)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a change to branch retry_creation in repository https://gitbox.apache.org/repos/asf/pulsar.git. at 9f62a19 Retry creation of assignment topic a few times before giving up This branch includes the following new commits: new 9f62a19 Retry creation of assignment topic a few times before giving up The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
[pulsar] 01/01: Retry creation of assignment topic a few times before giving up
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch retry_creation in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 9f62a190b577243c1044e718de7f99ea3792693e Author: Sanjeev Kulkarni AuthorDate: Thu Feb 28 16:05:33 2019 -0800 Retry creation of assignment topic a few times before giving up --- .../pulsar/functions/worker/SchedulerManager.java | 23 +- 1 file changed, 5 insertions(+), 18 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java index 2a93494..f50acc3 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java @@ -105,25 +105,12 @@ public class SchedulerManager implements AutoCloseable { .compressionType(CompressionType.LZ4) .sendTimeout(0, TimeUnit.MILLISECONDS) .createAsync().get(10, TimeUnit.SECONDS); +} catch (Exception e) { +log.error("Exception while at creating producer to topic {}", config.getFunctionAssignmentTopic(), e); +} +try { +Thread.sleep(1); } catch (InterruptedException e) { -log.error("Interrupted at creating producer to topic {}", config.getFunctionAssignmentTopic(), e); -Thread.currentThread().interrupt(); -throw new RuntimeException(e); -} catch (ExecutionException e) { -log.error("Encountered exceptions at creating producer for topic {}", -config.getFunctionAssignmentTopic(), e); -throw new RuntimeException(e); -} catch (TimeoutException e) { -try { -log.info("Can't create a producer on assignment topic {} in {} seconds, retry in 10 seconds ...", -stopwatch.elapsed(TimeUnit.SECONDS)); -TimeUnit.SECONDS.sleep(10); -} catch (InterruptedException e1) { -log.error("Interrupted at creating producer to topic {}", config.getFunctionAssignmentTopic(), e); -Thread.currentThread().interrupt(); -throw new RuntimeException(e); -} -continue; } } throw new RuntimeException("Can't create a producer on assignment topic "
[pulsar] branch master updated: Simplified the workflow of functionruntime manager (#3551)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 878362c Simplified the workflow of functionruntime manager (#3551) 878362c is described below commit 878362c2f08055dedf11477fd434ad83b98a4faf Author: Sanjeev Kulkarni AuthorDate: Tue Feb 26 10:57:55 2019 -0800 Simplified the workflow of functionruntime manager (#3551) * Simplified the workflow of functionruntime manager * Fix unittest * Took feedback into account * added missing imports --- .../pulsar/functions/worker/FunctionAction.java| 40 - .../pulsar/functions/worker/FunctionActioner.java | 147 ++--- .../functions/worker/FunctionRuntimeManager.java | 125 +-- .../functions/worker/FunctionActionerTest.java | 27 ++-- .../worker/FunctionRuntimeManagerTest.java | 173 - 5 files changed, 199 insertions(+), 313 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAction.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAction.java deleted file mode 100644 index ded8268..000 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAction.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.functions.worker; - -import lombok.*; -import lombok.experimental.Accessors; - -@Data -@Setter -@Getter -@EqualsAndHashCode -@ToString -@Accessors(chain = true) -public class FunctionAction { - -public enum Action { -START, -STOP, -TERMINATE -} - -private Action action; -private FunctionRuntimeInfo functionRuntimeInfo; -} diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java index 8033507..1d1014e 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java @@ -18,9 +18,9 @@ */ package org.apache.pulsar.functions.worker; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.io.MoreFiles; +import com.google.common.io.MoreFiles; import com.google.common.io.RecursiveDeleteOption; + import lombok.Data; import lombok.EqualsAndHashCode; import lombok.Getter; @@ -79,120 +79,74 @@ import static org.apache.pulsar.functions.utils.Utils.getSourceType; @EqualsAndHashCode @ToString @Slf4j -public class FunctionActioner implements AutoCloseable { +public class FunctionActioner { private final WorkerConfig workerConfig; private final RuntimeFactory runtimeFactory; private final Namespace dlogNamespace; -private LinkedBlockingQueue actionQueue; -private volatile boolean running; -private Thread actioner; private final ConnectorsManager connectorsManager; private final PulsarAdmin pulsarAdmin; public FunctionActioner(WorkerConfig workerConfig, RuntimeFactory runtimeFactory, Namespace dlogNamespace, -LinkedBlockingQueue actionQueue, ConnectorsManager connectorsManager, PulsarAdmin pulsarAdmin) { this.workerConfig = workerConfig; this.runtimeFactory = runtimeFactory; this.dlogNamespace = dlogNamespace; -this.actionQueue = actionQueue; this.connectorsManager = connectorsManager; this.pulsarAdmin = pulsarAdmin; -actioner = new Thread(() -> { -log.info("Starting Actioner Thread..."); -while(running) { -try { -FunctionAction action = actionQueue.poll(1, TimeUnit.SECONDS); -processAction(action); -
[pulsar] branch master updated: Consolidated all windowing code into its own module (#3583)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 3aeafc1 Consolidated all windowing code into its own module (#3583) 3aeafc1 is described below commit 3aeafc185603a76515de1a904068806dd872a740 Author: Sanjeev Kulkarni AuthorDate: Tue Feb 12 15:56:56 2019 -0800 Consolidated all windowing code into its own module (#3583) * moved all windowing related code into its own submodule. * added pom --- pulsar-functions/pom.xml | 1 + pulsar-functions/{ => windowing}/pom.xml | 53 +++--- .../windowing/DefaultEvictionContext.java | 0 .../apache/pulsar/functions/windowing/Event.java | 0 .../pulsar/functions/windowing/EventImpl.java | 0 .../functions/windowing/EvictionContext.java | 0 .../pulsar/functions/windowing/EvictionPolicy.java | 0 .../functions/windowing/TimestampExtractor.java| 0 .../pulsar/functions/windowing/TriggerHandler.java | 0 .../pulsar/functions/windowing/TriggerPolicy.java | 0 .../pulsar/functions/windowing/WaterMarkEvent.java | 0 .../windowing/WaterMarkEventGenerator.java | 0 .../apache/pulsar/functions/windowing/Window.java | 0 .../functions/windowing/WindowContextImpl.java | 0 .../windowing/WindowFunctionExecutor.java | 0 .../pulsar/functions/windowing/WindowImpl.java | 0 .../windowing/WindowLifecycleListener.java | 0 .../pulsar/functions/windowing/WindowManager.java | 0 .../pulsar/functions/windowing/WindowUtils.java| 0 .../windowing/evictors/CountEvictionPolicy.java| 0 .../windowing/evictors/TimeEvictionPolicy.java | 0 .../evictors/WatermarkCountEvictionPolicy.java | 0 .../evictors/WatermarkTimeEvictionPolicy.java | 0 .../windowing/triggers/CountTriggerPolicy.java | 0 .../windowing/triggers/TimeTriggerPolicy.java | 0 .../triggers/WatermarkCountTriggerPolicy.java | 0 .../triggers/WatermarkTimeTriggerPolicy.java | 0 .../windowing/WaterMarkEventGeneratorTest.java | 0 .../windowing/WindowFunctionExecutorTest.java | 0 .../functions/windowing/WindowManagerTest.java | 0 30 files changed, 37 insertions(+), 17 deletions(-) diff --git a/pulsar-functions/pom.xml b/pulsar-functions/pom.xml index a101a87..4b58a24 100644 --- a/pulsar-functions/pom.xml +++ b/pulsar-functions/pom.xml @@ -41,6 +41,7 @@ runtime-all worker secrets +windowing diff --git a/pulsar-functions/pom.xml b/pulsar-functions/windowing/pom.xml similarity index 50% copy from pulsar-functions/pom.xml copy to pulsar-functions/windowing/pom.xml index a101a87..a50fc09 100644 --- a/pulsar-functions/pom.xml +++ b/pulsar-functions/windowing/pom.xml @@ -19,28 +19,47 @@ --> http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> 4.0.0 - pom + org.apache.pulsar -pulsar +pulsar-functions 2.3.0-SNAPSHOT - pulsar-functions - Pulsar Functions :: Parent - - -proto -api-java -java-examples -utils -instance -runtime -runtime-all -worker -secrets - + pulsar-functions-windowing + Pulsar Functions :: Windowing + + + + + + org.apache.logging.log4j + log4j-slf4j-impl + + + org.apache.logging.log4j + log4j-api + + + org.apache.logging.log4j + log4j-core + + + + ${project.groupId} + pulsar-functions-utils + ${project.version} + + + + ${project.groupId} + pulsar-functions-api + ${project.version} + + + + diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/DefaultEvictionContext.java b/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/DefaultEvictionContext.java similarity index 100% rename from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/DefaultEvictionContext.java rename to pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/DefaultEvictionContext.java diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/Event.java b/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/Event.java similarity index 100% rename from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/Event.java rename to pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/Event.java diff --git a/pulsar-functions
[pulsar] branch master updated: Fixed the behavior of Function start/stop (#3477)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 020a1d5 Fixed the behavior of Function start/stop (#3477) 020a1d5 is described below commit 020a1d57e122582a5ad8bd043f278e4a92d4ffc1 Author: Sanjeev Kulkarni AuthorDate: Thu Jan 31 16:47:39 2019 -0800 Fixed the behavior of Function start/stop (#3477) * Added a state in the function metadata about what the state of the instances should be * Have start api for sources/sinks * Add missing pieces * more checks while handling request * Fixed bugs * Added unittests * Added unittest * Fix the all instances side logic --- .../pulsar/broker/admin/impl/FunctionsBase.java| 31 + .../apache/pulsar/broker/admin/impl/SinkBase.java | 31 + .../pulsar/broker/admin/impl/SourceBase.java | 29 + .../org/apache/pulsar/client/admin/Functions.java | 34 + .../java/org/apache/pulsar/client/admin/Sink.java | 34 + .../org/apache/pulsar/client/admin/Source.java | 34 + .../client/admin/internal/FunctionsImpl.java | 21 .../pulsar/client/admin/internal/SinkImpl.java | 21 .../pulsar/client/admin/internal/SourceImpl.java | 21 .../apache/pulsar/admin/cli/CmdFunctionsTest.java | 29 + .../org/apache/pulsar/admin/cli/CmdFunctions.java | 33 - .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 28 - .../org/apache/pulsar/admin/cli/CmdSources.java| 28 - .../proto/src/main/proto/Function.proto| 5 + .../functions/worker/FunctionMetaDataManager.java | 59 + .../functions/worker/FunctionRuntimeManager.java | 56 ++--- .../functions/worker/rest/api/ComponentImpl.java | 139 + .../worker/rest/api/v3/FunctionApiV3Resource.java | 31 + .../worker/rest/api/v3/SinkApiV3Resource.java | 27 .../worker/rest/api/v3/SourceApiV3Resource.java| 27 .../worker/FunctionMetaDataManagerTest.java| 62 + .../worker/FunctionRuntimeManagerTest.java | 53 22 files changed, 781 insertions(+), 52 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index 73af2c5..9b88f29 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -338,6 +338,37 @@ public class FunctionsBase extends AdminResource implements Supplier { sink.stopFunctionInstances(tenant, namespace, sinkName); } +@POST +@ApiOperation(value = "Start sink instance", response = Void.class) +@ApiResponses(value = { +@ApiResponse(code = 400, message = "Invalid request"), +@ApiResponse(code = 404, message = "The function does not exist"), +@ApiResponse(code = 500, message = "Internal server error") +}) +@Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/start") +@Consumes(MediaType.APPLICATION_JSON) +public void startSink(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sinkName") String sinkName, + final @PathParam("instanceId") String instanceId) { +sink.startFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri()); +} + +@POST +@ApiOperation(value = "Start all sink instances", response = Void.class) +@ApiResponses(value = { +@ApiResponse(code = 400, message = "Invalid request"), +@ApiResponse(code = 404, message = "The function does not exist"), +@ApiResponse(code = 500, message = "Internal server error") +}) +@Path("/{tenant}/{namespace}/{sinkName}/start") +@Consumes(MediaType.APPLICATION_JSON) +public void startSink(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sinkName") String sinkName) { +sink.startFunctionInstances(tenant, namespace, sinkName); +} + @GET @ApiOperation( value = "Fetches a list of supported Pulsar IO sink connectors currently running in cluster mode", diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.j
[pulsar] branch master updated: Add a component tag for every pulsar function (#3468)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new bdd6019 Add a component tag for every pulsar function (#3468) bdd6019 is described below commit bdd601969fdb16a4936a6cb4cf5e5f8a781ed616 Author: Sanjeev Kulkarni AuthorDate: Tue Jan 29 14:48:49 2019 -0800 Add a component tag for every pulsar function (#3468) * Add a component tag for every pulsar function * Took feedback into account --- .../pulsar/functions/runtime/KubernetesRuntime.java | 19 +++ 1 file changed, 19 insertions(+) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java index aba473b..9ba500e 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java @@ -60,11 +60,13 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; +import org.apache.pulsar.functions.instance.InstanceUtils; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; import org.apache.pulsar.functions.proto.InstanceControlGrpc; import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator; +import org.apache.pulsar.functions.utils.Utils; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -546,6 +548,23 @@ class KubernetesRuntime implements Runtime { private Map getLabels(Function.FunctionDetails functionDetails) { final Map labels = new HashMap<>(); +Utils.ComponentType componentType = InstanceUtils.calculateSubjectType(functionDetails); +String component; +switch (componentType) { +case FUNCTION: +component = "function"; +break; +case SOURCE: +component = "source"; +break; +case SINK: +component = "sink"; +break; +default: +component = "function"; +break; +} +labels.put("component", component); labels.put("namespace", functionDetails.getNamespace()); labels.put("tenant", functionDetails.getTenant()); labels.put("name", functionDetails.getName());
[pulsar] branch master updated: Enable stats to be recovered by Kubernetes runtime (#3363)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 63a0491 Enable stats to be recovered by Kubernetes runtime (#3363) 63a0491 is described below commit 63a0491ec86a4123062eb2835d6f385bf79e2552 Author: Sanjeev Kulkarni AuthorDate: Mon Jan 14 09:55:02 2019 -0800 Enable stats to be recovered by Kubernetes runtime (#3363) --- .../pulsar/functions/runtime/JavaInstanceMain.java | 2 +- .../functions/runtime/KubernetesRuntime.java | 26 +-- .../pulsar/functions/runtime/ProcessRuntime.java | 2 +- .../apache/pulsar/functions/runtime/Runtime.java | 2 +- .../pulsar/functions/runtime/ThreadRuntime.java| 2 +- .../functions/worker/FunctionRuntimeManager.java | 2 +- .../org/apache/pulsar/functions/worker/Utils.java | 17 +++- .../functions/worker/rest/api/WorkerImpl.java | 30 -- 8 files changed, 55 insertions(+), 28 deletions(-) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java index 3a0a404..43467ae 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java @@ -317,7 +317,7 @@ public class JavaInstanceMain implements AutoCloseable { Runtime runtime = runtimeSpawner.getRuntime(); if (runtime != null) { try { -InstanceCommunication.MetricsData metrics = runtime.getMetrics().get(); +InstanceCommunication.MetricsData metrics = runtime.getMetrics(instanceId).get(); responseObserver.onNext(metrics); responseObserver.onCompleted(); } catch (InterruptedException | ExecutionException e) { diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java index a3a006a..aba473b 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java @@ -295,9 +295,31 @@ class KubernetesRuntime implements Runtime { } @Override -public CompletableFuture getMetrics() { +public CompletableFuture getMetrics(int instanceId) { CompletableFuture retval = new CompletableFuture<>(); -retval.completeExceptionally(new RuntimeException("Kubernetes Runtime doesnt support getting metrics via rest")); +if (instanceId < 0 || instanceId >= stub.length) { +if (stub == null) { +retval.completeExceptionally(new RuntimeException("Invalid InstanceId")); +return retval; +} +} +if (stub == null) { +retval.completeExceptionally(new RuntimeException("Not alive")); +return retval; +} +ListenableFuture response = stub[instanceId].withDeadlineAfter(GRPC_TIMEOUT_SECS, TimeUnit.SECONDS).getMetrics(Empty.newBuilder().build()); +Futures.addCallback(response, new FutureCallback() { +@Override +public void onFailure(Throwable throwable) { +InstanceCommunication.MetricsData.Builder builder = InstanceCommunication.MetricsData.newBuilder(); +retval.complete(builder.build()); +} + +@Override +public void onSuccess(InstanceCommunication.MetricsData t) { +retval.complete(t); +} +}); return retval; } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java index 14e68cc..87017a6 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java @@ -288,7 +288,7 @@ class ProcessRuntime implements Runtime { } @Override -public CompletableFuture getMetrics() { +public CompletableFuture getMetrics(int instanceId) { CompletableFuture retval = new CompletableFuture<>(); if (stub == null) { retval.completeExceptionally(new RuntimeException("Not alive")); diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/R
[pulsar] branch master updated: Add Windowfunction interface to functions api (#3324)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new bf2a9c7 Add Windowfunction interface to functions api (#3324) bf2a9c7 is described below commit bf2a9c7b84f1c749bfabdc21a2926b017caa61ad Author: Sanjeev Kulkarni AuthorDate: Mon Jan 14 09:54:49 2019 -0800 Add Windowfunction interface to functions api (#3324) * Added WindowFunction interface and implementation * Fixed logic * Update comments * Took feedback into account --- .../pulsar/functions/api}/WindowContext.java | 2 +- .../pulsar/functions/api}/WindowFunction.java | 23 .../functions/windowing/WindowContextImpl.java | 1 + .../windowing/WindowFunctionExecutor.java | 64 -- .../windowing/WindowFunctionExecutorTest.java | 27 + ...{WindowFunction.java => AddWindowFunction.java} | 2 +- ...dowFunction.java => ContextWindowFunction.java} | 14 +++-- .../resources/example-window-function-config.yaml | 2 +- .../org/apache/pulsar/functions/utils/Utils.java | 34 9 files changed, 99 insertions(+), 70 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowContext.java similarity index 99% rename from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java rename to pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowContext.java index 2f1f2e7..0abc87a 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java +++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowContext.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.functions.windowing; +package org.apache.pulsar.functions.api; import org.slf4j.Logger; diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowFunction.java similarity index 65% copy from pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java copy to pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowFunction.java index ae01cec..6f2c421 100644 --- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java +++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowFunction.java @@ -16,20 +16,19 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.functions.api.examples; - -import lombok.extern.slf4j.Slf4j; +package org.apache.pulsar.functions.api; import java.util.Collection; -import java.util.function.Function; /** - * Example Function that acts on a window of tuples at a time rather than per tuple basis. + * This is the interface of the windowed function api. The process method is called + * for every triggered window. */ -@Slf4j -public class WindowFunction implements Function , Integer> { -@Override -public Integer apply(Collection integers) { -return integers.stream().reduce(0, (x, y) -> x + y); -} -} +@FunctionalInterface +public interface WindowFunction { +/** + * Process the input. + * @return the output + */ +O process(Collection> input, WindowContext context) throws Exception; +} \ No newline at end of file diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java index 41e8ebe..de00f52 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java @@ -19,6 +19,7 @@ package org.apache.pulsar.functions.windowing; import org.apache.pulsar.functions.api.Context; +import org.apache.pulsar.functions.api.WindowContext; import org.slf4j.Logger; import java.nio.ByteBuffer; diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java index e288261..1945949 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java +++ b/pulsar-functions/instance/sr
[pulsar] branch master updated: Cleanup consumer subscriptions and fix graceful shutdown for functions/sinks (#3299)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 034f6ba Cleanup consumer subscriptions and fix graceful shutdown for functions/sinks (#3299) 034f6ba is described below commit 034f6ba04e9c48abec1517668cb4fa46efdf02bc Author: Boyang Jerry Peng AuthorDate: Mon Jan 14 09:54:23 2019 -0800 Cleanup consumer subscriptions and fix graceful shutdown for functions/sinks (#3299) * Cleanup consumer subscriptions and fix graceful shutdown for functions * cleaning up * removing testing files * add unit tests * adding integration testing * refactoring * refactoring and adding tests * cleaning up --- .../worker/PulsarWorkerAssignmentTest.java | 2 + .../apache/pulsar/io/PulsarFunctionE2ETest.java| 168 - .../pulsar/common/functions/FunctionConfig.java| 2 + .../org/apache/pulsar/common/io/SinkConfig.java| 3 +- .../pulsar/functions/instance/InstanceUtils.java | 12 ++ .../functions/instance/JavaInstanceRunnable.java | 4 +- .../functions/source/PulsarSourceConfig.java | 1 + .../instance/src/main/python/Function_pb2.py | 47 +++--- .../instance/src/main/python/function_stats.py | 2 +- .../instance/src/main/python/python_instance.py| 22 ++- .../src/main/python/python_instance_main.py| 4 +- pulsar-functions/instance/src/main/python/util.py | 4 +- .../proto/src/main/proto/Function.proto| 1 + .../pulsar/functions/runtime/ProcessRuntime.java | 29 +++- .../pulsar/functions/runtime/ThreadRuntime.java| 2 + .../functions/utils/FunctionConfigUtils.java | 5 + .../pulsar/functions/utils/SinkConfigUtils.java| 6 + .../pulsar/functions/worker/FunctionAction.java| 3 +- .../pulsar/functions/worker/FunctionActioner.java | 85 +-- .../functions/worker/FunctionRuntimeManager.java | 34 - .../pulsar/functions/worker/WorkerService.java | 8 +- .../functions/worker/FunctionActionerTest.java | 5 +- .../worker/FunctionRuntimeManagerTest.java | 25 +-- .../functions/worker/MembershipManagerTest.java| 32 ++-- .../integration/functions/PulsarFunctionsTest.java | 20 +++ 25 files changed, 439 insertions(+), 87 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java index 5cdf5a4..4b4597e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import com.google.gson.Gson; import org.apache.bookkeeper.test.PortManager; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -213,6 +214,7 @@ public class PulsarWorkerAssignmentTest { } }, 5, 150); // validate pulsar sink consumer has started on the topic +log.info("admin.topics().getStats(sinkTopic): {}", new Gson().toJson(admin.topics().getStats(sinkTopic))); assertEquals(admin.topics().getStats(sinkTopic).subscriptions.values().iterator().next().consumers.size(), 1); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java index 94e7bfa..f04fede 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java @@ -20,11 +20,13 @@ package org.apache.pulsar.io; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.google.gson.Gson; import lombok.ToString; import org.apache.bookkeeper.test.PortManager; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.ServiceConfigurationUtils; +import org.apache.pulsar.broker.authentication.AuthenticationProviderBasic; import org.apache.pulsar.broker.authentication.AuthenticationProviderTls; import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl; import org.apache.pulsar.client.admin.BrokerStats; @@ -42,11 +44,13 @@ import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.Utils; import org.apache.pulsar.common.io.SinkConfig; import org.apache.pulsar.common.io.SourceConfig; +import org.apache.pulsar.common.naming.TopicNam
[pulsar] branch master updated: Added HTTP Support (#3336)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 7ec205a Added HTTP Support (#3336) 7ec205a is described below commit 7ec205ad5733bbbc9d99da16ed4721221bb5b315 Author: David Kjerrumgaard <35466513+david-stream...@users.noreply.github.com> AuthorDate: Fri Jan 11 13:27:36 2019 -0800 Added HTTP Support (#3336) * Added HTTP Support * Updated documentation and removed duplicate code * Added unit test for NettyHttpChannelInitializer --- .../org/apache/pulsar/io/netty/NettySource.java| 7 +- .../apache/pulsar/io/netty/NettySourceConfig.java | 19 ++- .../NettyHttpChannelInitializer.java} | 19 ++- .../io/netty/http/NettyHttpServerHandler.java | 144 + .../package-info.java} | 26 +--- ...tyChannelInitializer.java => package-info.java} | 26 +--- .../io/netty/server/NettyChannelInitializer.java | 2 +- .../apache/pulsar/io/netty/server/NettyServer.java | 70 ++ .../pulsar/io/netty/server/NettyServerHandler.java | 13 +- ...tyChannelInitializer.java => package-info.java} | 26 +--- .../http/NettyHttpChannelInitializerTest.java} | 32 ++--- site2/docs/io-connectors.md| 2 +- site2/docs/io-netty.md | 4 +- 13 files changed, 250 insertions(+), 140 deletions(-) diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySource.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySource.java index 215bd34..1e799d8 100644 --- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySource.java +++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySource.java @@ -18,20 +18,21 @@ */ package org.apache.pulsar.io.netty; +import java.util.Map; + import org.apache.pulsar.io.core.PushSource; import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.core.annotations.Connector; import org.apache.pulsar.io.core.annotations.IOType; import org.apache.pulsar.io.netty.server.NettyServer; -import java.util.Map; /** - * A simple Netty Tcp or Udp Source connector to listen Tcp/Udp messages and write to user-defined Pulsar topic + * A simple Netty Source connector to listen for incoming messages and write to user-defined Pulsar topic. */ @Connector( name = "netty", type = IOType.SOURCE, -help = "A simple Netty Tcp or Udp Source connector to listen Tcp/Udp messages and write to user-defined Pulsar topic", +help = "A simple Netty Source connector to listen for incoming messages and write to user-defined Pulsar topic", configClass = NettySourceConfig.class) public class NettySource extends PushSource { diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySourceConfig.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySourceConfig.java index f5d40e9..1ef4c35 100644 --- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySourceConfig.java +++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySourceConfig.java @@ -20,17 +20,22 @@ package org.apache.pulsar.io.netty; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; -import lombok.*; -import lombok.experimental.Accessors; -import org.apache.pulsar.io.core.annotations.FieldDoc; import java.io.File; import java.io.IOException; import java.io.Serializable; import java.util.Map; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import lombok.experimental.Accessors; +import org.apache.pulsar.io.core.annotations.FieldDoc; + /** - * Netty Tcp or Udp Source Connector Config. + * Netty Source Connector Config. */ @Data @Setter @@ -45,7 +50,7 @@ public class NettySourceConfig implements Serializable { @FieldDoc( required = true, defaultValue = "tcp", -help = "The tcp or udp network protocols") +help = "The network protocol to use, supported values are 'tcp', 'udp', and 'http'") private String type = "tcp"; @FieldDoc( @@ -63,8 +68,8 @@ public class NettySourceConfig implements Serializable { @FieldDoc( required = true, defaultValue = "1", -help = "The number of threads of Netty Tcp Server to accept incoming connections and " + -"handle the traffic of the accepted connections") +help = "The number of threads of Netty Tcp Server to accept incoming connections and " ++ "handle the traffic
[pulsar] branch master updated: Add BC Testing sample smoke test (#3355)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 4b8bd04 Add BC Testing sample smoke test (#3355) 4b8bd04 is described below commit 4b8bd04f3571f4c6dc40d0b1a1e79975d9ad5408 Author: Ali Ahmed AuthorDate: Thu Jan 10 17:29:55 2019 -0800 Add BC Testing sample smoke test (#3355) --- .../PulsarStandaloneTestSuite2_2.java} | 8 .../SmokeTest2_2.java} | 7 +++ .../integration/containers/PulsarContainer.java| 24 -- .../containers/StandaloneContainer.java| 11 ++ .../tests/integration/standalone/SmokeTest.java| 1 - .../suites/PulsarStandaloneTestSuite.java | 4 ++-- .../integration/topologies/PulsarCluster.java | 6 +++--- .../integration/topologies/PulsarClusterSpec.java | 14 ++--- .../topologies/PulsarStandaloneTestBase.java | 4 ++-- 9 files changed, 58 insertions(+), 21 deletions(-) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarStandaloneTestSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/PulsarStandaloneTestSuite2_2.java similarity index 81% copy from tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarStandaloneTestSuite.java copy to tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/PulsarStandaloneTestSuite2_2.java index 15e82e3..db993f3 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarStandaloneTestSuite.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/PulsarStandaloneTestSuite2_2.java @@ -16,18 +16,19 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.tests.integration.suites; +package org.apache.pulsar.tests.integration.backwardscompatibility; +import org.apache.pulsar.tests.integration.containers.PulsarContainer; import org.apache.pulsar.tests.integration.topologies.PulsarStandaloneTestBase; import org.testng.ITest; import org.testng.annotations.AfterSuite; import org.testng.annotations.BeforeSuite; -public class PulsarStandaloneTestSuite extends PulsarStandaloneTestBase implements ITest { +public class PulsarStandaloneTestSuite2_2 extends PulsarStandaloneTestBase implements ITest { @BeforeSuite public void setUpCluster() throws Exception { -super.startCluster(); +super.startCluster(PulsarContainer.PULSAR_2_2_IMAGE_NAME); } @AfterSuite @@ -35,7 +36,6 @@ public class PulsarStandaloneTestSuite extends PulsarStandaloneTestBase implemen super.stopCluster(); } - @Override public String getTestName() { return "pulsar-standalone-suite"; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/standalone/SmokeTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java similarity index 84% copy from tests/integration/src/test/java/org/apache/pulsar/tests/integration/standalone/SmokeTest.java copy to tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java index 2b658fa..d9c446d 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/standalone/SmokeTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java @@ -16,13 +16,12 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.tests.integration.standalone; -import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite; -import org.testng.annotations.Test; +package org.apache.pulsar.tests.integration.backwardscompatibility; +import org.testng.annotations.Test; -public class SmokeTest extends PulsarStandaloneTestSuite { +public class SmokeTest2_2 extends PulsarStandaloneTestSuite2_2 { @Test(dataProvider = "StandaloneServiceUrlAndTopics") public void testPublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception { diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java index 7650d8f..59e8628 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java @@ -40,7 +40,10 @@ public abstract class PulsarContainer> exte