This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ee2834040a6 [FLINK-25379][connectors] Support limit push down in 
DATAGEN connector
ee2834040a6 is described below

commit ee2834040a699e6b25fa96af9469253c31515ea1
Author: zhangchaoming <zhangchaom...@360.com>
AuthorDate: Mon Dec 20 15:07:32 2021 +0800

    [FLINK-25379][connectors] Support limit push down in DATAGEN connector
---
 .../datagen/table/DataGenTableSource.java          | 10 ++++++--
 .../stream/table/DataGeneratorConnectorITCase.java | 30 ++++++++++++++++++++++
 2 files changed, 38 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSource.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSource.java
index 6cc75dd0e56..dbc8e7f0f70 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSource.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSource.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.table.types.DataType;
@@ -37,13 +38,13 @@ import org.apache.flink.table.types.DataType;
  * in parallel. See {@link StatefulSequenceSource}.
  */
 @Internal
-public class DataGenTableSource implements ScanTableSource {
+public class DataGenTableSource implements ScanTableSource, 
SupportsLimitPushDown {
 
     private final DataGenerator<?>[] fieldGenerators;
     private final String tableName;
     private final DataType rowDataType;
     private final long rowsPerSecond;
-    private final Long numberOfRows;
+    private Long numberOfRows;
 
     public DataGenTableSource(
             DataGenerator<?>[] fieldGenerators,
@@ -87,4 +88,9 @@ public class DataGenTableSource implements ScanTableSource {
     public ChangelogMode getChangelogMode() {
         return ChangelogMode.insertOnly();
     }
+
+    @Override
+    public void applyLimit(long limit) {
+        this.numberOfRows = limit;
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java
index 53957c8dab4..03fa923fe55 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java
@@ -18,9 +18,14 @@
 
 package org.apache.flink.table.planner.runtime.stream.table;
 
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.table.planner.utils.TestingTableEnvironment;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.CollectionUtil;
 
 import org.junit.Test;
 
@@ -72,4 +77,29 @@ public class DataGeneratorConnectorITCase extends 
BatchTestBase {
 
         assertThat(results).as("Unexpected number of results").hasSize(10);
     }
+
+    @Test
+    public void testLimitPushDown() throws Exception {
+        final TestingTableEnvironment env =
+                TestingTableEnvironment.create(
+                        
EnvironmentSettings.newInstance().inStreamingMode().build(),
+                        null,
+                        new TableConfig());
+
+        env.executeSql(
+                "CREATE TABLE datagen_t (\n"
+                        + "    f0 CHAR(1)\n"
+                        + ") WITH ("
+                        + "    'connector' = 'datagen'"
+                        + ")");
+
+        final Table table = env.sqlQuery("select * from datagen_t limit 5");
+        assertThat(table.explain())
+                .contains(
+                        "table=[[default_catalog, default_database, datagen_t, 
limit=[5]]], fields=[f0]");
+
+        assertThat(CollectionUtil.iteratorToList(table.execute().collect()))
+                .as("Unexpected number of results")
+                .hasSize(5);
+    }
 }

Reply via email to