Re: TTL in pyflink does not seem to work

2024-03-11 Thread Ivan Petrarka
Thanks! We’ve created and issue for that: 
https://issues.apache.org/jira/browse/FLINK-34625

Yeap, planning to use timers as workaround for now
On Mar 10, 2024 at 02:59 +0400, David Anderson , wrote:
> My guess is that this only fails when pyflink is used with the heap state 
> backend, in which case one possible workaround is to use the RocksDB state 
> backend instead. Another workaround would be to rely on timers in the process 
> function, and clear the state yourself.
>
> David
>
> > On Fri, Mar 8, 2024 at 12:29 AM lorenzo.affetti.ververica.com via user 
> >  wrote:
> > > Hello Ivan!
> > >
> > > Could you please create a JIRA issue out of this?
> > > That seem the proper place where to discuss this.
> > >
> > > It seems a bug as the two versions of the code you posted look identical, 
> > > and the behavior should be consistent.
> > > On Mar 7, 2024 at 13:09 +0100, Ivan Petrarka , 
> > > wrote:
> > > > Note, that in Java code, it prints `State: Null`, `State: Null`, as I 
> > > > was expecting in, unlike pyflink code
> > > > On Mar 7, 2024 at 15:59 +0400, Ivan Petrarka , 
> > > > wrote:
> > > > > Hi! I’ve created a basic pyflink pipeline with ttl and it does not 
> > > > > seem to work. I have reproduced the exact same code in Java and it 
> > > > > works!
> > > > >
> > > > > Is this a pyflink bug? If so - how can I report it? If not - what can 
> > > > > I try to do?
> > > > >
> > > > > Flink: 1.18.0
> > > > > image: flink:1.18.0-scala_2.12-java11
> > > > >
> > > > > Code to reproduce. I expect this code to print:  > > > > None> all the time. But it prints  and state value
> > > > >
> > > > > ```python
> > > > > import time
> > > > >
> > > > > from datetime import datetime
> > > > >
> > > > > from pyflink.common import Time, Types
> > > > > from pyflink.datastream import KeyedProcessFunction, RuntimeContext, 
> > > > > StreamExecutionEnvironment, TimeCharacteristic
> > > > > from pyflink.datastream.state import StateTtlConfig, 
> > > > > ValueStateDescriptor
> > > > >
> > > > >
> > > > > class Processor(KeyedProcessFunction):
> > > > >     def open(self, runtime_context: RuntimeContext):
> > > > >         state_descriptor = ValueStateDescriptor(
> > > > >             name="my_state",
> > > > >             value_type_info=Types.STRING(),
> > > > >         )
> > > > >
> > > > >         state_descriptor.enable_time_to_live(
> > > > >             ttl_config=StateTtlConfig.new_builder(Time.seconds(1))
> > > > >             .cleanup_incrementally(cleanup_size=10, 
> > > > > run_cleanup_for_every_record=True)
> > > > >             
> > > > > .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite)
> > > > >             
> > > > > .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> > > > >             .build()
> > > > >         )
> > > > >
> > > > >         self.state = runtime_context.get_state(state_descriptor)
> > > > >
> > > > >     def process_element(self, value: int, ctx: 
> > > > > KeyedProcessFunction.Context):
> > > > >         current_state = self.state.value()
> > > > >
> > > > >         print(datetime.now(), current_state)
> > > > >
> > > > >         if current_state is None:
> > > > >             self.state.update(str(datetime.now()))
> > > > >
> > > > >         time.sleep(1.5)
> > > > >
> > > > >
> > > > > if __name__ == "__main__":
> > > > >     # - Init environment
> > > > >
> > > > >     environment = 
> > > > > StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)
> > > > >
> > > > >     # - Setup pipeline
> > > > >
> > > > >     (
> > > > >         environment.set_parallelism(1)
> > > > >         .from_collection(
> > > > >             collection=list(range(10)),
> > > > >         )
> > > > >         .key_by

Re: TTL in pyflink does not seem to work

2024-03-07 Thread Ivan Petrarka
Note, that in Java code, it prints `State: Null`, `State: Null`, as I was 
expecting in, unlike pyflink code
On Mar 7, 2024 at 15:59 +0400, Ivan Petrarka , wrote:
> Hi! I’ve created a basic pyflink pipeline with ttl and it does not seem to 
> work. I have reproduced the exact same code in Java and it works!
>
> Is this a pyflink bug? If so - how can I report it? If not - what can I try 
> to do?
>
> Flink: 1.18.0
> image: flink:1.18.0-scala_2.12-java11
>
> Code to reproduce. I expect this code to print:  all 
> the time. But it prints  and state value
>
> ```python
> import time
>
> from datetime import datetime
>
> from pyflink.common import Time, Types
> from pyflink.datastream import KeyedProcessFunction, RuntimeContext, 
> StreamExecutionEnvironment, TimeCharacteristic
> from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor
>
>
> class Processor(KeyedProcessFunction):
>     def open(self, runtime_context: RuntimeContext):
>         state_descriptor = ValueStateDescriptor(
>             name="my_state",
>             value_type_info=Types.STRING(),
>         )
>
>         state_descriptor.enable_time_to_live(
>             ttl_config=StateTtlConfig.new_builder(Time.seconds(1))
>             .cleanup_incrementally(cleanup_size=10, 
> run_cleanup_for_every_record=True)
>             .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite)
>             
> .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>             .build()
>         )
>
>         self.state = runtime_context.get_state(state_descriptor)
>
>     def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
>         current_state = self.state.value()
>
>         print(datetime.now(), current_state)
>
>         if current_state is None:
>             self.state.update(str(datetime.now()))
>
>         time.sleep(1.5)
>
>
> if __name__ == "__main__":
>     # - Init environment
>
>     environment = 
> StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)
>
>     # - Setup pipeline
>
>     (
>         environment.set_parallelism(1)
>         .from_collection(
>             collection=list(range(10)),
>         )
>         .key_by(lambda value: 0)
>         .process(Processor())
>
>
>
>     )
>
>     # - Execute pipeline
>
>     environment.execute("ttl_test")
>
>
>
> ```
>
> ```java
> import org.apache.flink.api.common.state.StateTtlConfig;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.common.time.Time;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.metrics.Histogram;
> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
> import org.apache.flink.util.Collector;
>
> import java.io.IOException;
> import java.time.LocalDateTime;
>
> public class GameHistoryProcessor extends KeyedProcessFunction String, String> {
>
>
>     private transient ValueState state;
>
>
>     @Override
>     public void open(Configuration parameters) {
>         var stateTtlConfig = StateTtlConfig
>                 .newBuilder(Time.seconds(1))
> //                .cleanupFullSnapshot()
>                 .cleanupIncrementally(10, true)
>                 .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>                 
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>                 .build();
>
>         var stateDescriptor = new ValueStateDescriptor<>("state", 
> String.class);
>         stateDescriptor.enableTimeToLive(stateTtlConfig);
>
>         state = getRuntimeContext().getState(stateDescriptor);
>
>     }
>
>     @Override
>     public void processElement(String event, Context context, 
> Collector collector) throws IOException, InterruptedException {
>         var state = state.value();
>         System.out.println("State: " + state);
>
>         if (state == null) {
>             state = LocalDateTime.now().toString();
>             state.update(state);
>         }
>
>         Thread.sleep(1500);
>     }
> }```


TTL in pyflink does not seem to work

2024-03-07 Thread Ivan Petrarka
Hi! I’ve created a basic pyflink pipeline with ttl and it does not seem to 
work. I have reproduced the exact same code in Java and it works!

Is this a pyflink bug? If so - how can I report it? If not - what can I try to 
do?

Flink: 1.18.0
image: flink:1.18.0-scala_2.12-java11

Code to reproduce. I expect this code to print:  all 
the time. But it prints  and state value

```python
import time

