This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d62be55c09a [FLINK-38933][python] Expose runtime context information 
in Python UDX FunctionContext
d62be55c09a is described below

commit d62be55c09a7beb85e6517312233127cd85e7dce
Author: yongliu <[email protected]>
AuthorDate: Thu Mar 26 11:47:39 2026 +0800

    [FLINK-38933][python] Expose runtime context information in Python UDX 
FunctionContext
    
    This aligns Python UDX with Java UDX by exposing runtime context 
information through FunctionContext. Added the following getter methods to 
FunctionContext:
    - get_task_name()
    - get_task_name_with_subtasks()
    - get_number_of_parallel_subtasks()
    - get_max_number_of_parallel_subtasks()
    - get_index_of_this_subtask()
    - get_attempt_number()
    
    The runtime context is propagated from Java operators via the protobuf 
protocol by adding a runtime_context field to UserDefinedFunctions and 
UserDefinedAggregateFunctions messages.
    
    This closes #27831.
---
 .../pyflink/fn_execution/flink_fn_execution_pb2.py | 212 ++++++++++-----------
 .../fn_execution/flink_fn_execution_pb2.pyi        |  12 +-
 .../fn_execution/metrics/tests/test_metric.py      |  25 +++
 .../table/async_function/operations.py             |  17 +-
 .../pyflink/fn_execution/table/operations.py       |  32 +++-
 .../pyflink/proto/flink-fn-execution.proto         |   6 +
 flink-python/pyflink/table/tests/test_udf.py       |  39 ++++
 flink-python/pyflink/table/udf.py                  |  64 ++++++-
 .../org/apache/flink/python/util/ProtoUtils.java   |  12 ++
 .../AbstractPythonStreamAggregateOperator.java     |  13 ++
 ...wPythonOverWindowAggregateFunctionOperator.java |  13 ++
 11 files changed, 329 insertions(+), 116 deletions(-)

diff --git a/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py 
b/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
index 5356b9f8738..96984f80d5c 100644
--- a/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
+++ b/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
@@ -31,7 +31,7 @@ _sym_db = _symbol_database.Default()
 
 
 
