[ 
https://issues.apache.org/jira/browse/FLINK-24105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-24105:
-----------------------------
        Parent: FLINK-23822
    Issue Type: Sub-task  (was: Bug)

> state TTL might not take effect for pyflink
> -------------------------------------------
>
>                 Key: FLINK-24105
>                 URL: https://issues.apache.org/jira/browse/FLINK-24105
>             Project: Flink
>          Issue Type: Sub-task
>          Components: API / Python, Runtime / State Backends
>            Reporter: Yun Tang
>            Priority: Blocker
>             Fix For: 1.14.0
>
>
> Since pyflink has its own data cache on python side, it might still read the 
> data from python side even TTL has expired.
> Scripts below could reproduce this:
> {code:python}
> from pyflink.common.time import Time
> from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig, 
> ListStateDescriptor, MapStateDescriptor
> from pyflink.common.typeinfo import Types
> from pyflink.datastream import StreamExecutionEnvironment, 
> TimeCharacteristic, RuntimeContext, KeyedProcessFunction, \
>     EmbeddedRocksDBStateBackend
> import time
> from datetime import datetime
> def test_keyed_process_function_with_state():
>     env = StreamExecutionEnvironment.get_execution_environment()
>     env.get_config().set_auto_watermark_interval(2000)
>     env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>     env.set_state_backend(EmbeddedRocksDBStateBackend())
>     data_stream = env.from_collection([(1, 'hi', '1603708211000'),
>                                             (3, 'hi', '1603708226000'),
>                                             (10, 'hi', '1603708226000'),
>                                             (6, 'hello', '1603708293000')],
>                                            type_info=Types.ROW([Types.INT(), 
> Types.STRING(),
>                                                                 
> Types.STRING()]))
>     class MyProcessFunction(KeyedProcessFunction):
>         def __init__(self):
>             self.value_state = None
>             self.list_state = None
>             self.map_state = None
>         def open(self, runtime_context: RuntimeContext):
>             state_ttl_config = StateTtlConfig \
>                 .new_builder(Time.seconds(1)) \
>                 .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) \
>                 .never_return_expired() \
>                 .build()
>             value_state_descriptor = ValueStateDescriptor('value_state', 
> Types.INT())
>             value_state_descriptor.enable_time_to_live(state_ttl_config)
>             self.value_state = 
> runtime_context.get_state(value_state_descriptor)
>             list_state_descriptor = ListStateDescriptor('list_state', 
> Types.INT())
>             list_state_descriptor.enable_time_to_live(state_ttl_config)
>             self.list_state = 
> runtime_context.get_list_state(list_state_descriptor)
>             map_state_descriptor = MapStateDescriptor('map_state', 
> Types.INT(), Types.STRING())
>             map_state_descriptor.enable_time_to_live(state_ttl_config)
>             self.map_state = 
> runtime_context.get_map_state(map_state_descriptor)
>         def process_element(self, value, ctx):
>             time.sleep(20)
>             current_value = self.value_state.value()
>             self.value_state.update(value[0])
>             current_list = [_ for _ in self.list_state.get()]
>             self.list_state.add(value[0])
>             map_entries_string = []
>             for k, v in self.map_state.items():
>                 map_entries_string.append(str(k) + ': ' + str(v))
>             map_entries_string = '{' + ', '.join(map_entries_string) + '}'
>             self.map_state.put(value[0], value[1])
>             current_key = ctx.get_current_key()
>             yield "time: {}, current key: {}, current value state: {}, 
> current list state: {}, " \
>                   "current map state: {}, current value: 
> {}".format(str(datetime.now().time()),
>                                                                     
> str(current_key),
>                                                                     
> str(current_value),
>                                                                     
> str(current_list),
>                                                                     
> map_entries_string,
>                                                                     
> str(value))
>         def on_timer(self, timestamp, ctx):
>             pass
>     data_stream.key_by(lambda x: x[1], key_type=Types.STRING()) \
>         .process(MyProcessFunction(), output_type=Types.STRING()) \
>         .print()
>     env.execute('test time stamp assigner with keyed process function')
> if __name__ == '__main__':
>     test_keyed_process_function_with_state()
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to