Re: TTL in pyflink does not seem to work

2024-03-11 Thread Ivan Petrarka
Thanks! We’ve created and issue for that: 
https://issues.apache.org/jira/browse/FLINK-34625

Yeap, planning to use timers as workaround for now
On Mar 10, 2024 at 02:59 +0400, David Anderson , wrote:
> My guess is that this only fails when pyflink is used with the heap state 
> backend, in which case one possible workaround is to use the RocksDB state 
> backend instead. Another workaround would be to rely on timers in the process 
> function, and clear the state yourself.
>
> David
>
> > On Fri, Mar 8, 2024 at 12:29 AM lorenzo.affetti.ververica.com via user 
> >  wrote:
> > > Hello Ivan!
> > >
> > > Could you please create a JIRA issue out of this?
> > > That seem the proper place where to discuss this.
> > >
> > > It seems a bug as the two versions of the code you posted look identical, 
> > > and the behavior should be consistent.
> > > On Mar 7, 2024 at 13:09 +0100, Ivan Petrarka , 
> > > wrote:
> > > > Note, that in Java code, it prints `State: Null`, `State: Null`, as I 
> > > > was expecting in, unlike pyflink code
> > > > On Mar 7, 2024 at 15:59 +0400, Ivan Petrarka , 
> > > > wrote:
> > > > > Hi! I’ve created a basic pyflink pipeline with ttl and it does not 
> > > > > seem to work. I have reproduced the exact same code in Java and it 
> > > > > works!
> > > > >
> > > > > Is this a pyflink bug? If so - how can I report it? If not - what can 
> > > > > I try to do?
> > > > >
> > > > > Flink: 1.18.0
> > > > > image: flink:1.18.0-scala_2.12-java11
> > > > >
> > > > > Code to reproduce. I expect this code to print:  > > > > None> all the time. But it prints  and state value
> > > > >
> > > > > ```python
> > > > > import time
> > > > >
> > > > > from datetime import datetime
> > > > >
> > > > > from pyflink.common import Time, Types
> > > > > from pyflink.datastream import KeyedProcessFunction, RuntimeContext, 
> > > > > StreamExecutionEnvironment, TimeCharacteristic
> > > > > from pyflink.datastream.state import StateTtlConfig, 
> > > > > ValueStateDescriptor
> > > > >
> > > > >
> > > > > class Processor(KeyedProcessFunction):
> > > > >     def open(self, runtime_context: RuntimeContext):
> > > > >         state_descriptor = ValueStateDescriptor(
> > > > >             name="my_state",
> > > > >             value_type_info=Types.STRING(),
> > > > >         )
> > > > >
> > > > >         state_descriptor.enable_time_to_live(
> > > > >             ttl_config=StateTtlConfig.new_builder(Time.seconds(1))
> > > > >             .cleanup_incrementally(cleanup_size=10, 
> > > > > run_cleanup_for_every_record=True)
> > > > >             
> > > > > .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite)
> > > > >             
> > > > > .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> > > > >             .build()
> > > > >         )
> > > > >
> > > > >         self.state = runtime_context.get_state(state_descriptor)
> > > > >
> > > > >     def process_element(self, value: int, ctx: 
> > > > > KeyedProcessFunction.Context):
> > > > >         current_state = self.state.value()
> > > > >
> > > > >         print(datetime.now(), current_state)
> > > > >
> > > > >         if current_state is None:
> > > > >             self.state.update(str(datetime.now()))
> > > > >
> > > > >         time.sleep(1.5)
> > > > >
> > > > >
> > > > > if __name__ == "__main__":
> > > > >     # - Init environment
> > > > >
> > > > >     environment = 
> > > > > StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)
> > > > >
> > > > >     # - Setup pipeline
> > > > >
> > > > >     (
> > > > >         environment.set_parallelism(1)
> > > > >         .from_collection(
> > > > >             collection=list(range(10)),
> > > > >         )
> > > > >         .key_by(lambda value: 0)
> > > > >         .process(Processor())
> > > > >
> > > > >
> > > > >
> > > > >     )
> > > > >
> > > > >     # - Execute pipeline
> > > > >
> > > > >     environment.execute("ttl_test")
> > > > >
> > > > >
> > > > >
> > > > > ```
> > > > >
> > > > > ```java
> > > > > import org.apache.flink.api.common.state.StateTtlConfig;
> > > > > import org.apache.flink.api.common.state.ValueState;
> > > > > import org.apache.flink.api.common.state.ValueStateDescriptor;
> > > > > import org.apache.flink.api.common.time.Time;
> > > > > import org.apache.flink.configuration.Configuration;
> > > > > import org.apache.flink.metrics.Histogram;
> > > > > import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
> > > > > import org.apache.flink.util.Collector;
> > > > >
> > > > > import java.io.IOException;
> > > > > import java.time.LocalDateTime;
> > > > >
> > > > > public class GameHistoryProcessor extends 
> > > > > KeyedProcessFunction {
> > > > >
> > > > >
> > > > >     private transient ValueState state;
> > > > >
> > > > >
> > > > >     @Override
> > > > >     public void open(Configuration parameters) {
> > > > >         var stateTtlConfig = StateTtlConfig
> > > > >                 .newBuilder(Time.seconds(1))
> > > > 

Re: TTL in pyflink does not seem to work

2024-03-09 Thread David Anderson
My guess is that this only fails when pyflink is used with the heap state
backend, in which case one possible workaround is to use the RocksDB state
backend instead. Another workaround would be to rely on timers in the
process function, and clear the state yourself.

David

On Fri, Mar 8, 2024 at 12:29 AM lorenzo.affetti.ververica.com via user <
user@flink.apache.org> wrote:

> Hello Ivan!
>
> Could you please create a JIRA issue out of this?
> That seem the proper place where to discuss this.
>
> It seems a bug as the two versions of the code you posted look identical,
> and the behavior should be consistent.
> On Mar 7, 2024 at 13:09 +0100, Ivan Petrarka ,
> wrote:
>
> Note, that in Java code, it prints `State: Null`, `State: Null`, as I was
> expecting in, unlike pyflink code
> On Mar 7, 2024 at 15:59 +0400, Ivan Petrarka ,
> wrote:
>
> Hi! I’ve created a basic pyflink pipeline with ttl and it does not seem to
> work. I have reproduced the exact same code in Java and it works!
>
> Is this a pyflink bug? If so - how can I report it? If not - what can I
> try to do?
>
> Flink: 1.18.0
> image: flink:1.18.0-scala_2.12-java11
>
> Code to reproduce. I expect this code to print: 
> all the time. But it prints  and state value
>
> ```python
> import time
>
> from datetime import datetime
>
> from pyflink.common import Time, Types
> from pyflink.datastream import KeyedProcessFunction, RuntimeContext,
> StreamExecutionEnvironment, TimeCharacteristic
> from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor
>
>
> class Processor(KeyedProcessFunction):
> def open(self, runtime_context: RuntimeContext):
> state_descriptor = ValueStateDescriptor(
> name="my_state",
> value_type_info=Types.STRING(),
> )
>
> state_descriptor.enable_time_to_live(
> ttl_config=StateTtlConfig.new_builder(Time.seconds(1))
> .cleanup_incrementally(cleanup_size=10,
> run_cleanup_for_every_record=True)
> .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite)
>
>  .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> .build()
> )
>
> self.state = runtime_context.get_state(state_descriptor)
>
> def process_element(self, value: int, ctx:
> KeyedProcessFunction.Context):
> current_state = self.state.value()
>
> print(datetime.now(), current_state)
>
> if current_state is None:
> self.state.update(str(datetime.now()))
>
> time.sleep(1.5)
>
>
> if __name__ == "__main__":
> # - Init environment
>
> environment =
> StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)
>
> # - Setup pipeline
>
> (
> environment.set_parallelism(1)
> .from_collection(
> collection=list(range(10)),
> )
> .key_by(lambda value: 0)
> .process(Processor())
>
>
>
> )
>
> # - Execute pipeline
>
> environment.execute("ttl_test")
>
>
>
> ```
>
> ```java
> import org.apache.flink.api.common.state.StateTtlConfig;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.common.time.Time;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.metrics.Histogram;
> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
> import org.apache.flink.util.Collector;
>
> import java.io.IOException;
> import java.time.LocalDateTime;
>
> public class GameHistoryProcessor extends KeyedProcessFunction String, String> {
>
>
> private transient ValueState state;
>
>
> @Override
> public void open(Configuration parameters) {
> var stateTtlConfig = StateTtlConfig
> .newBuilder(Time.seconds(1))
> //.cleanupFullSnapshot()
> .cleanupIncrementally(10, true)
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>
>  .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> .build();
>
> var stateDescriptor = new ValueStateDescriptor<>("state",
> String.class);
> stateDescriptor.enableTimeToLive(stateTtlConfig);
>
> state = getRuntimeContext().getState(stateDescriptor);
>
> }
>
> @Override
> public void processElement(String event, Context context,
> Collector collector) throws IOException, InterruptedException {
> var state = state.value();
> System.out.println("State: " + state);
>
> if (state == null) {
> state = LocalDateTime.now().toString();
> state.update(state);
> }
>
> Thread.sleep(1500);
> }
> }```
>
>


Re: TTL in pyflink does not seem to work

2024-03-08 Thread lorenzo.affetti.ververica.com via user
Hello Ivan!

Could you please create a JIRA issue out of this?
That seem the proper place where to discuss this.

It seems a bug as the two versions of the code you posted look identical, and 
the behavior should be consistent.
On Mar 7, 2024 at 13:09 +0100, Ivan Petrarka , wrote:
> Note, that in Java code, it prints `State: Null`, `State: Null`, as I was 
> expecting in, unlike pyflink code
> On Mar 7, 2024 at 15:59 +0400, Ivan Petrarka , wrote:
> > Hi! I’ve created a basic pyflink pipeline with ttl and it does not seem to 
> > work. I have reproduced the exact same code in Java and it works!
> >
> > Is this a pyflink bug? If so - how can I report it? If not - what can I try 
> > to do?
> >
> > Flink: 1.18.0
> > image: flink:1.18.0-scala_2.12-java11
> >
> > Code to reproduce. I expect this code to print:  
> > all the time. But it prints  and state value
> >
> > ```python
> > import time
> >
> > from datetime import datetime
> >
> > from pyflink.common import Time, Types
> > from pyflink.datastream import KeyedProcessFunction, RuntimeContext, 
> > StreamExecutionEnvironment, TimeCharacteristic
> > from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor
> >
> >
> > class Processor(KeyedProcessFunction):
> >     def open(self, runtime_context: RuntimeContext):
> >         state_descriptor = ValueStateDescriptor(
> >             name="my_state",
> >             value_type_info=Types.STRING(),
> >         )
> >
> >         state_descriptor.enable_time_to_live(
> >             ttl_config=StateTtlConfig.new_builder(Time.seconds(1))
> >             .cleanup_incrementally(cleanup_size=10, 
> > run_cleanup_for_every_record=True)
> >             .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite)
> >             
> > .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> >             .build()
> >         )
> >
> >         self.state = runtime_context.get_state(state_descriptor)
> >
> >     def process_element(self, value: int, ctx: 
> > KeyedProcessFunction.Context):
> >         current_state = self.state.value()
> >
> >         print(datetime.now(), current_state)
> >
> >         if current_state is None:
> >             self.state.update(str(datetime.now()))
> >
> >         time.sleep(1.5)
> >
> >
> > if __name__ == "__main__":
> >     # - Init environment
> >
> >     environment = 
> > StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)
> >
> >     # - Setup pipeline
> >
> >     (
> >         environment.set_parallelism(1)
> >         .from_collection(
> >             collection=list(range(10)),
> >         )
> >         .key_by(lambda value: 0)
> >         .process(Processor())
> >
> >
> >
> >     )
> >
> >     # - Execute pipeline
> >
> >     environment.execute("ttl_test")
> >
> >
> >
> > ```
> >
> > ```java
> > import org.apache.flink.api.common.state.StateTtlConfig;
> > import org.apache.flink.api.common.state.ValueState;
> > import org.apache.flink.api.common.state.ValueStateDescriptor;
> > import org.apache.flink.api.common.time.Time;
> > import org.apache.flink.configuration.Configuration;
> > import org.apache.flink.metrics.Histogram;
> > import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
> > import org.apache.flink.util.Collector;
> >
> > import java.io.IOException;
> > import java.time.LocalDateTime;
> >
> > public class GameHistoryProcessor extends KeyedProcessFunction > String, String> {
> >
> >
> >     private transient ValueState state;
> >
> >
> >     @Override
> >     public void open(Configuration parameters) {
> >         var stateTtlConfig = StateTtlConfig
> >                 .newBuilder(Time.seconds(1))
> > //                .cleanupFullSnapshot()
> >                 .cleanupIncrementally(10, true)
> >                 .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> >                 
> > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> >                 .build();
> >
> >         var stateDescriptor = new ValueStateDescriptor<>("state", 
> > String.class);
> >         stateDescriptor.enableTimeToLive(stateTtlConfig);
> >
> >         state = getRuntimeContext().getState(stateDescriptor);
> >
> >     }
> >
> >     @Override
> >     public void processElement(String event, Context context, 
> > Collector collector) throws IOException, InterruptedException {
> >         var state = state.value();
> >         System.out.println("State: " + state);
> >
> >         if (state == null) {
> >             state = LocalDateTime.now().toString();
> >             state.update(state);
> >         }
> >
> >         Thread.sleep(1500);
> >     }
> > }```


Re: TTL in pyflink does not seem to work

2024-03-07 Thread Ivan Petrarka
Note, that in Java code, it prints `State: Null`, `State: Null`, as I was 
expecting in, unlike pyflink code
On Mar 7, 2024 at 15:59 +0400, Ivan Petrarka , wrote:
> Hi! I’ve created a basic pyflink pipeline with ttl and it does not seem to 
> work. I have reproduced the exact same code in Java and it works!
>
> Is this a pyflink bug? If so - how can I report it? If not - what can I try 
> to do?
>
> Flink: 1.18.0
> image: flink:1.18.0-scala_2.12-java11
>
> Code to reproduce. I expect this code to print:  all 
> the time. But it prints  and state value
>
> ```python
> import time
>
> from datetime import datetime
>
> from pyflink.common import Time, Types
> from pyflink.datastream import KeyedProcessFunction, RuntimeContext, 
> StreamExecutionEnvironment, TimeCharacteristic
> from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor
>
>
> class Processor(KeyedProcessFunction):
>     def open(self, runtime_context: RuntimeContext):
>         state_descriptor = ValueStateDescriptor(
>             name="my_state",
>             value_type_info=Types.STRING(),
>         )
>
>         state_descriptor.enable_time_to_live(
>             ttl_config=StateTtlConfig.new_builder(Time.seconds(1))
>             .cleanup_incrementally(cleanup_size=10, 
> run_cleanup_for_every_record=True)
>             .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite)
>             
> .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>             .build()
>         )
>
>         self.state = runtime_context.get_state(state_descriptor)
>
>     def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
>         current_state = self.state.value()
>
>         print(datetime.now(), current_state)
>
>         if current_state is None:
>             self.state.update(str(datetime.now()))
>
>         time.sleep(1.5)
>
>
> if __name__ == "__main__":
>     # - Init environment
>
>     environment = 
> StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)
>
>     # - Setup pipeline
>
>     (
>         environment.set_parallelism(1)
>         .from_collection(
>             collection=list(range(10)),
>         )
>         .key_by(lambda value: 0)
>         .process(Processor())
>
>
>
>     )
>
>     # - Execute pipeline
>
>     environment.execute("ttl_test")
>
>
>
> ```
>
> ```java
> import org.apache.flink.api.common.state.StateTtlConfig;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.common.time.Time;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.metrics.Histogram;
> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
> import org.apache.flink.util.Collector;
>
> import java.io.IOException;
> import java.time.LocalDateTime;
>
> public class GameHistoryProcessor extends KeyedProcessFunction String, String> {
>
>
>     private transient ValueState state;
>
>
>     @Override
>     public void open(Configuration parameters) {
>         var stateTtlConfig = StateTtlConfig
>                 .newBuilder(Time.seconds(1))
> //                .cleanupFullSnapshot()
>                 .cleanupIncrementally(10, true)
>                 .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>                 
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>                 .build();
>
>         var stateDescriptor = new ValueStateDescriptor<>("state", 
> String.class);
>         stateDescriptor.enableTimeToLive(stateTtlConfig);
>
>         state = getRuntimeContext().getState(stateDescriptor);
>
>     }
>
>     @Override
>     public void processElement(String event, Context context, 
> Collector collector) throws IOException, InterruptedException {
>         var state = state.value();
>         System.out.println("State: " + state);
>
>         if (state == null) {
>             state = LocalDateTime.now().toString();
>             state.update(state);
>         }
>
>         Thread.sleep(1500);
>     }
> }```


TTL in pyflink does not seem to work

2024-03-07 Thread Ivan Petrarka
Hi! I’ve created a basic pyflink pipeline with ttl and it does not seem to 
work. I have reproduced the exact same code in Java and it works!

Is this a pyflink bug? If so - how can I report it? If not - what can I try to 
do?

Flink: 1.18.0
image: flink:1.18.0-scala_2.12-java11

Code to reproduce. I expect this code to print:  all 
the time. But it prints  and state value

```python
import time

