Dennis-Mircea commented on code in PR #28083:
URL: https://github.com/apache/flink/pull/28083#discussion_r3243720300


##########
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSource.java:
##########
@@ -62,16 +68,61 @@ public DataGenTableSource(
 
     @Override
     public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
-        boolean isBounded = numberOfRows != null;
-        return SourceFunctionProvider.of(createSource(), isBounded, 
parallelism);
+        TypeInformation<RowData> typeInfo = 
context.createTypeInformation(rowDataType);
+        return SourceProvider.of(createSource(typeInfo), parallelism);
     }
 
     @VisibleForTesting
-    public DataGeneratorSource<RowData> createSource() {
+    public DataGeneratorSource<RowData> createSource(TypeInformation<RowData> 
typeInfo) {
         return new DataGeneratorSource<>(
-                new RowDataGenerator(fieldGenerators, 
DataType.getFieldNames(rowDataType), 0),
-                rowsPerSecond,
-                numberOfRows);
+                buildRowGenerator(),
+                computeEffectiveCount(),
+                RateLimiterStrategy.perSecond(rowsPerSecond),
+                typeInfo);
+    }
+
+    /**
+     * Returns the per-field generator function that produces a single {@link 
RowData} for a given
+     * sequence index. Exposed for tests that exercise the generation logic 
directly.
+     */
+    @VisibleForTesting
+    public GeneratorFunction<Long, RowData> buildRowGenerator() {
+        return new RowDataGenerator(fieldGenerators, 
DataType.getFieldNames(rowDataType), 0);
+    }
+
+    /**
+     * Computes the bound passed to {@link DataGeneratorSource}. When {@code 
numberOfRows} is
+     * configured we honor it; otherwise we cap the source at the smallest 
sequence-field range to
+     * preserve the legacy "halt when any sequence field is exhausted" 
semantic, falling back to
+     * {@link Long#MAX_VALUE} when no sequence fields are present.
+     */
+    @VisibleForTesting
+    public long computeEffectiveCount() {
+        if (numberOfRows != null) {
+            return numberOfRows;
+        }

Review Comment:
   Good catch, thanks @featzhang! You're right, the legacy semantics required 
`Math.min(numberOfRows, minSequenceCount)`. Fixed in the next push, plus two 
tests in `DataGenTableSourceFactoryTest` covering both directions (row-count 
wins, sequence wins with `TINYINT` overflow guard).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to