Re: [DISCUSS] PIP 193 : Sink preprocessing Function

2022-08-10 Thread Christophe Bornet
This may be a matter of personal opinion but I don't see connectors as more
self contained than functions.
The problem I see in looking at this from a Function point of view is that
a Function could have both its Sink and Source not connected to Pulsar
which feels awkward. I understand that functions were not designed to be
generic purpose functions but only to deal with Pulsar messages.


Le mar. 9 août 2022 à 19:04, Neng Lu  a écrit :

> To my understanding, the Pulsar IO Connectors (i.e. Sources/Sinks) are
> quite self-contained. They move data around.
>
> If we want to enable functionality described inside the PIP (process ->
> write to otherplace), can we think in another way -- allow flexible
> configuring of a Pulsar Function?
>
> Originally Pulsar Function pipeline is:
> PulsarSource -> func() -> PulsarSink()
>
> Can we look into allowing users to change a source/sink in the
> PulsarFunction pipeline instead of tweaking the Sink?
>
> Syntax could be:
> ```
> pulsar-admins functions create --sink ... --source ...
> ```
>
> This will be more flexible and opens a lot possibility for further
> development
>
>
>
> On Tue, Jul 26, 2022 at 2:56 AM Christophe Bornet 
> wrote:
>
> > Thanks for the feedback Jerry.
> > We don't modify the way sources, sinks and functions are detected when
> it's
> > based on their fields. The proposal is just to modify the classname of
> the
> > function applied in the instance so the same detection rules apply. The
> > only difference is when detecting if the sink or function is built-in.
> For
> > this we add some code to do this detection also based on the
> ComponentType
> > (either detected or explicit). You can check the implementation PR about
> > it: https://github.com/apache/pulsar/pull/16740
> >
> > IMO, making it separate implementation of what currently exist would make
> > things more complex and this more error prone for no good reason. The
> > proposal is "just" to replace the name of the already existing function
> > (IdentityFunction) by another one and to provide the location of the
> > function JAR.
> >
> > Best regards
> > Christophe
> >
> > Le lun. 25 juil. 2022 à 23:31, Jerry Peng 
> a
> > écrit :
> >
> > > My feedback is to make this change as self contained as possible.  Can
> we
> > > just have a special implementation of a sink that will run the logic of
> > the
> > > "preprocess" function?  There are many places in the code where we
> figure
> > > out if it is a source, sink or a function based on the fields in the
> > > Function metadata.  Changing that may have unintended consequences.
> > >
> > > On Mon, Jul 25, 2022 at 5:55 AM Baodi Shi  >
> > > wrote:
> > >
> > > > > Can you explain more what you mean ?
> > > > This PIP doesn't change the API of a Function and it's already
> possible
> > > to
> > > > write a Function>.
> > > > And when declaring a Sink with a Function we'll check that it's the
> > case.
> > > >
> > > > I mean: we should constrain the function interface, otherwise, the
> user
> > > > may return a structure that is not a record.
> > > >
> > > > Thanks,
> > > > Baodi Shi
> > > >
> > > > > On Jul 25, 2022, at 01:0233, Christophe Bornet <
> > bornet.ch...@gmail.com
> > > >
> > > > wrote:
> > > > >
> > > > > Thanks for the feedback Asaf
> > > > >
> > > > >
> > > > >>>   - preprocess-function: the preprocess function applied before
> the
> > > > >>>   Sink. Starts by builtin:// for built-in functions, function://
> > for
> > > > >>>   package function, http:// or file://
> > > > >>>
> > > > >>> 1. While this function is applied only before sink? I thought it
> > > > replaces
> > > > >> the identity function, so why a source can't have a function that
> > > reads
> > > > >> from the source (say S3), runs the function and only then writes
> to
> > a
> > > > >> pulsar topic?
> > > > >>
> > > > >
> > > > > Yes that's totally possible to implement and will be done in future
> > > work
> > > > > like written in the PIP.
> > > > >
> > > > >
> > > > >> 2. Can you clarify more about built in and function for package
> > > > function?
> > > > >> Is this an existing functionality ?
> > > > >>
> > > > > Yes those are existing functionalities.
> > > > > Built-in functions are not documented (and we should do something
> > about
> > > > > that).
> > > > > Package management of functions is described in
> > > > >
> > > >
> > >
> >
> https://pulsar.apache.org/docs/functions-deploy#use-package-management-service
> > > > >
> > > > >
> > > > >> 3. Regarding http - Are you loading a class through that URL?
> Aren't
> > > we
> > > > >> exposed to same problem Log4Shell security issue had? If so, what
> > > > measures
> > > > >> are you taking to protect ?
> > > > >>
> > > > > Yes we are loading code via URL. This feature already exists for
> > > > > Sources/Sinks/Functions.
> > > > > I guess you need to have a huge trust of the source from where you
> > > > download.
> > > > > This PIP has the same security level as what already exists for
> this
> > > > 

Re: [DISCUSS] PIP 193 : Sink preprocessing Function

2022-08-09 Thread Neng Lu
To my understanding, the Pulsar IO Connectors (i.e. Sources/Sinks) are
quite self-contained. They move data around.

If we want to enable functionality described inside the PIP (process ->
write to otherplace), can we think in another way -- allow flexible
configuring of a Pulsar Function?

Originally Pulsar Function pipeline is:
PulsarSource -> func() -> PulsarSink()

Can we look into allowing users to change a source/sink in the
PulsarFunction pipeline instead of tweaking the Sink?

Syntax could be:
```
pulsar-admins functions create --sink ... --source ...
```

This will be more flexible and opens a lot possibility for further
development



On Tue, Jul 26, 2022 at 2:56 AM Christophe Bornet 
wrote:

