Hi! I’ve created a basic pyflink pipeline with ttl and it does not seem to
work. I have reproduced the exact same code in Java and it works!
Is this a pyflink bug? If so - how can I report it? If not - what can I try to
do?
Flink: 1.18.0
image: flink:1.18.0-scala_2.12-java11
Code to reproduce. I expect this code to print: <current_datetime, None> all
the time. But it prints <current_datetime> and state value
```python
import time
from datetime import datetime
from pyflink.common import Time, Types
from pyflink.datastream import KeyedProcessFunction, RuntimeContext,
StreamExecutionEnvironment, TimeCharacteristic
from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor
class Processor(KeyedProcessFunction):
def open(self, runtime_context: RuntimeContext):
state_descriptor = ValueStateDescriptor(
name="my_state",
value_type_info=Types.STRING(),
)
state_descriptor.enable_time_to_live(
ttl_config=StateTtlConfig.new_builder(Time.seconds(1))
.cleanup_incrementally(cleanup_size=10,
run_cleanup_for_every_record=True)
.set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite)
.set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build()
)
self.state = runtime_context.get_state(state_descriptor)
def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
current_state = self.state.value()
print(datetime.now(), current_state)
if current_state is None:
self.state.update(str(datetime.now()))
time.sleep(1.5)
if __name__ == "__main__":
# - Init environment
environment =
StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)
# - Setup pipeline
(
environment.set_parallelism(1)
.from_collection(
collection=list(range(10)),
)
.key_by(lambda value: 0)
.process(Processor())
)
# - Execute pipeline
environment.execute("ttl_test")
```
```java
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.io.IOException;
import java.time.LocalDateTime;
public class GameHistoryProcessor extends KeyedProcessFunction<Integer, String,
String> {
private transient ValueState<String> state;
@Override
public void open(Configuration parameters) {
var stateTtlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
// .cleanupFullSnapshot()
.cleanupIncrementally(10, true)
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
var stateDescriptor = new ValueStateDescriptor<>("state", String.class);
stateDescriptor.enableTimeToLive(stateTtlConfig);
state = getRuntimeContext().getState(stateDescriptor);
}
@Override
public void processElement(String event, Context context, Collector<String>
collector) throws IOException, InterruptedException {
var state = state.value();
System.out.println("State: " + state);
if (state == null) {
state = LocalDateTime.now().toString();
state.update(state);
}
Thread.sleep(1500);
}
}```