Thanks for the feedback. I see your concerns.

I've been thinking about some ways to mitigate these concerns without expanding 
the scope of this too much. First, I think it could be a good idea to limit 
state access to just functions within the same namespace. This will at least 
avoid any issues that might arise with different namespaces having different 
state storage implementations.

Another thing we could consider is making states read-only by other functions. 
This allows us to clearly define the owner of the data and avoid unexpected 
issues with multiple functions trying to change the same state. It would limit 
some potentially desirable functionality such as 2 functions being able to 
increment the same counter, but keeping the data ownership clearly defined may 
be more important for now.

Another option to think about could be adding a way to differentiate 
public/private states or defining what other functions are allowed to access 
certain states would ease security concerns. It would require some more 
development time, though, since it would complicate the current implementation 
a bit. We'd have to address how and where state access is defined.

Another option we could consider could be having a separate public state store 
that all functions in a namespace have access to. It would be simple to 
implement and would at least separate a function's private states from states 
it wants other functions to have access to. Data ownership is a bit messy with 
the public states for this solution, but it would at least provide a method of 
sharing data that needs to be shared.

Let me know if you have any thoughts on any of these changes

________________________________
From: Enrico Olivelli <eolive...@gmail.com>
Sent: Tuesday, January 11, 2022 1:45 PM
To: Dev <dev@pulsar.apache.org>
Subject: Re: [DISCUSSION] PIP-133 Pulsar Functions Add API For Accessing Other 
Function States

Thank you for posting your PIP !

I am sharing some of Neng's concerns.
We should define clearly how security works.

Also, currently the function defines some "namespace" for the state
storage, and we recently added support for custom state storage
implementation. With this change each function will possibly access
other state storage namespaces (think about using a Database per each
tenant).

We should also state something about guarantees while accessing
multiple storages and/or about transactional (atomic?) access


Enrico

