This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 63521977fca5e05128ad3720724f6be550b3098d
Author: Dian Fu <dia...@apache.org>
AuthorDate: Thu Apr 7 20:56:21 2022 +0800

    [FLINK-27126][python] Respect the state cache size configuration for Python 
DataStream operators
    
    This closes #19394.
---
 .../pyflink/fn_execution/beam/beam_operations.py   |   6 +-
 .../pyflink/fn_execution/flink_fn_execution_pb2.py | 109 ++++++++++++---------
 .../pyflink/proto/flink-fn-execution.proto         |   7 ++
 .../operators/python/PythonCoProcessOperator.java  |  12 ++-
 .../python/PythonKeyedCoProcessOperator.java       |  12 ++-
 .../python/PythonKeyedProcessOperator.java         |  12 ++-
 .../operators/python/PythonProcessOperator.java    |  12 ++-
 .../flink/streaming/api/utils/ProtoUtils.java      |  48 +++++++--
 8 files changed, 160 insertions(+), 58 deletions(-)

diff --git a/flink-python/pyflink/fn_execution/beam/beam_operations.py 
b/flink-python/pyflink/fn_execution/beam/beam_operations.py
index 17074b23c5b..3777c265638 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_operations.py
+++ b/flink-python/pyflink/fn_execution/beam/beam_operations.py
@@ -185,9 +185,9 @@ def _create_user_defined_function_operation(factory, 
transform_proto, consumers,
             factory.state_handler,
             key_row_coder,
             None,
-            1000,
-            1000,
-            1000)
+            serialized_fn.state_cache_size,
+            serialized_fn.map_state_read_cache_size,
+            serialized_fn.map_state_write_cache_size)
         return beam_operation_cls(
             transform_proto.unique_name,
             spec,
diff --git a/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py 
b/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
index c087b641bed..90b0bc5af53 100644
--- a/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
+++ b/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
@@ -36,7 +36,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
   name='flink-fn-execution.proto',
   package='org.apache.flink.fn_execution.v1',
   syntax='proto3',
-  serialized_pb=_b('\n\x18\x66link-fn-execution.proto\x12 
org.apache.flink.fn_execution.v1\"\x86\x01\n\x05Input\x12\x44\n\x03udf\x18\x01 
\x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunctionH\x00\x12\x15\n\x0binputOffset\x18\x02
 \x01(\x05H\x00\x12\x17\n\rinputConstant\x18\x03 
\x01(\x0cH\x00\x42\x07\n\x05input\"\xa8\x01\n\x13UserDefinedFunction\x12\x0f\n\x07payload\x18\x01
 \x01(\x0c\x12\x37\n\x06inputs\x18\x02 
\x03(\x0b\x32\'.org.apache.flink.fn_execution.v1.Input\x12\x14 [...]
+  serialized_pb=_b('\n\x18\x66link-fn-execution.proto\x12 
org.apache.flink.fn_execution.v1\"\x86\x01\n\x05Input\x12\x44\n\x03udf\x18\x01 
\x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunctionH\x00\x12\x15\n\x0binputOffset\x18\x02
 \x01(\x05H\x00\x12\x17\n\rinputConstant\x18\x03 
\x01(\x0cH\x00\x42\x07\n\x05input\"\xa8\x01\n\x13UserDefinedFunction\x12\x0f\n\x07payload\x18\x01
 \x01(\x0c\x12\x37\n\x06inputs\x18\x02 
\x03(\x0b\x32\'.org.apache.flink.fn_execution.v1.Input\x12\x14 [...]
 )
 
 
@@ -380,8 +380,8 @@ _USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE = 
_descriptor.EnumDescriptor(
   ],
   containing_type=None,
   options=None,
-  serialized_start=6739,
-  serialized_end=6854,
+  serialized_start=6836,
+  serialized_end=6951,
 )
 _sym_db.RegisterEnumDescriptor(_USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE)
 
@@ -406,8 +406,8 @@ 
_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_STRATEGIES = _descriptor.EnumD
   ],
   containing_type=None,
   options=None,
-  serialized_start=8416,
-  serialized_end=8514,
+  serialized_start=8513,
+  serialized_end=8611,
 )
 
_sym_db.RegisterEnumDescriptor(_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_STRATEGIES)
 
@@ -424,8 +424,8 @@ 
_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_EMPTYCLEANUPSTRATEGY = _descri
   ],
   containing_type=None,
   options=None,
-  serialized_start=8516,
-  serialized_end=8558,
+  serialized_start=8613,
+  serialized_end=8655,
 )
 
_sym_db.RegisterEnumDescriptor(_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_EMPTYCLEANUPSTRATEGY)
 
@@ -450,8 +450,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_UPDATETYPE = 
_descriptor.EnumDescriptor(
   ],
   containing_type=None,
   options=None,
-  serialized_start=8560,
-  serialized_end=8628,
+  serialized_start=8657,
+  serialized_end=8725,
 )
 _sym_db.RegisterEnumDescriptor(_STATEDESCRIPTOR_STATETTLCONFIG_UPDATETYPE)
 
@@ -472,8 +472,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_STATEVISIBILITY = 
_descriptor.EnumDescriptor(
   ],
   containing_type=None,
   options=None,
-  serialized_start=8630,
-  serialized_end=8704,
+  serialized_start=8727,
+  serialized_end=8801,
 )
 _sym_db.RegisterEnumDescriptor(_STATEDESCRIPTOR_STATETTLCONFIG_STATEVISIBILITY)
 
@@ -490,8 +490,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_TTLTIMECHARACTERISTIC = 
_descriptor.EnumDescript
   ],
   containing_type=None,
   options=None,
-  serialized_start=8706,
-  serialized_end=8749,
+  serialized_start=8803,
+  serialized_end=8846,
 )
 
_sym_db.RegisterEnumDescriptor(_STATEDESCRIPTOR_STATETTLCONFIG_TTLTIMECHARACTERISTIC)
 
@@ -512,8 +512,8 @@ _CODERINFODESCRIPTOR_MODE = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   options=None,
-  serialized_start=9716,
-  serialized_end=9748,
+  serialized_start=9813,
+  serialized_end=9845,
 )
 _sym_db.RegisterEnumDescriptor(_CODERINFODESCRIPTOR_MODE)
 