> Thanks for the feedback Jerry.
> We don't modify the way sources, sinks and functions are detected when it's
> based on their fields. The proposal is just to modify the classname of the
> function applied in the instance so the same detection rules apply. The
> only difference is when detecting if the sink or function is built-in. For
> this we add some code to do this detection also based on the ComponentType
> (either detected or explicit). You can check the implementation PR about
> it: https://github.com/apache/pulsar/pull/16740
>
> IMO, making it separate implementation of what currently exist would make
> things more complex and this more error prone for no good reason. The
> proposal is "just" to replace the name of the already existing function
> (IdentityFunction) by another one and to provide the location of the
> function JAR.
>
> Best regards
> Christophe
>
> Le lun. 25 juil. 2022 à 23:31, Jerry Peng  a
> écrit :
>
> > My feedback is to make this change as self contained as possible.  Can we
> > just have a special implementation of a sink that will run the logic of
> the
> > "preprocess" function?  There are many places in the code where we figure
> > out if it is a source, sink or a function based on the fields in the
> > Function metadata.  Changing that may have unintended consequences.
> >
> > On Mon, Jul 25, 2022 at 5:55 AM Baodi Shi 
> > wrote:
> >
> > > > Can you explain more what you mean ?
> > > This PIP doesn't change the API of a Function and it's already possible
> > to
> > > write a Function>.
> > > And when declaring a Sink with a Function we'll check that it's the
> case.
> > >
> > > I mean: we should constrain the function interface, otherwise, the user
> > > may return a structure that is not a record.
> > >
> > > Thanks,
> > > Baodi Shi
> > >
> > > > On Jul 25, 2022, at 01:0233, Christophe Bornet <
> bornet.ch...@gmail.com
> > >
> > > wrote:
> > > >
> > > > Thanks for the feedback Asaf
> > > >
> > > >
> > > >>>   - preprocess-function: the preprocess function applied before the
> > > >>>   Sink. Starts by builtin:// for built-in functions, function://
> for
> > > >>>   package function, http:// or file://
> > > >>>
> > > >>> 1. While this function is applied only before sink? I thought it
> > > replaces
> > > >> the identity function, so why a source can't have a function that
> > reads
> > > >> from the source (say S3), runs the function and only then writes to
> a
> > > >> pulsar topic?
> > > >>
> > > >
> > > > Yes that's totally possible to implement and will be done in future
> > work
> > > > like written in the PIP.
> > > >
> > > >
> > > >> 2. Can you clarify more about built in and function for package
> > > function?
> > > >> Is this an existing functionality ?
> > > >>
> > > > Yes those are existing functionalities.
> > > > Built-in functions are not documented (and we should do something
> about
> > > > that).
> > > > Package management of functions is described in
> > > >
> > >
> >
> https://pulsar.apache.org/docs/functions-deploy#use-package-management-service
> > > >
> > > >
> > > >> 3. Regarding http - Are you loading a class through that URL? Aren't
> > we
> > > >> exposed to same problem Log4Shell security issue had? If so, what
> > > measures
> > > >> are you taking to protect ?
> > > >>
> > > > Yes we are loading code via URL. This feature already exists for
> > > > Sources/Sinks/Functions.
> > > > I guess you need to have a huge trust of the source from where you
> > > download.
> > > > This PIP has the same security level as what already exists for this
> > > > functionality.
> > > >
> > > >
> > > >>
> > > >> The field extraFunctionPackageLocation to the protobuf structure
> > > >>> FunctionMetaData will be added. This field will be filled with the
> > > >>> location of the extra function to apply when registering a sink and
> > > used
> > > >> in
> > > >>> the Runtime to load the function code.
> > > >>
> > > >> Can you please expand on that? You mean the JAR location, which you
> > will
> > > >> search that class name and function specified in the 3 fields you've
> > > added
> > > >> to the config?
> > > >>
> > > > Not exactly. It's the location of where the JAR is stored. It can be
> > > > BookKeeper, package management, built-in NAR, etc...
> > > > In 

Re: [DISCUSS] PIP 193 : Sink preprocessing Function

2022-07-26 Thread Christophe Bornet
Thanks for the feedback Jerry.
We don't modify the way sources, sinks and functions are detected when it's
based on their fields. The proposal is just to modify the classname of the
function applied in the instance so the same detection rules apply. The
only difference is when detecting if the sink or function is built-in. For
this we add some code to do this detection also based on the ComponentType
(either detected or explicit). You can check the implementation PR about
it: https://github.com/apache/pulsar/pull/16740

IMO, making it separate implementation of what currently exist would make
things more complex and this more error prone for no good reason. The
proposal is "just" to replace the name of the already existing function
(IdentityFunction) by another one and to provide the location of the
function JAR.

Best regards
Christophe

Le lun. 25 juil. 2022 à 23:31, Jerry Peng  a
écrit :

> My feedback is to make this change as self contained as possible.  Can we
> just have a special implementation of a sink that will run the logic of the
> "preprocess" function?  There are many places in the code where we figure
> out if it is a source, sink or a function based on the fields in the
> Function metadata.  Changing that may have unintended consequences.
>
> On Mon, Jul 25, 2022 at 5:55 AM Baodi Shi 
> wrote:
>
> > > Can you explain more what you mean ?
> > This PIP doesn't change the API of a Function and it's already possible
> to
> > write a Function>.
> > And when declaring a Sink with a Function we'll check that it's the case.
> >
> > I mean: we should constrain the function interface, otherwise, the user
> > may return a structure that is not a record.
> >
> > Thanks,
> > Baodi Shi
> >
> > > On Jul 25, 2022, at 01:0233, Christophe Bornet  >
> > wrote:
> > >
> > > Thanks for the feedback Asaf
> > >
> > >
> > >>>   - preprocess-function: the preprocess function applied before the
> > >>>   Sink. Starts by builtin:// for built-in functions, function:// for
> > >>>   package function, http:// or file://
> > >>>
> > >>> 1. While this function is applied only before sink? I thought it
> > replaces
> > >> the identity function, so why a source can't have a function that
> reads
> > >> from the source (say S3), runs the function and only then writes to a
> > >> pulsar topic?
> > >>
> > >
> > > Yes that's totally possible to implement and will be done in future
> work
> > > like written in the PIP.
> > >
> > >
> > >> 2. Can you clarify more about built in and function for package
> > function?
> > >> Is this an existing functionality ?
> > >>
> > > Yes those are existing functionalities.
> > > Built-in functions are not documented (and we should do something about
> > > that).
> > > Package management of functions is described in
> > >
> >
> https://pulsar.apache.org/docs/functions-deploy#use-package-management-service
> > >
> > >
> > >> 3. Regarding http - Are you loading a class through that URL? Aren't
> we
> > >> exposed to same problem Log4Shell security issue had? If so, what
> > measures
> > >> are you taking to protect ?
> > >>
> > > Yes we are loading code via URL. This feature already exists for
> > > Sources/Sinks/Functions.
> > > I guess you need to have a huge trust of the source from where you
> > download.
> > > This PIP has the same security level as what already exists for this
> > > functionality.
> > >
> > >
> > >>
> > >> The field extraFunctionPackageLocation to the protobuf structure
> > >>> FunctionMetaData will be added. This field will be filled with the
> > >>> location of the extra function to apply when registering a sink and
> > used
> > >> in
> > >>> the Runtime to load the function code.
> > >>
> > >> Can you please expand on that? You mean the JAR location, which you
> will
> > >> search that class name and function specified in the 3 fields you've
> > added
> > >> to the config?
> > >>
> > > Not exactly. It's the location of where the JAR is stored. It can be
> > > BookKeeper, package management, built-in NAR, etc...
> > > In KubernetesRuntime, there are cases where the builtin or package
> > function
> > > you provide in the preprocess-function param could be copied to BK.
> > > That's the same as for a regular Sink/Source and if we need to copy to
> > BK,
> > > we append `__sink-function` to the storage path to prevent conflict
> with
> > > the sink code.
> > > The class name is indeed looked up in this JAR.
> > >
> > >
> > >> The parameters extraFunctionFile and originalExtraFunctionFileName
> will
> > be
> > >>> added to RuntimeFactory::createContainer
> > >>
> > >> 1. File and fileName containing what? How does this related to
> > >> extraFunctionPackageLocation?
> > >>
> > > That part mimicks what is already done for the main code of the
> > Source/Sink
> > > code (with respectively codeFile, originalCodeFileName and
> > packageLocation)
> > > Before starting the ThreadedRuntime, we download locally the JAR from
> the
> > > extraFunctionPackageLocation in the 

