[ https://issues.apache.org/jira/browse/FLINK-24105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yun Tang updated FLINK-24105: ----------------------------- Parent: FLINK-23822 Issue Type: Sub-task (was: Bug) > state TTL might not take effect for pyflink > ------------------------------------------- > > Key: FLINK-24105 > URL: https://issues.apache.org/jira/browse/FLINK-24105 > Project: Flink > Issue Type: Sub-task > Components: API / Python, Runtime / State Backends > Reporter: Yun Tang > Priority: Blocker > Fix For: 1.14.0 > > > Since pyflink has its own data cache on python side, it might still read the > data from python side even TTL has expired. > Scripts below could reproduce this: > {code:python} > from pyflink.common.time import Time > from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig, > ListStateDescriptor, MapStateDescriptor > from pyflink.common.typeinfo import Types > from pyflink.datastream import StreamExecutionEnvironment, > TimeCharacteristic, RuntimeContext, KeyedProcessFunction, \ > EmbeddedRocksDBStateBackend > import time > from datetime import datetime > def test_keyed_process_function_with_state(): > env = StreamExecutionEnvironment.get_execution_environment() > env.get_config().set_auto_watermark_interval(2000) > env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > env.set_state_backend(EmbeddedRocksDBStateBackend()) > data_stream = env.from_collection([(1, 'hi', '1603708211000'), > (3, 'hi', '1603708226000'), > (10, 'hi', '1603708226000'), > (6, 'hello', '1603708293000')], > type_info=Types.ROW([Types.INT(), > Types.STRING(), > > Types.STRING()])) > class MyProcessFunction(KeyedProcessFunction): > def __init__(self): > self.value_state = None > self.list_state = None > self.map_state = None > def open(self, runtime_context: RuntimeContext): > state_ttl_config = StateTtlConfig \ > .new_builder(Time.seconds(1)) \ > .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) \ > .never_return_expired() \ > .build() > value_state_descriptor = ValueStateDescriptor('value_state', > Types.INT()) > value_state_descriptor.enable_time_to_live(state_ttl_config) > self.value_state = > runtime_context.get_state(value_state_descriptor) > list_state_descriptor = ListStateDescriptor('list_state', > Types.INT()) > list_state_descriptor.enable_time_to_live(state_ttl_config) > self.list_state = > runtime_context.get_list_state(list_state_descriptor) > map_state_descriptor = MapStateDescriptor('map_state', > Types.INT(), Types.STRING()) > map_state_descriptor.enable_time_to_live(state_ttl_config) > self.map_state = > runtime_context.get_map_state(map_state_descriptor) > def process_element(self, value, ctx): > time.sleep(20) > current_value = self.value_state.value() > self.value_state.update(value[0]) > current_list = [_ for _ in self.list_state.get()] > self.list_state.add(value[0]) > map_entries_string = [] > for k, v in self.map_state.items(): > map_entries_string.append(str(k) + ': ' + str(v)) > map_entries_string = '{' + ', '.join(map_entries_string) + '}' > self.map_state.put(value[0], value[1]) > current_key = ctx.get_current_key() > yield "time: {}, current key: {}, current value state: {}, > current list state: {}, " \ > "current map state: {}, current value: > {}".format(str(datetime.now().time()), > > str(current_key), > > str(current_value), > > str(current_list), > > map_entries_string, > > str(value)) > def on_timer(self, timestamp, ctx): > pass > data_stream.key_by(lambda x: x[1], key_type=Types.STRING()) \ > .process(MyProcessFunction(), output_type=Types.STRING()) \ > .print() > env.execute('test time stamp assigner with keyed process function') > if __name__ == '__main__': > test_keyed_process_function_with_state() > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)