This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 63521977fca5e05128ad3720724f6be550b3098d Author: Dian Fu <dia...@apache.org> AuthorDate: Thu Apr 7 20:56:21 2022 +0800 [FLINK-27126][python] Respect the state cache size configuration for Python DataStream operators This closes #19394. --- .../pyflink/fn_execution/beam/beam_operations.py | 6 +- .../pyflink/fn_execution/flink_fn_execution_pb2.py | 109 ++++++++++++--------- .../pyflink/proto/flink-fn-execution.proto | 7 ++ .../operators/python/PythonCoProcessOperator.java | 12 ++- .../python/PythonKeyedCoProcessOperator.java | 12 ++- .../python/PythonKeyedProcessOperator.java | 12 ++- .../operators/python/PythonProcessOperator.java | 12 ++- .../flink/streaming/api/utils/ProtoUtils.java | 48 +++++++-- 8 files changed, 160 insertions(+), 58 deletions(-) diff --git a/flink-python/pyflink/fn_execution/beam/beam_operations.py b/flink-python/pyflink/fn_execution/beam/beam_operations.py index 17074b23c5b..3777c265638 100644 --- a/flink-python/pyflink/fn_execution/beam/beam_operations.py +++ b/flink-python/pyflink/fn_execution/beam/beam_operations.py @@ -185,9 +185,9 @@ def _create_user_defined_function_operation(factory, transform_proto, consumers, factory.state_handler, key_row_coder, None, - 1000, - 1000, - 1000) + serialized_fn.state_cache_size, + serialized_fn.map_state_read_cache_size, + serialized_fn.map_state_write_cache_size) return beam_operation_cls( transform_proto.unique_name, spec, diff --git a/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py b/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py index c087b641bed..90b0bc5af53 100644 --- a/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py +++ b/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py @@ -36,7 +36,7 @@ DESCRIPTOR = _descriptor.FileDescriptor( name='flink-fn-execution.proto', package='org.apache.flink.fn_execution.v1', syntax='proto3', - serialized_pb=_b('\n\x18\x66link-fn-execution.proto\x12 org.apache.flink.fn_execution.v1\"\x86\x01\n\x05Input\x12\x44\n\x03udf\x18\x01 \x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunctionH\x00\x12\x15\n\x0binputOffset\x18\x02 \x01(\x05H\x00\x12\x17\n\rinputConstant\x18\x03 \x01(\x0cH\x00\x42\x07\n\x05input\"\xa8\x01\n\x13UserDefinedFunction\x12\x0f\n\x07payload\x18\x01 \x01(\x0c\x12\x37\n\x06inputs\x18\x02 \x03(\x0b\x32\'.org.apache.flink.fn_execution.v1.Input\x12\x14 [...] + serialized_pb=_b('\n\x18\x66link-fn-execution.proto\x12 org.apache.flink.fn_execution.v1\"\x86\x01\n\x05Input\x12\x44\n\x03udf\x18\x01 \x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunctionH\x00\x12\x15\n\x0binputOffset\x18\x02 \x01(\x05H\x00\x12\x17\n\rinputConstant\x18\x03 \x01(\x0cH\x00\x42\x07\n\x05input\"\xa8\x01\n\x13UserDefinedFunction\x12\x0f\n\x07payload\x18\x01 \x01(\x0c\x12\x37\n\x06inputs\x18\x02 \x03(\x0b\x32\'.org.apache.flink.fn_execution.v1.Input\x12\x14 [...] ) @@ -380,8 +380,8 @@ _USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE = _descriptor.EnumDescriptor( ], containing_type=None, options=None, - serialized_start=6739, - serialized_end=6854, + serialized_start=6836, + serialized_end=6951, ) _sym_db.RegisterEnumDescriptor(_USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE) @@ -406,8 +406,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_STRATEGIES = _descriptor.EnumD ], containing_type=None, options=None, - serialized_start=8416, - serialized_end=8514, + serialized_start=8513, + serialized_end=8611, ) _sym_db.RegisterEnumDescriptor(_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_STRATEGIES) @@ -424,8 +424,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_EMPTYCLEANUPSTRATEGY = _descri ], containing_type=None, options=None, - serialized_start=8516, - serialized_end=8558, + serialized_start=8613, + serialized_end=8655, ) _sym_db.RegisterEnumDescriptor(_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_EMPTYCLEANUPSTRATEGY) @@ -450,8 +450,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_UPDATETYPE = _descriptor.EnumDescriptor( ], containing_type=None, options=None, - serialized_start=8560, - serialized_end=8628, + serialized_start=8657, + serialized_end=8725, ) _sym_db.RegisterEnumDescriptor(_STATEDESCRIPTOR_STATETTLCONFIG_UPDATETYPE) @@ -472,8 +472,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_STATEVISIBILITY = _descriptor.EnumDescriptor( ], containing_type=None, options=None, - serialized_start=8630, - serialized_end=8704, + serialized_start=8727, + serialized_end=8801, ) _sym_db.RegisterEnumDescriptor(_STATEDESCRIPTOR_STATETTLCONFIG_STATEVISIBILITY) @@ -490,8 +490,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_TTLTIMECHARACTERISTIC = _descriptor.EnumDescript ], containing_type=None, options=None, - serialized_start=8706, - serialized_end=8749, + serialized_start=8803, + serialized_end=8846, ) _sym_db.RegisterEnumDescriptor(_STATEDESCRIPTOR_STATETTLCONFIG_TTLTIMECHARACTERISTIC) @@ -512,8 +512,8 @@ _CODERINFODESCRIPTOR_MODE = _descriptor.EnumDescriptor( ], containing_type=None, options=None, - serialized_start=9716, - serialized_end=9748, + serialized_start=9813, + serialized_end=9845, ) _sym_db.RegisterEnumDescriptor(_CODERINFODESCRIPTOR_MODE) @@ -1865,8 +1865,8 @@ _USERDEFINEDDATASTREAMFUNCTION_JOBPARAMETER = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=6356, - serialized_end=6398, + serialized_start=6453, + serialized_end=6495, ) _USERDEFINEDDATASTREAMFUNCTION_RUNTIMECONTEXT = _descriptor.Descriptor( @@ -1944,8 +1944,8 @@ _USERDEFINEDDATASTREAMFUNCTION_RUNTIMECONTEXT = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=6401, - serialized_end=6737, + serialized_start=6498, + serialized_end=6834, ) _USERDEFINEDDATASTREAMFUNCTION = _descriptor.Descriptor( @@ -1997,6 +1997,27 @@ _USERDEFINEDDATASTREAMFUNCTION = _descriptor.Descriptor( message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='state_cache_size', full_name='org.apache.flink.fn_execution.v1.UserDefinedDataStreamFunction.state_cache_size', index=6, + number=7, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='map_state_read_cache_size', full_name='org.apache.flink.fn_execution.v1.UserDefinedDataStreamFunction.map_state_read_cache_size', index=7, + number=8, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='map_state_write_cache_size', full_name='org.apache.flink.fn_execution.v1.UserDefinedDataStreamFunction.map_state_write_cache_size', index=8, + number=9, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), ], extensions=[ ], @@ -2011,7 +2032,7 @@ _USERDEFINEDDATASTREAMFUNCTION = _descriptor.Descriptor( oneofs=[ ], serialized_start=5984, - serialized_end=6854, + serialized_end=6951, ) @@ -2048,8 +2069,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_INCREMENTALCLEANUPSTRATEGY = _ extension_ranges=[], oneofs=[ ], - serialized_start=7638, - serialized_end=7726, + serialized_start=7735, + serialized_end=7823, ) _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_ROCKSDBCOMPACTFILTERCLEANUPSTRATEGY = _descriptor.Descriptor( @@ -2078,8 +2099,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_ROCKSDBCOMPACTFILTERCLEANUPSTR extension_ranges=[], oneofs=[ ], - serialized_start=7728, - serialized_end=7803, + serialized_start=7825, + serialized_end=7900, ) _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_MAPSTRATEGIESENTRY = _descriptor.Descriptor( @@ -2132,8 +2153,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_MAPSTRATEGIESENTRY = _descript name='CleanupStrategy', full_name='org.apache.flink.fn_execution.v1.StateDescriptor.StateTTLConfig.CleanupStrategies.MapStrategiesEntry.CleanupStrategy', index=0, containing_type=None, fields=[]), ], - serialized_start=7806, - serialized_end=8414, + serialized_start=7903, + serialized_end=8511, ) _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES = _descriptor.Descriptor( @@ -2171,8 +2192,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=7460, - serialized_end=8558, + serialized_start=7557, + serialized_end=8655, ) _STATEDESCRIPTOR_STATETTLCONFIG = _descriptor.Descriptor( @@ -2232,8 +2253,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=6989, - serialized_end=8749, + serialized_start=7086, + serialized_end=8846, ) _STATEDESCRIPTOR = _descriptor.Descriptor( @@ -2269,8 +2290,8 @@ _STATEDESCRIPTOR = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=6857, - serialized_end=8749, + serialized_start=6954, + serialized_end=8846, ) @@ -2300,8 +2321,8 @@ _CODERINFODESCRIPTOR_FLATTENROWTYPE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=9345, - serialized_end=9419, + serialized_start=9442, + serialized_end=9516, ) _CODERINFODESCRIPTOR_ROWTYPE = _descriptor.Descriptor( @@ -2330,8 +2351,8 @@ _CODERINFODESCRIPTOR_ROWTYPE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=9421, - serialized_end=9488, + serialized_start=9518, + serialized_end=9585, ) _CODERINFODESCRIPTOR_ARROWTYPE = _descriptor.Descriptor( @@ -2360,8 +2381,8 @@ _CODERINFODESCRIPTOR_ARROWTYPE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=9490, - serialized_end=9559, + serialized_start=9587, + serialized_end=9656, ) _CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE = _descriptor.Descriptor( @@ -2390,8 +2411,8 @@ _CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=9561, - serialized_end=9640, + serialized_start=9658, + serialized_end=9737, ) _CODERINFODESCRIPTOR_RAWTYPE = _descriptor.Descriptor( @@ -2420,8 +2441,8 @@ _CODERINFODESCRIPTOR_RAWTYPE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=9642, - serialized_end=9714, + serialized_start=9739, + serialized_end=9811, ) _CODERINFODESCRIPTOR = _descriptor.Descriptor( @@ -2496,8 +2517,8 @@ _CODERINFODESCRIPTOR = _descriptor.Descriptor( name='data_type', full_name='org.apache.flink.fn_execution.v1.CoderInfoDescriptor.data_type', index=0, containing_type=None, fields=[]), ], - serialized_start=8752, - serialized_end=9761, + serialized_start=8849, + serialized_end=9858, ) _INPUT.fields_by_name['udf'].message_type = _USERDEFINEDFUNCTION diff --git a/flink-python/pyflink/proto/flink-fn-execution.proto b/flink-python/pyflink/proto/flink-fn-execution.proto index 95ee2f5c037..7ea69a55f8b 100644 --- a/flink-python/pyflink/proto/flink-fn-execution.proto +++ b/flink-python/pyflink/proto/flink-fn-execution.proto @@ -371,6 +371,13 @@ message UserDefinedDataStreamFunction { bool metric_enabled = 4; TypeInfo key_type_info = 5; bool profile_enabled = 6; + + // The state cache size. + int32 state_cache_size = 7; + // The map state read cache size. + int32 map_state_read_cache_size = 8; + // The map_state_write_cache_size. + int32 map_state_write_cache_size = 9; } // A representation of State diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoProcessOperator.java index 2b519aeb24e..3a13f6725dd 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoProcessOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoProcessOperator.java @@ -30,6 +30,11 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import static org.apache.flink.python.Constants.STATELESS_FUNCTION_URN; +import static org.apache.flink.python.PythonOptions.MAP_STATE_READ_CACHE_SIZE; +import static org.apache.flink.python.PythonOptions.MAP_STATE_WRITE_CACHE_SIZE; +import static org.apache.flink.python.PythonOptions.PYTHON_METRIC_ENABLED; +import static org.apache.flink.python.PythonOptions.PYTHON_PROFILE_ENABLED; +import static org.apache.flink.python.PythonOptions.STATE_CACHE_SIZE; import static org.apache.flink.streaming.api.utils.PythonOperatorUtils.inBatchExecutionMode; /** @@ -73,7 +78,12 @@ public class PythonCoProcessOperator<IN1, IN2, OUT> getPythonFunctionInfo(), getRuntimeContext(), getInternalParameters(), - inBatchExecutionMode(getKeyedStateBackend())), + inBatchExecutionMode(getKeyedStateBackend()), + config.get(PYTHON_METRIC_ENABLED), + config.get(PYTHON_PROFILE_ENABLED), + config.get(STATE_CACHE_SIZE), + config.get(MAP_STATE_READ_CACHE_SIZE), + config.get(MAP_STATE_WRITE_CACHE_SIZE)), getFlinkMetricContainer(), null, null, diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java index 924b86122de..26ccaedf9ad 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java @@ -41,6 +41,11 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.types.Row; import static org.apache.flink.python.Constants.STATEFUL_FUNCTION_URN; +import static org.apache.flink.python.PythonOptions.MAP_STATE_READ_CACHE_SIZE; +import static org.apache.flink.python.PythonOptions.MAP_STATE_WRITE_CACHE_SIZE; +import static org.apache.flink.python.PythonOptions.PYTHON_METRIC_ENABLED; +import static org.apache.flink.python.PythonOptions.PYTHON_PROFILE_ENABLED; +import static org.apache.flink.python.PythonOptions.STATE_CACHE_SIZE; import static org.apache.flink.streaming.api.operators.python.timer.TimerUtils.createTimerDataCoderInfoDescriptorProto; import static org.apache.flink.streaming.api.operators.python.timer.TimerUtils.createTimerDataTypeInfo; import static org.apache.flink.streaming.api.utils.PythonOperatorUtils.inBatchExecutionMode; @@ -112,7 +117,12 @@ public class PythonKeyedCoProcessOperator<OUT> getRuntimeContext(), getInternalParameters(), keyTypeInfo, - inBatchExecutionMode(getKeyedStateBackend())), + inBatchExecutionMode(getKeyedStateBackend()), + config.get(PYTHON_METRIC_ENABLED), + config.get(PYTHON_PROFILE_ENABLED), + config.get(STATE_CACHE_SIZE), + config.get(MAP_STATE_READ_CACHE_SIZE), + config.get(MAP_STATE_WRITE_CACHE_SIZE)), getFlinkMetricContainer(), getKeyedStateBackend(), keyTypeSerializer, diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java index cef5e6ce01d..2896ed3be47 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java @@ -41,6 +41,11 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.types.Row; import static org.apache.flink.python.Constants.STATEFUL_FUNCTION_URN; +import static org.apache.flink.python.PythonOptions.MAP_STATE_READ_CACHE_SIZE; +import static org.apache.flink.python.PythonOptions.MAP_STATE_WRITE_CACHE_SIZE; +import static org.apache.flink.python.PythonOptions.PYTHON_METRIC_ENABLED; +import static org.apache.flink.python.PythonOptions.PYTHON_PROFILE_ENABLED; +import static org.apache.flink.python.PythonOptions.STATE_CACHE_SIZE; import static org.apache.flink.streaming.api.operators.python.timer.TimerUtils.createTimerDataCoderInfoDescriptorProto; import static org.apache.flink.streaming.api.operators.python.timer.TimerUtils.createTimerDataTypeInfo; import static org.apache.flink.streaming.api.utils.PythonOperatorUtils.inBatchExecutionMode; @@ -142,7 +147,12 @@ public class PythonKeyedProcessOperator<OUT> getRuntimeContext(), getInternalParameters(), keyTypeInfo, - inBatchExecutionMode(getKeyedStateBackend())), + inBatchExecutionMode(getKeyedStateBackend()), + config.get(PYTHON_METRIC_ENABLED), + config.get(PYTHON_PROFILE_ENABLED), + config.get(STATE_CACHE_SIZE), + config.get(MAP_STATE_READ_CACHE_SIZE), + config.get(MAP_STATE_WRITE_CACHE_SIZE)), getFlinkMetricContainer(), getKeyedStateBackend(), keyTypeSerializer, diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonProcessOperator.java index 1bc820e5858..48e11fab123 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonProcessOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonProcessOperator.java @@ -31,6 +31,11 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import static org.apache.flink.python.Constants.STATELESS_FUNCTION_URN; +import static org.apache.flink.python.PythonOptions.MAP_STATE_READ_CACHE_SIZE; +import static org.apache.flink.python.PythonOptions.MAP_STATE_WRITE_CACHE_SIZE; +import static org.apache.flink.python.PythonOptions.PYTHON_METRIC_ENABLED; +import static org.apache.flink.python.PythonOptions.PYTHON_PROFILE_ENABLED; +import static org.apache.flink.python.PythonOptions.STATE_CACHE_SIZE; import static org.apache.flink.streaming.api.utils.PythonOperatorUtils.inBatchExecutionMode; /** @@ -70,7 +75,12 @@ public class PythonProcessOperator<IN, OUT> getPythonFunctionInfo(), getRuntimeContext(), getInternalParameters(), - inBatchExecutionMode(getKeyedStateBackend())), + inBatchExecutionMode(getKeyedStateBackend()), + config.get(PYTHON_METRIC_ENABLED), + config.get(PYTHON_PROFILE_ENABLED), + config.get(STATE_CACHE_SIZE), + config.get(MAP_STATE_READ_CACHE_SIZE), + config.get(MAP_STATE_WRITE_CACHE_SIZE)), getFlinkMetricContainer(), null, null, diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/utils/ProtoUtils.java b/flink-python/src/main/java/org/apache/flink/streaming/api/utils/ProtoUtils.java index 6676d84235b..e5e482a6aa1 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/utils/ProtoUtils.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/utils/ProtoUtils.java @@ -132,7 +132,12 @@ public enum ProtoUtils { DataStreamPythonFunctionInfo dataStreamPythonFunctionInfo, RuntimeContext runtimeContext, Map<String, String> internalParameters, - boolean inBatchExecutionMode) { + boolean inBatchExecutionMode, + boolean isMetricEnabled, + boolean isProfileEnabled, + int stateCacheSize, + int mapStateReadCacheSize, + int mapStateWriteCacheSize) { FlinkFnApi.UserDefinedDataStreamFunction.Builder builder = FlinkFnApi.UserDefinedDataStreamFunction.newBuilder(); builder.setFunctionType( @@ -175,7 +180,11 @@ public enum ProtoUtils { dataStreamPythonFunctionInfo .getPythonFunction() .getSerializedPythonFunction())); - builder.setMetricEnabled(true); + builder.setMetricEnabled(isMetricEnabled); + builder.setProfileEnabled(isProfileEnabled); + builder.setStateCacheSize(stateCacheSize); + builder.setMapStateReadCacheSize(mapStateReadCacheSize); + builder.setMapStateWriteCacheSize(mapStateWriteCacheSize); return builder.build(); } @@ -192,7 +201,12 @@ public enum ProtoUtils { DataStreamPythonFunctionInfo dataStreamPythonFunctionInfo, RuntimeContext runtimeContext, Map<String, String> internalParameters, - boolean inBatchExecutionMode) { + boolean inBatchExecutionMode, + boolean isMetricEnabled, + boolean isProfileEnabled, + int stateCacheSize, + int mapStateReadCacheSize, + int mapStateWriteCacheSize) { List<FlinkFnApi.UserDefinedDataStreamFunction> results = new ArrayList<>(); Object[] inputs = dataStreamPythonFunctionInfo.getInputs(); @@ -203,7 +217,12 @@ public enum ProtoUtils { (DataStreamPythonFunctionInfo) inputs[0], runtimeContext, internalParameters, - inBatchExecutionMode)); + inBatchExecutionMode, + isMetricEnabled, + isProfileEnabled, + stateCacheSize, + mapStateReadCacheSize, + mapStateWriteCacheSize)); } results.add( @@ -211,7 +230,12 @@ public enum ProtoUtils { dataStreamPythonFunctionInfo, runtimeContext, internalParameters, - inBatchExecutionMode)); + inBatchExecutionMode, + isMetricEnabled, + isProfileEnabled, + stateCacheSize, + mapStateReadCacheSize, + mapStateWriteCacheSize)); return results; } @@ -221,13 +245,23 @@ public enum ProtoUtils { RuntimeContext runtimeContext, Map<String, String> internalParameters, TypeInformation<?> keyTypeInfo, - boolean inBatchExecutionMode) { + boolean inBatchExecutionMode, + boolean isMetricEnabled, + boolean isProfileEnabled, + int stateCacheSize, + int mapStateReadCacheSize, + int mapStateWriteCacheSize) { List<FlinkFnApi.UserDefinedDataStreamFunction> results = createUserDefinedDataStreamFunctionProtos( dataStreamPythonFunctionInfo, runtimeContext, internalParameters, - inBatchExecutionMode); + inBatchExecutionMode, + isMetricEnabled, + isProfileEnabled, + stateCacheSize, + mapStateReadCacheSize, + mapStateWriteCacheSize); // set the key typeinfo for the head operator FlinkFnApi.TypeInfo builtKeyTypeInfo =