@@ -1865,8 +1865,8 @@ _USERDEFINEDDATASTREAMFUNCTION_JOBPARAMETER = 
_descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=6356,
-  serialized_end=6398,
+  serialized_start=6453,
+  serialized_end=6495,
 )
 
 _USERDEFINEDDATASTREAMFUNCTION_RUNTIMECONTEXT = _descriptor.Descriptor(
@@ -1944,8 +1944,8 @@ _USERDEFINEDDATASTREAMFUNCTION_RUNTIMECONTEXT = 
_descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=6401,
-  serialized_end=6737,
+  serialized_start=6498,
+  serialized_end=6834,
 )
 
 _USERDEFINEDDATASTREAMFUNCTION = _descriptor.Descriptor(
@@ -1997,6 +1997,27 @@ _USERDEFINEDDATASTREAMFUNCTION = _descriptor.Descriptor(
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='state_cache_size', 
full_name='org.apache.flink.fn_execution.v1.UserDefinedDataStreamFunction.state_cache_size',
 index=6,
+      number=7, type=5, cpp_type=1, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='map_state_read_cache_size', 
full_name='org.apache.flink.fn_execution.v1.UserDefinedDataStreamFunction.map_state_read_cache_size',
 index=7,
+      number=8, type=5, cpp_type=1, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='map_state_write_cache_size', 
full_name='org.apache.flink.fn_execution.v1.UserDefinedDataStreamFunction.map_state_write_cache_size',
 index=8,
+      number=9, type=5, cpp_type=1, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
@@ -2011,7 +2032,7 @@ _USERDEFINEDDATASTREAMFUNCTION = _descriptor.Descriptor(
   oneofs=[
   ],
   serialized_start=5984,
-  serialized_end=6854,
+  serialized_end=6951,
 )
 
 
@@ -2048,8 +2069,8 @@ 
_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_INCREMENTALCLEANUPSTRATEGY = _
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=7638,
-  serialized_end=7726,
+  serialized_start=7735,
+  serialized_end=7823,
 )
 
 
_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_ROCKSDBCOMPACTFILTERCLEANUPSTRATEGY
 = _descriptor.Descriptor(
@@ -2078,8 +2099,8 @@ 
_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_ROCKSDBCOMPACTFILTERCLEANUPSTR
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=7728,
-  serialized_end=7803,
+  serialized_start=7825,
+  serialized_end=7900,
 )
 
 _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_MAPSTRATEGIESENTRY = 
_descriptor.Descriptor(
@@ -2132,8 +2153,8 @@ 
_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_MAPSTRATEGIESENTRY = _descript
       name='CleanupStrategy', 
full_name='org.apache.flink.fn_execution.v1.StateDescriptor.StateTTLConfig.CleanupStrategies.MapStrategiesEntry.CleanupStrategy',
       index=0, containing_type=None, fields=[]),
   ],