Re: [DISCUSS] PIP 193 : Sink preprocessing Function

2022-07-26 Thread Christophe Bornet
When the users registers a Sink, the validation code will check that the
return type of the Function is a Record.
The Function can then be used either individually or together with a Sink.
IMO, adding a new API would only add complexity.

Le lun. 25 juil. 2022 à 14:55, Baodi Shi  a
écrit :

> > Can you explain more what you mean ?
> This PIP doesn't change the API of a Function and it's already possible to
> write a Function>.
> And when declaring a Sink with a Function we'll check that it's the case.
>
> I mean: we should constrain the function interface, otherwise, the user
> may return a structure that is not a record.
>
> Thanks,
> Baodi Shi
>
> > On Jul 25, 2022, at 01:0233, Christophe Bornet 
> wrote:
> >
> > Thanks for the feedback Asaf
> >
> >
> >>>   - preprocess-function: the preprocess function applied before the
> >>>   Sink. Starts by builtin:// for built-in functions, function:// for
> >>>   package function, http:// or file://
> >>>
> >>> 1. While this function is applied only before sink? I thought it
> replaces
> >> the identity function, so why a source can't have a function that reads
> >> from the source (say S3), runs the function and only then writes to a
> >> pulsar topic?
> >>
> >
> > Yes that's totally possible to implement and will be done in future work
> > like written in the PIP.
> >
> >
> >> 2. Can you clarify more about built in and function for package
> function?
> >> Is this an existing functionality ?
> >>
> > Yes those are existing functionalities.
> > Built-in functions are not documented (and we should do something about
> > that).
> > Package management of functions is described in
> >
> https://pulsar.apache.org/docs/functions-deploy#use-package-management-service
> >
> >
> >> 3. Regarding http - Are you loading a class through that URL? Aren't we
> >> exposed to same problem Log4Shell security issue had? If so, what
> measures
> >> are you taking to protect ?
> >>
> > Yes we are loading code via URL. This feature already exists for
> > Sources/Sinks/Functions.
> > I guess you need to have a huge trust of the source from where you
> download.
> > This PIP has the same security level as what already exists for this
> > functionality.
> >
> >
> >>
> >> The field extraFunctionPackageLocation to the protobuf structure
> >>> FunctionMetaData will be added. This field will be filled with the
> >>> location of the extra function to apply when registering a sink and
> used
> >> in
> >>> the Runtime to load the function code.
> >>
> >> Can you please expand on that? You mean the JAR location, which you will
> >> search that class name and function specified in the 3 fields you've
> added
> >> to the config?
> >>
> > Not exactly. It's the location of where the JAR is stored. It can be
> > BookKeeper, package management, built-in NAR, etc...
> > In KubernetesRuntime, there are cases where the builtin or package
> function
> > you provide in the preprocess-function param could be copied to BK.
> > That's the same as for a regular Sink/Source and if we need to copy to
> BK,
> > we append `__sink-function` to the storage path to prevent conflict with
> > the sink code.
> > The class name is indeed looked up in this JAR.
> >
> >
> >> The parameters extraFunctionFile and originalExtraFunctionFileName will
> be
> >>> added to RuntimeFactory::createContainer
> >>
> >> 1. File and fileName containing what? How does this related to
> >> extraFunctionPackageLocation?
> >>
> > That part mimicks what is already done for the main code of the
> Source/Sink
> > code (with respectively codeFile, originalCodeFileName and
> packageLocation)
> > Before starting the ThreadedRuntime, we download locally the JAR from the
> > extraFunctionPackageLocation in the extraFunctionFile so we can load the
> > code from it.
> >
> >
> >>
> >> In here you use the terminology Extra Function" and in fields of config
> and
> >> admin you used the term Pre-Process Function. I would stick to
> Pro-Process
> >> Function and stick with it all over.
> >>
> > This terminology need to be applicable to a Function that would be
> applied
> > after a Source so we can't use  "preprocess" here.
> > I haven't found better than "extra function". Don't hesitate to propose
> > something !
> >
> >
> >>
> >>
> >>> The following parameters will be added to JavaInstanceStarter:
> >>>
> >>>   - --extra_function_jar: the path to the extra function jar
> >>>
> >>>
> >>>   - --extra_function_id: the extra function UUID cache key
> >>>
> >>> These parameters are then used by the ThreadRuntime to load the
> function
> >>> from the FunctionCacheManager or create it there if needed.
> >>
> >>
> >> Can you elaborate on that? JavaInstanceStarter is used to start a single
> >> Function? It's used from command line?
> >
> > The  JavaInstanceStarter is indeed a CLI to start a JavaInstance.
> > The JavaInstance is the process that will execute the code to read from a
> > Source, execute a Function and write to a Sink.
> > Generally Pulsar users 

Re: [DISCUSS] PIP 193 : Sink preprocessing Function

