Mark Lidenberg created FLINK-34625: -------------------------------------- Summary: TTL doesn't seem to work in pyflink Key: FLINK-34625 URL: https://issues.apache.org/jira/browse/FLINK-34625 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.18.1 Environment: Image used: flink:1.18.1-scala_2.12-java11 Reporter: Mark Lidenberg
I've made a simple example to test the ttl and couldn't get the expected results. I went further and replicated this example in Java and it worked just fine. There is an inconsistency in behavior, so there is something wrong in pyflink or my pyflink setup. Here is a code to reproduce. In the example I create a state with ttl 1 second and then process events every 1.5 seconds and print current state. I expect it to print `None, None, None, ...` (because ttl expires after 1.5 seconds), but instead it prints `None, "state", "state, ...`. In Java it works as expected, prints `Null, Null, ...` ```python import time from pyflink.common import Time, Types from pyflink.datastream import KeyedProcessFunction, RuntimeContext, StreamExecutionEnvironment from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor class Processor(KeyedProcessFunction): def open(self, runtime_context: RuntimeContext): state_descriptor = ValueStateDescriptor( name="my_state", value_type_info=Types.STRING(), ) state_descriptor.enable_time_to_live( ttl_config=StateTtlConfig.new_builder(Time.seconds(1)) .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build() ) self.state = runtime_context.get_state(state_descriptor) def process_element(self, value: int, ctx: KeyedProcessFunction.Context): # Print current state print(self.state.value()) # expect to print `None` all the time, but prints: `None, 'state', 'state', ...` instead # Update state self.state.update("state") # sleep to reset the state time.sleep(1.5) if __name__ == "__main__": # Init environment environment = StreamExecutionEnvironment.get_execution_environment().set_parallelism(1) # Setup pipeline ( environment.from_collection( collection=list(range(10)), ) .key_by(lambda value: 0) .process(Processor()) ) # Execute pipeline environment.execute("ttl_test") ``` ```java import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Histogram; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; import java.io.IOException; import java.time.LocalDateTime; public class Processor extends KeyedProcessFunction<Integer, String, String> { private transient ValueState<String> state; @Override public void open(Configuration parameters) { var stateTtlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); var stateDescriptor = new ValueStateDescriptor<>("state", String.class); stateDescriptor.enableTimeToLive(stateTtlConfig); state = getRuntimeContext().getState(stateDescriptor); } @Override public void processElement(String event, Context context, Collector<String> collector) throws IOException, InterruptedException { // print state var state = state.value(); System.out.println(state); # prints `Null, Null, ...` // update state state.update(LocalDateTime.now().toString()); // sleep to reset the state Thread.sleep(1500); } } ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)