Il giorno mar 11 gen 2022 alle ore 21:38 Neng Lu <nl...@apache.org> ha scritto:
>
> Before we advance further, we first need to get on the same page of the
> pros and cons of allowing this feature.
>
> If functions can access (especially the write access) other functions'
> state, the data ownership will be a mess, isolation is broken and data
> security might be compromised.
>
>
>
>
>
> On Wed, Jan 5, 2022 at 3:45 PM Ethan Merrill <ethan.merr...@legrand.us>
> wrote:
>
> > Original PIP: 
> > https://eur01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fpulsar%2Fissues%2F13633&amp;data=04%7C01%7Cethan.merrill%40legrand.us%7Cedd443d57f1e41c8588408d9d543508e%7C199686b5bef4496087867a6b1888fee3%7C1%7C0%7C637775307351827417%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&amp;sdata=DbCae%2FULTgUiV3pIrUQbOtzvPlilATc%2Bcn50I1eg0Iw%3D&amp;reserved=0
> >
> > Pasted below for quoting convenience.
> >
> > -----
> >
> > ## Motivation
> >
> > Certain uses of Pulsar functions could benefit from the ability to access
> > the states of other functions. Currently functions can only access their
> > own states, and so sharing information between functions requires other
> > solutions such as writing to a separate database.
> >
> > ## Goal
> >
> > The goal is to enable the ability for a function to access another
> > function's state. Given another function's tenant, namespace, and name, any
> > function should be able to access the other function's state for read and
> > write purposes. This PIP is not concerned with expanding the capabilities
> > of function states, It only deals with expanding access to function states.
> >
> > ## API Changes
> >
> > The Pulsar function API would be modified to have the function context
> > implement the following interface for accessing function states using a
> > tenant, namespace, and name.
> >
> > ```
> > public interface SharedContext {
> >     /**
> >      * Update the state value for the key.
> >      *
> >      * @param key   name of the key
> >      * @param value state value of the key
> >      * @param tenant the state tenant name
> >      * @param ns the state namespace name
> >      * @param name the state store name
> >      */
> >     void putState(String key, ByteBuffer value, String tenant, String ns,
> > String name);
> >
> >     /**
> >      * Update the state value for the key, but don't wait for the
> > operation to be completed
> >      *
> >      * @param key   name of the key
> >      * @param value state value of the key
> >      * @param tenant the state tenant name
> >      * @param ns the state namespace name
> >      * @param name the state store name
> >      */
> >     CompletableFuture<Void> putStateAsync(String key, ByteBuffer value,
> > String tenant, String ns, String name);
> >
> >     /**
> >      * Retrieve the state value for the key.
> >      *
> >      * @param key name of the key
> >      * @param tenant the state tenant name
> >      * @param ns the state namespace name
> >      * @param name the state store name
> >      * @return the state value for the key.
> >      */
> >     ByteBuffer getState(String key, String tenant, String ns, String name);
> >
> >     /**
> >      * Retrieve the state value for the key, but don't wait for the
> > operation to be completed
> >      *
> >      * @param key name of the key
> >      * @param tenant the state tenant name
> >      * @param ns the state namespace name
> >      * @param name the state store name
> >      * @return the state value for the key.
> >      */
> >     CompletableFuture<ByteBuffer> getStateAsync(String key, String tenant,
> > String ns, String name);
> >
> >     /**
> >      * Delete the state value for the key.
> >      *
> >      * @param key   name of the key
> >      * @param tenant the state tenant name
> >      * @param ns the state namespace name
> >      * @param name the state store name
> >      */
> >     void deleteState(String key, String tenant, String ns, String name);
> >
> >     /**
> >      * Delete the state value for the key, but don't wait for the
> > operation to be completed
> >      *
> >      * @param key   name of the key
> >      * @param tenant the state tenant name
> >      * @param ns the state namespace name
> >      * @param name the state store name
> >      */
> >     CompletableFuture<Void> deleteStateAsync(String key, String tenant,
> > String ns, String name);
> >
> >     /**
> >      * Increment the builtin distributed counter referred by key.
> >      *
> >      * @param key    The name of the key
> >      * @param amount The amount to be incremented
> >      * @param tenant the state tenant name
> >      * @param ns the state namespace name
> >      * @param name the state store name
> >      */
> >     void incrCounter(String key, long amount, String tenant, String ns,
> > String name);
> >
> >     /**
> >      * Increment the builtin distributed counter referred by key
> >      * but dont wait for the completion of the increment operation
> >      *
> >      * @param key    The name of the key
> >      * @param amount The amount to be incremented
> >      * @param tenant the state tenant name
> >      * @param ns the state namespace name
> >      * @param name the state store name
> >      */
> >     CompletableFuture<Void> incrCounterAsync(String key, long amount,
> > String tenant, String ns, String name);
> >
> >     /**
> >      * Retrieve the counter value for the key.
> >      *
> >      * @param key name of the key
> >      * @param tenant the state tenant name
> >      * @param ns the state namespace name
> >      * @param name the state store name
> >      * @return the amount of the counter value for this key
> >      */
> >     long getCounter(String key, String tenant, String ns, String name);
> >
> >     /**
> >      * Retrieve the counter value for the key, but don't wait
> >      * for the operation to be completed
> >      *
> >      * @param key name of the key
> >      * @param tenant the state tenant name
> >      * @param ns the state namespace name
> >      * @param name the state store name
> >      * @return the amount of the counter value for this key
> >      */
> >     CompletableFuture<Long> getCounterAsync(String key, String tenant,
> > String ns, String name);
> > }
> > ```
> >
> > And the python context would have the following added:
> > ```
> > class Context(object):
> >   @abstractmethod
> >   def incr_counter(self, key, amount, tenant, ns, name):
> >     """incr the counter of a given key in the managed state"""
> >     pass
> >
> >   @abstractmethod
> >   def get_counter(self, key, tenant, ns, name):
> >     """get the counter of a given key in the managed state"""
> >     pass
> >
> >   @abstractmethod
> >   def del_counter(self, key, tenant, ns, name):
> >     """delete the counter of a given key in the managed state"""
> >     pass
> >
> >   @abstractmethod
> >   def put_state(self, key, value, tenant, ns, name):
> >     """update the value of a given key in the managed state"""
> >     pass
> >
> >   @abstractmethod
> >   def get_state(self, key, tenant, ns, name):
> >     """get the value of a given key in the managed state"""
> >     pass
> > ```
> >
> > ## Implementation
> >
> > The implementations of the API functions are simple. For example:
> > ```
> > @Override
> > public void incrCounter(String key, long amount, String tenant, String ns,
> > String name) {
> >     DefaultStateStore stateStore = (DefaultStateStore)
> > getCreateStateStore(tenant, ns, name);
> >     ensureStateEnabled(stateStore, tenant, ns, name);
> >     stateStore.incrCounter(key, amount);
> > }
> > ```
> >
> > This implementation would require a small change to the
> > ensureStateEnabled() function to allow for checking that states other than
> > the function's own state are enabled.
> >
> > Additionally, a new function, getCreateStateStore() would need to be made
> > or the existing getStateStore() function would need to be modified to
> > create a new state if the requested one doesn't currently exist in the
> > StateManager. This new function in the StateManager might look like:
> > ```
> > @Override
> > public StateStore getCreateStore(String tenant, String namespace, String
> > name) {
> >     String storeName = FunctionCommon.getFullyQualifiedName(tenant,
> > namespace, name);
> >     StateStore store = stores.get(storeName);
> >
> >     try {
> >         if (store == null) {
> >             store = createStore(tenant, namespace, name);
> >             registerStore(store);
> >         }
> >     } catch (Exception e) {
> >         store = null;
> >     }
> >
> >     return store;
> > }
> >
> > private StateStore createStore(String tenant, String namespace, String
> > name) throws Exception {
> >     StateStore store = stateStoreProvider.getStateStore(tenant, namespace,
> > name);
> >     StateStoreContext context = new StateStoreContextImpl();
> >     store.init(context);
> >     return store;
> > }
> > ```
> >
> > This implementation would just require that the JavaInstanceRunnable class
> > passes the stateStoreProvider into the StateManager when its created so the
> > StateManager can get new states when needed.
> >
> > For the python implementation currently a state manager class doesn't
> > exist, so one would need to be created. It could be simple, as such:
> > ```
> > class StateManager():
> >     states = {}
> >     state_storage_serviceurl = ""
> >
> >     def __init__(self, state_storage_serviceurl):
> >         self.state_storage_serviceurl = state_storage_serviceurl
> >
> >     def get_state(self, tenant, ns, name):
> >         table_ns = "%s_%s" % (str(tenant), str(ns))
> >         table_ns = table_ns.replace("-", "_")
> >         table_name = str(name)
> >         state_id = table_ns + table_name
> >
> >         state = self.states.get(state_id)
> >         if state is None:
> >             state =
> > state_context.create_state_context(self.state_storage_serviceurl, table_ns,
> > table_name)
> >             self.states[state_id] = state
> >
> >         return state
> > ```
> >
> > The PythonInstance class that creates the function context would also
> > create a StateManager object and set it in the context object so the python
> > api functions could use it as such:
> > ```
> > def incr_counter(self, key, amount, tenant=None, ns=None, name=None):
> >   if tenant is None or ns is None or name is None:
> >     return self.state_context.incr(key, amount)
> >   else:
> >     state_context = self.state_manager.get_state(tenant, ns, name)
> >     return state_context.incr(key, amount)
> > ```
> >
> > All of this should enable easy access to other functions' states in a way
> > that has minimal impact on the existing functionality of Pulsar.
> >
> > ## Reject Alternatives
> >
> > None
> >
> > ________________________________
> >
> > Ce message, ainsi que tous les fichiers joints à ce message, peuvent
> > contenir des informations sensibles et/ ou confidentielles ne devant pas
> > être divulguées. Si vous n'êtes pas le destinataire de ce message (ou que
> > vous recevez ce message par erreur), nous vous remercions de le notifier
> > immédiatement à son expéditeur, et de détruire ce message. Toute copie,
> > divulgation, modification, utilisation ou diffusion, non autorisée, directe
> > ou indirecte, de tout ou partie de ce message, est strictement interdite.
> > Se désabonner: Si vous souhaitez être retiré de notre liste de diffusion,
> > s'il vous plaît envoyer vos coordonnées à casl.unsubscr...@legrand.ca
> > <mailto:casl.unsubscr...@legrand.ca> et indiquer quels sont les messages
> > que vous ne souhaitez plus recevoir.
> >
> >
> > This e-mail, and any document attached hereby, may contain confidential
> > and/or privileged information. If you are not the intended recipient (or
> > have received this e-mail in error) please notify the sender immediately
> > and destroy this e-mail. Any unauthorized, direct or indirect, copying,
> > disclosure, distribution or other use of the material or parts thereof is
> > strictly forbidden.
> > Unsubscribe: If you would like to be removed from our mailing list, please
> > send your contact information to casl.unsubscr...@legrand.ca<mailto:
> > casl.unsubscr...@legrand.ca> and indicate what messages you no longer
> > wish to receive.
> >