2022-07-25 Thread Jerry Peng
My feedback is to make this change as self contained as possible.  Can we
just have a special implementation of a sink that will run the logic of the
"preprocess" function?  There are many places in the code where we figure
out if it is a source, sink or a function based on the fields in the
Function metadata.  Changing that may have unintended consequences.

On Mon, Jul 25, 2022 at 5:55 AM Baodi Shi 
wrote:

> > Can you explain more what you mean ?
> This PIP doesn't change the API of a Function and it's already possible to
> write a Function>.
> And when declaring a Sink with a Function we'll check that it's the case.
>
> I mean: we should constrain the function interface, otherwise, the user
> may return a structure that is not a record.
>
> Thanks,
> Baodi Shi
>
> > On Jul 25, 2022, at 01:0233, Christophe Bornet 
> wrote:
> >
> > Thanks for the feedback Asaf
> >
> >
> >>>   - preprocess-function: the preprocess function applied before the
> >>>   Sink. Starts by builtin:// for built-in functions, function:// for
> >>>   package function, http:// or file://
> >>>
> >>> 1. While this function is applied only before sink? I thought it
> replaces
> >> the identity function, so why a source can't have a function that reads
> >> from the source (say S3), runs the function and only then writes to a
> >> pulsar topic?
> >>
> >
> > Yes that's totally possible to implement and will be done in future work
> > like written in the PIP.
> >
> >
> >> 2. Can you clarify more about built in and function for package
> function?
> >> Is this an existing functionality ?
> >>
> > Yes those are existing functionalities.
> > Built-in functions are not documented (and we should do something about
> > that).
> > Package management of functions is described in
> >
> https://pulsar.apache.org/docs/functions-deploy#use-package-management-service
> >
> >
> >> 3. Regarding http - Are you loading a class through that URL? Aren't we
> >> exposed to same problem Log4Shell security issue had? If so, what
> measures
> >> are you taking to protect ?
> >>
> > Yes we are loading code via URL. This feature already exists for
> > Sources/Sinks/Functions.
> > I guess you need to have a huge trust of the source from where you
> download.
> > This PIP has the same security level as what already exists for this
> > functionality.
> >
> >
> >>
> >> The field extraFunctionPackageLocation to the protobuf structure
> >>> FunctionMetaData will be added. This field will be filled with the
> >>> location of the extra function to apply when registering a sink and
> used
> >> in
> >>> the Runtime to load the function code.
> >>
> >> Can you please expand on that? You mean the JAR location, which you will
> >> search that class name and function specified in the 3 fields you've
> added
> >> to the config?
> >>
> > Not exactly. It's the location of where the JAR is stored. It can be
> > BookKeeper, package management, built-in NAR, etc...
> > In KubernetesRuntime, there are cases where the builtin or package
> function
> > you provide in the preprocess-function param could be copied to BK.
> > That's the same as for a regular Sink/Source and if we need to copy to
> BK,
> > we append `__sink-function` to the storage path to prevent conflict with
> > the sink code.
> > The class name is indeed looked up in this JAR.
> >
> >
> >> The parameters extraFunctionFile and originalExtraFunctionFileName will
> be
> >>> added to RuntimeFactory::createContainer
> >>
> >> 1. File and fileName containing what? How does this related to
> >> extraFunctionPackageLocation?
> >>
> > That part mimicks what is already done for the main code of the
> Source/Sink
> > code (with respectively codeFile, originalCodeFileName and
> packageLocation)
> > Before starting the ThreadedRuntime, we download locally the JAR from the
> > extraFunctionPackageLocation in the extraFunctionFile so we can load the
> > code from it.
> >
> >
> >>
> >> In here you use the terminology Extra Function" and in fields of config
> and
> >> admin you used the term Pre-Process Function. I would stick to
> Pro-Process
> >> Function and stick with it all over.
> >>
> > This terminology need to be applicable to a Function that would be
> applied
> > after a Source so we can't use  "preprocess" here.
> > I haven't found better than "extra function". Don't hesitate to propose
> > something !
> >
> >
> >>
> >>
> >>> The following parameters will be added to JavaInstanceStarter:
> >>>
> >>>   - --extra_function_jar: the path to the extra function jar
> >>>
> >>>
> >>>   - --extra_function_id: the extra function UUID cache key
> >>>
> >>> These parameters are then used by the ThreadRuntime to load the
> function
> >>> from the FunctionCacheManager or create it there if needed.
> >>
> >>
> >> Can you elaborate on that? JavaInstanceStarter is used to start a single
> >> Function? It's used from command line?
> >
> > The  JavaInstanceStarter is indeed a CLI to start a JavaInstance.
> > The JavaInstance is the process 

Re: [DISCUSS] PIP 193 : Sink preprocessing Function

2022-07-25 Thread Baodi Shi
> Can you explain more what you mean ?
This PIP doesn't change the API of a Function and it's already possible to
write a Function>.
And when declaring a Sink with a Function we'll check that it's the case.

I mean: we should constrain the function interface, otherwise, the user may 
return a structure that is not a record.

Thanks,
Baodi Shi

