Re: TTL in pyflink does not seem to work
Thanks! We’ve created and issue for that: https://issues.apache.org/jira/browse/FLINK-34625 Yeap, planning to use timers as workaround for now On Mar 10, 2024 at 02:59 +0400, David Anderson , wrote: > My guess is that this only fails when pyflink is used with the heap state > backend, in which case one possible workaround is to use the RocksDB state > backend instead. Another workaround would be to rely on timers in the process > function, and clear the state yourself. > > David > > > On Fri, Mar 8, 2024 at 12:29 AM lorenzo.affetti.ververica.com via user > > wrote: > > > Hello Ivan! > > > > > > Could you please create a JIRA issue out of this? > > > That seem the proper place where to discuss this. > > > > > > It seems a bug as the two versions of the code you posted look identical, > > > and the behavior should be consistent. > > > On Mar 7, 2024 at 13:09 +0100, Ivan Petrarka , > > > wrote: > > > > Note, that in Java code, it prints `State: Null`, `State: Null`, as I > > > > was expecting in, unlike pyflink code > > > > On Mar 7, 2024 at 15:59 +0400, Ivan Petrarka , > > > > wrote: > > > > > Hi! I’ve created a basic pyflink pipeline with ttl and it does not > > > > > seem to work. I have reproduced the exact same code in Java and it > > > > > works! > > > > > > > > > > Is this a pyflink bug? If so - how can I report it? If not - what can > > > > > I try to do? > > > > > > > > > > Flink: 1.18.0 > > > > > image: flink:1.18.0-scala_2.12-java11 > > > > > > > > > > Code to reproduce. I expect this code to print: > > > > None> all the time. But it prints and state value > > > > > > > > > > ```python > > > > > import time > > > > > > > > > > from datetime import datetime > > > > > > > > > > from pyflink.common import Time, Types > > > > > from pyflink.datastream import KeyedProcessFunction, RuntimeContext, > > > > > StreamExecutionEnvironment, TimeCharacteristic > > > > > from pyflink.datastream.state import StateTtlConfig, > > > > > ValueStateDescriptor > > > > > > > > > > > > > > > class Processor(KeyedProcessFunction): > > > > > def open(self, runtime_context: RuntimeContext): > > > > > state_descriptor = ValueStateDescriptor( > > > > > name="my_state", > > > > > value_type_info=Types.STRING(), > > > > > ) > > > > > > > > > > state_descriptor.enable_time_to_live( > > > > > ttl_config=StateTtlConfig.new_builder(Time.seconds(1)) > > > > > .cleanup_incrementally(cleanup_size=10, > > > > > run_cleanup_for_every_record=True) > > > > > > > > > > .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) > > > > > > > > > > .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > > > > > .build() > > > > > ) > > > > > > > > > > self.state = runtime_context.get_state(state_descriptor) > > > > > > > > > > def process_element(self, value: int, ctx: > > > > > KeyedProcessFunction.Context): > > > > > current_state = self.state.value() > > > > > > > > > > print(datetime.now(), current_state) > > > > > > > > > > if current_state is None: > > > > > self.state.update(str(datetime.now())) > > > > > > > > > > time.sleep(1.5) > > > > > > > > > > > > > > > if __name__ == "__main__": > > > > > # - Init environment > > > > > > > > > > environment = > > > > > StreamExecutionEnvironment.get_execution_environment().set_parallelism(1) > > > > > > > > > > # - Setup pipeline > > > > > > > > > > ( > > > > > environment.set_parallelism(1) > > > > > .from_collection( > > > > > collection=list(range(10)), > > > > > ) > > > > > .key_by(lambda value: 0) > > > > > .process(Processor()) > > > > > > > > > > > > > > > > > > > > ) > > > > > > > > > > # - Execute pipeline > > > > > > > > > > environment.execute("ttl_test") > > > > > > > > > > > > > > > > > > > > ``` > > > > > > > > > > ```java > > > > > import org.apache.flink.api.common.state.StateTtlConfig; > > > > > import org.apache.flink.api.common.state.ValueState; > > > > > import org.apache.flink.api.common.state.ValueStateDescriptor; > > > > > import org.apache.flink.api.common.time.Time; > > > > > import org.apache.flink.configuration.Configuration; > > > > > import org.apache.flink.metrics.Histogram; > > > > > import org.apache.flink.streaming.api.functions.KeyedProcessFunction; > > > > > import org.apache.flink.util.Collector; > > > > > > > > > > import java.io.IOException; > > > > > import java.time.LocalDateTime; > > > > > > > > > > public class GameHistoryProcessor extends > > > > > KeyedProcessFunction { > > > > > > > > > > > > > > > private transient ValueState state; > > > > > > > > > > > > > > > @Override > > > > > public void open(Configuration parameters) { > > > > > var stateTtlConfig = StateTtlConfig > > > > > .newBuilder(Time.seconds(1)) > > > >
Re: TTL in pyflink does not seem to work
My guess is that this only fails when pyflink is used with the heap state backend, in which case one possible workaround is to use the RocksDB state backend instead. Another workaround would be to rely on timers in the process function, and clear the state yourself. David On Fri, Mar 8, 2024 at 12:29 AM lorenzo.affetti.ververica.com via user < user@flink.apache.org> wrote: > Hello Ivan! > > Could you please create a JIRA issue out of this? > That seem the proper place where to discuss this. > > It seems a bug as the two versions of the code you posted look identical, > and the behavior should be consistent. > On Mar 7, 2024 at 13:09 +0100, Ivan Petrarka , > wrote: > > Note, that in Java code, it prints `State: Null`, `State: Null`, as I was > expecting in, unlike pyflink code > On Mar 7, 2024 at 15:59 +0400, Ivan Petrarka , > wrote: > > Hi! I’ve created a basic pyflink pipeline with ttl and it does not seem to > work. I have reproduced the exact same code in Java and it works! > > Is this a pyflink bug? If so - how can I report it? If not - what can I > try to do? > > Flink: 1.18.0 > image: flink:1.18.0-scala_2.12-java11 > > Code to reproduce. I expect this code to print: > all the time. But it prints and state value > > ```python > import time > > from datetime import datetime > > from pyflink.common import Time, Types > from pyflink.datastream import KeyedProcessFunction, RuntimeContext, > StreamExecutionEnvironment, TimeCharacteristic > from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor > > > class Processor(KeyedProcessFunction): > def open(self, runtime_context: RuntimeContext): > state_descriptor = ValueStateDescriptor( > name="my_state", > value_type_info=Types.STRING(), > ) > > state_descriptor.enable_time_to_live( > ttl_config=StateTtlConfig.new_builder(Time.seconds(1)) > .cleanup_incrementally(cleanup_size=10, > run_cleanup_for_every_record=True) > .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) > > .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > .build() > ) > > self.state = runtime_context.get_state(state_descriptor) > > def process_element(self, value: int, ctx: > KeyedProcessFunction.Context): > current_state = self.state.value() > > print(datetime.now(), current_state) > > if current_state is None: > self.state.update(str(datetime.now())) > > time.sleep(1.5) > > > if __name__ == "__main__": > # - Init environment > > environment = > StreamExecutionEnvironment.get_execution_environment().set_parallelism(1) > > # - Setup pipeline > > ( > environment.set_parallelism(1) > .from_collection( > collection=list(range(10)), > ) > .key_by(lambda value: 0) > .process(Processor()) > > > > ) > > # - Execute pipeline > > environment.execute("ttl_test") > > > > ``` > > ```java > import org.apache.flink.api.common.state.StateTtlConfig; > import org.apache.flink.api.common.state.ValueState; > import org.apache.flink.api.common.state.ValueStateDescriptor; > import org.apache.flink.api.common.time.Time; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.metrics.Histogram; > import org.apache.flink.streaming.api.functions.KeyedProcessFunction; > import org.apache.flink.util.Collector; > > import java.io.IOException; > import java.time.LocalDateTime; > > public class GameHistoryProcessor extends KeyedProcessFunction String, String> { > > > private transient ValueState state; > > > @Override > public void open(Configuration parameters) { > var stateTtlConfig = StateTtlConfig > .newBuilder(Time.seconds(1)) > //.cleanupFullSnapshot() > .cleanupIncrementally(10, true) > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > .build(); > > var stateDescriptor = new ValueStateDescriptor<>("state", > String.class); > stateDescriptor.enableTimeToLive(stateTtlConfig); > > state = getRuntimeContext().getState(stateDescriptor); > > } > > @Override > public void processElement(String event, Context context, > Collector collector) throws IOException, InterruptedException { > var state = state.value(); > System.out.println("State: " + state); > > if (state == null) { > state = LocalDateTime.now().toString(); > state.update(state); > } > > Thread.sleep(1500); > } > }``` > >
Re: TTL in pyflink does not seem to work
Hello Ivan! Could you please create a JIRA issue out of this? That seem the proper place where to discuss this. It seems a bug as the two versions of the code you posted look identical, and the behavior should be consistent. On Mar 7, 2024 at 13:09 +0100, Ivan Petrarka , wrote: > Note, that in Java code, it prints `State: Null`, `State: Null`, as I was > expecting in, unlike pyflink code > On Mar 7, 2024 at 15:59 +0400, Ivan Petrarka , wrote: > > Hi! I’ve created a basic pyflink pipeline with ttl and it does not seem to > > work. I have reproduced the exact same code in Java and it works! > > > > Is this a pyflink bug? If so - how can I report it? If not - what can I try > > to do? > > > > Flink: 1.18.0 > > image: flink:1.18.0-scala_2.12-java11 > > > > Code to reproduce. I expect this code to print: > > all the time. But it prints and state value > > > > ```python > > import time > > > > from datetime import datetime > > > > from pyflink.common import Time, Types > > from pyflink.datastream import KeyedProcessFunction, RuntimeContext, > > StreamExecutionEnvironment, TimeCharacteristic > > from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor > > > > > > class Processor(KeyedProcessFunction): > > def open(self, runtime_context: RuntimeContext): > > state_descriptor = ValueStateDescriptor( > > name="my_state", > > value_type_info=Types.STRING(), > > ) > > > > state_descriptor.enable_time_to_live( > > ttl_config=StateTtlConfig.new_builder(Time.seconds(1)) > > .cleanup_incrementally(cleanup_size=10, > > run_cleanup_for_every_record=True) > > .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) > > > > .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > > .build() > > ) > > > > self.state = runtime_context.get_state(state_descriptor) > > > > def process_element(self, value: int, ctx: > > KeyedProcessFunction.Context): > > current_state = self.state.value() > > > > print(datetime.now(), current_state) > > > > if current_state is None: > > self.state.update(str(datetime.now())) > > > > time.sleep(1.5) > > > > > > if __name__ == "__main__": > > # - Init environment > > > > environment = > > StreamExecutionEnvironment.get_execution_environment().set_parallelism(1) > > > > # - Setup pipeline > > > > ( > > environment.set_parallelism(1) > > .from_collection( > > collection=list(range(10)), > > ) > > .key_by(lambda value: 0) > > .process(Processor()) > > > > > > > > ) > > > > # - Execute pipeline > > > > environment.execute("ttl_test") > > > > > > > > ``` > > > > ```java > > import org.apache.flink.api.common.state.StateTtlConfig; > > import org.apache.flink.api.common.state.ValueState; > > import org.apache.flink.api.common.state.ValueStateDescriptor; > > import org.apache.flink.api.common.time.Time; > > import org.apache.flink.configuration.Configuration; > > import org.apache.flink.metrics.Histogram; > > import org.apache.flink.streaming.api.functions.KeyedProcessFunction; > > import org.apache.flink.util.Collector; > > > > import java.io.IOException; > > import java.time.LocalDateTime; > > > > public class GameHistoryProcessor extends KeyedProcessFunction > String, String> { > > > > > > private transient ValueState state; > > > > > > @Override > > public void open(Configuration parameters) { > > var stateTtlConfig = StateTtlConfig > > .newBuilder(Time.seconds(1)) > > // .cleanupFullSnapshot() > > .cleanupIncrementally(10, true) > > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > > > > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > > .build(); > > > > var stateDescriptor = new ValueStateDescriptor<>("state", > > String.class); > > stateDescriptor.enableTimeToLive(stateTtlConfig); > > > > state = getRuntimeContext().getState(stateDescriptor); > > > > } > > > > @Override > > public void processElement(String event, Context context, > > Collector collector) throws IOException, InterruptedException { > > var state = state.value(); > > System.out.println("State: " + state); > > > > if (state == null) { > > state = LocalDateTime.now().toString(); > > state.update(state); > > } > > > > Thread.sleep(1500); > > } > > }```
Re: TTL in pyflink does not seem to work
Note, that in Java code, it prints `State: Null`, `State: Null`, as I was expecting in, unlike pyflink code On Mar 7, 2024 at 15:59 +0400, Ivan Petrarka , wrote: > Hi! I’ve created a basic pyflink pipeline with ttl and it does not seem to > work. I have reproduced the exact same code in Java and it works! > > Is this a pyflink bug? If so - how can I report it? If not - what can I try > to do? > > Flink: 1.18.0 > image: flink:1.18.0-scala_2.12-java11 > > Code to reproduce. I expect this code to print: all > the time. But it prints and state value > > ```python > import time > > from datetime import datetime > > from pyflink.common import Time, Types > from pyflink.datastream import KeyedProcessFunction, RuntimeContext, > StreamExecutionEnvironment, TimeCharacteristic > from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor > > > class Processor(KeyedProcessFunction): > def open(self, runtime_context: RuntimeContext): > state_descriptor = ValueStateDescriptor( > name="my_state", > value_type_info=Types.STRING(), > ) > > state_descriptor.enable_time_to_live( > ttl_config=StateTtlConfig.new_builder(Time.seconds(1)) > .cleanup_incrementally(cleanup_size=10, > run_cleanup_for_every_record=True) > .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) > > .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > .build() > ) > > self.state = runtime_context.get_state(state_descriptor) > > def process_element(self, value: int, ctx: KeyedProcessFunction.Context): > current_state = self.state.value() > > print(datetime.now(), current_state) > > if current_state is None: > self.state.update(str(datetime.now())) > > time.sleep(1.5) > > > if __name__ == "__main__": > # - Init environment > > environment = > StreamExecutionEnvironment.get_execution_environment().set_parallelism(1) > > # - Setup pipeline > > ( > environment.set_parallelism(1) > .from_collection( > collection=list(range(10)), > ) > .key_by(lambda value: 0) > .process(Processor()) > > > > ) > > # - Execute pipeline > > environment.execute("ttl_test") > > > > ``` > > ```java > import org.apache.flink.api.common.state.StateTtlConfig; > import org.apache.flink.api.common.state.ValueState; > import org.apache.flink.api.common.state.ValueStateDescriptor; > import org.apache.flink.api.common.time.Time; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.metrics.Histogram; > import org.apache.flink.streaming.api.functions.KeyedProcessFunction; > import org.apache.flink.util.Collector; > > import java.io.IOException; > import java.time.LocalDateTime; > > public class GameHistoryProcessor extends KeyedProcessFunction String, String> { > > > private transient ValueState state; > > > @Override > public void open(Configuration parameters) { > var stateTtlConfig = StateTtlConfig > .newBuilder(Time.seconds(1)) > // .cleanupFullSnapshot() > .cleanupIncrementally(10, true) > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > .build(); > > var stateDescriptor = new ValueStateDescriptor<>("state", > String.class); > stateDescriptor.enableTimeToLive(stateTtlConfig); > > state = getRuntimeContext().getState(stateDescriptor); > > } > > @Override > public void processElement(String event, Context context, > Collector collector) throws IOException, InterruptedException { > var state = state.value(); > System.out.println("State: " + state); > > if (state == null) { > state = LocalDateTime.now().toString(); > state.update(state); > } > > Thread.sleep(1500); > } > }```
TTL in pyflink does not seem to work
Hi! I’ve created a basic pyflink pipeline with ttl and it does not seem to work. I have reproduced the exact same code in Java and it works! Is this a pyflink bug? If so - how can I report it? If not - what can I try to do? Flink: 1.18.0 image: flink:1.18.0-scala_2.12-java11 Code to reproduce. I expect this code to print: all the time. But it prints and state value ```python import time from datetime import datetime from pyflink.common import Time, Types from pyflink.datastream import KeyedProcessFunction, RuntimeContext, StreamExecutionEnvironment, TimeCharacteristic from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor class Processor(KeyedProcessFunction): def open(self, runtime_context: RuntimeContext): state_descriptor = ValueStateDescriptor( name="my_state", value_type_info=Types.STRING(), ) state_descriptor.enable_time_to_live( ttl_config=StateTtlConfig.new_builder(Time.seconds(1)) .cleanup_incrementally(cleanup_size=10, run_cleanup_for_every_record=True) .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build() ) self.state = runtime_context.get_state(state_descriptor) def process_element(self, value: int, ctx: KeyedProcessFunction.Context): current_state = self.state.value() print(datetime.now(), current_state) if current_state is None: self.state.update(str(datetime.now())) time.sleep(1.5) if __name__ == "__main__": # - Init environment environment = StreamExecutionEnvironment.get_execution_environment().set_parallelism(1) # - Setup pipeline ( environment.set_parallelism(1) .from_collection( collection=list(range(10)), ) .key_by(lambda value: 0) .process(Processor()) ) # - Execute pipeline environment.execute("ttl_test") ``` ```java import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Histogram; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; import java.io.IOException; import java.time.LocalDateTime; public class GameHistoryProcessor extends KeyedProcessFunction { private transient ValueState state; @Override public void open(Configuration parameters) { var stateTtlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) // .cleanupFullSnapshot() .cleanupIncrementally(10, true) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); var stateDescriptor = new ValueStateDescriptor<>("state", String.class); stateDescriptor.enableTimeToLive(stateTtlConfig); state = getRuntimeContext().getState(stateDescriptor); } @Override public void processElement(String event, Context context, Collector collector) throws IOException, InterruptedException { var state = state.value(); System.out.println("State: " + state); if (state == null) { state = LocalDateTime.now().toString(); state.update(state); } Thread.sleep(1500); } }```