[
https://issues.apache.org/jira/browse/NIFI-16003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Alaksiej Ščarbaty updated NIFI-16003:
-------------------------------------
Description:
h1. ConsumeKinesis legacy checkpoint migration can drop the migration table
before the new checkpoint table is populated
h2. Affected component
_nifi-aws-kinesis_ bundle: _LegacyCheckpointMigrator_ class.
h2. Problem
_LegacyCheckpointMigrator.renameMigrationTable_ performs:
1. _deleteTable(checkpointTableName)_ — drop the legacy table.
2. {_}waitForTableDeleted(checkpointTableName){_}.
3. _createNewSchemaTable(checkpointTableName)_ — recreate with new schema, same
name.
4. {_}waitForTableActive(checkpointTableName){_}.
5. {_}copyCheckpointItems(migrationTableName → checkpointTableName){_}.
6. {_}deleteTable(migrationTableName){_}.
If step 5 throws after step 3, the only surviving copy of the migrated state is
in migration{_}. On restart, KinesisShardManager.ensureCheckpointTableExists{_}
sees _case NEW_ (recreated table has new schema) and calls
{_}cleanupLingeringMigration{_}, which {*}deletes the migration table without
rerunning the copy{*}. The "legacy checkpoint table retains original data"
comment is wrong here — the legacy table was already deleted in step 1.
Result: silent loss of all per-shard checkpoint state; every shard restarts
from {_}INITIAL_STREAM_POSITION{_}.
h2. Observed trigger: DynamoDB request-router schema cache
After the original table has been recreated with a new key schema, writing
records to it might result in the following error, identifying that some layer
in the DynamoDB request processing still sees the old table schema.
{{DynamoDbException: One or more parameter values were invalid:}}
{{Missing the key leaseKey in the item (Status Code: 400)}}
The assumption is that step 5 fails on a fraction of streams because the
same-name delete+recreate races against DynamoDB's request-router cache. Per
the [USENIX ATC 2022
paper|https://cdn.amazon.science/33/9d/b77f13fe49a798ece85cf3f9be6d/amazon-dynamodb-a-scalable-predictably-performant-and-fully-managed-nosql-database-service.pdf]
(6.6 Metadata availability), routers cache table key schema. The metadata
service updates immediately and _DescribeTable_ returns {_}ACTIVE{_}, but
routers keep old key schema in the cache. The first _PutItem_ hits stale cache
and is validated against the old schema.
_ValidationException_ is non-retryable, so step 5 terminates on the first item.
(Any transient step-5 error reproduces the same outcome — the cache race is
just the most reliable trigger.)
Observed in real-world: 6 of 9 streams failed on a single upgrade. Streams
whose first put landed >50 ms after _ACTIVE_ succeeded; those landing within
~15–30 ms failed and lost state on restart.
h2. Proposed fix
In {_}KinesisShardManager.ensureCheckpointTableExists{_}, treat _case NEW_ +
migration-table-present as an {*}incomplete migration{*}. A clean
_renameMigrationTable_ run deletes the migration table at the end, so its
presence proves a prior attempt failed between steps 3 and 6.
Replace _cleanupLingeringMigration_ in this branch with a recovery that:
- Re-runs {_}copyCheckpointItems(migrationTableName, checkpointTableName){_}.
- Only after that deletes the lingering migration table.
h2. Impact
For _TRIM_HORIZON_ initial checkpoint position, total checkpoint loss can mean
reprocessing up to 24 h of records and downstream duplicates.
For _LATEST_ initial checkpoint position, total checkpoint loss means a
potential loss of data.
was:
h1. ConsumeKinesis legacy checkpoint migration can drop the migration table
before the new checkpoint table is populated
h2. Affected component
_nifi-aws-kinesis_ bundle: _LegacyCheckpointMigrator_ class.
h2. Problem
_LegacyCheckpointMigrator.renameMigrationTable_ performs:
1. _deleteTable(checkpointTableName)_ — drop the legacy table.
2. {_}waitForTableDeleted(checkpointTableName){_}.
3. _createNewSchemaTable(checkpointTableName)_ — recreate with new schema, same
name.
4. {_}waitForTableActive(checkpointTableName){_}.
5. {_}copyCheckpointItems(migrationTableName → checkpointTableName){_}.
6. {_}deleteTable(migrationTableName){_}.
If step 5 throws after step 3, the only surviving copy of the migrated state is
in \{_}{_}migration{_}. On restart,
_KinesisShardManager.ensureCheckpointTableExists_ sees _case NEW_ (recreated
table has new schema) and calls {_}cleanupLingeringMigration{_}, which
{*}deletes the migration table without rerunning the copy{*}. The "legacy
checkpoint table retains original data" comment is wrong here — the legacy
table was already deleted in step 1.
Result: silent loss of all per-shard checkpoint state; every shard restarts
from {_}INITIAL_STREAM_POSITION{_}.
h2. Observed trigger: DynamoDB request-router schema cache
After the original table has been recreated with a new key schema, writing
records to it might result in the following error, identifying that some layer
in the DynamoDB request processing still sees the old table schema.
{{DynamoDbException: One or more parameter values were invalid:}}
{{Missing the key leaseKey in the item (Status Code: 400)}}
The assumption is that step 5 fails on a fraction of streams because the
same-name delete+recreate races against DynamoDB's request-router cache. Per
the [USENIX ATC 2022
paper|https://cdn.amazon.science/33/9d/b77f13fe49a798ece85cf3f9be6d/amazon-dynamodb-a-scalable-predictably-performant-and-fully-managed-nosql-database-service.pdf]
(6.6 Metadata availability), routers cache table key schema. The metadata
service updates immediately and _DescribeTable_ returns {_}ACTIVE{_}, but
routers keep old key schema in the cache. The first _PutItem_ hits stale cache
and is validated against the old schema.
_ValidationException_ is non-retryable, so step 5 terminates on the first item.
(Any transient step-5 error reproduces the same outcome — the cache race is
just the most reliable trigger.)
Observed in real-world: 6 of 9 streams failed on a single upgrade. Streams
whose first put landed >50 ms after _ACTIVE_ succeeded; those landing within
~15–30 ms failed and lost state on restart.
h2. Proposed fix
In {_}KinesisShardManager.ensureCheckpointTableExists{_}, treat _case NEW_ +
migration-table-present as an {*}incomplete migration{*}. A clean
_renameMigrationTable_ run deletes the migration table at the end, so its
presence proves a prior attempt failed between steps 3 and 6.
Replace _cleanupLingeringMigration_ in this branch with a recovery that:
- Re-runs {_}copyCheckpointItems(migrationTableName,
checkpointTableName){_}`(idempotent: items keyed on
{_}streamName{_}+{_}shardId{_}).
- Only after that deletes the lingering migration table.
h2. Impact
For _TRIM_HORIZON_ initial checkpoint position, total checkpoint loss can mean
reprocessing up to 24 h of records and downstream duplicates.
For _LATEST_ initial checkpoint position, total checkpoint loss means a
potential loss of data.
> ConsumeKinesis can drop checkpoints after a failed migration
> ------------------------------------------------------------
>
> Key: NIFI-16003
> URL: https://issues.apache.org/jira/browse/NIFI-16003
> Project: Apache NiFi
> Issue Type: Bug
> Components: Extensions
> Affects Versions: 2.9.0
> Reporter: Alaksiej Ščarbaty
> Assignee: Alaksiej Ščarbaty
> Priority: Major
>
> h1. ConsumeKinesis legacy checkpoint migration can drop the migration table
> before the new checkpoint table is populated
> h2. Affected component
> _nifi-aws-kinesis_ bundle: _LegacyCheckpointMigrator_ class.
> h2. Problem
> _LegacyCheckpointMigrator.renameMigrationTable_ performs:
> 1. _deleteTable(checkpointTableName)_ — drop the legacy table.
> 2. {_}waitForTableDeleted(checkpointTableName){_}.
> 3. _createNewSchemaTable(checkpointTableName)_ — recreate with new schema,
> same name.
> 4. {_}waitForTableActive(checkpointTableName){_}.
> 5. {_}copyCheckpointItems(migrationTableName → checkpointTableName){_}.
> 6. {_}deleteTable(migrationTableName){_}.
> If step 5 throws after step 3, the only surviving copy of the migrated state
> is in migration{_}. On restart,
> KinesisShardManager.ensureCheckpointTableExists{_} sees _case NEW_ (recreated
> table has new schema) and calls {_}cleanupLingeringMigration{_}, which
> {*}deletes the migration table without rerunning the copy{*}. The "legacy
> checkpoint table retains original data" comment is wrong here — the legacy
> table was already deleted in step 1.
> Result: silent loss of all per-shard checkpoint state; every shard restarts
> from {_}INITIAL_STREAM_POSITION{_}.
> h2. Observed trigger: DynamoDB request-router schema cache
> After the original table has been recreated with a new key schema, writing
> records to it might result in the following error, identifying that some
> layer in the DynamoDB request processing still sees the old table schema.
> {{DynamoDbException: One or more parameter values were invalid:}}
> {{Missing the key leaseKey in the item (Status Code: 400)}}
> The assumption is that step 5 fails on a fraction of streams because the
> same-name delete+recreate races against DynamoDB's request-router cache. Per
> the [USENIX ATC 2022
> paper|https://cdn.amazon.science/33/9d/b77f13fe49a798ece85cf3f9be6d/amazon-dynamodb-a-scalable-predictably-performant-and-fully-managed-nosql-database-service.pdf]
> (6.6 Metadata availability), routers cache table key schema. The metadata
> service updates immediately and _DescribeTable_ returns {_}ACTIVE{_}, but
> routers keep old key schema in the cache. The first _PutItem_ hits stale
> cache and is validated against the old schema.
> _ValidationException_ is non-retryable, so step 5 terminates on the first
> item. (Any transient step-5 error reproduces the same outcome — the cache
> race is just the most reliable trigger.)
> Observed in real-world: 6 of 9 streams failed on a single upgrade. Streams
> whose first put landed >50 ms after _ACTIVE_ succeeded; those landing within
> ~15–30 ms failed and lost state on restart.
> h2. Proposed fix
> In {_}KinesisShardManager.ensureCheckpointTableExists{_}, treat _case NEW_ +
> migration-table-present as an {*}incomplete migration{*}. A clean
> _renameMigrationTable_ run deletes the migration table at the end, so its
> presence proves a prior attempt failed between steps 3 and 6.
> Replace _cleanupLingeringMigration_ in this branch with a recovery that:
> - Re-runs {_}copyCheckpointItems(migrationTableName, checkpointTableName){_}.
> - Only after that deletes the lingering migration table.
> h2. Impact
> For _TRIM_HORIZON_ initial checkpoint position, total checkpoint loss can
> mean reprocessing up to 24 h of records and downstream duplicates.
> For _LATEST_ initial checkpoint position, total checkpoint loss means a
> potential loss of data.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)