> On Jul 25, 2022, at 01:0233, Christophe Bornet  wrote:
> 
> Thanks for the feedback Asaf
> 
> 
>>>   - preprocess-function: the preprocess function applied before the
>>>   Sink. Starts by builtin:// for built-in functions, function:// for
>>>   package function, http:// or file://
>>> 
>>> 1. While this function is applied only before sink? I thought it replaces
>> the identity function, so why a source can't have a function that reads
>> from the source (say S3), runs the function and only then writes to a
>> pulsar topic?
>> 
> 
> Yes that's totally possible to implement and will be done in future work
> like written in the PIP.
> 
> 
>> 2. Can you clarify more about built in and function for package function?
>> Is this an existing functionality ?
>> 
> Yes those are existing functionalities.
> Built-in functions are not documented (and we should do something about
> that).
> Package management of functions is described in
> https://pulsar.apache.org/docs/functions-deploy#use-package-management-service
> 
> 
>> 3. Regarding http - Are you loading a class through that URL? Aren't we
>> exposed to same problem Log4Shell security issue had? If so, what measures
>> are you taking to protect ?
>> 
> Yes we are loading code via URL. This feature already exists for
> Sources/Sinks/Functions.
> I guess you need to have a huge trust of the source from where you download.
> This PIP has the same security level as what already exists for this
> functionality.
> 
> 
>> 
>> The field extraFunctionPackageLocation to the protobuf structure
>>> FunctionMetaData will be added. This field will be filled with the
>>> location of the extra function to apply when registering a sink and used
>> in
>>> the Runtime to load the function code.
>> 
>> Can you please expand on that? You mean the JAR location, which you will
>> search that class name and function specified in the 3 fields you've added
>> to the config?
>> 
> Not exactly. It's the location of where the JAR is stored. It can be
> BookKeeper, package management, built-in NAR, etc...
> In KubernetesRuntime, there are cases where the builtin or package function
> you provide in the preprocess-function param could be copied to BK.
> That's the same as for a regular Sink/Source and if we need to copy to BK,
> we append `__sink-function` to the storage path to prevent conflict with
> the sink code.
> The class name is indeed looked up in this JAR.
> 
> 
>> The parameters extraFunctionFile and originalExtraFunctionFileName will be
>>> added to RuntimeFactory::createContainer
>> 
>> 1. File and fileName containing what? How does this related to
>> extraFunctionPackageLocation?
>> 
> That part mimicks what is already done for the main code of the Source/Sink
> code (with respectively codeFile, originalCodeFileName and packageLocation)
> Before starting the ThreadedRuntime, we download locally the JAR from the
> extraFunctionPackageLocation in the extraFunctionFile so we can load the
> code from it.
> 
> 
>> 
>> In here you use the terminology Extra Function" and in fields of config and
>> admin you used the term Pre-Process Function. I would stick to Pro-Process
>> Function and stick with it all over.
>> 
> This terminology need to be applicable to a Function that would be applied
> after a Source so we can't use  "preprocess" here.
> I haven't found better than "extra function". Don't hesitate to propose
> something !
> 
> 
>> 
>> 
>>> The following parameters will be added to JavaInstanceStarter:
>>> 
>>>   - --extra_function_jar: the path to the extra function jar
>>> 
>>> 
>>>   - --extra_function_id: the extra function UUID cache key
>>> 
>>> These parameters are then used by the ThreadRuntime to load the function
>>> from the FunctionCacheManager or create it there if needed.
>> 
>> 
>> Can you elaborate on that? JavaInstanceStarter is used to start a single
>> Function? It's used from command line?
> 
> The  JavaInstanceStarter is indeed a CLI to start a JavaInstance.
> The JavaInstance is the process that will execute the code to read from a
> Source, execute a Function and write to a Sink.
> Generally Pulsar users don't use the JavaInstanceStarter directly. The
> command line is forged by the ProcessRuntime and KubernetesRuntime.
> 
>> 
>> 
>> In general, you will essentially have two class loaders - one for the sink
>> and one for the pre-process function?
>> 
> Yes, exactly.
> 3 to be more accurate since there's also the instance class loader.
> 
> 
>> 
>> 
>> 
>> 
>> 
>> On Fri, Jul 22, 2022 at 12:48 PM Christophe Bornet >> 
>> wrote:
>> 
>>> Dear Pulsar dev community,
>>> 
>>> I would like to open a discussion here 

Re: [DISCUSS] PIP 193 : Sink preprocessing Function

2022-07-24 Thread Christophe Bornet
Thanks for the feedback Asaf


> >- preprocess-function: the preprocess function applied before the
> >Sink. Starts by builtin:// for built-in functions, function:// for
> >package function, http:// or file://
> >
> > 1. While this function is applied only before sink? I thought it replaces
> the identity function, so why a source can't have a function that reads
> from the source (say S3), runs the function and only then writes to a
> pulsar topic?
>

Yes that's totally possible to implement and will be done in future work
like written in the PIP.


>  2. Can you clarify more about built in and function for package function?
> Is this an existing functionality ?
>
Yes those are existing functionalities.
Built-in functions are not documented (and we should do something about
that).
Package management of functions is described in
https://pulsar.apache.org/docs/functions-deploy#use-package-management-service


> 3. Regarding http - Are you loading a class through that URL? Aren't we
> exposed to same problem Log4Shell security issue had? If so, what measures
> are you taking to protect ?
>
Yes we are loading code via URL. This feature already exists for
Sources/Sinks/Functions.
I guess you need to have a huge trust of the source from where you download.
This PIP has the same security level as what already exists for this
functionality.


>
> The field extraFunctionPackageLocation to the protobuf structure
> > FunctionMetaData will be added. This field will be filled with the
> > location of the extra function to apply when registering a sink and used
> in
> > the Runtime to load the function code.
>
> Can you please expand on that? You mean the JAR location, which you will
> search that class name and function specified in the 3 fields you've added
> to the config?
>
Not exactly. It's the location of where the JAR is stored. It can be
BookKeeper, package management, built-in NAR, etc...
In KubernetesRuntime, there are cases where the builtin or package function
you provide in the preprocess-function param could be copied to BK.
That's the same as for a regular Sink/Source and if we need to copy to BK,
we append `__sink-function` to the storage path to prevent conflict with
the sink code.
The class name is indeed looked up in this JAR.


> The parameters extraFunctionFile and originalExtraFunctionFileName will be
> > added to RuntimeFactory::createContainer
>
> 1. File and fileName containing what? How does this related to
> extraFunctionPackageLocation?
>
That part mimicks what is already done for the main code of the Source/Sink
code (with respectively codeFile, originalCodeFileName and packageLocation)
Before starting the ThreadedRuntime, we download locally the JAR from the
extraFunctionPackageLocation in the extraFunctionFile so we can load the
code from it.


>
> In here you use the terminology Extra Function" and in fields of config and
> admin you used the term Pre-Process Function. I would stick to Pro-Process
> Function and stick with it all over.
>
This terminology need to be applicable to a Function that would be applied
after a Source so we can't use  "preprocess" here.
I haven't found better than "extra function". Don't hesitate to propose
something !


>
>
> > The following parameters will be added to JavaInstanceStarter:
> >
> >- --extra_function_jar: the path to the extra function jar
> >
> >
> >- --extra_function_id: the extra function UUID cache key
> >
> > These parameters are then used by the ThreadRuntime to load the function
> > from the FunctionCacheManager or create it there if needed.
>
>
> Can you elaborate on that? JavaInstanceStarter is used to start a single
> Function? It's used from command line?

