This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push: new e2a0866 [Functions] reorganize the context hierarchy for functions (#10631) (#12117) e2a0866 is described below commit e2a08669da4d7c1a59bd10895949387ea6e1d1d8 Author: Jiwei Guo <techno...@apache.org> AuthorDate: Fri Sep 24 17:59:01 2021 +0800 [Functions] reorganize the context hierarchy for functions (#10631) (#12117) ### Motivation Currently the context relationship for function, source and sink is not well defined. This prevents some common features to be added once for all and creates some confusion, code duplication in the current repo. As demonstrated in the following graph, this PR changes the hierarchy from left to right. By introducing a common base context, it help solving some issues we are seeing. The base context provides common access to pulsar cluster, state, metrics, and meta-data to make sure all [...] ![context hierarchy](https://user-images.githubusercontent.com/16407807/118730483-8ebf5200-b7ec-11eb-9220-d41261f148bb.png) ### Modifications - Remove `ConnectorContext` interface. - Introduce a `BaseContext` interface. - Update existing `Context`, `SourceContext`, `SinkContext` interface to extend the new common interface. Co-authored-by: Neng Lu <n...@streamnative.io> --- .../org/apache/pulsar/client/api/Producer.java | 2 +- .../apache/pulsar/functions/api/BaseContext.java | 140 ++++++++-------- .../org/apache/pulsar/functions/api/Context.java | 177 ++------------------- .../org/apache/pulsar/io/core/SinkContext.java | 23 +-- .../org/apache/pulsar/io/core/SourceContext.java | 16 +- 5 files changed, 110 insertions(+), 248 deletions(-) diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Producer.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Producer.java index 4d69668..6147d33 100644 --- a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Producer.java +++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Producer.java @@ -84,7 +84,7 @@ public interface Producer extends Closeable { void flush() throws PulsarClientException; /** - * Flush all the messages buffered in the client and wait until all messages have been successfully persisted. + * Flush all the messages buffered in the client asynchronously. * * @return a future that can be used to track when all the messages have been safely persisted. * @since 2.1.0 diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/ConnectorContext.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java similarity index 74% rename from pulsar-io/core/src/main/java/org/apache/pulsar/io/core/ConnectorContext.java rename to pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java index 5e4de64..5105df7 100644 --- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/ConnectorContext.java +++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java @@ -16,73 +16,67 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.io.core; +package org.apache.pulsar.functions.api; -import java.nio.ByteBuffer; -import java.util.concurrent.CompletableFuture; import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; -import org.apache.pulsar.functions.api.StateStore; import org.slf4j.Logger; +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; + /** - * Interface for a connector providing information about environment where it is running. - * It also allows to propagate information, such as logs, metrics, states, back to the Pulsar environment. + * BaseContext provides base contextual information to the executing function/source/sink. + * It allows to propagate information, such as pulsar environment, logs, metrics, states etc. */ @InterfaceAudience.Public @InterfaceStability.Stable -public interface ConnectorContext { - +public interface BaseContext { /** - * The id of the instance that invokes this source. + * The tenant this component belongs to. * - * @return the instance id + * @return the tenant this component belongs to */ - int getInstanceId(); + String getTenant(); /** - * Get the number of instances that invoke this source. + * The namespace this component belongs to. * - * @return the number of instances that invoke this source. - */ - int getNumInstances(); - - /** - * Record a user defined metric - * @param metricName The name of the metric - * @param value The value of the metric + * @return the namespace this component belongs to */ - void recordMetric(String metricName, double value); - + String getNamespace(); + /** - * The tenant this source belongs to. + * The id of the instance that invokes this component. * - * @return the tenant this source belongs to + * @return the instance id */ - String getTenant(); + int getInstanceId(); /** - * The namespace this source belongs to. + * Get the number of instances that invoke this component. * - * @return the namespace this source belongs to + * @return the number of instances that invoke this component. */ - String getNamespace(); + int getNumInstances(); /** - * The logger object that can be used to log in a sink + * The logger object that can be used to log in a component. + * * @return the logger object */ Logger getLogger(); /** - * Get the secret associated with this key + * Get the secret associated with this key. + * * @param secretName The name of the secret * @return The secret if anything was found or null */ String getSecret(String secretName); /** - * Get the state store with the provided store name. + * Get the state store with the provided store name in the tenant & namespace. * * @param name the state store name * @param <S> the type of interface of the store to return @@ -92,43 +86,24 @@ public interface ConnectorContext { * or interface of the actual returned store. */ default <S extends StateStore> S getStateStore(String name) { - throw new UnsupportedOperationException("Not implemented"); + throw new UnsupportedOperationException("Component cannot get state store"); } /** - * Increment the builtin distributed counter referred by key. - * - * @param key The name of the key - * @param amount The amount to be incremented - */ - void incrCounter(String key, long amount); - - - /** - * Increment the builtin distributed counter referred by key - * but dont wait for the completion of the increment operation - * - * @param key The name of the key - * @param amount The amount to be incremented - */ - CompletableFuture<Void> incrCounterAsync(String key, long amount); - - /** - * Retrieve the counter value for the key. + * Get the state store with the provided store name. * - * @param key name of the key - * @return the amount of the counter value for this key - */ - long getCounter(String key); - - /** - * Retrieve the counter value for the key, but don't wait - * for the operation to be completed + * @param tenant the state tenant name + * @param ns the state namespace name + * @param name the state store name + * @param <S> the type of interface of the store to return + * @return the state store instance. * - * @param key name of the key - * @return the amount of the counter value for this key + * @throws ClassCastException if the return type isn't a type + * or interface of the actual returned store. */ - CompletableFuture<Long> getCounterAsync(String key); + default <S extends StateStore> S getStateStore(String tenant, String ns, String name) { + throw new UnsupportedOperationException("Component cannot get state store"); + } /** * Update the state value for the key. @@ -175,4 +150,45 @@ public interface ConnectorContext { * @param key name of the key */ CompletableFuture<Void> deleteStateAsync(String key); + + /** + * Increment the builtin distributed counter referred by key. + * + * @param key The name of the key + * @param amount The amount to be incremented + */ + void incrCounter(String key, long amount); + + /** + * Increment the builtin distributed counter referred by key + * but dont wait for the completion of the increment operation + * + * @param key The name of the key + * @param amount The amount to be incremented + */ + CompletableFuture<Void> incrCounterAsync(String key, long amount); + + /** + * Retrieve the counter value for the key. + * + * @param key name of the key + * @return the amount of the counter value for this key + */ + long getCounter(String key); + + /** + * Retrieve the counter value for the key, but don't wait + * for the operation to be completed + * + * @param key name of the key + * @return the amount of the counter value for this key + */ + CompletableFuture<Long> getCounterAsync(String key); + + /** + * Record a user defined metric + * @param metricName The name of the metric + * @param value The value of the metric + */ + void recordMetric(String metricName, double value); } diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java index 5b5b5f1..1a2175e 100644 --- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java +++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.functions.api; -import java.nio.ByteBuffer; import java.util.Collection; import java.util.Map; import java.util.Optional; @@ -31,7 +30,6 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; -import org.slf4j.Logger; /** * Context provides contextual information to the executing function. @@ -41,14 +39,7 @@ import org.slf4j.Logger; */ @InterfaceAudience.Public @InterfaceStability.Stable -public interface Context { - /** - * Access the record associated with the current input value. - * - * @return - */ - Record<?> getCurrentRecord(); - +public interface Context extends BaseContext { /** * Get a list of all input topics. * @@ -57,32 +48,25 @@ public interface Context { Collection<String> getInputTopics(); /** - * Get the output topic of the function. + * Get the output topic of the source. * * @return output topic name */ String getOutputTopic(); /** - * Get output schema builtin type or custom class name. - * - * @return output schema builtin type or custom class name - */ - String getOutputSchemaType(); - - /** - * The tenant this function belongs to. + * Access the record associated with the current input value. * - * @return the tenant this function belongs to + * @return */ - String getTenant(); + Record<?> getCurrentRecord(); /** - * The namespace this function belongs to. + * Get output schema builtin type or custom class name. * - * @return the namespace this function belongs to + * @return output schema builtin type or custom class name */ - String getNamespace(); + String getOutputSchemaType(); /** * The name of the function that we are executing. @@ -99,20 +83,6 @@ public interface Context { String getFunctionId(); /** - * The id of the instance that invokes this function. - * - * @return the instance id - */ - int getInstanceId(); - - /** - * Get the number of instances that invoke this function. - * - * @return the number of instances that invoke this function. - */ - int getNumInstances(); - - /** * The version of the function that we are executing. * * @return The version id @@ -120,119 +90,6 @@ public interface Context { String getFunctionVersion(); /** - * The logger object that can be used to log in a function. - * - * @return the logger object - */ - Logger getLogger(); - - /** - * Get the state store with the provided store name in the function tenant & namespace. - * - * @param name the state store name - * @param <S> the type of interface of the store to return - * @return the state store instance. - * - * @throws ClassCastException if the return type isn't a type - * or interface of the actual returned store. - */ - <S extends StateStore> S getStateStore(String name); - - /** - * Get the state store with the provided store name. - * - * @param tenant the state tenant name - * @param ns the state namespace name - * @param name the state store name - * @param <S> the type of interface of the store to return - * @return the state store instance. - * - * @throws ClassCastException if the return type isn't a type - * or interface of the actual returned store. - */ - <S extends StateStore> S getStateStore(String tenant, String ns, String name); - - /** - * Increment the builtin distributed counter referred by key. - * - * @param key The name of the key - * @param amount The amount to be incremented - */ - void incrCounter(String key, long amount); - - /** - * Increment the builtin distributed counter referred by key - * but dont wait for the completion of the increment operation - * - * @param key The name of the key - * @param amount The amount to be incremented - */ - CompletableFuture<Void> incrCounterAsync(String key, long amount); - - /** - * Retrieve the counter value for the key. - * - * @param key name of the key - * @return the amount of the counter value for this key - */ - long getCounter(String key); - - /** - * Retrieve the counter value for the key, but don't wait - * for the operation to be completed - * - * @param key name of the key - * @return the amount of the counter value for this key - */ - CompletableFuture<Long> getCounterAsync(String key); - - /** - * Update the state value for the key. - * - * @param key name of the key - * @param value state value of the key - */ - void putState(String key, ByteBuffer value); - - /** - * Update the state value for the key, but don't wait for the operation to be completed - * - * @param key name of the key - * @param value state value of the key - */ - CompletableFuture<Void> putStateAsync(String key, ByteBuffer value); - - /** - * Delete the state value for the key. - * - * @param key name of the key - */ - void deleteState(String key); - - /** - * Delete the state value for the key, but don't wait for the operation to be completed - * - * @param key name of the key - */ - CompletableFuture<Void> deleteStateAsync(String key); - - /** - * Retrieve the state value for the key. - * - * @param key name of the key - * @return the state value for the key. - */ - ByteBuffer getState(String key); - - /** - * Retrieve the state value for the key, but don't wait for the operation to be completed - * - * @param key name of the key - * @return the state value for the key. - */ - CompletableFuture<ByteBuffer> getStateAsync(String key); - - /** * Get a map of all user-defined key/value configs for the function. * * @return The full map of user-defined config values @@ -257,14 +114,6 @@ public interface Context { Object getUserConfigValueOrDefault(String key, Object defaultValue); /** - * Get the secret associated with this key. - * - * @param secretName The name of the secret - * @return The secret if anything was found or null - */ - String getSecret(String secretName); - - /** * Get the pulsar admin client. * * @return The instance of pulsar admin client @@ -272,14 +121,6 @@ public interface Context { PulsarAdmin getPulsarAdmin(); /** - * Record a user defined metric. - * - * @param metricName The name of the metric - * @param value The value of the metric - */ - void recordMetric(String metricName, double value); - - /** * Publish an object using serDe or schema class for serializing to the topic. * * @param topicName The name of the topic for publishing @@ -321,4 +162,4 @@ public interface Context { * @throws PulsarClientException */ <O> ConsumerBuilder<O> newConsumerBuilder(Schema<O> schema) throws PulsarClientException; -} \ No newline at end of file +} diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java index b283a09..427978c 100644 --- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java +++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java @@ -24,6 +24,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; +import org.apache.pulsar.functions.api.BaseContext; /** * Interface for a sink connector providing information about environment where it is running. @@ -31,14 +32,7 @@ import org.apache.pulsar.common.classification.InterfaceStability; */ @InterfaceAudience.Public @InterfaceStability.Stable -public interface SinkContext extends ConnectorContext { - - /** - * Get a list of all input topics - * @return a list of all input topics - */ - Collection<String> getInputTopics(); - +public interface SinkContext extends BaseContext { /** * The name of the sink that we are executing * @return The Sink name @@ -46,7 +40,15 @@ public interface SinkContext extends ConnectorContext { String getSinkName(); /** + * Get a list of all input topics. + * + * @return a list of all input topics + */ + Collection<String> getInputTopics(); + + /** * Get subscription type used by the source providing data for the sink + * * @return subscription type */ default SubscriptionType getSubscriptionType() { @@ -55,6 +57,7 @@ public interface SinkContext extends ConnectorContext { /** * Reset the subscription associated with this topic and partition to a specific message id. + * * @param topic - topic name * @param partition - partition id (0 for non-partitioned topics) * @param messageId to reset to @@ -65,7 +68,9 @@ public interface SinkContext extends ConnectorContext { } /** - * Stop requesting new messages for given topic and partition until {@link #resume(String topic)} is called. + * Stop requesting new messages for given topic and partition until {@link #resume(String topic, int partition)} + * is called. + * * @param topic - topic name * @param partition - partition id (0 for non-partitioned topics) */ diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java index 9740189..934db0c 100644 --- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java +++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java @@ -24,6 +24,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; +import org.apache.pulsar.functions.api.BaseContext; /** * Interface for a source connector providing information about environment where it is running. @@ -31,21 +32,20 @@ import org.apache.pulsar.common.classification.InterfaceStability; */ @InterfaceAudience.Public @InterfaceStability.Stable -public interface SourceContext extends ConnectorContext { - +public interface SourceContext extends BaseContext { /** - * Get the output topic of the source. + * The name of the source that we are executing. * - * @return output topic name + * @return The Source name */ - String getOutputTopic(); + String getSourceName(); /** - * The name of the source that we are executing. + * Get the output topic of the source. * - * @return The Source name + * @return output topic name */ - String getSourceName(); + String getOutputTopic(); /** * New output message using schema for serializing to the topic