Re: TTL in pyflink does not seem to work
Thanks! We’ve created and issue for that: https://issues.apache.org/jira/browse/FLINK-34625 Yeap, planning to use timers as workaround for now On Mar 10, 2024 at 02:59 +0400, David Anderson , wrote: > My guess is that this only fails when pyflink is used with the heap state > backend, in which case one possible workaround is to use the RocksDB state > backend instead. Another workaround would be to rely on timers in the process > function, and clear the state yourself. > > David > > > On Fri, Mar 8, 2024 at 12:29 AM lorenzo.affetti.ververica.com via user > > wrote: > > > Hello Ivan! > > > > > > Could you please create a JIRA issue out of this? > > > That seem the proper place where to discuss this. > > > > > > It seems a bug as the two versions of the code you posted look identical, > > > and the behavior should be consistent. > > > On Mar 7, 2024 at 13:09 +0100, Ivan Petrarka , > > > wrote: > > > > Note, that in Java code, it prints `State: Null`, `State: Null`, as I > > > > was expecting in, unlike pyflink code > > > > On Mar 7, 2024 at 15:59 +0400, Ivan Petrarka , > > > > wrote: > > > > > Hi! I’ve created a basic pyflink pipeline with ttl and it does not > > > > > seem to work. I have reproduced the exact same code in Java and it > > > > > works! > > > > > > > > > > Is this a pyflink bug? If so - how can I report it? If not - what can > > > > > I try to do? > > > > > > > > > > Flink: 1.18.0 > > > > > image: flink:1.18.0-scala_2.12-java11 > > > > > > > > > > Code to reproduce. I expect this code to print: > > > > None> all the time. But it prints and state value > > > > > > > > > > ```python > > > > > import time > > > > > > > > > > from datetime import datetime > > > > > > > > > > from pyflink.common import Time, Types > > > > > from pyflink.datastream import KeyedProcessFunction, RuntimeContext, > > > > > StreamExecutionEnvironment, TimeCharacteristic > > > > > from pyflink.datastream.state import StateTtlConfig, > > > > > ValueStateDescriptor > > > > > > > > > > > > > > > class Processor(KeyedProcessFunction): > > > > > def open(self, runtime_context: RuntimeContext): > > > > > state_descriptor = ValueStateDescriptor( > > > > > name="my_state", > > > > > value_type_info=Types.STRING(), > > > > > ) > > > > > > > > > > state_descriptor.enable_time_to_live( > > > > > ttl_config=StateTtlConfig.new_builder(Time.seconds(1)) > > > > > .cleanup_incrementally(cleanup_size=10, > > > > > run_cleanup_for_every_record=True) > > > > > > > > > > .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) > > > > > > > > > > .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > > > > > .build() > > > > > ) > > > > > > > > > > self.state = runtime_context.get_state(state_descriptor) > > > > > > > > > > def process_element(self, value: int, ctx: > > > > > KeyedProcessFunction.Context): > > > > > current_state = self.state.value() > > > > > > > > > > print(datetime.now(), current_state) > > > > > > > > > > if current_state is None: > > > > > self.state.update(str(datetime.now())) > > > > > > > > > > time.sleep(1.5) > > > > > > > > > > > > > > > if __name__ == "__main__": > > > > > # - Init environment > > > > > > > > > > environment = > > > > > StreamExecutionEnvironment.get_execution_environment().set_parallelism(1) > > > > > > > > > > # - Setup pipeline > > > > > > > > > > ( > > > > > environment.set_parallelism(1) > > > > > .from_collection( > > > > > collection=list(range(10)), > > > > > ) > > > > > .key_by
Re: TTL in pyflink does not seem to work
Note, that in Java code, it prints `State: Null`, `State: Null`, as I was expecting in, unlike pyflink code On Mar 7, 2024 at 15:59 +0400, Ivan Petrarka , wrote: > Hi! I’ve created a basic pyflink pipeline with ttl and it does not seem to > work. I have reproduced the exact same code in Java and it works! > > Is this a pyflink bug? If so - how can I report it? If not - what can I try > to do? > > Flink: 1.18.0 > image: flink:1.18.0-scala_2.12-java11 > > Code to reproduce. I expect this code to print: all > the time. But it prints and state value > > ```python > import time > > from datetime import datetime > > from pyflink.common import Time, Types > from pyflink.datastream import KeyedProcessFunction, RuntimeContext, > StreamExecutionEnvironment, TimeCharacteristic > from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor > > > class Processor(KeyedProcessFunction): > def open(self, runtime_context: RuntimeContext): > state_descriptor = ValueStateDescriptor( > name="my_state", > value_type_info=Types.STRING(), > ) > > state_descriptor.enable_time_to_live( > ttl_config=StateTtlConfig.new_builder(Time.seconds(1)) > .cleanup_incrementally(cleanup_size=10, > run_cleanup_for_every_record=True) > .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) > > .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > .build() > ) > > self.state = runtime_context.get_state(state_descriptor) > > def process_element(self, value: int, ctx: KeyedProcessFunction.Context): > current_state = self.state.value() > > print(datetime.now(), current_state) > > if current_state is None: > self.state.update(str(datetime.now())) > > time.sleep(1.5) > > > if __name__ == "__main__": > # - Init environment > > environment = > StreamExecutionEnvironment.get_execution_environment().set_parallelism(1) > > # - Setup pipeline > > ( > environment.set_parallelism(1) > .from_collection( > collection=list(range(10)), > ) > .key_by(lambda value: 0) > .process(Processor()) > > > > ) > > # - Execute pipeline > > environment.execute("ttl_test") > > > > ``` > > ```java > import org.apache.flink.api.common.state.StateTtlConfig; > import org.apache.flink.api.common.state.ValueState; > import org.apache.flink.api.common.state.ValueStateDescriptor; > import org.apache.flink.api.common.time.Time; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.metrics.Histogram; > import org.apache.flink.streaming.api.functions.KeyedProcessFunction; > import org.apache.flink.util.Collector; > > import java.io.IOException; > import java.time.LocalDateTime; > > public class GameHistoryProcessor extends KeyedProcessFunction String, String> { > > > private transient ValueState state; > > > @Override > public void open(Configuration parameters) { > var stateTtlConfig = StateTtlConfig > .newBuilder(Time.seconds(1)) > // .cleanupFullSnapshot() > .cleanupIncrementally(10, true) > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > .build(); > > var stateDescriptor = new ValueStateDescriptor<>("state", > String.class); > stateDescriptor.enableTimeToLive(stateTtlConfig); > > state = getRuntimeContext().getState(stateDescriptor); > > } > > @Override > public void processElement(String event, Context context, > Collector collector) throws IOException, InterruptedException { > var state = state.value(); > System.out.println("State: " + state); > > if (state == null) { > state = LocalDateTime.now().toString(); > state.update(state); > } > > Thread.sleep(1500); > } > }```
TTL in pyflink does not seem to work
Hi! I’ve created a basic pyflink pipeline with ttl and it does not seem to work. I have reproduced the exact same code in Java and it works! Is this a pyflink bug? If so - how can I report it? If not - what can I try to do? Flink: 1.18.0 image: flink:1.18.0-scala_2.12-java11 Code to reproduce. I expect this code to print: all the time. But it prints and state value ```python import time from datetime import datetime from pyflink.common import Time, Types from pyflink.datastream import KeyedProcessFunction, RuntimeContext, StreamExecutionEnvironment, TimeCharacteristic from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor class Processor(KeyedProcessFunction): def open(self, runtime_context: RuntimeContext): state_descriptor = ValueStateDescriptor( name="my_state", value_type_info=Types.STRING(), ) state_descriptor.enable_time_to_live( ttl_config=StateTtlConfig.new_builder(Time.seconds(1)) .cleanup_incrementally(cleanup_size=10, run_cleanup_for_every_record=True) .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build() ) self.state = runtime_context.get_state(state_descriptor) def process_element(self, value: int, ctx: KeyedProcessFunction.Context): current_state = self.state.value() print(datetime.now(), current_state) if current_state is None: self.state.update(str(datetime.now())) time.sleep(1.5) if __name__ == "__main__": # - Init environment environment = StreamExecutionEnvironment.get_execution_environment().set_parallelism(1) # - Setup pipeline ( environment.set_parallelism(1) .from_collection( collection=list(range(10)), ) .key_by(lambda value: 0) .process(Processor()) ) # - Execute pipeline environment.execute("ttl_test") ``` ```java import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Histogram; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; import java.io.IOException; import java.time.LocalDateTime; public class GameHistoryProcessor extends KeyedProcessFunction { private transient ValueState state; @Override public void open(Configuration parameters) { var stateTtlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) // .cleanupFullSnapshot() .cleanupIncrementally(10, true) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); var stateDescriptor = new ValueStateDescriptor<>("state", String.class); stateDescriptor.enableTimeToLive(stateTtlConfig); state = getRuntimeContext().getState(stateDescriptor); } @Override public void processElement(String event, Context context, Collector collector) throws IOException, InterruptedException { var state = state.value(); System.out.println("State: " + state); if (state == null) { state = LocalDateTime.now().toString(); state.update(state); } Thread.sleep(1500); } }```