-DESCRIPTOR = 
_descriptor_pool.Default().AddSerializedFile(b'\n\x18\x66link-fn-execution.proto\x12
 org.apache.flink.fn_execution.v1\"*\n\x0cJobParameter\x12\x0b\n\x03key\x18\x01 
\x01(\t\x12\r\n\x05value\x18\x02 
\x01(\t\"\x86\x01\n\x05Input\x12\x44\n\x03udf\x18\x01 
\x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunctionH\x00\x12\x15\n\x0binputOffset\x18\x02
 \x01(\x05H\x00\x12\x17\n\rinputConstant\x18\x03 
\x01(\x0cH\x00\x42\x07\n\x05input\"\xa8\x01\n\x13UserDefinedFunction\x1 [...]
+DESCRIPTOR = 
_descriptor_pool.Default().AddSerializedFile(b'\n\x18\x66link-fn-execution.proto\x12
 org.apache.flink.fn_execution.v1\"*\n\x0cJobParameter\x12\x0b\n\x03key\x18\x01 
\x01(\t\x12\r\n\x05value\x18\x02 
\x01(\t\"\x86\x01\n\x05Input\x12\x44\n\x03udf\x18\x01 
\x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunctionH\x00\x12\x15\n\x0binputOffset\x18\x02
 \x01(\x05H\x00\x12\x17\n\rinputConstant\x18\x03 
\x01(\x0cH\x00\x42\x07\n\x05input\"\xa8\x01\n\x13UserDefinedFunction\x1 [...]
 
 _globals = globals()
 _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
@@ -48,109 +48,109 @@ if _descriptor._USE_C_DESCRIPTORS == False:
   _globals['_ASYNCOPTIONS']._serialized_start=415
   _globals['_ASYNCOPTIONS']._serialized_end=559
   _globals['_USERDEFINEDFUNCTIONS']._serialized_start=562
-  _globals['_USERDEFINEDFUNCTIONS']._serialized_end=908
-  _globals['_OVERWINDOW']._serialized_start=911
-  _globals['_OVERWINDOW']._serialized_end=1260
-  _globals['_OVERWINDOW_WINDOWTYPE']._serialized_start=1052
-  _globals['_OVERWINDOW_WINDOWTYPE']._serialized_end=1260
-  _globals['_USERDEFINEDAGGREGATEFUNCTION']._serialized_start=1263
-  _globals['_USERDEFINEDAGGREGATEFUNCTION']._serialized_end=2042
-  _globals['_USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC']._serialized_start=1528
-  _globals['_USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC']._serialized_end=2042
-  
_globals['_USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_LISTVIEW']._serialized_start=1791
-  
_globals['_USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_LISTVIEW']._serialized_end=1875
-  
_globals['_USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_MAPVIEW']._serialized_start=1878
-  
_globals['_USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_MAPVIEW']._serialized_end=2029
-  _globals['_GROUPWINDOW']._serialized_start=2045
-  _globals['_GROUPWINDOW']._serialized_end=2601
-  _globals['_GROUPWINDOW_WINDOWTYPE']._serialized_start=2409
-  _globals['_GROUPWINDOW_WINDOWTYPE']._serialized_end=2500
-  _globals['_GROUPWINDOW_WINDOWPROPERTY']._serialized_start=2502
-  _globals['_GROUPWINDOW_WINDOWPROPERTY']._serialized_end=2601
-  _globals['_USERDEFINEDAGGREGATEFUNCTIONS']._serialized_start=2604
-  _globals['_USERDEFINEDAGGREGATEFUNCTIONS']._serialized_end=3210
-  _globals['_SCHEMA']._serialized_start=3213
-  _globals['_SCHEMA']._serialized_end=5251
-  _globals['_SCHEMA_MAPINFO']._serialized_start=3288
-  _globals['_SCHEMA_MAPINFO']._serialized_end=3439
-  _globals['_SCHEMA_TIMEINFO']._serialized_start=3441
-  _globals['_SCHEMA_TIMEINFO']._serialized_end=3470
-  _globals['_SCHEMA_TIMESTAMPINFO']._serialized_start=3472
-  _globals['_SCHEMA_TIMESTAMPINFO']._serialized_end=3506
-  _globals['_SCHEMA_LOCALZONEDTIMESTAMPINFO']._serialized_start=3508
-  _globals['_SCHEMA_LOCALZONEDTIMESTAMPINFO']._serialized_end=3552
-  _globals['_SCHEMA_ZONEDTIMESTAMPINFO']._serialized_start=3554
-  _globals['_SCHEMA_ZONEDTIMESTAMPINFO']._serialized_end=3593
-  _globals['_SCHEMA_DECIMALINFO']._serialized_start=3595
-  _globals['_SCHEMA_DECIMALINFO']._serialized_end=3642
-  _globals['_SCHEMA_BINARYINFO']._serialized_start=3644
-  _globals['_SCHEMA_BINARYINFO']._serialized_end=3672
-  _globals['_SCHEMA_VARBINARYINFO']._serialized_start=3674
-  _globals['_SCHEMA_VARBINARYINFO']._serialized_end=3705
-  _globals['_SCHEMA_CHARINFO']._serialized_start=3707
-  _globals['_SCHEMA_CHARINFO']._serialized_end=3733
-  _globals['_SCHEMA_VARCHARINFO']._serialized_start=3735
-  _globals['_SCHEMA_VARCHARINFO']._serialized_end=3764
-  _globals['_SCHEMA_FIELDTYPE']._serialized_start=3767
-  _globals['_SCHEMA_FIELDTYPE']._serialized_end=4839
-  _globals['_SCHEMA_FIELD']._serialized_start=4841
-  _globals['_SCHEMA_FIELD']._serialized_end=4949
-  _globals['_SCHEMA_TYPENAME']._serialized_start=4952
-  _globals['_SCHEMA_TYPENAME']._serialized_end=5251
-  _globals['_TYPEINFO']._serialized_start=5254
-  _globals['_TYPEINFO']._serialized_end=6601
-  _globals['_TYPEINFO_MAPTYPEINFO']._serialized_start=5748
-  _globals['_TYPEINFO_MAPTYPEINFO']._serialized_end=5887
-  _globals['_TYPEINFO_ROWTYPEINFO']._serialized_start=5890
-  _globals['_TYPEINFO_ROWTYPEINFO']._serialized_end=6074
-  _globals['_TYPEINFO_ROWTYPEINFO_FIELD']._serialized_start=5983
-  _globals['_TYPEINFO_ROWTYPEINFO_FIELD']._serialized_end=6074
-  _globals['_TYPEINFO_TUPLETYPEINFO']._serialized_start=6076
-  _globals['_TYPEINFO_TUPLETYPEINFO']._serialized_end=6156
-  _globals['_TYPEINFO_AVROTYPEINFO']._serialized_start=6158
-  _globals['_TYPEINFO_AVROTYPEINFO']._serialized_end=6188
-  _globals['_TYPEINFO_TYPENAME']._serialized_start=6191
-  _globals['_TYPEINFO_TYPENAME']._serialized_end=6588
-  _globals['_USERDEFINEDDATASTREAMFUNCTION']._serialized_start=6604
-  _globals['_USERDEFINEDDATASTREAMFUNCTION']._serialized_end=7581
-  
_globals['_USERDEFINEDDATASTREAMFUNCTION_RUNTIMECONTEXT']._serialized_start=7099
-  
_globals['_USERDEFINEDDATASTREAMFUNCTION_RUNTIMECONTEXT']._serialized_end=7405
-  
_globals['_USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE']._serialized_start=7408
-  _globals['_USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE']._serialized_end=7581
-  _globals['_STATEDESCRIPTOR']._serialized_start=7584
-  _globals['_STATEDESCRIPTOR']._serialized_end=9476
-  _globals['_STATEDESCRIPTOR_STATETTLCONFIG']._serialized_start=7716
-  _globals['_STATEDESCRIPTOR_STATETTLCONFIG']._serialized_end=9476
-  
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES']._serialized_start=8187
-  
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES']._serialized_end=9285
-  
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_INCREMENTALCLEANUPSTRATEGY']._serialized_start=8365
-  
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_INCREMENTALCLEANUPSTRATEGY']._serialized_end=8453
-  
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_ROCKSDBCOMPACTFILTERCLEANUPSTRATEGY']._serialized_start=8455
-  
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_ROCKSDBCOMPACTFILTERCLEANUPSTRATEGY']._serialized_end=8530
-  
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_MAPSTRATEGIESENTRY']._serialized_start=8533
-  
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_MAPSTRATEGIESENTRY']._serialized_end=9141
-  
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_STRATEGIES']._serialized_start=9143
-  
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_STRATEGIES']._serialized_end=9241
-  
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_EMPTYCLEANUPSTRATEGY']._serialized_start=9243
-  
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_EMPTYCLEANUPSTRATEGY']._serialized_end=9285
-  _globals['_STATEDESCRIPTOR_STATETTLCONFIG_UPDATETYPE']._serialized_start=9287
-  _globals['_STATEDESCRIPTOR_STATETTLCONFIG_UPDATETYPE']._serialized_end=9355
-  
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_STATEVISIBILITY']._serialized_start=9357
-  
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_STATEVISIBILITY']._serialized_end=9431
-  
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_TTLTIMECHARACTERISTIC']._serialized_start=9433
-  
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_TTLTIMECHARACTERISTIC']._serialized_end=9476
-  _globals['_CODERINFODESCRIPTOR']._serialized_start=9479
-  _globals['_CODERINFODESCRIPTOR']._serialized_end=10488
-  _globals['_CODERINFODESCRIPTOR_FLATTENROWTYPE']._serialized_start=10072
-  _globals['_CODERINFODESCRIPTOR_FLATTENROWTYPE']._serialized_end=10146
-  _globals['_CODERINFODESCRIPTOR_ROWTYPE']._serialized_start=10148
-  _globals['_CODERINFODESCRIPTOR_ROWTYPE']._serialized_end=10215
-  _globals['_CODERINFODESCRIPTOR_ARROWTYPE']._serialized_start=10217
-  _globals['_CODERINFODESCRIPTOR_ARROWTYPE']._serialized_end=10286
-  _globals['_CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE']._serialized_start=10288
-  _globals['_CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE']._serialized_end=10367
-  _globals['_CODERINFODESCRIPTOR_RAWTYPE']._serialized_start=10369
-  _globals['_CODERINFODESCRIPTOR_RAWTYPE']._serialized_end=10441
-  _globals['_CODERINFODESCRIPTOR_MODE']._serialized_start=10443
-  _globals['_CODERINFODESCRIPTOR_MODE']._serialized_end=10475
+  _globals['_USERDEFINEDFUNCTIONS']._serialized_end=1013
+  _globals['_OVERWINDOW']._serialized_start=1016
+  _globals['_OVERWINDOW']._serialized_end=1365
+  _globals['_OVERWINDOW_WINDOWTYPE']._serialized_start=1157
+  _globals['_OVERWINDOW_WINDOWTYPE']._serialized_end=1365
+  _globals['_USERDEFINEDAGGREGATEFUNCTION']._serialized_start=1368
+  _globals['_USERDEFINEDAGGREGATEFUNCTION']._serialized_end=2147
+  _globals['_USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC']._serialized_start=1633
+  _globals['_USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC']._serialized_end=2147
+  
_globals['_USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_LISTVIEW']._serialized_start=1896
+  
_globals['_USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_LISTVIEW']._serialized_end=1980
+  
_globals['_USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_MAPVIEW']._serialized_start=1983
+  
_globals['_USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_MAPVIEW']._serialized_end=2134
+  _globals['_GROUPWINDOW']._serialized_start=2150
+  _globals['_GROUPWINDOW']._serialized_end=2706
+  _globals['_GROUPWINDOW_WINDOWTYPE']._serialized_start=2514
+  _globals['_GROUPWINDOW_WINDOWTYPE']._serialized_end=2605
+  _globals['_GROUPWINDOW_WINDOWPROPERTY']._serialized_start=2607
+  _globals['_GROUPWINDOW_WINDOWPROPERTY']._serialized_end=2706
+  _globals['_USERDEFINEDAGGREGATEFUNCTIONS']._serialized_start=2709
+  _globals['_USERDEFINEDAGGREGATEFUNCTIONS']._serialized_end=3420
+  _globals['_SCHEMA']._serialized_start=3423
+  _globals['_SCHEMA']._serialized_end=5461
+  _globals['_SCHEMA_MAPINFO']._serialized_start=3498
+  _globals['_SCHEMA_MAPINFO']._serialized_end=3649
+  _globals['_SCHEMA_TIMEINFO']._serialized_start=3651
+  _globals['_SCHEMA_TIMEINFO']._serialized_end=3680
+  _globals['_SCHEMA_TIMESTAMPINFO']._serialized_start=3682
+  _globals['_SCHEMA_TIMESTAMPINFO']._serialized_end=3716
+  _globals['_SCHEMA_LOCALZONEDTIMESTAMPINFO']._serialized_start=3718
+  _globals['_SCHEMA_LOCALZONEDTIMESTAMPINFO']._serialized_end=3762
+  _globals['_SCHEMA_ZONEDTIMESTAMPINFO']._serialized_start=3764
+  _globals['_SCHEMA_ZONEDTIMESTAMPINFO']._serialized_end=3803
+  _globals['_SCHEMA_DECIMALINFO']._serialized_start=3805
+  _globals['_SCHEMA_DECIMALINFO']._serialized_end=3852
+  _globals['_SCHEMA_BINARYINFO']._serialized_start=3854
+  _globals['_SCHEMA_BINARYINFO']._serialized_end=3882
+  _globals['_SCHEMA_VARBINARYINFO']._serialized_start=3884
+  _globals['_SCHEMA_VARBINARYINFO']._serialized_end=3915
+  _globals['_SCHEMA_CHARINFO']._serialized_start=3917
+  _globals['_SCHEMA_CHARINFO']._serialized_end=3943
+  _globals['_SCHEMA_VARCHARINFO']._serialized_start=3945
+  _globals['_SCHEMA_VARCHARINFO']._serialized_end=3974
+  _globals['_SCHEMA_FIELDTYPE']._serialized_start=3977
+  _globals['_SCHEMA_FIELDTYPE']._serialized_end=5049
+  _globals['_SCHEMA_FIELD']._serialized_start=5051
+  _globals['_SCHEMA_FIELD']._serialized_end=5159
+  _globals['_SCHEMA_TYPENAME']._serialized_start=5162
+  _globals['_SCHEMA_TYPENAME']._serialized_end=5461
+  _globals['_TYPEINFO']._serialized_start=5464
+  _globals['_TYPEINFO']._serialized_end=6811
+  _globals['_TYPEINFO_MAPTYPEINFO']._serialized_start=5958
+  _globals['_TYPEINFO_MAPTYPEINFO']._serialized_end=6097
+  _globals['_TYPEINFO_ROWTYPEINFO']._serialized_start=6100
+  _globals['_TYPEINFO_ROWTYPEINFO']._serialized_end=6284
+  _globals['_TYPEINFO_ROWTYPEINFO_FIELD']._serialized_start=6193
+  _globals['_TYPEINFO_ROWTYPEINFO_FIELD']._serialized_end=6284
+  _globals['_TYPEINFO_TUPLETYPEINFO']._serialized_start=6286
+  _globals['_TYPEINFO_TUPLETYPEINFO']._serialized_end=6366
+  _globals['_TYPEINFO_AVROTYPEINFO']._serialized_start=6368
+  _globals['_TYPEINFO_AVROTYPEINFO']._serialized_end=6398
+  _globals['_TYPEINFO_TYPENAME']._serialized_start=6401
+  _globals['_TYPEINFO_TYPENAME']._serialized_end=6798
+  _globals['_USERDEFINEDDATASTREAMFUNCTION']._serialized_start=6814
+  _globals['_USERDEFINEDDATASTREAMFUNCTION']._serialized_end=7791
+  
_globals['_USERDEFINEDDATASTREAMFUNCTION_RUNTIMECONTEXT']._serialized_start=7309
+  
_globals['_USERDEFINEDDATASTREAMFUNCTION_RUNTIMECONTEXT']._serialized_end=7615
+  
_globals['_USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE']._serialized_start=7618
+  _globals['_USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE']._serialized_end=7791
+  _globals['_STATEDESCRIPTOR']._serialized_start=7794
+  _globals['_STATEDESCRIPTOR']._serialized_end=9686
+  _globals['_STATEDESCRIPTOR_STATETTLCONFIG']._serialized_start=7926
+  _globals['_STATEDESCRIPTOR_STATETTLCONFIG']._serialized_end=9686
+  
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES']._serialized_start=8397
+  
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES']._serialized_end=9495
+  
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_INCREMENTALCLEANUPSTRATEGY']._serialized_start=8575
+  
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_INCREMENTALCLEANUPSTRATEGY']._serialized_end=8663
+  
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_ROCKSDBCOMPACTFILTERCLEANUPSTRATEGY']._serialized_start=8665
+  
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_ROCKSDBCOMPACTFILTERCLEANUPSTRATEGY']._serialized_end=8740
+  
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_MAPSTRATEGIESENTRY']._serialized_start=8743
+  
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_MAPSTRATEGIESENTRY']._serialized_end=9351
+  
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_STRATEGIES']._serialized_start=9353
+  
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_STRATEGIES']._serialized_end=9451
+  
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_EMPTYCLEANUPSTRATEGY']._serialized_start=9453
+  
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_EMPTYCLEANUPSTRATEGY']._serialized_end=9495
+  _globals['_STATEDESCRIPTOR_STATETTLCONFIG_UPDATETYPE']._serialized_start=9497
+  _globals['_STATEDESCRIPTOR_STATETTLCONFIG_UPDATETYPE']._serialized_end=9565
+  
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_STATEVISIBILITY']._serialized_start=9567
+  
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_STATEVISIBILITY']._serialized_end=9641
+  
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_TTLTIMECHARACTERISTIC']._serialized_start=9643
+  
_globals['_STATEDESCRIPTOR_STATETTLCONFIG_TTLTIMECHARACTERISTIC']._serialized_end=9686
+  _globals['_CODERINFODESCRIPTOR']._serialized_start=9689
+  _globals['_CODERINFODESCRIPTOR']._serialized_end=10698
+  _globals['_CODERINFODESCRIPTOR_FLATTENROWTYPE']._serialized_start=10282
+  _globals['_CODERINFODESCRIPTOR_FLATTENROWTYPE']._serialized_end=10356
+  _globals['_CODERINFODESCRIPTOR_ROWTYPE']._serialized_start=10358
+  _globals['_CODERINFODESCRIPTOR_ROWTYPE']._serialized_end=10425
+  _globals['_CODERINFODESCRIPTOR_ARROWTYPE']._serialized_start=10427
+  _globals['_CODERINFODESCRIPTOR_ARROWTYPE']._serialized_end=10496
+  _globals['_CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE']._serialized_start=10498
+  _globals['_CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE']._serialized_end=10577
+  _globals['_CODERINFODESCRIPTOR_RAWTYPE']._serialized_start=10579
+  _globals['_CODERINFODESCRIPTOR_RAWTYPE']._serialized_end=10651
+  _globals['_CODERINFODESCRIPTOR_MODE']._serialized_start=10653
+  _globals['_CODERINFODESCRIPTOR_MODE']._serialized_end=10685
 # @@protoc_insertion_point(module_scope)
diff --git a/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.pyi 
b/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.pyi
index 93f892263fe..b6b77c68946 100644
--- a/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.pyi
+++ b/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.pyi
@@ -70,20 +70,22 @@ class AsyncOptions(_message.Message):
     def __init__(self, max_concurrent_operations: _Optional[int] = ..., 
timeout_ms: _Optional[int] = ..., retry_enabled: bool = ..., 
retry_max_attempts: _Optional[int] = ..., retry_delay_ms: _Optional[int] = ...) 
-> None: ...
 
 class UserDefinedFunctions(_message.Message):
-    __slots__ = ("udfs", "metric_enabled", "windows", "profile_enabled", 
"job_parameters", "async_options")
+    __slots__ = ("udfs", "metric_enabled", "windows", "profile_enabled", 
"job_parameters", "async_options", "runtime_context")
     UDFS_FIELD_NUMBER: _ClassVar[int]
     METRIC_ENABLED_FIELD_NUMBER: _ClassVar[int]
     WINDOWS_FIELD_NUMBER: _ClassVar[int]
     PROFILE_ENABLED_FIELD_NUMBER: _ClassVar[int]
     JOB_PARAMETERS_FIELD_NUMBER: _ClassVar[int]
     ASYNC_OPTIONS_FIELD_NUMBER: _ClassVar[int]
+    RUNTIME_CONTEXT_FIELD_NUMBER: _ClassVar[int]
     udfs: _containers.RepeatedCompositeFieldContainer[UserDefinedFunction]
     metric_enabled: bool
     windows: _containers.RepeatedCompositeFieldContainer[OverWindow]
     profile_enabled: bool
     job_parameters: _containers.RepeatedCompositeFieldContainer[JobParameter]
     async_options: AsyncOptions
-    def __init__(self, udfs: _Optional[_Iterable[_Union[UserDefinedFunction, 
_Mapping]]] = ..., metric_enabled: bool = ..., windows: 
_Optional[_Iterable[_Union[OverWindow, _Mapping]]] = ..., profile_enabled: bool 
= ..., job_parameters: _Optional[_Iterable[_Union[JobParameter, _Mapping]]] = 
..., async_options: _Optional[_Union[AsyncOptions, _Mapping]] = ...) -> None: 
...
+    runtime_context: UserDefinedDataStreamFunction.RuntimeContext
+    def __init__(self, udfs: _Optional[_Iterable[_Union[UserDefinedFunction, 
_Mapping]]] = ..., metric_enabled: bool = ..., windows: 
_Optional[_Iterable[_Union[OverWindow, _Mapping]]] = ..., profile_enabled: bool 
= ..., job_parameters: _Optional[_Iterable[_Union[JobParameter, _Mapping]]] = 
..., async_options: _Optional[_Union[AsyncOptions, _Mapping]] = ..., 
runtime_context: _Optional[_Union[UserDefinedDataStreamFunction.RuntimeContext, 
_Mapping]] = ...) -> None: ...
 
 class OverWindow(_message.Message):
     __slots__ = ("window_type", "lower_boundary", "upper_boundary")
@@ -195,7 +197,7 @@ class GroupWindow(_message.Message):
     def __init__(self, window_type: _Optional[_Union[GroupWindow.WindowType, 
str]] = ..., is_time_window: bool = ..., window_slide: _Optional[int] = ..., 
window_size: _Optional[int] = ..., window_gap: _Optional[int] = ..., 
is_row_time: bool = ..., time_field_index: _Optional[int] = ..., 
allowedLateness: _Optional[int] = ..., namedProperties: 
_Optional[_Iterable[_Union[GroupWindow.WindowProperty, str]]] = ..., 
shift_timezone: _Optional[str] = ...) -> None: ...
 
 class UserDefinedAggregateFunctions(_message.Message):
-    __slots__ = ("udfs", "metric_enabled", "grouping", 
"generate_update_before", "key_type", "index_of_count_star", 
"state_cleaning_enabled", "state_cache_size", "map_state_read_cache_size", 
"map_state_write_cache_size", "count_star_inserted", "group_window", 
"profile_enabled", "job_parameters")
+    __slots__ = ("udfs", "metric_enabled", "grouping", 
"generate_update_before", "key_type", "index_of_count_star", 
"state_cleaning_enabled", "state_cache_size", "map_state_read_cache_size", 
"map_state_write_cache_size", "count_star_inserted", "group_window", 
"profile_enabled", "job_parameters", "runtime_context")
     UDFS_FIELD_NUMBER: _ClassVar[int]
     METRIC_ENABLED_FIELD_NUMBER: _ClassVar[int]
     GROUPING_FIELD_NUMBER: _ClassVar[int]
@@ -210,6 +212,7 @@ class UserDefinedAggregateFunctions(_message.Message):
     GROUP_WINDOW_FIELD_NUMBER: _ClassVar[int]
     PROFILE_ENABLED_FIELD_NUMBER: _ClassVar[int]
     JOB_PARAMETERS_FIELD_NUMBER: _ClassVar[int]
+    RUNTIME_CONTEXT_FIELD_NUMBER: _ClassVar[int]
     udfs: 
_containers.RepeatedCompositeFieldContainer[UserDefinedAggregateFunction]
     metric_enabled: bool
     grouping: _containers.RepeatedScalarFieldContainer[int]
@@ -224,7 +227,8 @@ class UserDefinedAggregateFunctions(_message.Message):
     group_window: GroupWindow
     profile_enabled: bool
     job_parameters: _containers.RepeatedCompositeFieldContainer[JobParameter]
-    def __init__(self, udfs: 
_Optional[_Iterable[_Union[UserDefinedAggregateFunction, _Mapping]]] = ..., 
metric_enabled: bool = ..., grouping: _Optional[_Iterable[int]] = ..., 
generate_update_before: bool = ..., key_type: 
_Optional[_Union[Schema.FieldType, _Mapping]] = ..., index_of_count_star: 
_Optional[int] = ..., state_cleaning_enabled: bool = ..., state_cache_size: 
_Optional[int] = ..., map_state_read_cache_size: _Optional[int] = ..., 
map_state_write_cache_size: _Optional[int] = ..., [...]
+    runtime_context: UserDefinedDataStreamFunction.RuntimeContext
+    def __init__(self, udfs: 
_Optional[_Iterable[_Union[UserDefinedAggregateFunction, _Mapping]]] = ..., 
metric_enabled: bool = ..., grouping: _Optional[_Iterable[int]] = ..., 
generate_update_before: bool = ..., key_type: 
_Optional[_Union[Schema.FieldType, _Mapping]] = ..., index_of_count_star: 
_Optional[int] = ..., state_cleaning_enabled: bool = ..., state_cache_size: 
_Optional[int] = ..., map_state_read_cache_size: _Optional[int] = ..., 
map_state_write_cache_size: _Optional[int] = ..., [...]
 
 class Schema(_message.Message):
     __slots__ = ("fields",)
diff --git a/flink-python/pyflink/fn_execution/metrics/tests/test_metric.py 
b/flink-python/pyflink/fn_execution/metrics/tests/test_metric.py
index 7fba25cb87e..93c6c8ad5d7 100644
--- a/flink-python/pyflink/fn_execution/metrics/tests/test_metric.py
+++ b/flink-python/pyflink/fn_execution/metrics/tests/test_metric.py
@@ -52,6 +52,31 @@ class MetricTests(PyFlinkTestCase):
         with self.assertRaises(RuntimeError):
             fc.get_metric_group()
 
+    def test_function_context_runtime_info(self):
+        fc = FunctionContext(
+            None, {},
+            task_name='MyTask',
+            task_name_with_subtasks='MyTask (1/4)',
+            number_of_parallel_subtasks=4,
+            max_number_of_parallel_subtasks=128,
+            index_of_this_subtask=0,
+            attempt_number=2)
+        self.assertEqual('MyTask', fc.get_task_name())
+        self.assertEqual('MyTask (1/4)', fc.get_task_name_with_subtasks())
+        self.assertEqual(4, fc.get_number_of_parallel_subtasks())
+        self.assertEqual(128, fc.get_max_number_of_parallel_subtasks())
+        self.assertEqual(0, fc.get_index_of_this_subtask())
+        self.assertEqual(2, fc.get_attempt_number())
+
+    def test_function_context_runtime_info_defaults(self):
+        fc = FunctionContext(None, {})
+        self.assertIsNone(fc.get_task_name())
+        self.assertIsNone(fc.get_task_name_with_subtasks())
+        self.assertIsNone(fc.get_number_of_parallel_subtasks())
+        self.assertIsNone(fc.get_max_number_of_parallel_subtasks())
+        self.assertIsNone(fc.get_index_of_this_subtask())
+        self.assertIsNone(fc.get_attempt_number())
+
     def test_get_metric_name(self):
         new_group = MetricTests.base_metric_group.add_group('my_group')
         self.assertEqual(
diff --git 
a/flink-python/pyflink/fn_execution/table/async_function/operations.py 
b/flink-python/pyflink/fn_execution/table/async_function/operations.py
index 44e4e4a70b0..5d536c47496 100644
--- a/flink-python/pyflink/fn_execution/table/async_function/operations.py
+++ b/flink-python/pyflink/fn_execution/table/async_function/operations.py
@@ -74,6 +74,18 @@ class AsyncScalarFunctionOperation(Operation, 
AsyncOperationMixin):
 
         # Job parameters
         self._job_parameters = {p.key: p.value for p in 
serialized_fn.job_parameters}
+        if serialized_fn.HasField('runtime_context'):
+            rc = serialized_fn.runtime_context
+            self._runtime_context = {
+                'task_name': rc.task_name,
+                'task_name_with_subtasks': rc.task_name_with_subtasks,
+                'number_of_parallel_subtasks': rc.number_of_parallel_subtasks,
+                'max_number_of_parallel_subtasks': 
rc.max_number_of_parallel_subtasks,
+                'index_of_this_subtask': rc.index_of_this_subtask,
+                'attempt_number': rc.attempt_number,
+            }
+        else:
+            self._runtime_context = {}
 
     def set_output_processor(self, output_processor):
         """Set the output processor for emitting results.
@@ -86,8 +98,9 @@ class AsyncScalarFunctionOperation(Operation, 
AsyncOperationMixin):
         # Open user defined functions
         for user_defined_func in self.user_defined_funcs:
             if hasattr(user_defined_func, 'open'):
-                user_defined_func.open(
-                    FunctionContext(self.base_metric_group, 
self._job_parameters))
+                user_defined_func.open(FunctionContext(
+                    self.base_metric_group, self._job_parameters,
+                    **self._runtime_context))
 
         # Start emitter thread to collect async results
         self._emitter = Emitter(self._mark_exception, self._output_processor, 
self._queue)
diff --git a/flink-python/pyflink/fn_execution/table/operations.py 
b/flink-python/pyflink/fn_execution/table/operations.py
index eb8cac4fdcf..5fa132db849 100644
--- a/flink-python/pyflink/fn_execution/table/operations.py
+++ b/flink-python/pyflink/fn_execution/table/operations.py
@@ -85,6 +85,18 @@ class BaseOperation(Operation):
             self.base_metric_group = None
         self.func, self.user_defined_funcs = self.generate_func(serialized_fn)
         self.job_parameters = {p.key: p.value for p in 
serialized_fn.job_parameters}
+        if serialized_fn.HasField('runtime_context'):
+            rc = serialized_fn.runtime_context
+            self.runtime_context = {
+                'task_name': rc.task_name,
+                'task_name_with_subtasks': rc.task_name_with_subtasks,
+                'number_of_parallel_subtasks': rc.number_of_parallel_subtasks,
+                'max_number_of_parallel_subtasks': 
rc.max_number_of_parallel_subtasks,
+                'index_of_this_subtask': rc.index_of_this_subtask,
+                'attempt_number': rc.attempt_number,
+            }
+        else:
+            self.runtime_context = {}
 
     def finish(self):
         self._update_gauge(self.base_metric_group)
@@ -104,7 +116,9 @@ class BaseOperation(Operation):
     def open(self):
         for user_defined_func in self.user_defined_funcs:
             if hasattr(user_defined_func, 'open'):
-                user_defined_func.open(FunctionContext(self.base_metric_group, 
self.job_parameters))
+                user_defined_func.open(FunctionContext(
+                    self.base_metric_group, self.job_parameters,
+                    **self.runtime_context))
 
     def close(self):
         for user_defined_func in self.user_defined_funcs:
@@ -326,11 +340,25 @@ class 
AbstractStreamGroupAggregateOperation(BaseStatefulOperation):
         self.state_cleaning_enabled = serialized_fn.state_cleaning_enabled
         self.data_view_specs = extract_data_view_specs(serialized_fn.udfs)
         self.job_parameters = {p.key: p.value for p in 
serialized_fn.job_parameters}
+        if serialized_fn.HasField('runtime_context'):
+            rc = serialized_fn.runtime_context
+            self.runtime_context = {
+                'task_name': rc.task_name,
+                'task_name_with_subtasks': rc.task_name_with_subtasks,
+                'number_of_parallel_subtasks': rc.number_of_parallel_subtasks,
+                'max_number_of_parallel_subtasks': 
rc.max_number_of_parallel_subtasks,
+                'index_of_this_subtask': rc.index_of_this_subtask,
+                'attempt_number': rc.attempt_number,
+            }
+        else:
+            self.runtime_context = {}
         super(AbstractStreamGroupAggregateOperation, self).__init__(
             serialized_fn, keyed_state_backend)
 
     def open(self):
-        self.group_agg_function.open(FunctionContext(self.base_metric_group, 
self.job_parameters))
+        self.group_agg_function.open(FunctionContext(
+            self.base_metric_group, self.job_parameters,
+            **self.runtime_context))
 
     def close(self):
         self.group_agg_function.close()
diff --git a/flink-python/pyflink/proto/flink-fn-execution.proto 
b/flink-python/pyflink/proto/flink-fn-execution.proto
index 40b5e915732..4ab5616b011 100644
--- a/flink-python/pyflink/proto/flink-fn-execution.proto
+++ b/flink-python/pyflink/proto/flink-fn-execution.proto
@@ -82,6 +82,9 @@ message UserDefinedFunctions {
   bool profile_enabled = 4;
   repeated JobParameter job_parameters = 5;
   AsyncOptions async_options = 6;
+  // The runtime context of the user-defined functions, providing task info 
such as
+  // task_name, parallelism, subtask_index, etc.
+  UserDefinedDataStreamFunction.RuntimeContext runtime_context = 7;
 }
 
 // Used to describe the info of over window in pandas batch over window 
aggregation
@@ -200,6 +203,9 @@ message UserDefinedAggregateFunctions {
 
   bool profile_enabled = 13;
   repeated JobParameter job_parameters = 14;
+  // The runtime context of the user-defined aggregate functions, providing 
task info such as
+  // task_name, parallelism, subtask_index, etc.
+  UserDefinedDataStreamFunction.RuntimeContext runtime_context = 15;
 }
 
 // A representation of the data schema.
diff --git a/flink-python/pyflink/table/tests/test_udf.py 
b/flink-python/pyflink/table/tests/test_udf.py
index 4667702bbbd..182c46bbd9c 100644
--- a/flink-python/pyflink/table/tests/test_udf.py
+++ b/flink-python/pyflink/table/tests/test_udf.py
@@ -266,6 +266,34 @@ class UserDefinedFunctionTests(object):
         actual = source_sink_utils.results()
         self.assert_equals(actual, ["+I[1, 1]", "+I[2, 4]", "+I[3, 3]"])
 
+    def test_function_context_runtime_info(self):
+        runtime_info_func = udf(RuntimeInfoFunc(), 
result_type=DataTypes.STRING())
+
+        sink_table = generate_random_table_name()
+        sink_table_ddl = f"""
+            CREATE TABLE {sink_table}(a STRING) WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
+
+        t = self.t_env.from_elements([(1,)], ['a'])
+        t.select(runtime_info_func(t.a)).execute_insert(sink_table).wait()
+        actual = source_sink_utils.results()
+        result = actual[0]
+        # The result should contain task_name, number_of_parallel_subtasks, and
+        # index_of_this_subtask info, verifying that FunctionContext runtime 
info
+        # is properly propagated from Java to Python.
+        self.assertTrue(result.startswith("+I["))
+        # Extract the value between +I[ and ]
+        value = result[3:-1]
+        parts = value.split(",")
+        self.assertEqual(len(parts), 3)
+        # task_name should be non-empty
+        self.assertTrue(len(parts[0].strip()) > 0)
+        # number_of_parallel_subtasks should be a positive integer
+        self.assertTrue(int(parts[1].strip()) > 0)
+        # index_of_this_subtask should be a non-negative integer
+        self.assertTrue(int(parts[2].strip()) >= 0)
+
     def test_udf_without_arguments(self):
         one = udf(lambda: 1, result_type=DataTypes.BIGINT(), 
deterministic=True)
         two = udf(lambda: 2, result_type=DataTypes.BIGINT(), 
deterministic=False)
@@ -1147,6 +1175,17 @@ class Subtract(ScalarFunction, unittest.TestCase):
         return i - self.subtracted_value
 
 
+class RuntimeInfoFunc(ScalarFunction):
+
+    def open(self, function_context: FunctionContext):
+        self.task_name = function_context.get_task_name()
+        self.num_parallel = function_context.get_number_of_parallel_subtasks()
+        self.subtask_index = function_context.get_index_of_this_subtask()
+
+    def eval(self, i):
+        return "%s,%d,%d" % (self.task_name, self.num_parallel, 
self.subtask_index)
+
+
 class CallablePlus(object):
 
     def __call__(self, col):
diff --git a/flink-python/pyflink/table/udf.py 
b/flink-python/pyflink/table/udf.py
index 0c40350777f..92bd180d189 100644
--- a/flink-python/pyflink/table/udf.py
+++ b/flink-python/pyflink/table/udf.py
@@ -36,12 +36,21 @@ class FunctionContext(object):
     """
     Used to obtain global runtime information about the context in which the
     user-defined function is executed. The information includes the metric 
group,
-    and global job parameters, etc.
+    global job parameters, and runtime task information such as task name, 
parallelism, etc.
     """
 
-    def __init__(self, base_metric_group, job_parameters):
+    def __init__(self, base_metric_group, job_parameters,
+                 task_name=None, task_name_with_subtasks=None,
+                 number_of_parallel_subtasks=None, 
max_number_of_parallel_subtasks=None,
+                 index_of_this_subtask=None, attempt_number=None):
         self._base_metric_group = base_metric_group
         self._job_parameters = job_parameters
+        self._task_name = task_name
+        self._task_name_with_subtasks = task_name_with_subtasks
+        self._number_of_parallel_subtasks = number_of_parallel_subtasks
+        self._max_number_of_parallel_subtasks = max_number_of_parallel_subtasks
+        self._index_of_this_subtask = index_of_this_subtask
+        self._attempt_number = attempt_number
 
     def get_metric_group(self) -> MetricGroup:
         """
@@ -66,6 +75,57 @@ class FunctionContext(object):
         """
         return self._job_parameters[key] if key in self._job_parameters else 
default_value
 
+    def get_task_name(self) -> str:
+        """
+        Returns the name of the task in which the UDF runs, as assigned during 
plan construction.
+
+        .. versionadded:: 2.3.0
+        """
+        return self._task_name
+
+    def get_task_name_with_subtasks(self) -> str:
+        """
+        Returns the name of the task, appended with the subtask indicator, 
such as "MyTask (3/6)",
+        where 3 would be (:func:`get_index_of_this_subtask` + 1), and 6 would 
be
+        :func:`get_number_of_parallel_subtasks`.
+
+        .. versionadded:: 2.3.0
+        """
+        return self._task_name_with_subtasks
+
+    def get_number_of_parallel_subtasks(self) -> int:
+        """
+        Gets the parallelism with which the parallel task runs.
+
+        .. versionadded:: 2.3.0
+        """
+        return self._number_of_parallel_subtasks
+
+    def get_max_number_of_parallel_subtasks(self) -> int:
+        """
+        Gets the number of max-parallelism with which the parallel task runs.
+
+        .. versionadded:: 2.3.0
+        """
+        return self._max_number_of_parallel_subtasks
+
+    def get_index_of_this_subtask(self) -> int:
+        """
+        Gets the number of this parallel subtask. The numbering starts from 0 
and goes up to
+        parallelism-1 (parallelism as returned by 
:func:`get_number_of_parallel_subtasks`).
+
+        .. versionadded:: 2.3.0
+        """
+        return self._index_of_this_subtask
+
+    def get_attempt_number(self) -> int:
+        """
+        Gets the attempt number of this parallel subtask. First attempt is 
numbered 0.
+
+        .. versionadded:: 2.3.0
+        """
+        return self._attempt_number
+
 
 @PublicEvolving()
 class UserDefinedFunction(abc.ABC):
diff --git 
a/flink-python/src/main/java/org/apache/flink/python/util/ProtoUtils.java 
b/flink-python/src/main/java/org/apache/flink/python/util/ProtoUtils.java
index a752c69c011..2745b36cb56 100644
--- a/flink-python/src/main/java/org/apache/flink/python/util/ProtoUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/python/util/ProtoUtils.java
@@ -154,6 +154,18 @@ public enum ProtoUtils {
                                                 .setValue(entry.getValue())
                                                 .build())
                         .collect(Collectors.toList()));
+        builder.setRuntimeContext(
+                
FlinkFnApi.UserDefinedDataStreamFunction.RuntimeContext.newBuilder()
+                        
.setTaskName(runtimeContext.getTaskInfo().getTaskName())
+                        .setTaskNameWithSubtasks(
+                                
runtimeContext.getTaskInfo().getTaskNameWithSubtasks())
+                        .setNumberOfParallelSubtasks(
+                                
runtimeContext.getTaskInfo().getNumberOfParallelSubtasks())
+                        .setMaxNumberOfParallelSubtasks(
+                                
runtimeContext.getTaskInfo().getMaxNumberOfParallelSubtasks())
+                        
.setIndexOfThisSubtask(runtimeContext.getTaskInfo().getIndexOfThisSubtask())
+                        
.setAttemptNumber(runtimeContext.getTaskInfo().getAttemptNumber())
+                        .build());
         return builder.build();
     }
 
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java
index 64381c06582..0298916b6d8 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java
@@ -264,6 +264,19 @@ public abstract class AbstractPythonStreamAggregateOperator
                                                 .setValue(entry.getValue())
                                                 .build())
                         .collect(Collectors.toList()));
+        builder.setRuntimeContext(
+                
FlinkFnApi.UserDefinedDataStreamFunction.RuntimeContext.newBuilder()
+                        
.setTaskName(getRuntimeContext().getTaskInfo().getTaskName())
+                        .setTaskNameWithSubtasks(
+                                
getRuntimeContext().getTaskInfo().getTaskNameWithSubtasks())
+                        .setNumberOfParallelSubtasks(
+                                
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks())
+                        .setMaxNumberOfParallelSubtasks(
+                                
getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks())
+                        .setIndexOfThisSubtask(
+                                
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask())
+                        
.setAttemptNumber(getRuntimeContext().getTaskInfo().getAttemptNumber())
+                        .build());
         return builder.build();
     }
 
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.java
index 616908205e2..25ce45dc1b0 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.java
@@ -273,6 +273,19 @@ public class 
BatchArrowPythonOverWindowAggregateFunctionOperator
                                                 .setValue(entry.getValue())
                                                 .build())
                         .collect(Collectors.toList()));
+        builder.setRuntimeContext(
+                
FlinkFnApi.UserDefinedDataStreamFunction.RuntimeContext.newBuilder()
+                        
.setTaskName(getRuntimeContext().getTaskInfo().getTaskName())
+                        .setTaskNameWithSubtasks(
+                                
getRuntimeContext().getTaskInfo().getTaskNameWithSubtasks())
+                        .setNumberOfParallelSubtasks(
+                                
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks())
+                        .setMaxNumberOfParallelSubtasks(
+                                
getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks())
+                        .setIndexOfThisSubtask(
+                                
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask())
+                        
.setAttemptNumber(getRuntimeContext().getTaskInfo().getAttemptNumber())
+                        .build());
         // add windows
         for (int i = 0; i < lowerBoundary.length; i++) {
             FlinkFnApi.OverWindow.Builder windowBuilder = 
FlinkFnApi.OverWindow.newBuilder();


Reply via email to