The  JavaInstanceStarter is indeed a CLI to start a JavaInstance.
The JavaInstance is the process that will execute the code to read from a
Source, execute a Function and write to a Sink.
Generally Pulsar users don't use the JavaInstanceStarter directly. The
command line is forged by the ProcessRuntime and KubernetesRuntime.

>
>
> In general, you will essentially have two class loaders - one for the sink
> and one for the pre-process function?
>
Yes, exactly.
3 to be more accurate since there's also the instance class loader.


>
>
>
>
>
> On Fri, Jul 22, 2022 at 12:48 PM Christophe Bornet  >
> wrote:
>
> > Dear Pulsar dev community,
> >
> > I would like to open a discussion here about PIP 193 : Sink preprocessing
> > Function .
> >
> > Best regards
> >
> > Christophe
> >
> > ## Motivation
> >
> > Pulsar IO connectors make it possible to connect Pulsar to an external
> > system:
> > * A Source reads continuously from an external system and writes to a
> > Pulsar topic
> > * A Sink reads continuously from a Pulsar topic and writes to an external
> > system.
> > Sources and Sinks are written in Java.
> >
> > Pulsar also has a lightweight computing system named Pulsar Functions. A
> > Pulsar Function reads from one or 

Re: [DISCUSS] PIP 193 : Sink preprocessing Function

2022-07-24 Thread Christophe Bornet
Thanks for your comments Baodi


> This proposal looks good to me.
>
> > Only Functions returning a Record will be authorized to ensure that the
> Function sets the Schema explicitly.
>
> Does this mean that the function return type is fixed Record?


Yes


> Can the interface declaration of the function be displayed in the API
> changes?
>
>
Can you explain more what you mean ?
This PIP doesn't change the API of a Function and it's already possible to
write a Function>.
And when declaring a Sink with a Function we'll check that it's the case.



