Mark Lidenberg created FLINK-34625:
--------------------------------------

             Summary: TTL doesn't seem to work in pyflink 
                 Key: FLINK-34625
                 URL: https://issues.apache.org/jira/browse/FLINK-34625
             Project: Flink
          Issue Type: Bug
          Components: API / Python
    Affects Versions: 1.18.1
         Environment: Image used: flink:1.18.1-scala_2.12-java11
            Reporter: Mark Lidenberg


I've made a simple example to test the ttl and couldn't get the expected 
results. I went further and replicated this example in Java and it worked just 
fine. There is an inconsistency in behavior, so there is something wrong in 
pyflink or my pyflink setup. 

Here is a code to reproduce. In the example I create a state with ttl 1 second 
and then process events every 1.5 seconds and print current state.  I expect it 
to print `None, None, None, ...` (because ttl expires after 1.5 seconds), but 
instead it prints `None, "state", "state, ...`. In Java it works as expected, 
prints `Null, Null, ...`

```python

import time

from pyflink.common import Time, Types
from pyflink.datastream import KeyedProcessFunction, RuntimeContext, 
StreamExecutionEnvironment
from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor


class Processor(KeyedProcessFunction):
    def open(self, runtime_context: RuntimeContext):
        state_descriptor = ValueStateDescriptor(
            name="my_state",
            value_type_info=Types.STRING(),
        )

        state_descriptor.enable_time_to_live(
            ttl_config=StateTtlConfig.new_builder(Time.seconds(1))
            .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite)
            
.set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .build()
        )

        self.state = runtime_context.get_state(state_descriptor)

    def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
        # Print current state
        print(self.state.value())
        # expect to print `None` all the time, but prints: `None, 'state', 
'state', ...` instead

        # Update state
        self.state.update("state")

        # sleep to reset the state
        time.sleep(1.5)


if __name__ == "__main__":
    # Init environment
    environment = 
StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)

    # Setup pipeline
    (
        environment.from_collection(
            collection=list(range(10)),
        )
        .key_by(lambda value: 0)
        .process(Processor())
    )

    # Execute pipeline
    environment.execute("ttl_test")

```

 

```java

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.io.IOException;
import java.time.LocalDateTime;

public class Processor extends KeyedProcessFunction<Integer, String, String> {

    private transient ValueState<String> state;


    @Override
    public void open(Configuration parameters) {
        var stateTtlConfig = StateTtlConfig
                .newBuilder(Time.seconds(1))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();

        var stateDescriptor = new ValueStateDescriptor<>("state", String.class);
        stateDescriptor.enableTimeToLive(stateTtlConfig);
        state = getRuntimeContext().getState(stateDescriptor);

    }

    @Override
    public void processElement(String event, Context context, Collector<String> 
collector) throws IOException, InterruptedException {
        // print state
        var state = state.value();
        System.out.println(state); # prints `Null, Null, ...` 

        // update state
        state.update(LocalDateTime.now().toString());

        // sleep to reset the state
        Thread.sleep(1500);
    }
}

```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to