WeiZhong94 commented on a change in pull request #15130:
URL: https://github.com/apache/flink/pull/15130#discussion_r592109738



##########
File path: flink-python/pyflink/fn_execution/state_data_view.py
##########
@@ -99,9 +114,35 @@ def __hash__(self) -> int:
         return hash([i for i in self.get()])
 
 
-class StateMapView(MapView):
+class KeyedStateListView(StateListView[N]):
+    """
+    KeyedStateListView is an default implementation of StateListView whose 
underlying

Review comment:
       an -> a

##########
File path: flink-python/pyflink/fn_execution/state_data_view.py
##########
@@ -136,6 +177,32 @@ def clear(self) -> None:
         return self._map_state.clear()
 
 
+class KeyedStateMapView(StateMapView[N]):
+    """
+    KeyedStateMapView is an default implementation of StateMapView whose 
underlying

Review comment:
       an -> a

##########
File path: flink-python/pyflink/fn_execution/state_data_view.py
##########
@@ -173,9 +239,65 @@ def __init__(self,
     def get_runtime_context(self):
         return self._function_context
 
+    @abstractmethod
+    def get_state_list_view(self, state_name, element_coder):
+        """
+        Creates a state list view.
+
+        :param state_name: The name of underlying state of the list view

Review comment:
       add a period at the end of the sentence.

##########
File path: flink-python/pyflink/fn_execution/state_data_view.py
##########
@@ -99,9 +114,35 @@ def __hash__(self) -> int:
         return hash([i for i in self.get()])
 
 
-class StateMapView(MapView):
+class KeyedStateListView(StateListView[N]):
+    """
+    KeyedStateListView is an default implementation of StateListView whose 
underlying
+    representation is a keyed state.
+    """
 
-    def __init__(self, map_state: MapState):
+    def __init__(self, list_state: ListState):
+        super(KeyedStateListView, self).__init__(list_state)
+
+    def set_current_namespace(self, namespace: N):
+        raise Exception("KeyedStateListView doesn't support 
set_current_namespace")
+
+
+class NamespacedStateListView(StateListView[N]):
+    """
+    NamespacedStateListView is an StateListView whose underlying 
representation is a keyed and

Review comment:
       an -> a

##########
File path: flink-python/pyflink/fn_execution/state_data_view.py
##########
@@ -99,9 +114,35 @@ def __hash__(self) -> int:
         return hash([i for i in self.get()])
 
 
-class StateMapView(MapView):
+class KeyedStateListView(StateListView[N]):
+    """
+    KeyedStateListView is an default implementation of StateListView whose 
underlying
+    representation is a keyed state.
+    """
 
-    def __init__(self, map_state: MapState):
+    def __init__(self, list_state: ListState):
+        super(KeyedStateListView, self).__init__(list_state)
+
+    def set_current_namespace(self, namespace: N):
+        raise Exception("KeyedStateListView doesn't support 
set_current_namespace")
+
+
+class NamespacedStateListView(StateListView[N]):
+    """
+    NamespacedStateListView is an StateListView whose underlying 
representation is a keyed and
+    namespaced state. It also support to change current namespace.

Review comment:
       It also supports changing current namespace.

##########
File path: flink-python/pyflink/fn_execution/state_data_view.py
##########
@@ -173,9 +239,65 @@ def __init__(self,
     def get_runtime_context(self):
         return self._function_context
 
+    @abstractmethod
+    def get_state_list_view(self, state_name, element_coder):
+        """
+        Creates a state list view.
+
+        :param state_name: The name of underlying state of the list view
+        :param element_coder: The element coder
+        :return: a keyed list state
+        """
+        pass
+
+    @abstractmethod
+    def get_state_map_view(self, state_name, key_coder, value_coder):
+        """
+        Creates a state map view.
+
+        :param state_name: The name of underlying state of the map view
+        :param key_coder: The key coder
+        :param value_coder: The value coder
+        :return: a keyed map state
+        """
+        pass
+
+
+class PerKeyStateDataViewStore(StateDataViewStore):
+    """
+    Default implementation of StateDataViewStore.
+    """
+
+    def __init__(self,
+                 function_context: FunctionContext,
+                 keyed_state_backend: RemoteKeyedStateBackend):
+        super(PerKeyStateDataViewStore, self).__init__(function_context, 
keyed_state_backend)
+
+    def get_state_list_view(self, state_name, element_coder):
+        return KeyedStateListView(
+            self._keyed_state_backend.get_list_state(state_name, 
element_coder))
+
+    def get_state_map_view(self, state_name, key_coder, value_coder):
+        return KeyedStateMapView(
+            self._keyed_state_backend.get_map_state(state_name, key_coder, 
value_coder))
+
+
+class PerWindowStateDataViewStore(StateDataViewStore):
+    """
+    An implementation of StateDataViewStore for window aggregates which 
forwards the state
+    registration to an underlying RemoteKeyedStateBackend. The created state 
by this store has the

Review comment:
       The state created by this store

##########
File path: flink-python/pyflink/fn_execution/state_data_view.py
##########
@@ -136,6 +177,32 @@ def clear(self) -> None:
         return self._map_state.clear()
 
 
+class KeyedStateMapView(StateMapView[N]):
+    """
+    KeyedStateMapView is an default implementation of StateMapView whose 
underlying
+    representation is a keyed state.
+    """
+
+    def __init__(self, map_state: MapState):
+        super(KeyedStateMapView, self).__init__(map_state)
+
+    def set_current_namespace(self, namespace: N):
+        raise Exception("KeyedStateMapView doesn't support 
set_current_namespace")
+
+
+class NamespacedStateMapView(StateMapView[N]):
+    """
+    NamespacedStateMapView is an StateMapView whose underlying representation 
is a keyed and

Review comment:
       an -> a

##########
File path: flink-python/pyflink/fn_execution/state_data_view.py
##########
@@ -136,6 +177,32 @@ def clear(self) -> None:
         return self._map_state.clear()
 
 
+class KeyedStateMapView(StateMapView[N]):
+    """
+    KeyedStateMapView is an default implementation of StateMapView whose 
underlying
+    representation is a keyed state.
+    """
+
+    def __init__(self, map_state: MapState):
+        super(KeyedStateMapView, self).__init__(map_state)
+
+    def set_current_namespace(self, namespace: N):
+        raise Exception("KeyedStateMapView doesn't support 
set_current_namespace")
+
+
+class NamespacedStateMapView(StateMapView[N]):
+    """
+    NamespacedStateMapView is an StateMapView whose underlying representation 
is a keyed and
+    namespaced state. It also support to change current namespace.

Review comment:
       It also supports changing current namespace.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to