________________________________

Ce message, ainsi que tous les fichiers joints à ce message, peuvent contenir 
des informations sensibles et/ ou confidentielles ne devant pas être 
divulguées. Si vous n'êtes pas le destinataire de ce message (ou que vous 
recevez ce message par erreur), nous vous remercions de le notifier 
immédiatement à son expéditeur, et de détruire ce message. Toute copie, 
divulgation, modification, utilisation ou diffusion, non autorisée, directe ou 
indirecte, de tout ou partie de ce message, est strictement interdite.
Se désabonner: Si vous souhaitez être retiré de notre liste de diffusion, s'il 
vous plaît envoyer vos coordonnées à 
casl.unsubscr...@legrand.ca<mailto:casl.unsubscr...@legrand.ca> et indiquer 
quels sont les messages que vous ne souhaitez plus recevoir.


This e-mail, and any document attached hereby, may contain confidential and/or 
privileged information. If you are not the intended recipient (or have received 
this e-mail in error) please notify the sender immediately and destroy this 
e-mail. Any unauthorized, direct or indirect, copying, disclosure, distribution 
or other use of the material or parts thereof is strictly forbidden.
Unsubscribe: If you would like to be removed from our mailing list, please send 
your contact information to 
casl.unsubscr...@legrand.ca<mailto:casl.unsubscr...@legrand.ca> and indicate 
what messages you no longer wish to receive.

Reply via email to