Hello all,
Let's say I want to hold some state value derived during one
transformation, and then use that same state value in a subsequent
transformation? For example:
myStream
.keyBy(fieldID) // Some field ID, may be 0
.map(new MyStatefulMapper())
.map(new MySubsequentMapper())
....
Now, I define MyStatefulMapper in the usual fashion:
public class MyStatefulMapper extends RichFlatMapFunction<Tuple2<Long,
Long>, Tuple2<Long, Long>> {
/** * The ValueState handle. The first field is the count, the
second field a running sum. */
private transient ValueState<Tuple2<Long, Long>> sum;
@Override
public void flatMap(Tuple2<Long, Long> input,
Collector<Tuple2<Long, Long>> out) throws Exception {
// logic of accessing and updating the ValueState 'sum' above
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"mySum", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long,
Long>>() {}), // type information
Tuple2.of(0L, 0L)); // default value of the
state, if nothing was set
sum = getRuntimeContext().getState(descriptor);
}}
So, by now, RuntimeContext has registered a State holder named 'mySum'.
In the implementation of 'MySubsequentMapper', I need to access this State
holder named 'mySum', perhaps thus (my thinking, I may be wrong):
public class MySubsequentMapper extends
RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
/** * The ValueState handle. The first field is the count, the
second field a running sum. */
private transient ValueState<Tuple2<Long, Long>> aSubsequentSum;
private transient ValueState<Tuple2<Long, Long>> sum; // defined earlier
@Override
public void flatMap(Tuple2<Long, Long> input,
Collector<Tuple2<Long, Long>> out) throws Exception {
// logic of accessing and updating the ValueState 'aSubsequentSum' above
// but this logic depends on the current contents of ValueState
'sum' created earlier
}
@Override
public void open(Configuration config) {
// Logic to create ValueDescriptor for 'aSubsequentSum' which
is owned by this operator
// ...
// Question: now, how do I prepare for accessing 'sum' which
is a State holder, but created inside an earlier operator?
sum = getRuntimeContext().getState(descriptor) // how can I
pass the name 'mySum' (used in StateDescriptor)?
}}
I have two questions:
1) What I am trying to achieve: is that possible and even, advisable? If
not, then what is the alternative?
2) Is there a guarantee that Flink will execute MyStatefulOperator.open()
always before MySubsequentOperator.open() because of the lexical order of
appearance in the source code?
-- Nirmalya
--
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."