Hi Flink devs,

I'm working with the DataStream API V2 and noticed that state TTL cannot be 
configured when using the ProcessFunction V2 declarative state 
API(usesStates()+StateDeclaration).

Background: two parallel state hierarchies

The V2 architecture has two separate state type hierarchies,bridged 
byDefaultStateManager:

-

User-facing declarationsinflink-core-api(org.apache.flink.api.common.state)

- StateDeclaration(interface) →ValueStateDeclaration,MapStateDeclaration, etc.
- No TTL methods. Returned by ProcessFunction'susesStates().

-

Internal descriptorsinflink-core(org.apache.flink.api.common.state.v2)

- StateDescriptor<T>(abstract class) →ValueStateDescriptor,MapStateDescriptor, 
etc.
- HasenableTimeToLive(StateTtlConfig)andgetTtlConfig().
- This TTL support was introduced as part of FLINK-35262 (commit3e68422) by 
implementingDefaultKeyedStateStoreV2.getValueState(ValueStateDescriptor).

These hierarchies haveno inheritance relationship—ValueStateDeclarationdoes not 
extendValueStateDescriptor.

The gap:DefaultStateManager.getStateOptional()drops TTL

The bridge lives 
inorg.apache.flink.datastream.impl.context.DefaultStateManager.For each state 
type,getStateOptional()extractsnameand type information from 
theStateDeclarationand constructs a freshStateDescriptor—but never 
callsenableTimeToLive():

// DefaultStateManager.getStateOptional(ValueStateDeclaration)

// File: flink-datastream/.../datastream/impl/context/DefaultStateManager.java

@Override

public

<T> Optional<ValueState<T>> getStateOptional(ValueStateDeclaration<T> 
stateDeclaration)

throws

Exception {

ValueStateDescriptor<T> valueStateDescriptor =

new

ValueStateDescriptor<>(

stateDeclaration.getName(),

TypeExtractor.createTypeInfo(

stateDeclaration.getTypeDescriptor().getTypeClass()));

// ☝️ enableTimeToLive() is never called — TTL defaults to DISABLED

return

Optional.ofNullable(operatorContext.getValueState(valueStateDescriptor));

}

The same pattern applies to all state types inDefaultStateManager:

- getStateOptional(ListStateDeclaration)— createsListStateDescriptor, no TTL
- getStateOptional(MapStateDeclaration)— createsMapStateDescriptor, no TTL
- getStateOptional(ReducingStateDeclaration)— createsReducingStateDescriptor, 
no TTL
- getStateOptional(AggregatingStateDeclaration)— 
createsAggregatingStateDescriptor, no TTL

Why this matters

A user writing a V2 
ProcessFunction(e.g.,TwoOutputEventTimeStreamProcessFunction)must declare state 
viausesStates()returningSet<StateDeclaration>.State is then accessed 
throughctx.getStateManager().getState(decl).At no point in this path can the 
user configureStateTtlConfig.The alternative path shown in the V2 state 
docs—RuntimeContext.getState(StateDescriptor)withenableTimeToLive()—is only 
available inRichFunction,not in V2 ProcessFunction.

Proposed fix

The simplest approach:addgetTtlConfig()toStateDeclarationand 
haveDefaultStateManagerapply it:

// StateDeclaration.java — add:

@Nonnull

default

StateTtlConfig getTtlConfig() {

return

StateTtlConfig.DISABLED;

// preserves current behavior

}

// DefaultStateManager.getStateOptional() — apply:

ValueStateDescriptor<T> valueStateDescriptor =

new

ValueStateDescriptor<>(

stateDeclaration.getName(),

TypeExtractor.createTypeInfo(

stateDeclaration.getTypeDescriptor().getTypeClass()));

valueStateDescriptor.enableTimeToLive(stateDeclaration.getTtlConfig());

SinceStateTtlConfig.DISABLEDis the default,this is fully 
backward-compatible.The 
concreteValueStateDeclarationBuilder,MapStateDeclarationBuilder,etc.would also 
need a newwithTtlConfig()method to make it usable.

Related tickets

- FLINK-34547: [Umbrella] FLIP-408: Introduce DataStream API V2
- FLINK-34977: FLIP-433: State Access on DataStream API V2
- FLINK-34549: FLIP-410: Config, Context and Processing Timer Service of 
DataStream API V2
- FLINK-35262: Bridge between AsyncKeyedStateBackend and 
AsyncExecutionController

Would this approach make sense,or is there a deliberate reason for the 
omission?Happy to contribute a PR if folks agree on the direction.

Thanks,
Karol

Reply via email to