This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d62be55c09a [FLINK-38933][python] Expose runtime context information
in Python UDX FunctionContext
d62be55c09a is described below
commit d62be55c09a7beb85e6517312233127cd85e7dce
Author: yongliu <[email protected]>
AuthorDate: Thu Mar 26 11:47:39 2026 +0800
[FLINK-38933][python] Expose runtime context information in Python UDX
FunctionContext
This aligns Python UDX with Java UDX by exposing runtime context
information through FunctionContext. Added the following getter methods to
FunctionContext:
- get_task_name()
- get_task_name_with_subtasks()
- get_number_of_parallel_subtasks()
- get_max_number_of_parallel_subtasks()
- get_index_of_this_subtask()
- get_attempt_number()
The runtime context is propagated from Java operators via the protobuf
protocol by adding a runtime_context field to UserDefinedFunctions and
UserDefinedAggregateFunctions messages.
This closes #27831.
---
.../pyflink/fn_execution/flink_fn_execution_pb2.py | 212 ++++++++++-----------
.../fn_execution/flink_fn_execution_pb2.pyi | 12 +-
.../fn_execution/metrics/tests/test_metric.py | 25 +++
.../table/async_function/operations.py | 17 +-
.../pyflink/fn_execution/table/operations.py | 32 +++-
.../pyflink/proto/flink-fn-execution.proto | 6 +
flink-python/pyflink/table/tests/test_udf.py | 39 ++++
flink-python/pyflink/table/udf.py | 64 ++++++-
.../org/apache/flink/python/util/ProtoUtils.java | 12 ++
.../AbstractPythonStreamAggregateOperator.java | 13 ++
...wPythonOverWindowAggregateFunctionOperator.java | 13 ++
11 files changed, 329 insertions(+), 116 deletions(-)
diff --git a/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
b/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
index 5356b9f8738..96984f80d5c 100644
--- a/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
+++ b/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
@@ -31,7 +31,7 @@ _sym_db = _symbol_database.Default()
-DESCRIPTOR =
_descriptor_pool.Default().AddSerializedFile(b'\n\x18\x66link-fn-execution.proto\x12
org.apache.flink.fn_execution.v1\"*\n\x0cJobParameter\x12\x0b\n\x03key\x18\x01
\x01(\t\x12\r\n\x05value\x18\x02
\x01(\t\"\x86\x01\n\x05Input\x12\x44\n\x03udf\x18\x01
\x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunctionH\x00\x12\x15\n\x0binputOffset\x18\x02
\x01(\x05H\x00\x12\x17\n\rinputConstant\x18\x03
\x01(\x0cH\x00\x42\x07\n\x05input\"\xa8\x01\n\x13UserDefinedFunction\x1 [...]
+DESCRIPTOR =
_descriptor_pool.Default().AddSerializedFile(b'\n\x18\x66link-fn-execution.proto\x12
org.apache.flink.fn_execution.v1\"*\n\x0cJobParameter\x12\x0b\n\x03key\x18\x01
\x01(\t\x12\r\n\x05value\x18\x02
\x01(\t\"\x86\x01\n\x05Input\x12\x44\n\x03udf\x18\x01
\x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunctionH\x00\x12\x15\n\x0binputOffset\x18\x02
\x01(\x05H\x00\x12\x17\n\rinputConstant\x18\x03
\x01(\x0cH\x00\x42\x07\n\x05input\"\xa8\x01\n\x13UserDefinedFunction\x1 [...]
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
@@ -48,109 +48,109 @@ if _descriptor._USE_C_DESCRIPTORS == False:
_globals['_ASYNCOPTIONS']._serialized_start=415
_globals['_ASYNCOPTIONS']._serialized_end=559
_globals['_USERDEFINEDFUNCTIONS']._serialized_start=562
- _globals['_USERDEFINEDFUNCTIONS']._serialized_end=908
- _globals['_OVERWINDOW']._serialized_start=911
- _globals['_OVERWINDOW']._serialized_end=1260
- _globals['_OVERWINDOW_WINDOWTYPE']._serialized_start=1052
- _globals['_OVERWINDOW_WINDOWTYPE']._serialized_end=1260
- _globals['_USERDEFINEDAGGREGATEFUNCTION']._serialized_start=1263
- _globals['_USERDEFINEDAGGREGATEFUNCTION']._serialized_end=2042
- _globals['_USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC']._serialized_start=1528
- _globals['_USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC']._serialized_end=2042
-
_globals['_USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_LISTVIEW']._serialized_start=1791
-
_globals['_USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_LISTVIEW']._serialized_end=1875
-
_globals['_USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_MAPVIEW']._serialized_start=1878
-
_globals['_USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_MAPVIEW']._serialized_end=2029
- _globals['_GROUPWINDOW']._serialized_start=2045
- _globals['_GROUPWINDOW']._serialized_end=2601
- _globals['_GROUPWINDOW_WINDOWTYPE']._serialized_start=2409
- _globals['_GROUPWINDOW_WINDOWTYPE']._serialized_end=2500
- _globals['_GROUPWINDOW_WINDOWPROPERTY']._serialized_start=2502
- _globals['_GROUPWINDOW_WINDOWPROPERTY']._serialized_end=2601
- _globals['_USERDEFINEDAGGREGATEFUNCTIONS']._serialized_start=2604
- _globals['_USERDEFINEDAGGREGATEFUNCTIONS']._serialized_end=3210
- _globals['_SCHEMA']._serialized_start=3213
- _globals['_SCHEMA']._serialized_end=5251
- _globals['_SCHEMA_MAPINFO']._serialized_start=3288
- _globals['_SCHEMA_MAPINFO']._serialized_end=3439
- _globals['_SCHEMA_TIMEINFO']._serialized_start=3441
- _globals['_SCHEMA_TIMEINFO']._serialized_end=3470
- _globals['_SCHEMA_TIMESTAMPINFO']._serialized_start=3472
- _globals['_SCHEMA_TIMESTAMPINFO']._serialized_end=3506
- _globals['_SCHEMA_LOCALZONEDTIMESTAMPINFO']._serialized_start=3508
- _globals['_SCHEMA_LOCALZONEDTIMESTAMPINFO']._serialized_end=3552
- _globals['_SCHEMA_ZONEDTIMESTAMPINFO']._serialized_start=3554
- _globals['_SCHEMA_ZONEDTIMESTAMPINFO']._serialized_end=3593
- _globals['_SCHEMA_DECIMALINFO']._serialized_start=3595
- _globals['_SCHEMA_DECIMALINFO']._serialized_end=3642
- _globals['_SCHEMA_BINARYINFO']._serialized_start=3644
- _globals['_SCHEMA_BINARYINFO']._serialized_end=3672
- _globals['_SCHEMA_VARBINARYINFO']._serialized_start=3674
- _globals['_SCHEMA_VARBINARYINFO']._serialized_end=3705
- _globals['_SCHEMA_CHARINFO']._serialized_start=3707
- _globals['_SCHEMA_CHARINFO']._serialized_end=3733
- _globals['_SCHEMA_VARCHARINFO']._serialized_start=3735
- _globals['_SCHEMA_VARCHARINFO']._serialized_end=3764
- _globals['_SCHEMA_FIELDTYPE']._serialized_start=3767
- _globals['_SCHEMA_FIELDTYPE']._serialized_end=4839
- _globals['_SCHEMA_FIELD']._serialized_start=4841
- _globals['_SCHEMA_FIELD']._serialized_end=4949
- _globals['_SCHEMA_TYPENAME']._serialized_start=4952
- _globals['_SCHEMA_TYPENAME']._serialized_end=5251
- _globals['_TYPEINFO']._serialized_start=5254
- _globals['_TYPEINFO']._serialized_end=6601
- _globals['_TYPEINFO_MAPTYPEINFO']._serialized_start=5748
- _globals['_TYPEINFO_MAPTYPEINFO']._serialized_end=5887
- _globals['_TYPEINFO_ROWTYPEINFO']._serialized_start=5890
- _globals['_TYPEINFO_ROWTYPEINFO']._serialized_end=6074
- _globals['_TYPEINFO_ROWTYPEINFO_FIELD']._serialized_start=5983
- _globals['_TYPEINFO_ROWTYPEINFO_FIELD']._serialized_end=6074
- _globals['_TYPEINFO_TUPLETYPEINFO']._serialized_start=6076
- _globals['_TYPEINFO_TUPLETYPEINFO']._serialized_end=6156
- _globals['_TYPEINFO_AVROTYPEINFO']._serialized_start=6158
- _globals['_TYPEINFO_AVROTYPEINFO']._serialized_end=6188
- _globals['_TYPEINFO_TYPENAME']._serialized_start=6191
- _globals['_TYPEINFO_TYPENAME']._serialized_end=6588
- _globals['_USERDEFINEDDATASTREAMFUNCTION']._serialized_start=6604
- _globals['_USERDEFINEDDATASTREAMFUNCTION']._serialized_end=7581
-
_globals['_USERDEFINEDDATASTREAMFUNCTION_RUNTIMECONTEXT']._serialized_start=7099
-
_globals['_USERDEFINEDDATASTREAMFUNCTION_RUNTIMECONTEXT']._serialized_end=7405
-
_globals['_USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE']._serialized_start=7408
- _globals['_USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE']._serialized_end=7581
- _globals['_STATEDESCRIPTOR']._serialized_start=7584
- _globals['_STATEDESCRIPTOR']._serialized_end=9476
- _globals['_STATEDESCRIPTOR_STATETTLCONFIG']._serialized_start=7716
- _globals['_STATEDESCRIPTOR_STATETTLCONFIG']._serialized_end=9476
-
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES']._serialized_start=8187
-
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES']._serialized_end=9285
-
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_INCREMENTALCLEANUPSTRATEGY']._serialized_start=8365
-
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_INCREMENTALCLEANUPSTRATEGY']._serialized_end=8453
-
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_ROCKSDBCOMPACTFILTERCLEANUPSTRATEGY']._serialized_start=8455
-
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_ROCKSDBCOMPACTFILTERCLEANUPSTRATEGY']._serialized_end=8530
-
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_MAPSTRATEGIESENTRY']._serialized_start=8533
-
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_MAPSTRATEGIESENTRY']._serialized_end=9141
-
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_STRATEGIES']._serialized_start=9143
-
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_STRATEGIES']._serialized_end=9241
-
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_EMPTYCLEANUPSTRATEGY']._serialized_start=9243
-
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_EMPTYCLEANUPSTRATEGY']._serialized_end=9285
- _globals['_STATEDESCRIPTOR_STATETTLCONFIG_UPDATETYPE']._serialized_start=9287
- _globals['_STATEDESCRIPTOR_STATETTLCONFIG_UPDATETYPE']._serialized_end=9355
-
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_STATEVISIBILITY']._serialized_start=9357
-
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_STATEVISIBILITY']._serialized_end=9431
-
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_TTLTIMECHARACTERISTIC']._serialized_start=9433
-
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_TTLTIMECHARACTERISTIC']._serialized_end=9476
- _globals['_CODERINFODESCRIPTOR']._serialized_start=9479
- _globals['_CODERINFODESCRIPTOR']._serialized_end=10488
- _globals['_CODERINFODESCRIPTOR_FLATTENROWTYPE']._serialized_start=10072
- _globals['_CODERINFODESCRIPTOR_FLATTENROWTYPE']._serialized_end=10146
- _globals['_CODERINFODESCRIPTOR_ROWTYPE']._serialized_start=10148
- _globals['_CODERINFODESCRIPTOR_ROWTYPE']._serialized_end=10215
- _globals['_CODERINFODESCRIPTOR_ARROWTYPE']._serialized_start=10217
- _globals['_CODERINFODESCRIPTOR_ARROWTYPE']._serialized_end=10286
- _globals['_CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE']._serialized_start=10288
- _globals['_CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE']._serialized_end=10367
- _globals['_CODERINFODESCRIPTOR_RAWTYPE']._serialized_start=10369
- _globals['_CODERINFODESCRIPTOR_RAWTYPE']._serialized_end=10441
- _globals['_CODERINFODESCRIPTOR_MODE']._serialized_start=10443
- _globals['_CODERINFODESCRIPTOR_MODE']._serialized_end=10475
+ _globals['_USERDEFINEDFUNCTIONS']._serialized_end=1013
+ _globals['_OVERWINDOW']._serialized_start=1016
+ _globals['_OVERWINDOW']._serialized_end=1365
+ _globals['_OVERWINDOW_WINDOWTYPE']._serialized_start=1157
+ _globals['_OVERWINDOW_WINDOWTYPE']._serialized_end=1365
+ _globals['_USERDEFINEDAGGREGATEFUNCTION']._serialized_start=1368
+ _globals['_USERDEFINEDAGGREGATEFUNCTION']._serialized_end=2147
+ _globals['_USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC']._serialized_start=1633
+ _globals['_USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC']._serialized_end=2147
+
_globals['_USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_LISTVIEW']._serialized_start=1896
+
_globals['_USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_LISTVIEW']._serialized_end=1980
+
_globals['_USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_MAPVIEW']._serialized_start=1983
+
_globals['_USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_MAPVIEW']._serialized_end=2134
+ _globals['_GROUPWINDOW']._serialized_start=2150
+ _globals['_GROUPWINDOW']._serialized_end=2706
+ _globals['_GROUPWINDOW_WINDOWTYPE']._serialized_start=2514
+ _globals['_GROUPWINDOW_WINDOWTYPE']._serialized_end=2605
+ _globals['_GROUPWINDOW_WINDOWPROPERTY']._serialized_start=2607
+ _globals['_GROUPWINDOW_WINDOWPROPERTY']._serialized_end=2706
+ _globals['_USERDEFINEDAGGREGATEFUNCTIONS']._serialized_start=2709
+ _globals['_USERDEFINEDAGGREGATEFUNCTIONS']._serialized_end=3420
+ _globals['_SCHEMA']._serialized_start=3423
+ _globals['_SCHEMA']._serialized_end=5461
+ _globals['_SCHEMA_MAPINFO']._serialized_start=3498
+ _globals['_SCHEMA_MAPINFO']._serialized_end=3649
+ _globals['_SCHEMA_TIMEINFO']._serialized_start=3651
+ _globals['_SCHEMA_TIMEINFO']._serialized_end=3680
+ _globals['_SCHEMA_TIMESTAMPINFO']._serialized_start=3682
+ _globals['_SCHEMA_TIMESTAMPINFO']._serialized_end=3716
+ _globals['_SCHEMA_LOCALZONEDTIMESTAMPINFO']._serialized_start=3718
+ _globals['_SCHEMA_LOCALZONEDTIMESTAMPINFO']._serialized_end=3762
+ _globals['_SCHEMA_ZONEDTIMESTAMPINFO']._serialized_start=3764
+ _globals['_SCHEMA_ZONEDTIMESTAMPINFO']._serialized_end=3803
+ _globals['_SCHEMA_DECIMALINFO']._serialized_start=3805
+ _globals['_SCHEMA_DECIMALINFO']._serialized_end=3852
+ _globals['_SCHEMA_BINARYINFO']._serialized_start=3854
+ _globals['_SCHEMA_BINARYINFO']._serialized_end=3882
+ _globals['_SCHEMA_VARBINARYINFO']._serialized_start=3884
+ _globals['_SCHEMA_VARBINARYINFO']._serialized_end=3915
+ _globals['_SCHEMA_CHARINFO']._serialized_start=3917
+ _globals['_SCHEMA_CHARINFO']._serialized_end=3943
+ _globals['_SCHEMA_VARCHARINFO']._serialized_start=3945
+ _globals['_SCHEMA_VARCHARINFO']._serialized_end=3974
+ _globals['_SCHEMA_FIELDTYPE']._serialized_start=3977
+ _globals['_SCHEMA_FIELDTYPE']._serialized_end=5049
+ _globals['_SCHEMA_FIELD']._serialized_start=5051
+ _globals['_SCHEMA_FIELD']._serialized_end=5159
+ _globals['_SCHEMA_TYPENAME']._serialized_start=5162
+ _globals['_SCHEMA_TYPENAME']._serialized_end=5461
+ _globals['_TYPEINFO']._serialized_start=5464
+ _globals['_TYPEINFO']._serialized_end=6811
+ _globals['_TYPEINFO_MAPTYPEINFO']._serialized_start=5958
+ _globals['_TYPEINFO_MAPTYPEINFO']._serialized_end=6097
+ _globals['_TYPEINFO_ROWTYPEINFO']._serialized_start=6100
+ _globals['_TYPEINFO_ROWTYPEINFO']._serialized_end=6284
+ _globals['_TYPEINFO_ROWTYPEINFO_FIELD']._serialized_start=6193
+ _globals['_TYPEINFO_ROWTYPEINFO_FIELD']._serialized_end=6284
+ _globals['_TYPEINFO_TUPLETYPEINFO']._serialized_start=6286
+ _globals['_TYPEINFO_TUPLETYPEINFO']._serialized_end=6366
+ _globals['_TYPEINFO_AVROTYPEINFO']._serialized_start=6368
+ _globals['_TYPEINFO_AVROTYPEINFO']._serialized_end=6398
+ _globals['_TYPEINFO_TYPENAME']._serialized_start=6401
+ _globals['_TYPEINFO_TYPENAME']._serialized_end=6798
+ _globals['_USERDEFINEDDATASTREAMFUNCTION']._serialized_start=6814
+ _globals['_USERDEFINEDDATASTREAMFUNCTION']._serialized_end=7791
+
_globals['_USERDEFINEDDATASTREAMFUNCTION_RUNTIMECONTEXT']._serialized_start=7309
+
_globals['_USERDEFINEDDATASTREAMFUNCTION_RUNTIMECONTEXT']._serialized_end=7615
+
_globals['_USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE']._serialized_start=7618
+ _globals['_USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE']._serialized_end=7791
+ _globals['_STATEDESCRIPTOR']._serialized_start=7794
+ _globals['_STATEDESCRIPTOR']._serialized_end=9686
+ _globals['_STATEDESCRIPTOR_STATETTLCONFIG']._serialized_start=7926
+ _globals['_STATEDESCRIPTOR_STATETTLCONFIG']._serialized_end=9686
+
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES']._serialized_start=8397
+
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES']._serialized_end=9495
+
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_INCREMENTALCLEANUPSTRATEGY']._serialized_start=8575
+
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_INCREMENTALCLEANUPSTRATEGY']._serialized_end=8663
+
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_ROCKSDBCOMPACTFILTERCLEANUPSTRATEGY']._serialized_start=8665
+
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_ROCKSDBCOMPACTFILTERCLEANUPSTRATEGY']._serialized_end=8740
+
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_MAPSTRATEGIESENTRY']._serialized_start=8743
+
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_MAPSTRATEGIESENTRY']._serialized_end=9351
+
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_STRATEGIES']._serialized_start=9353
+
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_STRATEGIES']._serialized_end=9451
+
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_EMPTYCLEANUPSTRATEGY']._serialized_start=9453
+
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_EMPTYCLEANUPSTRATEGY']._serialized_end=9495
+ _globals['_STATEDESCRIPTOR_STATETTLCONFIG_UPDATETYPE']._serialized_start=9497
+ _globals['_STATEDESCRIPTOR_STATETTLCONFIG_UPDATETYPE']._serialized_end=9565
+
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_STATEVISIBILITY']._serialized_start=9567
+
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_STATEVISIBILITY']._serialized_end=9641
+
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_TTLTIMECHARACTERISTIC']._serialized_start=9643
+
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_TTLTIMECHARACTERISTIC']._serialized_end=9686
+ _globals['_CODERINFODESCRIPTOR']._serialized_start=9689
+ _globals['_CODERINFODESCRIPTOR']._serialized_end=10698
+ _globals['_CODERINFODESCRIPTOR_FLATTENROWTYPE']._serialized_start=10282
+ _globals['_CODERINFODESCRIPTOR_FLATTENROWTYPE']._serialized_end=10356
+ _globals['_CODERINFODESCRIPTOR_ROWTYPE']._serialized_start=10358
+ _globals['_CODERINFODESCRIPTOR_ROWTYPE']._serialized_end=10425
+ _globals['_CODERINFODESCRIPTOR_ARROWTYPE']._serialized_start=10427
+ _globals['_CODERINFODESCRIPTOR_ARROWTYPE']._serialized_end=10496
+ _globals['_CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE']._serialized_start=10498
+ _globals['_CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE']._serialized_end=10577
+ _globals['_CODERINFODESCRIPTOR_RAWTYPE']._serialized_start=10579
+ _globals['_CODERINFODESCRIPTOR_RAWTYPE']._serialized_end=10651
+ _globals['_CODERINFODESCRIPTOR_MODE']._serialized_start=10653
+ _globals['_CODERINFODESCRIPTOR_MODE']._serialized_end=10685
# @@protoc_insertion_point(module_scope)
diff --git a/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.pyi
b/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.pyi
index 93f892263fe..b6b77c68946 100644
--- a/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.pyi
+++ b/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.pyi
@@ -70,20 +70,22 @@ class AsyncOptions(_message.Message):
def __init__(self, max_concurrent_operations: _Optional[int] = ...,
timeout_ms: _Optional[int] = ..., retry_enabled: bool = ...,
retry_max_attempts: _Optional[int] = ..., retry_delay_ms: _Optional[int] = ...)
-> None: ...
class UserDefinedFunctions(_message.Message):
- __slots__ = ("udfs", "metric_enabled", "windows", "profile_enabled",
"job_parameters", "async_options")
+ __slots__ = ("udfs", "metric_enabled", "windows", "profile_enabled",
"job_parameters", "async_options", "runtime_context")
UDFS_FIELD_NUMBER: _ClassVar[int]
METRIC_ENABLED_FIELD_NUMBER: _ClassVar[int]
WINDOWS_FIELD_NUMBER: _ClassVar[int]
PROFILE_ENABLED_FIELD_NUMBER: _ClassVar[int]
JOB_PARAMETERS_FIELD_NUMBER: _ClassVar[int]
ASYNC_OPTIONS_FIELD_NUMBER: _ClassVar[int]
+ RUNTIME_CONTEXT_FIELD_NUMBER: _ClassVar[int]
udfs: _containers.RepeatedCompositeFieldContainer[UserDefinedFunction]
metric_enabled: bool
windows: _containers.RepeatedCompositeFieldContainer[OverWindow]
profile_enabled: bool
job_parameters: _containers.RepeatedCompositeFieldContainer[JobParameter]
async_options: AsyncOptions
- def __init__(self, udfs: _Optional[_Iterable[_Union[UserDefinedFunction,
_Mapping]]] = ..., metric_enabled: bool = ..., windows:
_Optional[_Iterable[_Union[OverWindow, _Mapping]]] = ..., profile_enabled: bool
= ..., job_parameters: _Optional[_Iterable[_Union[JobParameter, _Mapping]]] =
..., async_options: _Optional[_Union[AsyncOptions, _Mapping]] = ...) -> None:
...
+ runtime_context: UserDefinedDataStreamFunction.RuntimeContext
+ def __init__(self, udfs: _Optional[_Iterable[_Union[UserDefinedFunction,
_Mapping]]] = ..., metric_enabled: bool = ..., windows:
_Optional[_Iterable[_Union[OverWindow, _Mapping]]] = ..., profile_enabled: bool
= ..., job_parameters: _Optional[_Iterable[_Union[JobParameter, _Mapping]]] =
..., async_options: _Optional[_Union[AsyncOptions, _Mapping]] = ...,
runtime_context: _Optional[_Union[UserDefinedDataStreamFunction.RuntimeContext,
_Mapping]] = ...) -> None: ...
class OverWindow(_message.Message):
__slots__ = ("window_type", "lower_boundary", "upper_boundary")
@@ -195,7 +197,7 @@ class GroupWindow(_message.Message):
def __init__(self, window_type: _Optional[_Union[GroupWindow.WindowType,
str]] = ..., is_time_window: bool = ..., window_slide: _Optional[int] = ...,
window_size: _Optional[int] = ..., window_gap: _Optional[int] = ...,
is_row_time: bool = ..., time_field_index: _Optional[int] = ...,
allowedLateness: _Optional[int] = ..., namedProperties:
_Optional[_Iterable[_Union[GroupWindow.WindowProperty, str]]] = ...,
shift_timezone: _Optional[str] = ...) -> None: ...
class UserDefinedAggregateFunctions(_message.Message):
- __slots__ = ("udfs", "metric_enabled", "grouping",
"generate_update_before", "key_type", "index_of_count_star",
"state_cleaning_enabled", "state_cache_size", "map_state_read_cache_size",
"map_state_write_cache_size", "count_star_inserted", "group_window",
"profile_enabled", "job_parameters")
+ __slots__ = ("udfs", "metric_enabled", "grouping",
"generate_update_before", "key_type", "index_of_count_star",
"state_cleaning_enabled", "state_cache_size", "map_state_read_cache_size",
"map_state_write_cache_size", "count_star_inserted", "group_window",
"profile_enabled", "job_parameters", "runtime_context")
UDFS_FIELD_NUMBER: _ClassVar[int]
METRIC_ENABLED_FIELD_NUMBER: _ClassVar[int]
GROUPING_FIELD_NUMBER: _ClassVar[int]
@@ -210,6 +212,7 @@ class UserDefinedAggregateFunctions(_message.Message):
GROUP_WINDOW_FIELD_NUMBER: _ClassVar[int]
PROFILE_ENABLED_FIELD_NUMBER: _ClassVar[int]
JOB_PARAMETERS_FIELD_NUMBER: _ClassVar[int]
+ RUNTIME_CONTEXT_FIELD_NUMBER: _ClassVar[int]
udfs:
_containers.RepeatedCompositeFieldContainer[UserDefinedAggregateFunction]
metric_enabled: bool
grouping: _containers.RepeatedScalarFieldContainer[int]
@@ -224,7 +227,8 @@ class UserDefinedAggregateFunctions(_message.Message):
group_window: GroupWindow
profile_enabled: bool
job_parameters: _containers.RepeatedCompositeFieldContainer[JobParameter]
- def __init__(self, udfs:
_Optional[_Iterable[_Union[UserDefinedAggregateFunction, _Mapping]]] = ...,
metric_enabled: bool = ..., grouping: _Optional[_Iterable[int]] = ...,
generate_update_before: bool = ..., key_type:
_Optional[_Union[Schema.FieldType, _Mapping]] = ..., index_of_count_star:
_Optional[int] = ..., state_cleaning_enabled: bool = ..., state_cache_size:
_Optional[int] = ..., map_state_read_cache_size: _Optional[int] = ...,
map_state_write_cache_size: _Optional[int] = ..., [...]
+ runtime_context: UserDefinedDataStreamFunction.RuntimeContext
+ def __init__(self, udfs:
_Optional[_Iterable[_Union[UserDefinedAggregateFunction, _Mapping]]] = ...,
metric_enabled: bool = ..., grouping: _Optional[_Iterable[int]] = ...,
generate_update_before: bool = ..., key_type:
_Optional[_Union[Schema.FieldType, _Mapping]] = ..., index_of_count_star:
_Optional[int] = ..., state_cleaning_enabled: bool = ..., state_cache_size:
_Optional[int] = ..., map_state_read_cache_size: _Optional[int] = ...,
map_state_write_cache_size: _Optional[int] = ..., [...]
class Schema(_message.Message):
__slots__ = ("fields",)
diff --git a/flink-python/pyflink/fn_execution/metrics/tests/test_metric.py
b/flink-python/pyflink/fn_execution/metrics/tests/test_metric.py
index 7fba25cb87e..93c6c8ad5d7 100644
--- a/flink-python/pyflink/fn_execution/metrics/tests/test_metric.py
+++ b/flink-python/pyflink/fn_execution/metrics/tests/test_metric.py
@@ -52,6 +52,31 @@ class MetricTests(PyFlinkTestCase):
with self.assertRaises(RuntimeError):
fc.get_metric_group()
+ def test_function_context_runtime_info(self):
+ fc = FunctionContext(
+ None, {},
+ task_name='MyTask',
+ task_name_with_subtasks='MyTask (1/4)',
+ number_of_parallel_subtasks=4,
+ max_number_of_parallel_subtasks=128,
+ index_of_this_subtask=0,
+ attempt_number=2)
+ self.assertEqual('MyTask', fc.get_task_name())
+ self.assertEqual('MyTask (1/4)', fc.get_task_name_with_subtasks())
+ self.assertEqual(4, fc.get_number_of_parallel_subtasks())
+ self.assertEqual(128, fc.get_max_number_of_parallel_subtasks())
+ self.assertEqual(0, fc.get_index_of_this_subtask())
+ self.assertEqual(2, fc.get_attempt_number())
+
+ def test_function_context_runtime_info_defaults(self):
+ fc = FunctionContext(None, {})
+ self.assertIsNone(fc.get_task_name())
+ self.assertIsNone(fc.get_task_name_with_subtasks())
+ self.assertIsNone(fc.get_number_of_parallel_subtasks())
+ self.assertIsNone(fc.get_max_number_of_parallel_subtasks())
+ self.assertIsNone(fc.get_index_of_this_subtask())
+ self.assertIsNone(fc.get_attempt_number())
+
def test_get_metric_name(self):
new_group = MetricTests.base_metric_group.add_group('my_group')
self.assertEqual(
diff --git
a/flink-python/pyflink/fn_execution/table/async_function/operations.py
b/flink-python/pyflink/fn_execution/table/async_function/operations.py
index 44e4e4a70b0..5d536c47496 100644
--- a/flink-python/pyflink/fn_execution/table/async_function/operations.py
+++ b/flink-python/pyflink/fn_execution/table/async_function/operations.py
@@ -74,6 +74,18 @@ class AsyncScalarFunctionOperation(Operation,
AsyncOperationMixin):
# Job parameters
self._job_parameters = {p.key: p.value for p in
serialized_fn.job_parameters}
+ if serialized_fn.HasField('runtime_context'):
+ rc = serialized_fn.runtime_context
+ self._runtime_context = {
+ 'task_name': rc.task_name,
+ 'task_name_with_subtasks': rc.task_name_with_subtasks,
+ 'number_of_parallel_subtasks': rc.number_of_parallel_subtasks,
+ 'max_number_of_parallel_subtasks':
rc.max_number_of_parallel_subtasks,
+ 'index_of_this_subtask': rc.index_of_this_subtask,
+ 'attempt_number': rc.attempt_number,
+ }
+ else:
+ self._runtime_context = {}
def set_output_processor(self, output_processor):
"""Set the output processor for emitting results.
@@ -86,8 +98,9 @@ class AsyncScalarFunctionOperation(Operation,
AsyncOperationMixin):
# Open user defined functions
for user_defined_func in self.user_defined_funcs:
if hasattr(user_defined_func, 'open'):
- user_defined_func.open(
- FunctionContext(self.base_metric_group,
self._job_parameters))
+ user_defined_func.open(FunctionContext(
+ self.base_metric_group, self._job_parameters,
+ **self._runtime_context))
# Start emitter thread to collect async results
self._emitter = Emitter(self._mark_exception, self._output_processor,
self._queue)
diff --git a/flink-python/pyflink/fn_execution/table/operations.py
b/flink-python/pyflink/fn_execution/table/operations.py
index eb8cac4fdcf..5fa132db849 100644
--- a/flink-python/pyflink/fn_execution/table/operations.py
+++ b/flink-python/pyflink/fn_execution/table/operations.py
@@ -85,6 +85,18 @@ class BaseOperation(Operation):
self.base_metric_group = None
self.func, self.user_defined_funcs = self.generate_func(serialized_fn)
self.job_parameters = {p.key: p.value for p in
serialized_fn.job_parameters}
+ if serialized_fn.HasField('runtime_context'):
+ rc = serialized_fn.runtime_context
+ self.runtime_context = {
+ 'task_name': rc.task_name,
+ 'task_name_with_subtasks': rc.task_name_with_subtasks,
+ 'number_of_parallel_subtasks': rc.number_of_parallel_subtasks,
+ 'max_number_of_parallel_subtasks':
rc.max_number_of_parallel_subtasks,
+ 'index_of_this_subtask': rc.index_of_this_subtask,
+ 'attempt_number': rc.attempt_number,
+ }
+ else:
+ self.runtime_context = {}
def finish(self):
self._update_gauge(self.base_metric_group)
@@ -104,7 +116,9 @@ class BaseOperation(Operation):
def open(self):
for user_defined_func in self.user_defined_funcs:
if hasattr(user_defined_func, 'open'):
- user_defined_func.open(FunctionContext(self.base_metric_group,
self.job_parameters))
+ user_defined_func.open(FunctionContext(
+ self.base_metric_group, self.job_parameters,
+ **self.runtime_context))
def close(self):
for user_defined_func in self.user_defined_funcs:
@@ -326,11 +340,25 @@ class
AbstractStreamGroupAggregateOperation(BaseStatefulOperation):
self.state_cleaning_enabled = serialized_fn.state_cleaning_enabled
self.data_view_specs = extract_data_view_specs(serialized_fn.udfs)
self.job_parameters = {p.key: p.value for p in
serialized_fn.job_parameters}
+ if serialized_fn.HasField('runtime_context'):
+ rc = serialized_fn.runtime_context
+ self.runtime_context = {
+ 'task_name': rc.task_name,
+ 'task_name_with_subtasks': rc.task_name_with_subtasks,
+ 'number_of_parallel_subtasks': rc.number_of_parallel_subtasks,
+ 'max_number_of_parallel_subtasks':
rc.max_number_of_parallel_subtasks,
+ 'index_of_this_subtask': rc.index_of_this_subtask,
+ 'attempt_number': rc.attempt_number,
+ }
+ else:
+ self.runtime_context = {}
super(AbstractStreamGroupAggregateOperation, self).__init__(
serialized_fn, keyed_state_backend)
def open(self):
- self.group_agg_function.open(FunctionContext(self.base_metric_group,
self.job_parameters))
+ self.group_agg_function.open(FunctionContext(
+ self.base_metric_group, self.job_parameters,
+ **self.runtime_context))
def close(self):
self.group_agg_function.close()
diff --git a/flink-python/pyflink/proto/flink-fn-execution.proto
b/flink-python/pyflink/proto/flink-fn-execution.proto
index 40b5e915732..4ab5616b011 100644
--- a/flink-python/pyflink/proto/flink-fn-execution.proto
+++ b/flink-python/pyflink/proto/flink-fn-execution.proto
@@ -82,6 +82,9 @@ message UserDefinedFunctions {
bool profile_enabled = 4;
repeated JobParameter job_parameters = 5;
AsyncOptions async_options = 6;
+ // The runtime context of the user-defined functions, providing task info
such as
+ // task_name, parallelism, subtask_index, etc.
+ UserDefinedDataStreamFunction.RuntimeContext runtime_context = 7;
}
// Used to describe the info of over window in pandas batch over window
aggregation
@@ -200,6 +203,9 @@ message UserDefinedAggregateFunctions {
bool profile_enabled = 13;
repeated JobParameter job_parameters = 14;
+ // The runtime context of the user-defined aggregate functions, providing
task info such as
+ // task_name, parallelism, subtask_index, etc.
+ UserDefinedDataStreamFunction.RuntimeContext runtime_context = 15;
}
// A representation of the data schema.
diff --git a/flink-python/pyflink/table/tests/test_udf.py
b/flink-python/pyflink/table/tests/test_udf.py
index 4667702bbbd..182c46bbd9c 100644
--- a/flink-python/pyflink/table/tests/test_udf.py
+++ b/flink-python/pyflink/table/tests/test_udf.py
@@ -266,6 +266,34 @@ class UserDefinedFunctionTests(object):
actual = source_sink_utils.results()
self.assert_equals(actual, ["+I[1, 1]", "+I[2, 4]", "+I[3, 3]"])
+ def test_function_context_runtime_info(self):
+ runtime_info_func = udf(RuntimeInfoFunc(),
result_type=DataTypes.STRING())
+
+ sink_table = generate_random_table_name()
+ sink_table_ddl = f"""
+ CREATE TABLE {sink_table}(a STRING) WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
+
+ t = self.t_env.from_elements([(1,)], ['a'])
+ t.select(runtime_info_func(t.a)).execute_insert(sink_table).wait()
+ actual = source_sink_utils.results()
+ result = actual[0]
+ # The result should contain task_name, number_of_parallel_subtasks, and
+ # index_of_this_subtask info, verifying that FunctionContext runtime
info
+ # is properly propagated from Java to Python.
+ self.assertTrue(result.startswith("+I["))
+ # Extract the value between +I[ and ]
+ value = result[3:-1]
+ parts = value.split(",")
+ self.assertEqual(len(parts), 3)
+ # task_name should be non-empty
+ self.assertTrue(len(parts[0].strip()) > 0)
+ # number_of_parallel_subtasks should be a positive integer
+ self.assertTrue(int(parts[1].strip()) > 0)
+ # index_of_this_subtask should be a non-negative integer
+ self.assertTrue(int(parts[2].strip()) >= 0)
+
def test_udf_without_arguments(self):
one = udf(lambda: 1, result_type=DataTypes.BIGINT(),
deterministic=True)
two = udf(lambda: 2, result_type=DataTypes.BIGINT(),
deterministic=False)
@@ -1147,6 +1175,17 @@ class Subtract(ScalarFunction, unittest.TestCase):
return i - self.subtracted_value
+class RuntimeInfoFunc(ScalarFunction):
+
+ def open(self, function_context: FunctionContext):
+ self.task_name = function_context.get_task_name()
+ self.num_parallel = function_context.get_number_of_parallel_subtasks()
+ self.subtask_index = function_context.get_index_of_this_subtask()
+
+ def eval(self, i):
+ return "%s,%d,%d" % (self.task_name, self.num_parallel,
self.subtask_index)
+
+
class CallablePlus(object):
def __call__(self, col):
diff --git a/flink-python/pyflink/table/udf.py
b/flink-python/pyflink/table/udf.py
index 0c40350777f..92bd180d189 100644
--- a/flink-python/pyflink/table/udf.py
+++ b/flink-python/pyflink/table/udf.py
@@ -36,12 +36,21 @@ class FunctionContext(object):
"""
Used to obtain global runtime information about the context in which the
user-defined function is executed. The information includes the metric
group,
- and global job parameters, etc.
+ global job parameters, and runtime task information such as task name,
parallelism, etc.
"""
- def __init__(self, base_metric_group, job_parameters):
+ def __init__(self, base_metric_group, job_parameters,
+ task_name=None, task_name_with_subtasks=None,
+ number_of_parallel_subtasks=None,
max_number_of_parallel_subtasks=None,
+ index_of_this_subtask=None, attempt_number=None):
self._base_metric_group = base_metric_group
self._job_parameters = job_parameters
+ self._task_name = task_name
+ self._task_name_with_subtasks = task_name_with_subtasks
+ self._number_of_parallel_subtasks = number_of_parallel_subtasks
+ self._max_number_of_parallel_subtasks = max_number_of_parallel_subtasks
+ self._index_of_this_subtask = index_of_this_subtask
+ self._attempt_number = attempt_number
def get_metric_group(self) -> MetricGroup:
"""
@@ -66,6 +75,57 @@ class FunctionContext(object):
"""
return self._job_parameters[key] if key in self._job_parameters else
default_value
+ def get_task_name(self) -> str:
+ """
+ Returns the name of the task in which the UDF runs, as assigned during
plan construction.
+
+ .. versionadded:: 2.3.0
+ """
+ return self._task_name
+
+ def get_task_name_with_subtasks(self) -> str:
+ """
+ Returns the name of the task, appended with the subtask indicator,
such as "MyTask (3/6)",
+ where 3 would be (:func:`get_index_of_this_subtask` + 1), and 6 would
be
+ :func:`get_number_of_parallel_subtasks`.
+
+ .. versionadded:: 2.3.0
+ """
+ return self._task_name_with_subtasks
+
+ def get_number_of_parallel_subtasks(self) -> int:
+ """
+ Gets the parallelism with which the parallel task runs.
+
+ .. versionadded:: 2.3.0
+ """
+ return self._number_of_parallel_subtasks
+
+ def get_max_number_of_parallel_subtasks(self) -> int:
+ """
+ Gets the number of max-parallelism with which the parallel task runs.
+
+ .. versionadded:: 2.3.0
+ """
+ return self._max_number_of_parallel_subtasks
+
+ def get_index_of_this_subtask(self) -> int:
+ """
+ Gets the number of this parallel subtask. The numbering starts from 0
and goes up to
+ parallelism-1 (parallelism as returned by
:func:`get_number_of_parallel_subtasks`).
+
+ .. versionadded:: 2.3.0
+ """
+ return self._index_of_this_subtask
+
+ def get_attempt_number(self) -> int:
+ """
+ Gets the attempt number of this parallel subtask. First attempt is
numbered 0.
+
+ .. versionadded:: 2.3.0
+ """
+ return self._attempt_number
+
@PublicEvolving()
class UserDefinedFunction(abc.ABC):
diff --git
a/flink-python/src/main/java/org/apache/flink/python/util/ProtoUtils.java
b/flink-python/src/main/java/org/apache/flink/python/util/ProtoUtils.java
index a752c69c011..2745b36cb56 100644
--- a/flink-python/src/main/java/org/apache/flink/python/util/ProtoUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/python/util/ProtoUtils.java
@@ -154,6 +154,18 @@ public enum ProtoUtils {
.setValue(entry.getValue())
.build())
.collect(Collectors.toList()));
+ builder.setRuntimeContext(
+
FlinkFnApi.UserDefinedDataStreamFunction.RuntimeContext.newBuilder()
+
.setTaskName(runtimeContext.getTaskInfo().getTaskName())
+ .setTaskNameWithSubtasks(
+
runtimeContext.getTaskInfo().getTaskNameWithSubtasks())
+ .setNumberOfParallelSubtasks(
+
runtimeContext.getTaskInfo().getNumberOfParallelSubtasks())
+ .setMaxNumberOfParallelSubtasks(
+
runtimeContext.getTaskInfo().getMaxNumberOfParallelSubtasks())
+
.setIndexOfThisSubtask(runtimeContext.getTaskInfo().getIndexOfThisSubtask())
+
.setAttemptNumber(runtimeContext.getTaskInfo().getAttemptNumber())
+ .build());
return builder.build();
}
diff --git
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java
index 64381c06582..0298916b6d8 100644
---
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java
+++
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java
@@ -264,6 +264,19 @@ public abstract class AbstractPythonStreamAggregateOperator
.setValue(entry.getValue())
.build())
.collect(Collectors.toList()));
+ builder.setRuntimeContext(
+
FlinkFnApi.UserDefinedDataStreamFunction.RuntimeContext.newBuilder()
+
.setTaskName(getRuntimeContext().getTaskInfo().getTaskName())
+ .setTaskNameWithSubtasks(
+
getRuntimeContext().getTaskInfo().getTaskNameWithSubtasks())
+ .setNumberOfParallelSubtasks(
+
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks())
+ .setMaxNumberOfParallelSubtasks(
+
getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks())
+ .setIndexOfThisSubtask(
+
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask())
+
.setAttemptNumber(getRuntimeContext().getTaskInfo().getAttemptNumber())
+ .build());
return builder.build();
}
diff --git
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.java
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.java
index 616908205e2..25ce45dc1b0 100644
---
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.java
+++
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.java
@@ -273,6 +273,19 @@ public class
BatchArrowPythonOverWindowAggregateFunctionOperator
.setValue(entry.getValue())
.build())
.collect(Collectors.toList()));
+ builder.setRuntimeContext(
+
FlinkFnApi.UserDefinedDataStreamFunction.RuntimeContext.newBuilder()
+
.setTaskName(getRuntimeContext().getTaskInfo().getTaskName())
+ .setTaskNameWithSubtasks(
+
getRuntimeContext().getTaskInfo().getTaskNameWithSubtasks())
+ .setNumberOfParallelSubtasks(
+
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks())
+ .setMaxNumberOfParallelSubtasks(
+
getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks())
+ .setIndexOfThisSubtask(
+
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask())
+
.setAttemptNumber(getRuntimeContext().getTaskInfo().getAttemptNumber())
+ .build());
// add windows
for (int i = 0; i < lowerBoundary.length; i++) {
FlinkFnApi.OverWindow.Builder windowBuilder =
FlinkFnApi.OverWindow.newBuilder();