asfgit closed pull request #6621:  [FLINK-8686] [sql-client] Limit result size 
for prototyping modes
URL: https://github.com/apache/flink/pull/6621
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md
index f16f1a5561a..8c4ba83c6dc 100644
--- a/docs/dev/table/sqlClient.md
+++ b/docs/dev/table/sqlClient.md
@@ -106,7 +106,7 @@ Alice, 1
 Greg, 1
 {% endhighlight %}
 
-Both result modes can be useful during the prototyping of SQL queries.
+Both result modes can be useful during the prototyping of SQL queries. In both 
modes, results are stored in the Java heap memory of the SQL Client. In order 
to keep the CLI interface responsive, the changelog mode only shows the latest 
1000 changes. The table mode allows for navigating through bigger results that 
are only limited by the available main memory and the configured [maximum 
number of rows](sqlClient.html#configuration) (`max-table-result-rows`).
 
 <span class="label label-danger">Attention</span> Queries that are executed in 
a batch environment, can only be retrieved using the `table` result mode.
 
@@ -167,6 +167,7 @@ Every environment file is a regular [YAML 
file](http://yaml.org/). An example of
 tables:
   - name: MyTableSource
     type: source
+    update-mode: append
     connector:
       type: filesystem
       path: "/path/to/something.csv"
@@ -206,6 +207,8 @@ functions:
 execution:
   type: streaming                   # required: execution mode either 'batch' 
or 'streaming'
   result-mode: table                # required: either 'table' or 'changelog'
+  max-table-result-rows: 1000000    # optional: maximum number of maintained 
rows in
+                                    #   'table' mode (1000000 by default, 
smaller 1 means unlimited)
   time-characteristic: event-time   # optional: 'processing-time' or 
'event-time' (default)
   parallelism: 1                    # optional: Flink's parallelism (1 by 
default)
   periodic-watermarks-interval: 200 # optional: interval for periodic 
watermarks (200 ms by default)
@@ -213,7 +216,7 @@ execution:
   min-idle-state-retention: 0       # optional: table program's minimum idle 
state time
   max-idle-state-retention: 0       # optional: table program's maximum idle 
state time
   restart-strategy:                 # optional: restart strategy
-    type: fallback                  #           "fallback" to global restart 
strategy by default
+    type: fallback                  #   "fallback" to global restart strategy 
by default
 
 # Deployment properties allow for describing the cluster to which table 
programs are submitted to.
 
diff --git a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml 
b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
index 302651a78aa..97e89fd6459 100644
--- a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
+++ b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
@@ -76,7 +76,9 @@ execution:
   # interval in ms for emitting periodic watermarks
   periodic-watermarks-interval: 200
   # 'changelog' or 'table' presentation of results
-  result-mode: changelog
+  result-mode: table
+  # maximum number of maintained rows in 'table' presentation of results
+  max-table-result-rows: 1000000
   # parallelism of the program
   parallelism: 1
   # maximum parallelism
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliChangelogResultView.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliChangelogResultView.java
index cf17933d977..d70af807a30 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliChangelogResultView.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliChangelogResultView.java
@@ -34,6 +34,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 
 import static org.apache.flink.table.client.cli.CliUtils.TIME_FORMATTER;
@@ -50,6 +51,7 @@
  */
 public class CliChangelogResultView extends 