>
> Thanks,
> Baodi Shi
>
> > On Jul 22, 2022, at 17:4828, Christophe Bornet 
> wrote:
> >
> > Dear Pulsar dev community,
> >
> > I would like to open a discussion here about PIP 193 : Sink preprocessing
> > Function .
> >
> > Best regards
> >
> > Christophe
> >
> > ## Motivation
> >
> > Pulsar IO connectors make it possible to connect Pulsar to an external
> > system:
> > * A Source reads continuously from an external system and writes to a
> > Pulsar topic
> > * A Sink reads continuously from a Pulsar topic and writes to an external
> > system.
> > Sources and Sinks are written in Java.
> >
> > Pulsar also has a lightweight computing system named Pulsar Functions. A
> > Pulsar Function reads from one or more topics, applies user logic written
> > in Java, Python or Go and writes to an output topic.
> >
> > When using Pulsar IO connectors, the format of what is read/written
> from/to
> > the source/sink is defined by the connector code. But there are a lot of
> > situations where a user wants to transform this data before using it.
> > Currently the solution is to either :
> > * write a custom connector that transforms the data the way we want but
> > that means writing a lot of code without reuse, packaging and managing
> > custom connectors and so on..
> > * write a Function to transform the data after it was written to a topic
> by
> > a Source or before it is read from a topic by a Sink. This is not very
> > efficient as we have to use an intermediate topic, which means additional
> > storage, IO, and latency.
> >
> > Considering all this, it would be handy to be able to apply a Function
> > on-the-fly to a connector without going through an intermediary topic.
> >
> > ## Goal
> >
> > This PIP defines the changes needed to be able to apply a preprocessing
> > Function on-the-fly to a Sink.
> > The preprocessing function can be a built-in function, a package
> function,
> > or loaded through an http URL or a file path.
> > Sources, Sinks and Functions are based on the same runtime process that:
> > * reads from a Source. For Sinks and Functions this Source is a
> > PulsarSource consuming from a Pulsar topic
> > * applies a Function. For Sources and Sinks, this Function is
> > IdentityFunction which returns the data it gets without modification.
> > * writes to a Sink. For Sources and Functions, this Sink is a PulsarSink
> > writing to a Pulsar topic.
> >
> > This PIP reuses this and allows configuring a Function different from
> > IdentityFunction to Sinks.
> > Only Functions returning a Record will be authorized to ensure that the
> > Function sets the Schema explicitly.
> >
> > Out of the scope of this PIP, for future work:
> > * Applying a post-processing Function to a Source
> > * Loading the Function jar through the Sink CLI
> >
> > ## API Changes
> >
> > ### Admin CLI
> >
> > The following options will be added to the `pulsar-admin sinks` CLI
> > `create`, `update` and `localrun`:
> > * `preprocess-function`: the preprocess function applied before the Sink.
> > Starts by `builtin://` for built-in functions, `function://` for package
> > function, `http://` or `file://`
> > * `preprocess-function-classname`: the preprocess function class name
> > (optional if the function is a NAR)
> > * `preprocess-function-config`: the configuration of the preprocess
> > function in the same format as the `user-config` parameter of the
> > `functions create` CLI command.
> >
> > The corresponding fields will be added to `SinkConfig`:
> >
> > ```java
> >private String preprocessFunction;
> >private String preprocessFunctionClassName;
> >private String preprocessFunctionConfig;
> > ```
> >
> > ### Function definition
> >
> > The field `extraFunctionPackageLocation` to the protobuf structure
> > `FunctionMetaData` will be added. This field will be filled with the
> > location of the extra function to apply when registering a sink and used
> in
> > the Runtime to load the function code.
> >
> > ```protobuf
> > message FunctionMetaData {
> >...
> >PackageLocationMetaData extraFunctionPackageLocation = 7;
> > }
> > ```
> >
> > ### Runtime
> >
> > The parameters `extraFunctionFile` and `originalExtraFunctionFileName`
> will
> > be added to `RuntimeFactory::createContainer`
> >
> >
> > ```java
> >   Runtime createContainer(
> >InstanceConfig instanceConfig, String codeFile, String
> > 

Re: [DISCUSS] PIP 193 : Sink preprocessing Function

2022-07-24 Thread Asaf Mesika
Hi,

Few questions:


>- preprocess-function: the preprocess function applied before the
>Sink. Starts by builtin:// for built-in functions, function:// for
>package function, http:// or file://
>
> 1. While this function is applied only before sink? I thought it replaces
the identity function, so why a source can't have a function that reads
from the source (say S3), runs the function and only then writes to a
pulsar topic?
 2. Can you clarify more about built in and function for package function?
Is this an existing functionality ?
3. Regarding http - Are you loading a class through that URL? Aren't we
exposed to same problem Log4Shell security issue had? If so, what measures
are you taking to protect ?

The field extraFunctionPackageLocation to the protobuf structure
> FunctionMetaData will be added. This field will be filled with the
> location of the extra function to apply when registering a sink and used in
> the Runtime to load the function code.

Can you please expand on that? You mean the JAR location, which you will
search that class name and function specified in the 3 fields you've added
to the config?

The parameters extraFunctionFile and originalExtraFunctionFileName will be
> added to RuntimeFactory::createContainer

1. File and fileName containing what? How does this related to
extraFunctionPackageLocation?

In here you use the terminology Extra Function" and in fields of config and
admin you used the term Pre-Process Function. I would stick to Pro-Process
Function and stick with it all over.


> The following parameters will be added to JavaInstanceStarter:
>
>- --extra_function_jar: the path to the extra function jar
>
>
>- --extra_function_id: the extra function UUID cache key
>
> These parameters are then used by the ThreadRuntime to load the function
> from the FunctionCacheManager or create it there if needed.


Can you elaborate on that? JavaInstanceStarter is used to start a single
Function? It's used from command line?


In general, you will essentially have two class loaders - one for the sink
and one for the pre-process function?





On Fri, Jul 22, 2022 at 12:48 PM Christophe Bornet 
wrote:

> Dear Pulsar dev community,
>
> I would like to open a discussion here about PIP 193 : Sink preprocessing
> Function .
>
> Best regards
>
> Christophe
>
> ## Motivation
>
> Pulsar IO connectors make it possible to connect Pulsar to an external
> system:
> * A Source reads continuously from an external system and writes to a
> Pulsar topic
> * A Sink reads continuously from a Pulsar topic and writes to an external
> system.
> Sources and Sinks are written in Java.
>
> Pulsar also has a lightweight computing system named Pulsar Functions. A
> Pulsar Function reads from one or more topics, applies user logic written
> in Java, Python or Go and writes to an output topic.
>
> When using Pulsar IO connectors, the format of what is read/written from/to
> the source/sink is defined by the connector code. But there are a lot of
> situations where a user wants to transform this data before using it.
> Currently the solution is to either :
> * write a custom connector that transforms the data the way we want but
> that means writing a lot of code without reuse, packaging and managing
> custom connectors and so on..
> * write a Function to transform the data after it was written to a topic by
> a Source or before it is read from a topic by a Sink. This is not very
> efficient as we have to use an intermediate topic, which means additional
> storage, IO, and latency.
>
> Considering all this, it would be handy to be able to apply a Function
> on-the-fly to a connector without going through an intermediary topic.
>
> ## Goal
>
> This PIP defines the changes needed to be able to apply a preprocessing
> Function on-the-fly to a Sink.
> The preprocessing function can be a built-in function, a package function,
> or loaded through an http URL or a file path.
> Sources, Sinks and Functions are based on the same runtime process that:
> * reads from a Source. For Sinks and Functions this Source is a
> PulsarSource consuming from a Pulsar topic
> * applies a Function. For Sources and Sinks, this Function is
> IdentityFunction which returns the data it gets without modification.
> * writes to a Sink. For Sources and Functions, this Sink is a PulsarSink
> writing to a Pulsar topic.
>
> This PIP reuses this and allows configuring a Function different from
> IdentityFunction to Sinks.
> Only Functions returning a Record will be authorized to ensure that the
> Function sets the Schema explicitly.
>
> Out of the scope of this PIP, for future work:
> * Applying a post-processing Function to a Source
> * Loading the Function jar through the Sink CLI
>
> ## API Changes
>
> ### Admin CLI
>
> The following options will be added to the `pulsar-admin sinks` CLI
>  `create`, `update` and `localrun`:
> * `preprocess-function`: the preprocess function applied 

Re: [DISCUSS] PIP 193 : Sink preprocessing Function

2022-07-22 Thread Baodi Shi
This proposal looks good to me. 

> Only Functions returning a Record will be authorized to ensure that the 
> Function sets the Schema explicitly.

Does this mean that the function return type is fixed Record? Can the interface 
declaration of the function be displayed in the API changes?


Thanks,
Baodi Shi

> On Jul 22, 2022, at 17:4828, Christophe Bornet  wrote:
> 
> Dear Pulsar dev community,
> 
> I would like to open a discussion here about PIP 193 : Sink preprocessing
> Function .
> 
> Best regards
> 
> Christophe
> 
> ## Motivation
> 
> Pulsar IO connectors make it possible to connect Pulsar to an external
> system:
> * A Source reads continuously from an external system and writes to a
> Pulsar topic
> * A Sink reads continuously from a Pulsar topic and writes to an external
> system.
> Sources and Sinks are written in Java.
> 
> Pulsar also has a lightweight computing system named Pulsar Functions. A
> Pulsar Function reads from one or more topics, applies user logic written
> in Java, Python or Go and writes to an output topic.
> 
> When using Pulsar IO connectors, the format of what is read/written from/to
> the source/sink is defined by the connector code. But there are a lot of
> situations where a user wants to transform this data before using it.
> Currently the solution is to either :
> * write a custom connector that transforms the data the way we want but
> that means writing a lot of code without reuse, packaging and managing
> custom connectors and so on..
> * write a Function to transform the data after it was written to a topic by
> a Source or before it is read from a topic by a Sink. This is not very
> efficient as we have to use an intermediate topic, which means additional
> storage, IO, and latency.
> 
> Considering all this, it would be handy to be able to apply a Function
> on-the-fly to a connector without going through an intermediary topic.
> 
> ## Goal
> 
> This PIP defines the changes needed to be able to apply a preprocessing
> Function on-the-fly to a Sink.
> The preprocessing function can be a built-in function, a package function,
> or loaded through an http URL or a file path.
> Sources, Sinks and Functions are based on the same runtime process that:
> * reads from a Source. For Sinks and Functions this Source is a
> PulsarSource consuming from a Pulsar topic
> * applies a Function. For Sources and Sinks, this Function is
> IdentityFunction which returns the data it gets without modification.
> * writes to a Sink. For Sources and Functions, this Sink is a PulsarSink
> writing to a Pulsar topic.
> 
> This PIP reuses this and allows configuring a Function different from
> IdentityFunction to Sinks.
> Only Functions returning a Record will be authorized to ensure that the
> Function sets the Schema explicitly.
> 
> Out of the scope of this PIP, for future work:
> * Applying a post-processing Function to a Source
> * Loading the Function jar through the Sink CLI
> 
> ## API Changes
> 
> ### Admin CLI
> 
> The following options will be added to the `pulsar-admin sinks` CLI
> `create`, `update` and `localrun`:
> * `preprocess-function`: the preprocess function applied before the Sink.
> Starts by `builtin://` for built-in functions, `function://` for package
> function, `http://` or `file://`
> * `preprocess-function-classname`: the preprocess function class name
> (optional if the function is a NAR)
> * `preprocess-function-config`: the configuration of the preprocess
> function in the same format as the `user-config` parameter of the
> `functions create` CLI command.
> 
> The corresponding fields will be added to `SinkConfig`:
> 
> ```java
>private String preprocessFunction;
>private String preprocessFunctionClassName;
>private String preprocessFunctionConfig;
> ```
> 
> ### Function definition
> 
> The field `extraFunctionPackageLocation` to the protobuf structure
> `FunctionMetaData` will be added. This field will be filled with the
> location of the extra function to apply when registering a sink and used in
> the Runtime to load the function code.
> 
> ```protobuf
> message FunctionMetaData {
>...
>PackageLocationMetaData extraFunctionPackageLocation = 7;
> }
> ```
> 
> ### Runtime
> 
> The parameters `extraFunctionFile` and `originalExtraFunctionFileName` will
> be added to `RuntimeFactory::createContainer`
> 
> 
> ```java
>   Runtime createContainer(
>InstanceConfig instanceConfig, String codeFile, String
> originalCodeFileName,
>String extraFunctionFile, String originalExtraFunctionFileName,
>Long expectedHealthCheckInterval) throws Exception;
> ```
> 
> ### Instance function cache
> 
> A field `extraFunctionId` to `InstanceConfig` that will hold the UUID cache
> key of the extra function will be added.
> 
> ```java
> public class InstanceConfig {
>private int instanceId;
>private String functionId;
>private String extraFunctionId;
> ```
> 
> ### 

