[ 
https://issues.apache.org/jira/browse/NIFI-16003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alaksiej Ščarbaty updated NIFI-16003:
-------------------------------------
    Description: 
h1. ConsumeKinesis legacy checkpoint migration can drop the migration table 
before the new checkpoint table is populated
h2. Affected component

_nifi-aws-kinesis_ bundle: _LegacyCheckpointMigrator_ class.
h2. Problem

_LegacyCheckpointMigrator.renameMigrationTable_ performs:

1. _deleteTable(checkpointTableName)_ — drop the legacy table.
2. {_}waitForTableDeleted(checkpointTableName){_}.
3. _createNewSchemaTable(checkpointTableName)_ — recreate with new schema, same 
name.
4. {_}waitForTableActive(checkpointTableName){_}.
5. {_}copyCheckpointItems(migrationTableName → checkpointTableName){_}.
6. {_}deleteTable(migrationTableName){_}.

If step 5 throws after step 3, the only surviving copy of the migrated state is 
in \{_}{_}migration{_}. On restart, 
_KinesisShardManager.ensureCheckpointTableExists_ sees _case NEW_ (recreated 
table has new schema) and calls {_}cleanupLingeringMigration{_}, which 
{*}deletes the migration table without rerunning the copy{*}. The "legacy 
checkpoint table retains original data" comment is wrong here — the legacy 
table was already deleted in step 1.

Result: silent loss of all per-shard checkpoint state; every shard restarts 
from {_}INITIAL_STREAM_POSITION{_}.
h2. Observed trigger: DynamoDB request-router schema cache

After the original table has been recreated with a new key schema, writing 
records to it might result in the following error, identifying that some layer 
in the DynamoDB request processing still sees the old table schema. 

{{DynamoDbException: One or more parameter values were invalid:}}
{{Missing the key leaseKey in the item (Status Code: 400)}}

The assumption is that step 5 fails on a fraction of streams because the 
same-name delete+recreate races against DynamoDB's request-router cache. Per 
the [USENIX ATC 2022 
paper|https://cdn.amazon.science/33/9d/b77f13fe49a798ece85cf3f9be6d/amazon-dynamodb-a-scalable-predictably-performant-and-fully-managed-nosql-database-service.pdf]
 (6.6 Metadata availability), routers cache table key schema. The metadata 
service updates immediately and _DescribeTable_ returns {_}ACTIVE{_}, but 
routers keep old key schema in the cache. The first _PutItem_ hits stale cache 
and is validated against the old schema.

_ValidationException_ is non-retryable, so step 5 terminates on the first item. 
(Any transient step-5 error reproduces the same outcome — the cache race is 
just the most reliable trigger.)

Observed in real-world: 6 of 9 streams failed on a single upgrade. Streams 
whose first put landed >50 ms after _ACTIVE_ succeeded; those landing within 
~15–30 ms failed and lost state on restart.
h2. Proposed fix

In {_}KinesisShardManager.ensureCheckpointTableExists{_}, treat _case NEW_ + 
migration-table-present as an {*}incomplete migration{*}. A clean 
_renameMigrationTable_ run deletes the migration table at the end, so its 
presence proves a prior attempt failed between steps 3 and 6.

Replace _cleanupLingeringMigration_ in this branch with a recovery that:
 - Re-runs {_}copyCheckpointItems(migrationTableName, 
checkpointTableName){_}`(idempotent: items keyed on 
{_}streamName{_}+{_}shardId{_}).
 - Only after that deletes the lingering migration table.

h2. Impact

For _TRIM_HORIZON_ initial checkpoint position, total checkpoint loss can mean 
reprocessing up to 24 h of records and downstream duplicates.

For _LATEST_ initial checkpoint position, total checkpoint loss means a 
potential loss of data.

  was:
h1. ConsumeKinesis legacy checkpoint migration can drop the migration table 
before the new checkpoint table is populated
h2. Affected component

_nifi-aws-kinesis_ bundle: _LegacyCheckpointMigrator_ class.
h2. Problem

_LegacyCheckpointMigrator.renameMigrationTable_ performs:

1. _deleteTable(checkpointTableName)_ — drop the legacy table.
2. {_}waitForTableDeleted(checkpointTableName){_}.
3. _createNewSchemaTable(checkpointTableName)_ — recreate with new schema, same 
name.
4. {_}waitForTableActive(checkpointTableName){_}.
5. {_}copyCheckpointItems(migrationTableName → checkpointTableName){_}.
6. {_}deleteTable(migrationTableName){_}.

If step 5 throws after step 3, the only surviving copy of the migrated state is 
in {_}_migration{_}. On restart, 
_KinesisShardManager.ensureCheckpointTableExists_ sees _case NEW_ (recreated 
table has new schema) and calls {_}cleanupLingeringMigration{_}, which 
{*}deletes the migration table without rerunning the copy{*}. The "legacy 
checkpoint table retains original data" comment is wrong here — the legacy 
table was already deleted in step 1.

Result: silent loss of all per-shard checkpoint state; every shard restarts 
from {_}INITIAL_STREAM_POSITION{_}.
h2. Observed trigger: DynamoDB request-router schema cache

After the original table has been recreated with a new key schema, writing 
records to it might result in the following error, identifying that some layer 
in the DynamoDB request processing still sees the old table schema. 

{{DynamoDbException: One or more parameter values were invalid:}}
{{Missing the key leaseKey in the item (Status Code: 400)}}

The assumption is that step 5 fails on a fraction of streams because the 
same-name delete+recreate races against DynamoDB's request-router cache. Per 
the [USENIX ATC 2022 
paper|https://cdn.amazon.science/33/9d/b77f13fe49a798ece85cf3f9be6d/amazon-dynamodb-a-scalable-predictably-performant-and-fully-managed-nosql-database-service.pdf]
 (6.6 Metadata availability), routers cache table key schema. The metadata 
service updates immediately and _DescribeTable_ returns {_}ACTIVE{_}, but 
routers keep old key schema in the cache. The first _PutItem_ hits stale cache 
and is validated against the old schema.

_ValidationException_ is non-retryable, so step 5 terminates on the first item. 
(Any transient step-5 error reproduces the same outcome — the cache race is 
just the most reliable trigger.)

Observed in real-world: 6 of 9 streams failed on a single upgrade. Streams 
whose first put landed >50 ms after _ACTIVE_ succeeded; those landing within 
~15–30 ms failed and lost state on restart.
h2. Proposed fix

In {_}KinesisShardManager.ensureCheckpointTableExists{_}, treat _case NEW_ + 
migration-table-present as an {*}incomplete migration{*}. A clean 
_renameMigrationTable_ run deletes the migration table at the end, so its 
presence proves a prior attempt failed between steps 3 and 6.

Replace _cleanupLingeringMigration_ in this branch with a recovery that:
 - Re-runs {_}copyCheckpointItems(migrationTableName, 
checkpointTableName){_}`(idempotent: items keyed on 
{_}streamName{_}+{_}shardId{_}).
 - Only after that deletes the lingering migration table.

h2. Impact

For _TRIM_HORIZON_ initial checkpoint position, total checkpoint loss can mean 
reprocessing up to 24 h of records and downstream duplicates.

For _LATEST_ initial checkpoint position, total checkpoint loss means a 
potential loss of data.
 #  


> ConsumeKinesis can drop checkpoints after a failed migration
> ------------------------------------------------------------
>
>                 Key: NIFI-16003
>                 URL: https://issues.apache.org/jira/browse/NIFI-16003
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>    Affects Versions: 2.9.0
>            Reporter: Alaksiej Ščarbaty
>            Assignee: Alaksiej Ščarbaty
>            Priority: Major
>
> h1. ConsumeKinesis legacy checkpoint migration can drop the migration table 
> before the new checkpoint table is populated
> h2. Affected component
> _nifi-aws-kinesis_ bundle: _LegacyCheckpointMigrator_ class.
> h2. Problem
> _LegacyCheckpointMigrator.renameMigrationTable_ performs:
> 1. _deleteTable(checkpointTableName)_ — drop the legacy table.
> 2. {_}waitForTableDeleted(checkpointTableName){_}.
> 3. _createNewSchemaTable(checkpointTableName)_ — recreate with new schema, 
> same name.
> 4. {_}waitForTableActive(checkpointTableName){_}.
> 5. {_}copyCheckpointItems(migrationTableName → checkpointTableName){_}.
> 6. {_}deleteTable(migrationTableName){_}.
> If step 5 throws after step 3, the only surviving copy of the migrated state 
> is in \{_}{_}migration{_}. On restart, 
> _KinesisShardManager.ensureCheckpointTableExists_ sees _case NEW_ (recreated 
> table has new schema) and calls {_}cleanupLingeringMigration{_}, which 
> {*}deletes the migration table without rerunning the copy{*}. The "legacy 
> checkpoint table retains original data" comment is wrong here — the legacy 
> table was already deleted in step 1.
> Result: silent loss of all per-shard checkpoint state; every shard restarts 
> from {_}INITIAL_STREAM_POSITION{_}.
> h2. Observed trigger: DynamoDB request-router schema cache
> After the original table has been recreated with a new key schema, writing 
> records to it might result in the following error, identifying that some 
> layer in the DynamoDB request processing still sees the old table schema. 
> {{DynamoDbException: One or more parameter values were invalid:}}
> {{Missing the key leaseKey in the item (Status Code: 400)}}
> The assumption is that step 5 fails on a fraction of streams because the 
> same-name delete+recreate races against DynamoDB's request-router cache. Per 
> the [USENIX ATC 2022 
> paper|https://cdn.amazon.science/33/9d/b77f13fe49a798ece85cf3f9be6d/amazon-dynamodb-a-scalable-predictably-performant-and-fully-managed-nosql-database-service.pdf]
>  (6.6 Metadata availability), routers cache table key schema. The metadata 
> service updates immediately and _DescribeTable_ returns {_}ACTIVE{_}, but 
> routers keep old key schema in the cache. The first _PutItem_ hits stale 
> cache and is validated against the old schema.
> _ValidationException_ is non-retryable, so step 5 terminates on the first 
> item. (Any transient step-5 error reproduces the same outcome — the cache 
> race is just the most reliable trigger.)
> Observed in real-world: 6 of 9 streams failed on a single upgrade. Streams 
> whose first put landed >50 ms after _ACTIVE_ succeeded; those landing within 
> ~15–30 ms failed and lost state on restart.
> h2. Proposed fix
> In {_}KinesisShardManager.ensureCheckpointTableExists{_}, treat _case NEW_ + 
> migration-table-present as an {*}incomplete migration{*}. A clean 
> _renameMigrationTable_ run deletes the migration table at the end, so its 
> presence proves a prior attempt failed between steps 3 and 6.
> Replace _cleanupLingeringMigration_ in this branch with a recovery that:
>  - Re-runs {_}copyCheckpointItems(migrationTableName, 
> checkpointTableName){_}`(idempotent: items keyed on 
> {_}streamName{_}+{_}shardId{_}).
>  - Only after that deletes the lingering migration table.
> h2. Impact
> For _TRIM_HORIZON_ initial checkpoint position, total checkpoint loss can 
> mean reprocessing up to 24 h of records and downstream duplicates.
> For _LATEST_ initial checkpoint position, total checkpoint loss means a 
> potential loss of data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to