This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new ee2834040a6 [FLINK-25379][connectors] Support limit push down in DATAGEN connector ee2834040a6 is described below commit ee2834040a699e6b25fa96af9469253c31515ea1 Author: zhangchaoming <zhangchaom...@360.com> AuthorDate: Mon Dec 20 15:07:32 2021 +0800 [FLINK-25379][connectors] Support limit push down in DATAGEN connector --- .../datagen/table/DataGenTableSource.java | 10 ++++++-- .../stream/table/DataGeneratorConnectorITCase.java | 30 ++++++++++++++++++++++ 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSource.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSource.java index 6cc75dd0e56..dbc8e7f0f70 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSource.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSource.java @@ -28,6 +28,7 @@ import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.connector.source.SourceFunctionProvider; +import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown; import org.apache.flink.table.data.RowData; import org.apache.flink.table.sources.StreamTableSource; import org.apache.flink.table.types.DataType; @@ -37,13 +38,13 @@ import org.apache.flink.table.types.DataType; * in parallel. See {@link StatefulSequenceSource}. */ @Internal -public class DataGenTableSource implements ScanTableSource { +public class DataGenTableSource implements ScanTableSource, SupportsLimitPushDown { private final DataGenerator<?>[] fieldGenerators; private final String tableName; private final DataType rowDataType; private final long rowsPerSecond; - private final Long numberOfRows; + private Long numberOfRows; public DataGenTableSource( DataGenerator<?>[] fieldGenerators, @@ -87,4 +88,9 @@ public class DataGenTableSource implements ScanTableSource { public ChangelogMode getChangelogMode() { return ChangelogMode.insertOnly(); } + + @Override + public void applyLimit(long limit) { + this.numberOfRows = limit; + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java index 53957c8dab4..03fa923fe55 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java @@ -18,9 +18,14 @@ package org.apache.flink.table.planner.runtime.stream.table; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.planner.runtime.utils.BatchTestBase; +import org.apache.flink.table.planner.utils.TestingTableEnvironment; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.CollectionUtil; import org.junit.Test; @@ -72,4 +77,29 @@ public class DataGeneratorConnectorITCase extends BatchTestBase { assertThat(results).as("Unexpected number of results").hasSize(10); } + + @Test + public void testLimitPushDown() throws Exception { + final TestingTableEnvironment env = + TestingTableEnvironment.create( + EnvironmentSettings.newInstance().inStreamingMode().build(), + null, + new TableConfig()); + + env.executeSql( + "CREATE TABLE datagen_t (\n" + + " f0 CHAR(1)\n" + + ") WITH (" + + " 'connector' = 'datagen'" + + ")"); + + final Table table = env.sqlQuery("select * from datagen_t limit 5"); + assertThat(table.explain()) + .contains( + "table=[[default_catalog, default_database, datagen_t, limit=[5]]], fields=[f0]"); + + assertThat(CollectionUtil.iteratorToList(table.execute().collect())) + .as("Unexpected number of results") + .hasSize(5); + } }