-  serialized_start=7806,
-  serialized_end=8414,
+  serialized_start=7903,
+  serialized_end=8511,
 )
 
 _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES = _descriptor.Descriptor(
@@ -2171,8 +2192,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES = 
_descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=7460,
-  serialized_end=8558,
+  serialized_start=7557,
+  serialized_end=8655,
 )
 
 _STATEDESCRIPTOR_STATETTLCONFIG = _descriptor.Descriptor(
@@ -2232,8 +2253,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=6989,
-  serialized_end=8749,
+  serialized_start=7086,
+  serialized_end=8846,
 )
 
 _STATEDESCRIPTOR = _descriptor.Descriptor(
@@ -2269,8 +2290,8 @@ _STATEDESCRIPTOR = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=6857,
-  serialized_end=8749,
+  serialized_start=6954,
+  serialized_end=8846,
 )
 
 
@@ -2300,8 +2321,8 @@ _CODERINFODESCRIPTOR_FLATTENROWTYPE = 
_descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=9345,
-  serialized_end=9419,
+  serialized_start=9442,
+  serialized_end=9516,
 )
 
 _CODERINFODESCRIPTOR_ROWTYPE = _descriptor.Descriptor(
@@ -2330,8 +2351,8 @@ _CODERINFODESCRIPTOR_ROWTYPE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=9421,
-  serialized_end=9488,
+  serialized_start=9518,
+  serialized_end=9585,
 )
 
 _CODERINFODESCRIPTOR_ARROWTYPE = _descriptor.Descriptor(
@@ -2360,8 +2381,8 @@ _CODERINFODESCRIPTOR_ARROWTYPE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=9490,
-  serialized_end=9559,
+  serialized_start=9587,
+  serialized_end=9656,
 )
 
 _CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE = _descriptor.Descriptor(
@@ -2390,8 +2411,8 @@ _CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE = 
_descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=9561,
-  serialized_end=9640,
+  serialized_start=9658,
+  serialized_end=9737,
 )
 
 _CODERINFODESCRIPTOR_RAWTYPE = _descriptor.Descriptor(
@@ -2420,8 +2441,8 @@ _CODERINFODESCRIPTOR_RAWTYPE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=9642,
-  serialized_end=9714,
+  serialized_start=9739,
+  serialized_end=9811,
 )
 
 _CODERINFODESCRIPTOR = _descriptor.Descriptor(
@@ -2496,8 +2517,8 @@ _CODERINFODESCRIPTOR = _descriptor.Descriptor(
       name='data_type', 
full_name='org.apache.flink.fn_execution.v1.CoderInfoDescriptor.data_type',
       index=0, containing_type=None, fields=[]),
   ],
-  serialized_start=8752,
-  serialized_end=9761,
+  serialized_start=8849,
+  serialized_end=9858,
 )
 
 _INPUT.fields_by_name['udf'].message_type = _USERDEFINEDFUNCTION
