Dennis-Mircea commented on PR #28083:
URL: https://github.com/apache/flink/pull/28083#issuecomment-4469943757

   > ## Checkpoint compatibility
   > State format changed completely: `SequenceGenerator`'s per-generator 
`ListState<Long>` → `NumberSequenceSource`'s internal state. Existing 
checkpoints with sequence fields can't restore.
   > 
   > If DataGen is production-used, need migration path. If test-only (likely), 
document it:
   > 
   > ```java
   > /**
   >  * DataGen is for testing/dev only. State compatibility not guaranteed.
   >  * Do not use in production with checkpoint recovery.
   >  */
   > ```
   > 
   > Add to release notes as breaking change.
   > 
   > ## Missing tests
   > ### Checkpoint restore
   > ```java
   > @Test
   > void testCheckpointRestore() {
   >     // Run sequence [0,100] to index 50, checkpoint, restore, verify 
continues from 51
   > }
   > ```
   > 
   > ### Edge cases
   > ```java
   > @Test
   > void testNumberOfRowsZero() {
   >     // 'number-of-rows' = '0', expect immediate finish
   > }
   > 
   > @Test
   > void testNumberOfRowsExceedsLongMax() {
   >     // 'number-of-rows' = '9223372036854775807', verify no overflow
   > }
   > 
   > @Test
   > void testSequenceStartEqualsEnd() {
   >     // start = end = 42, expect single row with value 42
   > }
   > 
   > @Test
   > void testMultipleSequenceFieldsDifferentRanges() {
   >     // field1: [0, 1000000], field2: [0, 10]
   >     // number-of-rows unset → expect stops at 11 (min wins)
   > }
   > 
   > @Test
   > void testSequenceTinyintOverflow() {
   >     // TINYINT sequence [0, 10], number-of-rows = 300
   >     // expect stops at 11, NOT wraps to -128
   > }
   > 
   > @Test
   > void testParallelismGreaterThanOne() {
   >     // parallelism = 4, number-of-rows = 100
   >     // verify each subtask gets ~25, total = 100, no duplicates
   > }
   > 
   > @Test
   > void testRateLimiterAccuracy() {
   >     // rows-per-second = 100, run for 1s, expect 95-105 rows (5% tolerance)
   > }
   > ```
   > 
   > ### Nullability
   > ```java
   > @Test  
   > void testNullableFieldsWithRandomGenerator() {
   >     // Random generator on nullable column, verify nulls generated
   > }
   > ```
   > 
   > Clarify:
   > 
   > 1. Is DataGen ever used in production?
   > 2. Does FLIP-27 have auto state migration?
   > 3. Should old state detection fail fast with clear error?
   
   Thanks for the review @featzhang! Responses below.
   
   ### 1. Checkpoint compatibility
   
   Correct that the state layouts are unrelated. There is no automatic bridge, 
and to my knowledge no FLIP-27 migration in Flink (Kafka, file, Kinesis, …) has 
preserved state from its legacy `SourceFunction` predecessor. That's the 
established pattern.
   
   DataGen is already documented as a testing/dev source (the 
`flink-connector-datagen` module is described as "DataGen source for testing"), 
so I'd rather not duplicate that warning on the class Javadoc.
   
   ### 2. Missing tests
   
   Several of the items are already covered after your previous comment:
   
   - **Multiple sequence fields, smallest range wins** → 
`testEffectiveCountIsCappedBySequenceWhenRowCountLarger` exercises the 
`min`-of-`getTotalCount()` semantic directly on `computeEffectiveCount`.
   - **Parallelism > 1** → `testWithParallelism` (the existing assertion was 
extended to cover the FLIP-27 wiring).
   - **Nullable random fields** → null-rate logic was preserved byte-for-byte 
in `RandomGeneratorFunction#nextWithNullRate`; `testSource` / 
`testVariableLengthDataGeneration` exercise nullable columns end-to-end.
   
   I've added two of your suggestions in a follow-up commit:
   
   - `testNumberOfRowsZero` - asserts `computeEffectiveCount() == 0` and an 
empty result set.
   - `testSequenceStartEqualsEnd` - single-row sequence asserts 
`getTotalCount() == 1` and the exact emitted value.
   
   Declining the rest:
   
   - `testCheckpointRestore` - the restore path lives entirely in 
`NumberSequenceSource` and is covered by its own tests; this PR didn't change 
that surface.
   - `testNumberOfRowsExceedsLongMax` - `numberOfRows` is parsed as a `long`; 
there's no overflow path the connector controls.
   - `testSequenceTinyintOverflow` - `SequenceGeneratorFunction#map` computes 
`convert(start + idx)` where the source-level count is bounded by 
`getTotalCount() = end - start + 1`. With the cap in `computeEffectiveCount`, 
wrapping is unreachable; 
`testEffectiveCountIsCappedBySequenceWhenRowCountLarger` already pins this for 
`TINYINT` specifically.
   - `testRateLimiterAccuracy` - `RateLimiterStrategy.perSecond` is library 
code with its own tests.
   
   ### 3. Clarifications
   
   1. **DataGen in production?** No. It's a testing/dev source by design and by 
docs.
   2. **FLIP-27 state migration?** No automatic bridge from legacy 
`SourceFunction` state to FLIP-27 source state exists, here or anywhere else in 
Flink.
   3. **Old-state detection with a descriptive message?** The runtime already 
fails state assignment when an operator's checkpointed state can't be claimed 
by the new operator. Adding bespoke detection inside a test-only connector 
isn't worth the complexity.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to