Sihua Zhou created FLINK-9401: --------------------------------- Summary: Data lost when rescaling the job from incremental checkpoint Key: FLINK-9401 URL: https://issues.apache.org/jira/browse/FLINK-9401 Project: Flink Issue Type: Bug Affects Versions: 1.4.2, 1.5.0 Reporter: Sihua Zhou Assignee: Sihua Zhou
We may lost data when rescaling job from incremental checkpoint because of the following code. {code:java} try (RocksIteratorWrapper iterator = getRocksIterator(restoreDb, columnFamilyHandle)) { int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup(); byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes]; for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE)); } iterator.seek(startKeyGroupPrefixBytes); while (iterator.isValid()) { int keyGroup = 0; for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j]; } if (stateBackend.keyGroupRange.contains(keyGroup)) { stateBackend.db.put(targetColumnFamilyHandle, iterator.key(), iterator.value()); } iterator.next(); } } {code} For every state handle to fetch the target data, we _seek(state.keyGroupRange.getStartKeyGroup())_, so the _iterator_ could be INVALID immediately if the state handle's _start key group_ is bigger that _state.keyGroupRange.getStartKeyGroup()_. Then, data lost... -- This message was sent by Atlassian JIRA (v7.6.3#76005)