from datetime import datetime

from pyflink.common import Time, Types
from pyflink.datastream import KeyedProcessFunction, RuntimeContext, 
StreamExecutionEnvironment, TimeCharacteristic
from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor


class Processor(KeyedProcessFunction):
    def open(self, runtime_context: RuntimeContext):
        state_descriptor = ValueStateDescriptor(
            name="my_state",
            value_type_info=Types.STRING(),
        )

        state_descriptor.enable_time_to_live(
            ttl_config=StateTtlConfig.new_builder(Time.seconds(1))
            .cleanup_incrementally(cleanup_size=10, 
run_cleanup_for_every_record=True)
            .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite)
            
.set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .build()
        )

        self.state = runtime_context.get_state(state_descriptor)

    def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
        current_state = self.state.value()

        print(datetime.now(), current_state)

        if current_state is None:
            self.state.update(str(datetime.now()))

        time.sleep(1.5)


if __name__ == "__main__":
    # - Init environment

    environment = 
StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)

    # - Setup pipeline

    (
        environment.set_parallelism(1)
        .from_collection(
            collection=list(range(10)),
        )
        .key_by(lambda value: 0)
        .process(Processor())



    )

    # - Execute pipeline

    environment.execute("ttl_test")



```

```java
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.io.IOException;
import java.time.LocalDateTime;

public class GameHistoryProcessor extends KeyedProcessFunction {


    private transient ValueState state;


    @Override
    public void open(Configuration parameters) {
        var stateTtlConfig = StateTtlConfig
                .newBuilder(Time.seconds(1))
//                .cleanupFullSnapshot()
                .cleanupIncrementally(10, true)
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();

        var stateDescriptor = new ValueStateDescriptor<>("state", String.class);
        stateDescriptor.enableTimeToLive(stateTtlConfig);

        state = getRuntimeContext().getState(stateDescriptor);

    }

    @Override
    public void processElement(String event, Context context, Collector 
collector) throws IOException, InterruptedException {
        var state = state.value();
        System.out.println("State: " + state);

        if (state == null) {
            state = LocalDateTime.now().toString();
            state.update(state);
        }

        Thread.sleep(1500);
    }
}```