[DISCUSS] PIP 193 : Sink preprocessing Function

2022-07-22 Thread Christophe Bornet
Dear Pulsar dev community,

I would like to open a discussion here about PIP 193 : Sink preprocessing
Function .

Best regards

Christophe

## Motivation

Pulsar IO connectors make it possible to connect Pulsar to an external
system:
* A Source reads continuously from an external system and writes to a
Pulsar topic
* A Sink reads continuously from a Pulsar topic and writes to an external
system.
Sources and Sinks are written in Java.

Pulsar also has a lightweight computing system named Pulsar Functions. A
Pulsar Function reads from one or more topics, applies user logic written
in Java, Python or Go and writes to an output topic.

When using Pulsar IO connectors, the format of what is read/written from/to
the source/sink is defined by the connector code. But there are a lot of
situations where a user wants to transform this data before using it.
Currently the solution is to either :
* write a custom connector that transforms the data the way we want but
that means writing a lot of code without reuse, packaging and managing
custom connectors and so on..
* write a Function to transform the data after it was written to a topic by
a Source or before it is read from a topic by a Sink. This is not very
efficient as we have to use an intermediate topic, which means additional
storage, IO, and latency.

Considering all this, it would be handy to be able to apply a Function
on-the-fly to a connector without going through an intermediary topic.

## Goal

This PIP defines the changes needed to be able to apply a preprocessing
Function on-the-fly to a Sink.
The preprocessing function can be a built-in function, a package function,
or loaded through an http URL or a file path.
Sources, Sinks and Functions are based on the same runtime process that:
* reads from a Source. For Sinks and Functions this Source is a
PulsarSource consuming from a Pulsar topic
* applies a Function. For Sources and Sinks, this Function is
IdentityFunction which returns the data it gets without modification.
* writes to a Sink. For Sources and Functions, this Sink is a PulsarSink
writing to a Pulsar topic.

This PIP reuses this and allows configuring a Function different from
IdentityFunction to Sinks.
Only Functions returning a Record will be authorized to ensure that the
Function sets the Schema explicitly.

Out of the scope of this PIP, for future work:
* Applying a post-processing Function to a Source
* Loading the Function jar through the Sink CLI

## API Changes

### Admin CLI

The following options will be added to the `pulsar-admin sinks` CLI
 `create`, `update` and `localrun`:
* `preprocess-function`: the preprocess function applied before the Sink.
Starts by `builtin://` for built-in functions, `function://` for package
function, `http://` or `file://`
* `preprocess-function-classname`: the preprocess function class name
(optional if the function is a NAR)
* `preprocess-function-config`: the configuration of the preprocess
function in the same format as the `user-config` parameter of the
`functions create` CLI command.

The corresponding fields will be added to `SinkConfig`:

```java
private String preprocessFunction;
private String preprocessFunctionClassName;
private String preprocessFunctionConfig;
```

### Function definition

The field `extraFunctionPackageLocation` to the protobuf structure
`FunctionMetaData` will be added. This field will be filled with the
location of the extra function to apply when registering a sink and used in
the Runtime to load the function code.

```protobuf
message FunctionMetaData {
...
PackageLocationMetaData extraFunctionPackageLocation = 7;
}
```

### Runtime

The parameters `extraFunctionFile` and `originalExtraFunctionFileName` will
be added to `RuntimeFactory::createContainer`


```java
   Runtime createContainer(
InstanceConfig instanceConfig, String codeFile, String
originalCodeFileName,
String extraFunctionFile, String originalExtraFunctionFileName,
Long expectedHealthCheckInterval) throws Exception;
```

### Instance function cache

A field `extraFunctionId` to `InstanceConfig` that will hold the UUID cache
key of the extra function will be added.

```java
public class InstanceConfig {
private int instanceId;
private String functionId;
private String extraFunctionId;
```

### JavaInstanceStarter


The following parameters will be added to JavaInstanceStarter:
* `--extra_function_jar`: the path to the extra function jar
* `--extra_function_id`: the extra function UUID cache key

These parameters are then used by the `ThreadRuntime` to load the function
from the `FunctionCacheManager` or create it there if needed.

### Download the extra function

The statefulset spawned in `KubernetesRuntime` needs to be able to download
the extra functions code via API.
An `extra-function` query param will be added to the download function HTTP
endpoint

```java