featzhang commented on code in PR #28083:
URL: https://github.com/apache/flink/pull/28083#discussion_r3235947871


##########
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSource.java:
##########
@@ -62,16 +68,61 @@ public DataGenTableSource(
 
     @Override
     public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
-        boolean isBounded = numberOfRows != null;
-        return SourceFunctionProvider.of(createSource(), isBounded, 
parallelism);
+        TypeInformation<RowData> typeInfo = 
context.createTypeInformation(rowDataType);
+        return SourceProvider.of(createSource(typeInfo), parallelism);
     }
 
     @VisibleForTesting
-    public DataGeneratorSource<RowData> createSource() {
+    public DataGeneratorSource<RowData> createSource(TypeInformation<RowData> 
typeInfo) {
         return new DataGeneratorSource<>(
-                new RowDataGenerator(fieldGenerators, 
DataType.getFieldNames(rowDataType), 0),
-                rowsPerSecond,
-                numberOfRows);
+                buildRowGenerator(),
+                computeEffectiveCount(),
+                RateLimiterStrategy.perSecond(rowsPerSecond),
+                typeInfo);
+    }
+
+    /**
+     * Returns the per-field generator function that produces a single {@link 
RowData} for a given
+     * sequence index. Exposed for tests that exercise the generation logic 
directly.
+     */
+    @VisibleForTesting
+    public GeneratorFunction<Long, RowData> buildRowGenerator() {
+        return new RowDataGenerator(fieldGenerators, 
DataType.getFieldNames(rowDataType), 0);
+    }
+
+    /**
+     * Computes the bound passed to {@link DataGeneratorSource}. When {@code 
numberOfRows} is
+     * configured we honor it; otherwise we cap the source at the smallest 
sequence-field range to
+     * preserve the legacy "halt when any sequence field is exhausted" 
semantic, falling back to
+     * {@link Long#MAX_VALUE} when no sequence fields are present.
+     */
+    @VisibleForTesting
+    public long computeEffectiveCount() {
+        if (numberOfRows != null) {
+            return numberOfRows;
+        }

Review Comment:
   Behavior regression when `'number-of-rows'` is set together with one or more 
`kind=sequence` fields. Legacy `DataGeneratorSource#run` stops as soon as 
`generator.hasNext()` returns false, i.e. the effective row count is 
`min(numberOfRows, smallest_sequence_total)`. Here we short-circuit to 
`numberOfRows` and skip the `min` against `minSequenceCount`, so 
`NumberSequenceSource` is asked to emit indices past `end - start`, and 
`SequenceGeneratorFunction#map` happily computes `start + idx` beyond `end`. 
Two concrete consequences:
   
   1. The invariant declared in `SequenceGeneratorFunction`'s javadoc — *"the 
source-level count never exceeds `getTotalCount()`, so `start + idx` never 
wraps past `end`"* — is violated by this very factory call site.
   2. For narrowing variants (`byteGenerator` / `shortGenerator` / 
`intGenerator`), the overflow is silent: e.g. `'fields.f.kind'='sequence'`, 
`start=0`, `end=10` on a `TINYINT` column with `'number-of-rows'='300'` 
previously stopped at row 11; with this patch it keeps going and emits 
`(byte)128 = -128`, etc.
   
   Fix is the obvious one: when `hasSequence`, return `Math.min(numberOfRows, 
minSequenceCount)`. Worth a unit case in `DataGenTableSourceFactoryTest` 
covering `numberOfRows > sequence_total` to lock the legacy semantic in.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to