CliResultView<CliChangelogResultView.ResultChangelogOperation> {
 
+       private static final int DEFAULT_MAX_ROW_COUNT = 1000;
        private static final int DEFAULT_REFRESH_INTERVAL = 0; // as fast as 
possible
        private static final int DEFAULT_REFRESH_INTERVAL_PLAIN = 3; // every 1s
        private static final int MIN_REFRESH_INTERVAL = 0; // every 100ms
@@ -66,7 +68,8 @@ public CliChangelogResultView(CliClient client, 
ResultDescriptor resultDescripto
                        refreshInterval = DEFAULT_REFRESH_INTERVAL;
                }
                previousResults = null;
-               results = new ArrayList<>();
+               // rows are always appended at the tail and deleted from the 
head of the list
+               results = new LinkedList<>();
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -133,6 +136,13 @@ protected void refresh() {
                                        }
 
                                        // update results
+
+                                       // formatting and printing of rows is 
expensive in the current implementation,
+                                       // therefore we limit the maximum 
number of lines shown in changelog mode to
+                                       // keep the CLI responsive
+                                       if (results.size() >= 
DEFAULT_MAX_ROW_COUNT) {
+                                               results.remove(0);
+                                       }
                                        results.add(changeRow);
 
                                        scrolling++;
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java
index df42edd63b3..78f3a565554 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java
@@ -28,7 +28,6 @@
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.stream.IntStream;
 
 import static org.apache.flink.table.client.cli.CliUtils.normalizeColumn;
 
@@ -182,9 +181,8 @@ protected void init() {
        protected List<AttributedString> computeMainLines() {
                final List<AttributedString> lines = new ArrayList<>();
 
-               IntStream.range(0, results.size()).forEach(lineIdx -> {
-                       final String[] line = results.get(lineIdx);
-
+               int lineIdx = 0;
+               for (String[] line : results) {
                        final AttributedStringBuilder row = new 
AttributedStringBuilder();
 
                        // highlight selected row
@@ -192,7 +190,7 @@ protected void init() {
                                row.style(AttributedStyle.DEFAULT.inverse());
                        }
 
-                       IntStream.range(0, line.length).forEach(colIdx -> {
+                       for (int colIdx = 0; colIdx < line.length; colIdx++) {
                                final String col = line[colIdx];
                                final int columnWidth = 
computeColumnWidth(colIdx);
 
@@ -208,9 +206,11 @@ protected void init() {
                                } else {
                                        normalizeColumn(row, col, columnWidth);
                                }
-                       });
+                       }
                        lines.add(row.toAttributedString());
-               });
+
+                       lineIdx++;
+               }
 
                return lines;
        }
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
index b7c28938121..0518dfce74d 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
@@ -92,6 +92,10 @@ public int getMaxParallelism() {
                return 
Integer.parseInt(properties.getOrDefault(PropertyStrings.EXECUTION_MAX_PARALLELISM,
 Integer.toString(128)));
        }
 
+       public int getMaxTableResultRows() {
+               return 
Integer.parseInt(properties.getOrDefault(PropertyStrings.EXECUTION_MAX_TABLE_RESULT_ROWS,
 Integer.toString(1_000_000)));
+       }
+
        public RestartStrategies.RestartStrategyConfiguration 
getRestartStrategy() {
                final String restartStrategy = properties.getOrDefault(
                        PropertyStrings.EXECUTION_RESTART_STRATEGY_TYPE,
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
index 2a6b001d5f4..2067c5a971b 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
@@ -57,6 +57,8 @@ private PropertyStrings() {
 
        public static final String EXECUTION_RESULT_MODE_VALUE_TABLE = "table";
 
+       public static final String EXECUTION_MAX_TABLE_RESULT_ROWS = 
"max-table-result-rows";
+
        public static final String EXECUTION_RESTART_STRATEGY_TYPE = 
"restart-strategy.type";
 
        public static final String 
EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FALLBACK = "fallback";
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
index a54160f24b8..93fdd483d44 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
@@ -71,7 +71,12 @@ public ResultStore(Configuration flinkConfig) {
                        if (env.getExecution().isChangelogMode()) {
                                return new 
ChangelogCollectStreamResult<>(outputType, config, gatewayAddress, gatewayPort);
                        } else {
-                               return new 
MaterializedCollectStreamResult<>(outputType, config, gatewayAddress, 
gatewayPort);
+                               return new MaterializedCollectStreamResult<>(
+                                       outputType,
+                                       config,
+                                       gatewayAddress,
+                                       gatewayPort,
+                                       
env.getExecution().getMaxTableResultRows());
                        }
 
                } else {
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
index 45c4f7553c6..0beabe94d2c 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.client.gateway.local.result;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -38,6 +39,31 @@
  */
 public class MaterializedCollectStreamResult<C> extends CollectStreamResult<C> 
implements MaterializedResult<C> {
 
+       /** Maximum initial capacity of the materialized table. */
+       public static final int MATERIALIZED_TABLE_MAX_INITIAL_CAPACITY = 
1_000_000;
+
+       /** Maximum overcommitment of the materialized table. */
+       public static final int MATERIALIZED_TABLE_MAX_OVERCOMMIT = 1_000_000;
+
+       /** Factor for the initial capacity of the materialized table. */
+       public static final double MATERIALIZED_TABLE_CAPACITY_FACTOR = 0.05;
+
+       /** Factor for cleaning up deleted rows in the materialized table. */
+       public static final double MATERIALIZED_TABLE_OVERCOMMIT_FACTOR = 0.01;
+
+       /**
+        * Maximum number of materialized rows to be stored. After the count is 
reached, oldest
+        * rows are dropped.
+        */
+       private final int maxRowCount;
+
+       /** Threshold for cleaning up deleted rows in the materialized table. */
+       private final int overcommitThreshold;
+
+       /**
+        * Materialized table that is continuously updated by inserts and 
deletes. Deletes at
+        * the beginning are lazily cleaned up when the threshold is reached.
+        */
        private final List<Row> materializedTable;
 
        /**
@@ -47,26 +73,65 @@
         */
        private final Map<Row, Integer> rowPositionCache;
 
+       /** Current snapshot of the materialized table. */
        private final List<Row> snapshot;
 
+       /** Counter for deleted rows to be deleted at the beginning of the 
materialized table. */
+       private int validRowPosition;
+
+       /** Page count of the snapshot (always >= 1). */
        private int pageCount;
 
+       /** Page size of the snapshot (always >= 1). */
        private int pageSize;
 
+       /** Indicator that this is the last snapshot possible (EOS afterwards). 
*/
        private boolean isLastSnapshot;
 
-       public MaterializedCollectStreamResult(TypeInformation<Row> outputType, 
ExecutionConfig config,
-                       InetAddress gatewayAddress, int gatewayPort) {
+       @VisibleForTesting
+       public MaterializedCollectStreamResult(
+                       TypeInformation<Row> outputType,
+                       ExecutionConfig config,
+                       InetAddress gatewayAddress,
+                       int gatewayPort,
+                       int maxRowCount,
+                       int overcommitThreshold) {
                super(outputType, config, gatewayAddress, gatewayPort);
 
+               if (maxRowCount <= 0) {
+                       this.maxRowCount = Integer.MAX_VALUE;
+               } else {
+                       this.maxRowCount = maxRowCount;
+               }
+
+               this.overcommitThreshold = overcommitThreshold;
+
                // prepare for materialization
-               materializedTable = new ArrayList<>();
-               rowPositionCache = new HashMap<>();
+               final int initialCapacity = 
computeMaterializedTableCapacity(maxRowCount); // avoid frequent resizing
+               materializedTable = new ArrayList<>(initialCapacity);
+               rowPositionCache = new HashMap<>(initialCapacity);
                snapshot = new ArrayList<>();
+               validRowPosition = 0;
                isLastSnapshot = false;
                pageCount = 0;
        }
 
+       public MaterializedCollectStreamResult(
+                       TypeInformation<Row> outputType,
+                       ExecutionConfig config,
+                       InetAddress gatewayAddress,
+                       int gatewayPort,
+                       int maxRowCount) {
+
+               this(
+                       outputType,
+                       config,
+                       gatewayAddress,
+                       gatewayPort,
+                       maxRowCount,
+                       computeMaterializedTableOvercommit(maxRowCount));
+       }
+
        @Override
        public boolean isMaterialized() {
                return true;
@@ -74,6 +139,10 @@ public boolean isMaterialized() {
 
        @Override
        public TypedResult<Integer> snapshot(int pageSize) {
+               if (pageSize < 1) {
+                       throw new SqlExecutionException("Page size must be 
greater than 0.");
+               }
+
                synchronized (resultLock) {
                        // retrieval thread is dead and there are no results 
anymore
                        // or program failed
@@ -87,7 +156,9 @@ else if (!isRetrieving()) {
 
                        this.pageSize = pageSize;
                        snapshot.clear();
-                       snapshot.addAll(materializedTable);
+                       for (int i = validRowPosition; i < 
materializedTable.size(); i++) {
+                               snapshot.add(materializedTable.get(i));
+                       }
 
                        // at least one page
                        pageCount = Math.max(1, (int) Math.ceil(((double) 
snapshot.size() / pageSize)));
@@ -112,31 +183,82 @@ else if (!isRetrieving()) {
        @Override
        protected void processRecord(Tuple2<Boolean, Row> change) {
                synchronized (resultLock) {
-                       final Row row = change.f1;
                        // insert
                        if (change.f0) {
-                               materializedTable.add(row);
-                               rowPositionCache.put(row, 
materializedTable.size() - 1);
+                               processInsert(change.f1);
                        }
                        // delete
                        else {
-                               // delete the newest record first to minimize 
per-page changes
-                               final Integer cachedPos = 
rowPositionCache.get(row);
-                               final int startSearchPos;
-                               if (cachedPos != null) {
-                                       startSearchPos = Math.min(cachedPos, 
materializedTable.size() - 1);
-                               } else {
-                                       startSearchPos = 
materializedTable.size() - 1;
-                               }
-
-                               for (int i = startSearchPos; i >= 0; i--) {
-                                       if 
(materializedTable.get(i).equals(row)) {
-                                               materializedTable.remove(i);
-                                               rowPositionCache.remove(row);
-                                               break;
-                                       }
-                               }
+                               processDelete(change.f1);
                        }
                }
        }
+
+       @VisibleForTesting
+       protected List<Row> getMaterializedTable() {
+               return materializedTable;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       private void processInsert(Row row) {
+               // limit the materialized table
+               if (materializedTable.size() - validRowPosition >= maxRowCount) 
{
+                       cleanUp();
+               }
+               materializedTable.add(row);
+               rowPositionCache.put(row, materializedTable.size() - 1);
+       }
+
+       private void processDelete(Row row) {
+               // delete the newest record first to minimize per-page changes
+               final Integer cachedPos = rowPositionCache.get(row);
+               final int startSearchPos;
+               if (cachedPos != null) {
+                       startSearchPos = Math.min(cachedPos, 
materializedTable.size() - 1);
+               } else {
+                       startSearchPos = materializedTable.size() - 1;
+               }
+
+               for (int i = startSearchPos; i >= validRowPosition; i--) {
+                       if (materializedTable.get(i).equals(row)) {
+                               materializedTable.remove(i);
+                               rowPositionCache.remove(row);
+                               break;
+                       }
+               }
+       }
+
+       private void cleanUp() {
+               // invalidate row
+               final Row deleteRow = materializedTable.get(validRowPosition);
+               if (rowPositionCache.get(deleteRow) == validRowPosition) {
+                       // this row has no duplicates in the materialized table,
+                       // it can be removed from the cache
+                       rowPositionCache.remove(deleteRow);
+               }
+               materializedTable.set(validRowPosition, null);
+
+               validRowPosition++;
+
+               // perform clean up in batches
+               if (validRowPosition >= overcommitThreshold) {
+                       materializedTable.subList(0, validRowPosition).clear();
+                       // adjust all cached indexes
+                       rowPositionCache.replaceAll((k, v) -> v - 
validRowPosition);
+                       validRowPosition = 0;
+               }
+       }
+
+       private static int computeMaterializedTableCapacity(int maxRowCount) {
+               return Math.min(
+                       MATERIALIZED_TABLE_MAX_INITIAL_CAPACITY,
+                       Math.max(1, (int) (maxRowCount * 
MATERIALIZED_TABLE_CAPACITY_FACTOR)));
+       }
+
+       private static int computeMaterializedTableOvercommit(int maxRowCount) {
+               return Math.min(
+                       MATERIALIZED_TABLE_MAX_OVERCOMMIT,
+                       (int) (maxRowCount * 
MATERIALIZED_TABLE_OVERCOMMIT_FACTOR));
+       }
 }
diff --git 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 76648d08e1a..495f5e0d314 100644
--- 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -198,6 +198,7 @@ public void testGetSessionProperties() throws Exception {
                expectedProperties.put("execution.max-idle-state-retention", 
"0");
                expectedProperties.put("execution.min-idle-state-retention", 
"0");
                expectedProperties.put("execution.result-mode", "table");
+               expectedProperties.put("execution.max-table-result-rows", 
"100");
                expectedProperties.put("execution.restart-strategy.type", 
"failure-rate");
                
expectedProperties.put("execution.restart-strategy.max-failures-per-interval", 
"10");
                
expectedProperties.put("execution.restart-strategy.failure-rate-interval", 
"99000");
@@ -264,38 +265,47 @@ public void testStreamQueryExecutionChangelog() throws 
Exception {
        public void testStreamQueryExecutionTable() throws Exception {
                final URL url = 
getClass().getClassLoader().getResource("test-data.csv");
                Objects.requireNonNull(url);
+
                final Map<String, String> replaceVars = new HashMap<>();
                replaceVars.put("$VAR_0", url.getPath());
                replaceVars.put("$VAR_1", "/");
                replaceVars.put("$VAR_2", "streaming");
                replaceVars.put("$VAR_3", "table");
                replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
+               replaceVars.put("$VAR_MAX_ROWS", "100");
 
-               final Executor executor = createModifiedExecutor(clusterClient, 
replaceVars);
-               final SessionContext session = new 
SessionContext("test-session", new Environment());
+               final String query = "SELECT scalarUDF(IntegerField1), 
StringField1 FROM TableNumber1";
 
-               try {
-                       // start job and retrieval
-                       final ResultDescriptor desc = executor.executeQuery(
-                               session,
-                               "SELECT scalarUDF(IntegerField1), StringField1 
FROM TableNumber1");
+               final List<String> expectedResults = new ArrayList<>();
+               expectedResults.add("47,Hello World");
+               expectedResults.add("27,Hello World");
+               expectedResults.add("37,Hello World");
+               expectedResults.add("37,Hello World");
+               expectedResults.add("47,Hello World");
+               expectedResults.add("57,Hello World!!!!");
+
+               executeStreamQueryTable(replaceVars, query, expectedResults);
+       }
 
-                       assertTrue(desc.isMaterialized());
+       @Test(timeout = 30_000L)
+       public void testStreamQueryExecutionLimitedTable() throws Exception {
+               final URL url = 
getClass().getClassLoader().getResource("test-data.csv");
+               Objects.requireNonNull(url);
 
-                       final List<String> actualResults = 
retrieveTableResult(executor, session, desc.getResultId());
+               final Map<String, String> replaceVars = new HashMap<>();
+               replaceVars.put("$VAR_0", url.getPath());
+               replaceVars.put("$VAR_1", "/");
+               replaceVars.put("$VAR_2", "streaming");
+               replaceVars.put("$VAR_3", "table");
+               replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
+               replaceVars.put("$VAR_MAX_ROWS", "1");
 
-                       final List<String> expectedResults = new ArrayList<>();
-                       expectedResults.add("47,Hello World");
-                       expectedResults.add("27,Hello World");
-                       expectedResults.add("37,Hello World");
-                       expectedResults.add("37,Hello World");
-                       expectedResults.add("47,Hello World");
-                       expectedResults.add("57,Hello World!!!!");
+               final String query = "SELECT COUNT(*), StringField1 FROM 
TableNumber1 GROUP BY StringField1";
 
-                       TestBaseUtils.compareResultCollections(expectedResults, 
actualResults, Comparator.naturalOrder());
-               } finally {
-                       executor.stop(session);
-               }
+               final List<String> expectedResults = new ArrayList<>();
+               expectedResults.add("1,Hello World!!!!");
+
+               executeStreamQueryTable(replaceVars, query, expectedResults);
        }
 
        @Test(timeout = 30_000L)
@@ -375,6 +385,28 @@ public void testStreamQueryExecutionSink() throws 
Exception {
                }
        }
 
+       private void executeStreamQueryTable(
+                       Map<String, String> replaceVars,
+                       String query,
+                       List<String> expectedResults) throws Exception {
+
+               final Executor executor = createModifiedExecutor(clusterClient, 
replaceVars);
+               final SessionContext session = new 
SessionContext("test-session", new Environment());
+
+               try {
+                       // start job and retrieval
+                       final ResultDescriptor desc = 
executor.executeQuery(session, query);
+
+                       assertTrue(desc.isMaterialized());
+
+                       final List<String> actualResults = 
retrieveTableResult(executor, session, desc.getResultId());
+
+                       TestBaseUtils.compareResultCollections(expectedResults, 
actualResults, Comparator.naturalOrder());
+               } finally {
+                       executor.stop(session);
+               }
+       }
+
        private void verifySinkResult(String path) throws IOException {
                final List<String> actualResults = new ArrayList<>();
                TestBaseUtils.readAllResultLines(actualResults, path);
@@ -392,6 +424,7 @@ private void verifySinkResult(String path) throws 
IOException {
                final Map<String, String> replaceVars = new HashMap<>();
                replaceVars.put("$VAR_2", "batch");
                replaceVars.put("$VAR_UPDATE_MODE", "");
+               replaceVars.put("$VAR_MAX_ROWS", "100");
                return new LocalExecutor(
                        
EnvironmentFileUtil.parseModified(DEFAULTS_ENVIRONMENT_FILE, replaceVars),
                        Collections.emptyList(),
diff --git 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java
 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java
index c7e41ffe111..ba8c9245cc9 100644
--- 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java
+++ 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java
@@ -29,7 +29,9 @@
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 
@@ -42,13 +44,14 @@
        public void testSnapshot() throws UnknownHostException {
                final TypeInformation<Row> type = Types.ROW(Types.STRING, 
Types.LONG);
 
-               TestMaterializedCollectStreamResult result = null;
+               TestMaterializedCollectStreamResult<?> result = null;
                try {
-                       result = new TestMaterializedCollectStreamResult(
+                       result = new TestMaterializedCollectStreamResult<>(
                                type,
                                new ExecutionConfig(),
                                InetAddress.getLocalHost(),
-                               0);
+                               0,
+                               Integer.MAX_VALUE);
 
                        result.isRetrieving = true;
 
@@ -85,11 +88,59 @@ public void testSnapshot() throws UnknownHostException {
                }
        }
 
+       @Test
+       public void testLimitedSnapshot() throws UnknownHostException {
+               final TypeInformation<Row> type = Types.ROW(Types.STRING, 
Types.LONG);
+
+               TestMaterializedCollectStreamResult<?> result = null;
+               try {
+                       result = new TestMaterializedCollectStreamResult<>(
+                               type,
+                               new ExecutionConfig(),
+                               InetAddress.getLocalHost(),
+                               0,
+                               2,  // limit the materialized table to 2 rows
+                               3); // with 3 rows overcommitment
+
+                       result.isRetrieving = true;
+
+                       result.processRecord(Tuple2.of(true, Row.of("D", 1)));
+                       result.processRecord(Tuple2.of(true, Row.of("A", 1)));
+                       result.processRecord(Tuple2.of(true, Row.of("B", 1)));
+                       result.processRecord(Tuple2.of(true, Row.of("A", 1)));
+
+                       assertEquals(
+                               Arrays.asList(null, null, Row.of("B", 1), 
Row.of("A", 1)), // two over-committed rows
+                               result.getMaterializedTable());
+
+                       assertEquals(TypedResult.payload(2), 
result.snapshot(1));
+
+                       assertEquals(Collections.singletonList(Row.of("B", 1)), 
result.retrievePage(1));
+                       assertEquals(Collections.singletonList(Row.of("A", 1)), 
result.retrievePage(2));
+
+                       result.processRecord(Tuple2.of(true, Row.of("C", 1)));
+
+                       assertEquals(
+                               Arrays.asList(Row.of("A", 1), Row.of("C", 1)), 
// limit clean up has taken place
+                               result.getMaterializedTable());
+
+                       result.processRecord(Tuple2.of(false, Row.of("A", 1)));
+
+                       assertEquals(
+                               Collections.singletonList(Row.of("C", 1)), // 
regular clean up has taken place
+                               result.getMaterializedTable());
+               } finally {
+                       if (result != null) {
+                               result.close();
+                       }
+               }
+       }
+
        // 
--------------------------------------------------------------------------------------------
        // Helper classes
        // 
--------------------------------------------------------------------------------------------
 
-       private static class TestMaterializedCollectStreamResult extends 
MaterializedCollectStreamResult {
+       private static class TestMaterializedCollectStreamResult<T> extends 
MaterializedCollectStreamResult<T> {
 
                public boolean isRetrieving;
 
@@ -97,18 +148,42 @@ public TestMaterializedCollectStreamResult(
                                TypeInformation<Row> outputType,
                                ExecutionConfig config,
                                InetAddress gatewayAddress,
-                               int gatewayPort) {
+                               int gatewayPort,
+                               int maxRowCount,
+                               int overcommitThreshold) {
 
                        super(
                                outputType,
                                config,
                                gatewayAddress,
-                               gatewayPort);
+                               gatewayPort,
+                               maxRowCount,
+                               overcommitThreshold);
+               }
+
+               public TestMaterializedCollectStreamResult(
+                               TypeInformation<Row> outputType,
+                               ExecutionConfig config,
+                               InetAddress gatewayAddress,
+                               int gatewayPort,
+                               int maxRowCount) {
+
+                       super(
+                               outputType,
+                               config,
+                               gatewayAddress,
+                               gatewayPort,
+                               maxRowCount);
                }
 
                @Override
                protected boolean isRetrieving() {
                        return isRetrieving;
                }
+
+               @Override
+               public List<Row> getMaterializedTable() {
+                       return super.getMaterializedTable();
+               }
        }
 }
diff --git 
a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
 
b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
index cd5257e611c..e9c6d5bf32d 100644
--- 
a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
+++ 
b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
@@ -123,6 +123,7 @@ execution:
   min-idle-state-retention: 0
   max-idle-state-retention: 0
   result-mode: "$VAR_3"
+  max-table-result-rows: "$VAR_MAX_ROWS"
   restart-strategy:
     type: failure-rate
     max-failures-per-interval: 10


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to