diff --git a/flink-python/pyflink/proto/flink-fn-execution.proto 
b/flink-python/pyflink/proto/flink-fn-execution.proto
index 95ee2f5c037..7ea69a55f8b 100644
--- a/flink-python/pyflink/proto/flink-fn-execution.proto
+++ b/flink-python/pyflink/proto/flink-fn-execution.proto
@@ -371,6 +371,13 @@ message UserDefinedDataStreamFunction {
   bool metric_enabled = 4;
   TypeInfo key_type_info = 5;
   bool profile_enabled = 6;
+
+  // The state cache size.
+  int32 state_cache_size = 7;
+  // The map state read cache size.
+  int32 map_state_read_cache_size = 8;
+  // The map_state_write_cache_size.
+  int32 map_state_write_cache_size = 9;
 }
 
 // A representation of State
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoProcessOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoProcessOperator.java
index 2b519aeb24e..3a13f6725dd 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoProcessOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoProcessOperator.java
@@ -30,6 +30,11 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import static org.apache.flink.python.Constants.STATELESS_FUNCTION_URN;
+import static org.apache.flink.python.PythonOptions.MAP_STATE_READ_CACHE_SIZE;
+import static org.apache.flink.python.PythonOptions.MAP_STATE_WRITE_CACHE_SIZE;
+import static org.apache.flink.python.PythonOptions.PYTHON_METRIC_ENABLED;
+import static org.apache.flink.python.PythonOptions.PYTHON_PROFILE_ENABLED;
+import static org.apache.flink.python.PythonOptions.STATE_CACHE_SIZE;
 import static 
