To my understanding, the Pulsar IO Connectors (i.e. Sources/Sinks) are
quite self-contained. They move data around.

If we want to enable functionality described inside the PIP (process ->
write to otherplace), can we think in another way -- allow flexible
configuring of a Pulsar Function?

Originally Pulsar Function pipeline is:
PulsarSource -> func() -> PulsarSink()

Can we look into allowing users to change a source/sink in the
PulsarFunction pipeline instead of tweaking the Sink?

Syntax could be:
```
pulsar-admins functions create --sink ... --source ...
```

This will be more flexible and opens a lot possibility for further
development



On Tue, Jul 26, 2022 at 2:56 AM Christophe Bornet <bornet.ch...@gmail.com>
wrote:

> Thanks for the feedback Jerry.
> We don't modify the way sources, sinks and functions are detected when it's
> based on their fields. The proposal is just to modify the classname of the
> function applied in the instance so the same detection rules apply. The
> only difference is when detecting if the sink or function is built-in. For
> this we add some code to do this detection also based on the ComponentType
> (either detected or explicit). You can check the implementation PR about
> it: https://github.com/apache/pulsar/pull/16740
>
> IMO, making it separate implementation of what currently exist would make
> things more complex and this more error prone for no good reason. The
> proposal is "just" to replace the name of the already existing function
> (IdentityFunction) by another one and to provide the location of the
> function JAR.
>
> Best regards
> Christophe
>
> Le lun. 25 juil. 2022 à 23:31, Jerry Peng <jerry.boyang.p...@gmail.com> a
> écrit :
>
> > My feedback is to make this change as self contained as possible.  Can we
> > just have a special implementation of a sink that will run the logic of
> the
> > "preprocess" function?  There are many places in the code where we figure
> > out if it is a source, sink or a function based on the fields in the
> > Function metadata.  Changing that may have unintended consequences.
> >
> > On Mon, Jul 25, 2022 at 5:55 AM Baodi Shi <baodi....@icloud.com.invalid>
> > wrote:
> >
> > > > Can you explain more what you mean ?
> > > This PIP doesn't change the API of a Function and it's already possible
> > to
> > > write a Function<?, Record<?>>.
> > > And when declaring a Sink with a Function we'll check that it's the
> case.
> > >
> > > I mean: we should constrain the function interface, otherwise, the user
> > > may return a structure that is not a record.
> > >
> > > Thanks,
> > > Baodi Shi
> > >
> > > > On Jul 25, 2022, at 01:0233, Christophe Bornet <
> bornet.ch...@gmail.com
> > >
> > > wrote:
> > > >
> > > > Thanks for the feedback Asaf
> > > >
> > > >
> > > >>>   - preprocess-function: the preprocess function applied before the
> > > >>>   Sink. Starts by builtin:// for built-in functions, function://
> for
> > > >>>   package function, http:// or file://
> > > >>>
> > > >>> 1. While this function is applied only before sink? I thought it
> > > replaces
> > > >> the identity function, so why a source can't have a function that
> > reads
> > > >> from the source (say S3), runs the function and only then writes to
> a
> > > >> pulsar topic?
> > > >>
> > > >
> > > > Yes that's totally possible to implement and will be done in future
> > work
> > > > like written in the PIP.
> > > >
> > > >
> > > >> 2. Can you clarify more about built in and function for package
> > > function?
> > > >> Is this an existing functionality ?
> > > >>
> > > > Yes those are existing functionalities.
> > > > Built-in functions are not documented (and we should do something
> about
> > > > that).
> > > > Package management of functions is described in
> > > >
> > >
> >
> https://pulsar.apache.org/docs/functions-deploy#use-package-management-service
> > > >
> > > >
> > > >> 3. Regarding http - Are you loading a class through that URL? Aren't
> > we
> > > >> exposed to same problem Log4Shell security issue had? If so, what
> > > measures
> > > >> are you taking to protect ?
> > > >>
> > > > Yes we are loading code via URL. This feature already exists for
> > > > Sources/Sinks/Functions.
> > > > I guess you need to have a huge trust of the source from where you
> > > download.
> > > > This PIP has the same security level as what already exists for this
> > > > functionality.
> > > >
> > > >
> > > >>
> > > >> The field extraFunctionPackageLocation to the protobuf structure
> > > >>> FunctionMetaData will be added. This field will be filled with the
> > > >>> location of the extra function to apply when registering a sink and
> > > used
> > > >> in
> > > >>> the Runtime to load the function code.
> > > >>
> > > >> Can you please expand on that? You mean the JAR location, which you
> > will
> > > >> search that class name and function specified in the 3 fields you've
> > > added
> > > >> to the config?
> > > >>
> > > > Not exactly. It's the location of where the JAR is stored. It can be
> > > > BookKeeper, package management, built-in NAR, etc...
> > > > In KubernetesRuntime, there are cases where the builtin or package
> > > function
> > > > you provide in the preprocess-function param could be copied to BK.
> > > > That's the same as for a regular Sink/Source and if we need to copy
> to
> > > BK,
> > > > we append `__sink-function` to the storage path to prevent conflict
> > with
> > > > the sink code.
> > > > The class name is indeed looked up in this JAR.
> > > >
> > > >
> > > >> The parameters extraFunctionFile and originalExtraFunctionFileName
> > will
> > > be
> > > >>> added to RuntimeFactory::createContainer
> > > >>
> > > >> 1. File and fileName containing what? How does this related to
> > > >> extraFunctionPackageLocation?
> > > >>
> > > > That part mimicks what is already done for the main code of the
> > > Source/Sink
> > > > code (with respectively codeFile, originalCodeFileName and
> > > packageLocation)
> > > > Before starting the ThreadedRuntime, we download locally the JAR from
> > the
> > > > extraFunctionPackageLocation in the extraFunctionFile so we can load
> > the
> > > > code from it.
> > > >
> > > >
> > > >>
> > > >> In here you use the terminology Extra Function" and in fields of
> > config
> > > and
> > > >> admin you used the term Pre-Process Function. I would stick to
> > > Pro-Process
> > > >> Function and stick with it all over.
> > > >>
> > > > This terminology need to be applicable to a Function that would be
> > > applied
> > > > after a Source so we can't use  "preprocess" here.
> > > > I haven't found better than "extra function". Don't hesitate to
> propose
> > > > something !
> > > >
> > > >
> > > >>
> > > >>
> > > >>> The following parameters will be added to JavaInstanceStarter:
> > > >>>
> > > >>>   - --extra_function_jar: the path to the extra function jar
> > > >>>
> > > >>>
> > > >>>   - --extra_function_id: the extra function UUID cache key
> > > >>>
> > > >>> These parameters are then used by the ThreadRuntime to load the
> > > function
> > > >>> from the FunctionCacheManager or create it there if needed.
> > > >>
> > > >>
> > > >> Can you elaborate on that? JavaInstanceStarter is used to start a
> > single
> > > >> Function? It's used from command line?
> > > >
> > > > The  JavaInstanceStarter is indeed a CLI to start a JavaInstance.
> > > > The JavaInstance is the process that will execute the code to read
> > from a
> > > > Source, execute a Function and write to a Sink.
> > > > Generally Pulsar users don't use the JavaInstanceStarter directly.
> The
> > > > command line is forged by the ProcessRuntime and KubernetesRuntime.
> > > >
> > > >>
> > > >>
> > > >> In general, you will essentially have two class loaders - one for
> the
> > > sink
> > > >> and one for the pre-process function?
> > > >>
> > > > Yes, exactly.
> > > > 3 to be more accurate since there's also the instance class loader.
> > > >
> > > >
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> On Fri, Jul 22, 2022 at 12:48 PM Christophe Bornet <
> > > bornet.ch...@gmail.com
> > > >>>
> > > >> wrote:
> > > >>
> > > >>> Dear Pulsar dev community,
> > > >>>
> > > >>> I would like to open a discussion here about PIP 193 : Sink
> > > preprocessing
> > > >>> Function <https://github.com/apache/pulsar/issues/16739>.
> > > >>>
> > > >>> Best regards
> > > >>>
> > > >>> Christophe
> > > >>>
> > > >>> ## Motivation
> > > >>>
> > > >>> Pulsar IO connectors make it possible to connect Pulsar to an
> > external
> > > >>> system:
> > > >>> * A Source reads continuously from an external system and writes
> to a
> > > >>> Pulsar topic
> > > >>> * A Sink reads continuously from a Pulsar topic and writes to an
> > > external
> > > >>> system.
> > > >>> Sources and Sinks are written in Java.
> > > >>>
> > > >>> Pulsar also has a lightweight computing system named Pulsar
> > Functions.
> > > A
> > > >>> Pulsar Function reads from one or more topics, applies user logic
> > > written
> > > >>> in Java, Python or Go and writes to an output topic.
> > > >>>
> > > >>> When using Pulsar IO connectors, the format of what is read/written
> > > >> from/to
> > > >>> the source/sink is defined by the connector code. But there are a
> lot
> > > of
> > > >>> situations where a user wants to transform this data before using
> it.
> > > >>> Currently the solution is to either :
> > > >>> * write a custom connector that transforms the data the way we want
> > but
> > > >>> that means writing a lot of code without reuse, packaging and
> > managing
> > > >>> custom connectors and so on..
> > > >>> * write a Function to transform the data after it was written to a
> > > topic
> > > >> by
> > > >>> a Source or before it is read from a topic by a Sink. This is not
> > very
> > > >>> efficient as we have to use an intermediate topic, which means
> > > additional
> > > >>> storage, IO, and latency.
> > > >>>
> > > >>> Considering all this, it would be handy to be able to apply a
> > Function
> > > >>> on-the-fly to a connector without going through an intermediary
> > topic.
> > > >>>
> > > >>> ## Goal
> > > >>>
> > > >>> This PIP defines the changes needed to be able to apply a
> > preprocessing
> > > >>> Function on-the-fly to a Sink.
> > > >>> The preprocessing function can be a built-in function, a package
> > > >> function,
> > > >>> or loaded through an http URL or a file path.
> > > >>> Sources, Sinks and Functions are based on the same runtime process
> > > that:
> > > >>> * reads from a Source. For Sinks and Functions this Source is a
> > > >>> PulsarSource consuming from a Pulsar topic
> > > >>> * applies a Function. For Sources and Sinks, this Function is
> > > >>> IdentityFunction which returns the data it gets without
> modification.
> > > >>> * writes to a Sink. For Sources and Functions, this Sink is a
> > > PulsarSink
> > > >>> writing to a Pulsar topic.
> > > >>>
> > > >>> This PIP reuses this and allows configuring a Function different
> from
> > > >>> IdentityFunction to Sinks.
> > > >>> Only Functions returning a Record will be authorized to ensure that
> > the
> > > >>> Function sets the Schema explicitly.
> > > >>>
> > > >>> Out of the scope of this PIP, for future work:
> > > >>> * Applying a post-processing Function to a Source
> > > >>> * Loading the Function jar through the Sink CLI
> > > >>>
> > > >>> ## API Changes
> > > >>>
> > > >>> ### Admin CLI
> > > >>>
> > > >>> The following options will be added to the `pulsar-admin sinks` CLI
> > > >>> `create`, `update` and `localrun`:
> > > >>> * `preprocess-function`: the preprocess function applied before the
> > > Sink.
> > > >>> Starts by `builtin://` for built-in functions, `function://` for
> > > package
> > > >>> function, `http://` or `file://`
> > > >>> * `preprocess-function-classname`: the preprocess function class
> name
> > > >>> (optional if the function is a NAR)
> > > >>> * `preprocess-function-config`: the configuration of the preprocess
> > > >>> function in the same format as the `user-config` parameter of the
> > > >>> `functions create` CLI command.
> > > >>>
> > > >>> The corresponding fields will be added to `SinkConfig`:
> > > >>>
> > > >>> ```java
> > > >>>    private String preprocessFunction;
> > > >>>    private String preprocessFunctionClassName;
> > > >>>    private String preprocessFunctionConfig;
> > > >>> ```
> > > >>>
> > > >>> ### Function definition
> > > >>>
> > > >>> The field `extraFunctionPackageLocation` to the protobuf structure
> > > >>> `FunctionMetaData` will be added. This field will be filled with
> the
> > > >>> location of the extra function to apply when registering a sink and
> > > used
> > > >> in
> > > >>> the Runtime to load the function code.
> > > >>>
> > > >>> ```protobuf
> > > >>> message FunctionMetaData {
> > > >>>    ...
> > > >>>    PackageLocationMetaData extraFunctionPackageLocation = 7;
> > > >>> }
> > > >>> ```
> > > >>>
> > > >>> ### Runtime
> > > >>>
> > > >>> The parameters `extraFunctionFile` and
> > `originalExtraFunctionFileName`
> > > >> will
> > > >>> be added to `RuntimeFactory::createContainer`
> > > >>>
> > > >>>
> > > >>> ```java
> > > >>>   Runtime createContainer(
> > > >>>            InstanceConfig instanceConfig, String codeFile, String
> > > >>> originalCodeFileName,
> > > >>>            String extraFunctionFile, String
> > > >> originalExtraFunctionFileName,
> > > >>>            Long expectedHealthCheckInterval) throws Exception;
> > > >>> ```
> > > >>>
> > > >>> ### Instance function cache
> > > >>>
> > > >>> A field `extraFunctionId` to `InstanceConfig` that will hold the
> UUID
> > > >> cache
> > > >>> key of the extra function will be added.
> > > >>>
> > > >>> ```java
> > > >>> public class InstanceConfig {
> > > >>>    private int instanceId;
> > > >>>    private String functionId;
> > > >>>    private String extraFunctionId;
> > > >>> ```
> > > >>>
> > > >>> ### JavaInstanceStarter
> > > >>>
> > > >>>
> > > >>> The following parameters will be added to JavaInstanceStarter:
> > > >>> * `--extra_function_jar`: the path to the extra function jar
> > > >>> * `--extra_function_id`: the extra function UUID cache key
> > > >>>
> > > >>> These parameters are then used by the `ThreadRuntime` to load the
> > > >> function
> > > >>> from the `FunctionCacheManager` or create it there if needed.
> > > >>>
> > > >>> ### Download the extra function
> > > >>>
> > > >>> The statefulset spawned in `KubernetesRuntime` needs to be able to
> > > >> download
> > > >>> the extra functions code via API.
> > > >>> An `extra-function` query param will be added to the download
> > function
> > > >> HTTP
> > > >>> endpoint
> > > >>>
> > > >>> ```java
> > > >>>   @Path("/{tenant}/{namespace}/{functionName}/download")
> > > >>>    public StreamingOutput downloadFunction(
> > > >>>            @ApiParam(value = "The tenant of functions")
> > > >>>            final @PathParam("tenant") String tenant,
> > > >>>            @ApiParam(value = "The namespace of functions")
> > > >>>            final @PathParam("namespace") String namespace,
> > > >>>            @ApiParam(value = "The name of functions")
> > > >>>            final @PathParam("functionName") String functionName) {
> > > >>>            final @PathParam("functionName") String functionName,
> > > >>>            @ApiParam(value = "Whether to download the
> > extra-function")
> > > >>>            final @QueryParam("extra-function") boolean
> > extraFunction) {
> > > >>> ```
> > > >>>
> > > >>> If `extraFunction` is `true` then the extra function will be
> returned
> > > >>> instead of the sink.
> > > >>>
> > > >>> The Java admin SDK will have the following methods added:
> > > >>>
> > > >>>
> > > >>> ```java
> > > >>>   /**
> > > >>>     * Download Function Code.
> > > >>>     *
> > > >>>     * @param destinationFile
> > > >>>     *           file where data should be downloaded to
> > > >>>     * @param tenant
> > > >>>     *            Tenant name
> > > >>>     * @param namespace
> > > >>>     *            Namespace name
> > > >>>     * @param function
> > > >>>     *            Function name
> > > >>>     * @param extraFunction
> > > >>>     *            Whether to download the extra-function (for
> sources
> > > and
> > > >>> sinks)
> > > >>>     * @throws PulsarAdminException
> > > >>>     */
> > > >>>    void downloadFunction(String destinationFile, String tenant,
> > String
> > > >>> namespace, String function,
> > > >>>                          boolean extraFunction) throws
> > > >>> PulsarAdminException;
> > > >>>
> > > >>>    /**
> > > >>>     * Download Function Code asynchronously.
> > > >>>     *
> > > >>>     * @param destinationFile
> > > >>>     *           file where data should be downloaded to
> > > >>>     * @param tenant
> > > >>>     *            Tenant name
> > > >>>     * @param namespace
> > > >>>     *            Namespace name
> > > >>>     * @param function
> > > >>>     *            Function name
> > > >>>     * @param extraFunction
> > > >>>     *            Whether to download the extra-function (for
> sources
> > > and
> > > >>> sinks)
> > > >>>     */
> > > >>>    CompletableFuture<Void> downloadFunctionAsync(
> > > >>>            String destinationFile, String tenant, String namespace,
> > > >> String
> > > >>> function, boolean extraFunction);
> > > >>> ```
> > > >>>
> > > >>> The parameter `--extra-function` will be added to the admin CLI
> > command
> > > >>> `functions download`
> > > >>>
> > > >>> ## Implementation
> > > >>>
> > > >>> ### Pulsar-admin
> > > >>>
> > > >>> * Add the admin CLI options when creating/updating/localrunning the
> > > sink
> > > >>> (see API changes)
> > > >>>
> > > >>> ### Pulsar broker
> > > >>>
> > > >>> * On the broker API, in registerSink/updateSink, if a preprocessing
> > > >>> function is present in the Sink config, we:
> > > >>>  * validate the function
> > > >>>    * get the function classloader (from builtin or download a
> package
> > > >>> file)
> > > >>>    * load the function
> > > >>>    * inspect the function types and set the first arg as Sink type.
> > > Also
> > > >>> verify that the second arg is of type Record.
> > > >>>  * use the function classloader instead of the sink classloader to
> > > >> verify
> > > >>> if custom schemas, serdes, crypto key readers can be loaded and are
> > > >>> conform.
> > > >>>  * get the function package location and fill the protobuf
> > > >>> extraFunctionPackageLocation field with it. A name for this
> > > preprocessing
> > > >>> function is generated from the sink name so it can be referenced
> when
> > > >>> stored in BookKeeper or in package management. The name of the
> > > >>> preprocessing function is `{sink name}__sink-function`.
> > > >>>  * set the `functionDetails` with the preprocessing function config
> > > >>> (function class name and function userConfig)
> > > >>>
> > > >>> * The `--extra-function` query parameter is added to the `functions
> > > >>> download` CLI command, admin SDK and HTTP API (see API changes).
> > > >>>
> > > >>> ### Function worker
> > > >>>
> > > >>> * When the `InstanceConfig` is created, an UUID is set to the
> > > >>> `extraFunctionId` field. This field will serve as a cache key for
> the
> > > >> extra
> > > >>> function (see API changes).
> > > >>> * When the `FunctionActioner` starts the function, if
> > > >>> `extraFunctionPackageLocation` is present, the same is done for the
> > > extra
> > > >>> function as what is done for the connector:
> > > >>>  * if the runtime is not externally managed, the extra function
> code
> > is
> > > >>> downloaded from the `extraFunctionPackageLocation` and the
> `Runtime`
> > is
> > > >>> created with the extra package file path and original name (see API
> > > >> changes
> > > >>> to `RuntimeFactory::createContainer`)
> > > >>>  * if the runtime is externally managed, the `Runtime` is created
> > with
> > > >> the
> > > >>> `extraFunctionPackageLocation` and original name.
> > > >>>
> > > >>> * Depending on the configured runtime, if there’s an extra function
> > > file:
> > > >>>  * For the `ThreadRuntime`, the extra function classloader is
> > obtained
> > > >>> with the instance `extraFunctionId` cache key, then this
> classloader
> > is
> > > >>> passed to the `JavaInstanceRunnable`. The `JavaInstanceRunnable`
> then
> > > >>> switches between the connector classloader and the extra function
> > > >>> classloader accordingly..
> > > >>>  * For the `ProcessRuntime`, the path to the extra function jar is
> > > added
> > > >>> to the `--extra_function_jar` parameter in the
> `JavaInstanceStarter`
> > > >>> command. The `JavaInstanceStarter` then uses it when creating its
> > > >>> `ThreadRuntime`.
> > > >>>  * For the `KubernetesRuntime`, a command is added in the
> statefulset
> > > >> exec
> > > >>> command to download the extra function using the `–extra-function`
> > flag
> > > >> of
> > > >>> the `functions download` command. And the path to this downloaded
> > jar
> > > is
> > > >>> added to the `--extra_function_jar` parameter of the
> > > >> `JavaInstanceStarter`
> > > >>> command.
> > > >>>
> > > >>> ### LocalRunner
> > > >>>
> > > >>> If  `sinkConfig` has a `preprocessFunction`, the `LocalRunner` will
> > use
> > > >> the
> > > >>> same methods as in the broker to get the function file and
> > > >>> `functionDetails` and use them when spawning the `Runtime`.
> > > >>>
> > > >>> ## Reject Alternatives
> > > >>>
> > > >>> N/A
> > > >>>
> > > >>
> > >
> > >
> >
>


-- 
Best Regards,
Neng

Reply via email to