[ https://issues.apache.org/jira/browse/FLINK-9401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sihua Zhou closed FLINK-9401. ----------------------------- Resolution: Invalid > Data lost when rescaling the job from incremental checkpoint > ------------------------------------------------------------ > > Key: FLINK-9401 > URL: https://issues.apache.org/jira/browse/FLINK-9401 > Project: Flink > Issue Type: Bug > Affects Versions: 1.5.0, 1.4.2 > Reporter: Sihua Zhou > Assignee: Sihua Zhou > Priority: Blocker > > We may lost data when rescaling job from incremental checkpoint because of > the following code. > {code:java} > try (RocksIteratorWrapper iterator = getRocksIterator(restoreDb, > columnFamilyHandle)) { > int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup(); > byte[] startKeyGroupPrefixBytes = new > byte[stateBackend.keyGroupPrefixBytes]; > for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { > startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> > ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE)); > } > iterator.seek(startKeyGroupPrefixBytes); > while (iterator.isValid()) { > int keyGroup = 0; > for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { > keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j]; > } > if (stateBackend.keyGroupRange.contains(keyGroup)) { > stateBackend.db.put(targetColumnFamilyHandle, > iterator.key(), iterator.value()); > } > iterator.next(); > } > } > {code} > For every state handle to fetch the target data, we > _seek(state.keyGroupRange.getStartKeyGroup())_, so the _iterator_ could be > INVALID immediately if the state handle's _start key group_ is bigger that > _state.keyGroupRange.getStartKeyGroup()_. Then, data lost... -- This message was sent by Atlassian JIRA (v7.6.3#76005)