org.apache.flink.streaming.api.utils.PythonOperatorUtils.inBatchExecutionMode;
 
 /**
@@ -73,7 +78,12 @@ public class PythonCoProcessOperator<IN1, IN2, OUT>
                         getPythonFunctionInfo(),
                         getRuntimeContext(),
                         getInternalParameters(),
-                        inBatchExecutionMode(getKeyedStateBackend())),
+                        inBatchExecutionMode(getKeyedStateBackend()),
+                        config.get(PYTHON_METRIC_ENABLED),
+                        config.get(PYTHON_PROFILE_ENABLED),
+                        config.get(STATE_CACHE_SIZE),
+                        config.get(MAP_STATE_READ_CACHE_SIZE),
+                        config.get(MAP_STATE_WRITE_CACHE_SIZE)),
                 getFlinkMetricContainer(),
                 null,
                 null,
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
index 924b86122de..26ccaedf9ad 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
@@ -41,6 +41,11 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.types.Row;
 
 import static org.apache.flink.python.Constants.STATEFUL_FUNCTION_URN;
+import static org.apache.flink.python.PythonOptions.MAP_STATE_READ_CACHE_SIZE;
+import static org.apache.flink.python.PythonOptions.MAP_STATE_WRITE_CACHE_SIZE;
+import static org.apache.flink.python.PythonOptions.PYTHON_METRIC_ENABLED;
+import static org.apache.flink.python.PythonOptions.PYTHON_PROFILE_ENABLED;
+import static org.apache.flink.python.PythonOptions.STATE_CACHE_SIZE;
 import static 
org.apache.flink.streaming.api.operators.python.timer.TimerUtils.createTimerDataCoderInfoDescriptorProto;
 import static 
org.apache.flink.streaming.api.operators.python.timer.TimerUtils.createTimerDataTypeInfo;
 import static 
org.apache.flink.streaming.api.utils.PythonOperatorUtils.inBatchExecutionMode;
@@ -112,7 +117,12 @@ public class PythonKeyedCoProcessOperator<OUT>
                         getRuntimeContext(),
                         getInternalParameters(),
                         keyTypeInfo,
-                        inBatchExecutionMode(getKeyedStateBackend())),
+                        inBatchExecutionMode(getKeyedStateBackend()),
+                        config.get(PYTHON_METRIC_ENABLED),
+                        config.get(PYTHON_PROFILE_ENABLED),
+                        config.get(STATE_CACHE_SIZE),
+                        config.get(MAP_STATE_READ_CACHE_SIZE),
+                        config.get(MAP_STATE_WRITE_CACHE_SIZE)),
                 getFlinkMetricContainer(),
                 getKeyedStateBackend(),
                 keyTypeSerializer,
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java
index cef5e6ce01d..2896ed3be47 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java
@@ -41,6 +41,11 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.types.Row;
 
 import static org.apache.flink.python.Constants.STATEFUL_FUNCTION_URN;
+import static org.apache.flink.python.PythonOptions.MAP_STATE_READ_CACHE_SIZE;
+import static org.apache.flink.python.PythonOptions.MAP_STATE_WRITE_CACHE_SIZE;
+import static org.apache.flink.python.PythonOptions.PYTHON_METRIC_ENABLED;
+import static org.apache.flink.python.PythonOptions.PYTHON_PROFILE_ENABLED;
+import static org.apache.flink.python.PythonOptions.STATE_CACHE_SIZE;
 import static 
org.apache.flink.streaming.api.operators.python.timer.TimerUtils.createTimerDataCoderInfoDescriptorProto;
 import static 
org.apache.flink.streaming.api.operators.python.timer.TimerUtils.createTimerDataTypeInfo;
 import static 
org.apache.flink.streaming.api.utils.PythonOperatorUtils.inBatchExecutionMode;
@@ -142,7 +147,12 @@ public class PythonKeyedProcessOperator<OUT>
                         getRuntimeContext(),
                         getInternalParameters(),
                         keyTypeInfo,
-                        inBatchExecutionMode(getKeyedStateBackend())),
+                        inBatchExecutionMode(getKeyedStateBackend()),
+                        config.get(PYTHON_METRIC_ENABLED),
+                        config.get(PYTHON_PROFILE_ENABLED),
+                        config.get(STATE_CACHE_SIZE),
+                        config.get(MAP_STATE_READ_CACHE_SIZE),
+                        config.get(MAP_STATE_WRITE_CACHE_SIZE)),
                 getFlinkMetricContainer(),
                 getKeyedStateBackend(),
                 keyTypeSerializer,
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonProcessOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonProcessOperator.java
index 1bc820e5858..48e11fab123 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonProcessOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonProcessOperator.java
@@ -31,6 +31,11 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import static org.apache.flink.python.Constants.STATELESS_FUNCTION_URN;
+import static org.apache.flink.python.PythonOptions.MAP_STATE_READ_CACHE_SIZE;
+import static org.apache.flink.python.PythonOptions.MAP_STATE_WRITE_CACHE_SIZE;
+import static org.apache.flink.python.PythonOptions.PYTHON_METRIC_ENABLED;
+import static org.apache.flink.python.PythonOptions.PYTHON_PROFILE_ENABLED;
+import static org.apache.flink.python.PythonOptions.STATE_CACHE_SIZE;
 import static 
org.apache.flink.streaming.api.utils.PythonOperatorUtils.inBatchExecutionMode;
 
 /**
@@ -70,7 +75,12 @@ public class PythonProcessOperator<IN, OUT>
                         getPythonFunctionInfo(),
                         getRuntimeContext(),
                         getInternalParameters(),
-                        inBatchExecutionMode(getKeyedStateBackend())),
+                        inBatchExecutionMode(getKeyedStateBackend()),
+                        config.get(PYTHON_METRIC_ENABLED),
+                        config.get(PYTHON_PROFILE_ENABLED),
+                        config.get(STATE_CACHE_SIZE),
+                        config.get(MAP_STATE_READ_CACHE_SIZE),
+                        config.get(MAP_STATE_WRITE_CACHE_SIZE)),
                 getFlinkMetricContainer(),
                 null,
                 null,
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/utils/ProtoUtils.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/utils/ProtoUtils.java
index 6676d84235b..e5e482a6aa1 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/utils/ProtoUtils.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/utils/ProtoUtils.java
@@ -132,7 +132,12 @@ public enum ProtoUtils {
             DataStreamPythonFunctionInfo dataStreamPythonFunctionInfo,
             RuntimeContext runtimeContext,
             Map<String, String> internalParameters,
-            boolean inBatchExecutionMode) {
+            boolean inBatchExecutionMode,
+            boolean isMetricEnabled,
+            boolean isProfileEnabled,
+            int stateCacheSize,
+            int mapStateReadCacheSize,
+            int mapStateWriteCacheSize) {
         FlinkFnApi.UserDefinedDataStreamFunction.Builder builder =
                 FlinkFnApi.UserDefinedDataStreamFunction.newBuilder();
         builder.setFunctionType(
@@ -175,7 +180,11 @@ public enum ProtoUtils {
                         dataStreamPythonFunctionInfo
                                 .getPythonFunction()
                                 .getSerializedPythonFunction()));
-        builder.setMetricEnabled(true);
+        builder.setMetricEnabled(isMetricEnabled);
+        builder.setProfileEnabled(isProfileEnabled);
+        builder.setStateCacheSize(stateCacheSize);
+        builder.setMapStateReadCacheSize(mapStateReadCacheSize);
+        builder.setMapStateWriteCacheSize(mapStateWriteCacheSize);
         return builder.build();
     }
 
@@ -192,7 +201,12 @@ public enum ProtoUtils {
                     DataStreamPythonFunctionInfo dataStreamPythonFunctionInfo,
                     RuntimeContext runtimeContext,
                     Map<String, String> internalParameters,
-                    boolean inBatchExecutionMode) {
+                    boolean inBatchExecutionMode,
+                    boolean isMetricEnabled,
+                    boolean isProfileEnabled,
+                    int stateCacheSize,
+                    int mapStateReadCacheSize,
+                    int mapStateWriteCacheSize) {
         List<FlinkFnApi.UserDefinedDataStreamFunction> results = new 
ArrayList<>();
 
         Object[] inputs = dataStreamPythonFunctionInfo.getInputs();
@@ -203,7 +217,12 @@ public enum ProtoUtils {
                             (DataStreamPythonFunctionInfo) inputs[0],
                             runtimeContext,
                             internalParameters,
-                            inBatchExecutionMode));
+                            inBatchExecutionMode,
+                            isMetricEnabled,
+                            isProfileEnabled,
+                            stateCacheSize,
+                            mapStateReadCacheSize,
+                            mapStateWriteCacheSize));
         }
 
         results.add(
@@ -211,7 +230,12 @@ public enum ProtoUtils {
                         dataStreamPythonFunctionInfo,
                         runtimeContext,
                         internalParameters,
-                        inBatchExecutionMode));
+                        inBatchExecutionMode,
+                        isMetricEnabled,
+                        isProfileEnabled,
+                        stateCacheSize,
+                        mapStateReadCacheSize,
+                        mapStateWriteCacheSize));
         return results;
     }
 
@@ -221,13 +245,23 @@ public enum ProtoUtils {
                     RuntimeContext runtimeContext,
                     Map<String, String> internalParameters,
                     TypeInformation<?> keyTypeInfo,
-                    boolean inBatchExecutionMode) {
+                    boolean inBatchExecutionMode,
+                    boolean isMetricEnabled,
+                    boolean isProfileEnabled,
+                    int stateCacheSize,
+                    int mapStateReadCacheSize,
+                    int mapStateWriteCacheSize) {
         List<FlinkFnApi.UserDefinedDataStreamFunction> results =
                 createUserDefinedDataStreamFunctionProtos(
                         dataStreamPythonFunctionInfo,
                         runtimeContext,
                         internalParameters,
-                        inBatchExecutionMode);
+                        inBatchExecutionMode,
+                        isMetricEnabled,
+                        isProfileEnabled,
+                        stateCacheSize,
+                        mapStateReadCacheSize,
+                        mapStateWriteCacheSize);
 
         // set the key typeinfo for the head operator
         FlinkFnApi.TypeInfo builtKeyTypeInfo =

Reply via email to