&res created FLINK-16571:
----------------------------
Summary: Throw exception when current key are out of range
Key: FLINK-16571
URL: https://issues.apache.org/jira/browse/FLINK-16571
Project: Flink
Issue Type: Improvement
Components: API / Core
Affects Versions: 1.9.2
Reporter: &res
* I've got a stream of records, that are "keyed by" using a class whose
hashCode isn't stable across jvm instances.
* The records are then processed by a parallel operator, which is running on
several task managers on the cluster.
* A given task manager receive a record, calculates the hashCode of the key and
sends it to another task manager instance according to it's slot allocation
* The other task manager instance receives the record for a given parallel slot
(allocated by the other instance), calculate the hash for the record (which
doesn't match the original hash) and then I get this error:
{code}
java.lang.NullPointerException
at
org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:336)
at
org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:159)
at
org.apache.flink.runtime.state.heap.HeapValueState.update(HeapValueState.java:90)
{code}
It took me a while to figure out what's going on, and while the issue is on my
side, it would have helped if the InternalKeyContextImpl had complained in the
first place that the hashCode was out of range.
InternalKeyContextImpl#setCurrentKeyGroupIndex should complain when
currentKeyGroupIndex isn't in the keyGroupRange, suggesting that the hash code
function isn't stable.
A few notes:
* The reason I'm getting the issue is because on oracle jdk, Enum hashcode use
the memory address of the enum. I should use ordinal when hashing to ensure
stability
* The issue is more likely to happen is cluster.evenly-spread-out-slots is on
(which forces the job to be distributed to different instances)
Here's an example that replicates the issue. It has to run using oracle jvm on
a cluster of a few nodes, using luster.evenly-spread-out-slots:
{code:java}
import java.lang.reflect.Field;
import java.util.Random;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JobThatFails {
private static final Logger LOG = LoggerFactory.getLogger(JobThatFails.class);
private enum EnumKey {
ONE,
TWO,
THREE,
FOUR,
FIVE,
SIX,
SEVEN,
EIGHT,
NINE,
TEN
}
void defineGraph(StreamExecutionEnvironment env) {
int parallelism = 4;
DataStream<Tuple2<EnumKey, Integer>> src = null;
for (int i = 0; i < 10; ++i) {
DataStream<Tuple2<EnumKey, Integer>> eachSrc = env.addSource(new
IntegerSourceFunction());
if (src == null) {
src = eachSrc;
} else {
src = src.union(eachSrc);
}
}
src.keyBy(p -> p.f0)
.map(new StatefulCounter())
.name(StatefulCounter.class.getSimpleName())
.setParallelism(parallelism)
.addSink(
new SinkFunction<Integer>() {
@Override
public void invoke(Integer value, Context context) {}
});
}
private static class IntegerSourceFunction implements
SourceFunction<Tuple2<EnumKey, Integer>> {
volatile boolean running;
private IntegerSourceFunction() {}
@Override
public void run(SourceContext<Tuple2<EnumKey, Integer>> ctx) throws
Exception {
running = true;
Random random = new Random();
while (running) {
for (EnumKey e : EnumKey.values()) {
ctx.collect(Tuple2.of(e, random.nextInt(10)));
}
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running = false;
}
}
class StatefulCounter extends RichMapFunction<Tuple2<EnumKey, Integer>,
Integer> {
private transient ValueState<Integer> stateHandle;
@Override
public Integer map(Tuple2<EnumKey, Integer> value) throws Exception {
LOG.info(getDebugMessage());
Integer state = stateHandle.value();
if (state == null) {
state = 0;
}
state = state + value.f1;
stateHandle.update(state);
return state;
}
@Override
public void open(Configuration configuration) {
stateHandle =
getRuntimeContext()
.getState(
new ValueStateDescriptor<>(
JobThatFails.class.getSimpleName(),
TypeInformation.of(Integer.class)));
}
private String getDebugMessage() {
Object stateTable = JobThatFails.readField(stateHandle, "stateTable");
Object keyContext = JobThatFails.readField(stateTable, "keyContext");
Object currentNamespace = JobThatFails.readField(stateHandle,
"currentNamespace");
Object length = ((Object[]) JobThatFails.readField(stateTable,
"keyGroupedStateMaps")).length;
Object keyGroupOffset = JobThatFails.readField(stateTable,
"keyGroupOffset");
JobThatFails.readField(keyContext, "currentKeyGroupIndex");
KeyGroupRange keyGroupRange =
(KeyGroupRange) JobThatFails.readField(keyContext, "keyGroupRange");
return String.format(
"numberOfKeyGroups=%s currentKey=%s currentKeyGroupIndex=%s
keyGroupOffset=%s currentNamespace=%s length=%s startKeyGroup=%s
endKeyGroup=%s",
JobThatFails.readField(keyContext, "numberOfKeyGroups"),
JobThatFails.readField(keyContext, "currentKey"),
JobThatFails.readField(keyContext, "currentKeyGroupIndex"),
keyGroupOffset,
currentNamespace,
length,
keyGroupRange.getStartKeyGroup(),
keyGroupRange.getEndKeyGroup());
}
}
static Object readField(Object object, String name) {
try {
Field field = getField(name, object.getClass());
field.setAccessible(true);
return field.get(object);
} catch (IllegalAccessException | NoSuchFieldException | SecurityException
e) {
throw new RuntimeException(
String.format(
"Cannot read %s from %s %s", name,
object.getClass().getSimpleName(), object),
e);
}
}
static Field getField(String name, Class<?> clazz) throws
NoSuchFieldException {
try {
return clazz.getDeclaredField(name);
} catch (NoSuchFieldException e) {
if (clazz.getSuperclass() == null) {
throw e;
} else {
return getField(name, clazz.getSuperclass());
}
}
}
public static void main(String[] args) throws Exception {
JobThatFails job = new JobThatFails();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
job.defineGraph(env);
JobExecutionResult result = env.execute(JobThatFails.class.getSimpleName());
}
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)