Mark Lidenberg created FLINK-34625:
--------------------------------------
Summary: TTL doesn't seem to work in pyflink
Key: FLINK-34625
URL: https://issues.apache.org/jira/browse/FLINK-34625
Project: Flink
Issue Type: Bug
Components: API / Python
Affects Versions: 1.18.1
Environment: Image used: flink:1.18.1-scala_2.12-java11
Reporter: Mark Lidenberg
I've made a simple example to test the ttl and couldn't get the expected
results. I went further and replicated this example in Java and it worked just
fine. There is an inconsistency in behavior, so there is something wrong in
pyflink or my pyflink setup.
Here is a code to reproduce. In the example I create a state with ttl 1 second
and then process events every 1.5 seconds and print current state. I expect it
to print `None, None, None, ...` (because ttl expires after 1.5 seconds), but
instead it prints `None, "state", "state, ...`. In Java it works as expected,
prints `Null, Null, ...`
```python
import time
from pyflink.common import Time, Types
from pyflink.datastream import KeyedProcessFunction, RuntimeContext,
StreamExecutionEnvironment
from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor
class Processor(KeyedProcessFunction):
def open(self, runtime_context: RuntimeContext):
state_descriptor = ValueStateDescriptor(
name="my_state",
value_type_info=Types.STRING(),
)
state_descriptor.enable_time_to_live(
ttl_config=StateTtlConfig.new_builder(Time.seconds(1))
.set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite)
.set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build()
)
self.state = runtime_context.get_state(state_descriptor)
def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
# Print current state
print(self.state.value())
# expect to print `None` all the time, but prints: `None, 'state',
'state', ...` instead
# Update state
self.state.update("state")
# sleep to reset the state
time.sleep(1.5)
if __name__ == "__main__":
# Init environment
environment =
StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)
# Setup pipeline
(
environment.from_collection(
collection=list(range(10)),
)
.key_by(lambda value: 0)
.process(Processor())
)
# Execute pipeline
environment.execute("ttl_test")
```
```java
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.io.IOException;
import java.time.LocalDateTime;
public class Processor extends KeyedProcessFunction<Integer, String, String> {
private transient ValueState<String> state;
@Override
public void open(Configuration parameters) {
var stateTtlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
var stateDescriptor = new ValueStateDescriptor<>("state", String.class);
stateDescriptor.enableTimeToLive(stateTtlConfig);
state = getRuntimeContext().getState(stateDescriptor);
}
@Override
public void processElement(String event, Context context, Collector<String>
collector) throws IOException, InterruptedException {
// print state
var state = state.value();
System.out.println(state); # prints `Null, Null, ...`
// update state
state.update(LocalDateTime.now().toString());
// sleep to reset the state
Thread.sleep(1500);
}
}
```
--
This message was sent by Atlassian Jira
(v8.20.10#820010)