from datetime import datetime

from pyflink.common import Time, Types
from pyflink.datastream import KeyedProcessFunction, RuntimeContext, 
StreamExecutionEnvironment, TimeCharacteristic
from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor


class Processor(KeyedProcessFunction):
    def open(self, runtime_context: RuntimeContext):
        state_descriptor = ValueStateDescriptor(
            name="my_state",
            value_type_info=Types.STRING(),
        )

        state_descriptor.enable_time_to_live(
            ttl_config=StateTtlConfig.new_builder(Time.seconds(1))
            .cleanup_incrementally(cleanup_size=10, 
run_cleanup_for_every_record=True)
            .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite)
            
.set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .build()
        )

        self.state = runtime_context.get_state(state_descriptor)

    def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
        current_state = self.state.value()

        print(datetime.now(), current_state)

        if current_state is None:
            self.state.update(str(datetime.now()))

        time.sleep(1.5)


if __name__ == "__main__":
    # - Init environment

    environment = 
StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)

    # - Setup pipeline

    (
        environment.set_parallelism(1)
        .from_collection(
            collection=list(range(10)),
        )
        .key_by(lambda value: 0)
        .process(Processor())



    )

    # - Execute pipeline

    environment.execute("ttl_test")



```

```java
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.io.IOException;
import java.time.LocalDateTime;

public class GameHistoryProcessor extends KeyedProcessFunction {


    private transient ValueState state;


    @Override
    public void open(Configuration parameters) {
        var stateTtlConfig = StateTtlConfig
                .newBuilder(Time.seconds(1))
//                .cleanupFullSnapshot()
                .cleanupIncrementally(10, true)
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();

        var stateDescriptor = new ValueStateDescriptor<>("state", String.class);
        stateDescriptor.enableTimeToLive(stateTtlConfig);

        state = getRuntimeContext().getState(stateDescriptor);

    }

    @Override
    public void processElement(String event, Context context, Collector 
collector) throws IOException, InterruptedException {
        var state = state.value();
        System.out.println("State: " + state);

        if (state == null) {
            state = LocalDateTime.now().toString();
            state.update(state);
        }

        Thread.sleep(1500);
    }
}```