This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 3a48f6dd2 [test/flink] Collect flink rows should always be with
timeout (#1622)
3a48f6dd2 is described below
commit 3a48f6dd22bed4e6bf3ebe009f34750228aa0cd7
Author: yuxia Luo <[email protected]>
AuthorDate: Thu Sep 4 10:47:32 2025 +0800
[test/flink] Collect flink rows should always be with timeout (#1622)
---
.../fluss/flink/sink/FlinkTableSinkITCase.java | 7 +-
.../flink/source/FlinkTableSourceBatchITCase.java | 15 +--
.../fluss/flink/source/FlinkTableSourceITCase.java | 126 ++++++---------------
.../source/testutils/FlinkRowAssertionsUtils.java | 83 ++++++++++----
.../testutils/FlinkRowAssertionsUtilsTest.java | 76 +++++++++++++
.../apache/fluss/flink/utils/FlinkTestBase.java | 14 ---
6 files changed, 179 insertions(+), 142 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
index 048ba42bc..54fb906ae 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
@@ -66,6 +66,7 @@ import java.util.stream.Stream;
import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
import static org.apache.fluss.flink.utils.FlinkTestBase.waitUntilPartitions;
import static
org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
import static org.assertj.core.api.Assertions.assertThat;
@@ -210,11 +211,7 @@ abstract class FlinkTableSinkITCase extends
AbstractTestBase {
List<String> expectedRows =
expectedGroups.stream().flatMap(List::stream).collect(Collectors.toList());
- List<String> actual = new ArrayList<>(expectedRows.size());
- for (int i = 0; i < expectedRows.size(); i++) {
- actual.add(rowIter.next().toString());
- }
- rowIter.close();
+ List<String> actual = collectRowsWithTimeout(rowIter,
expectedRows.size());
assertThat(actual).containsExactlyInAnyOrderElementsOf(expectedRows);
// check data with the same bucket key should be read in sequence.
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java
index a8353491e..7f3894f96 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java
@@ -45,6 +45,7 @@ import java.util.Map;
import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
import static
org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
import static org.apache.fluss.testutils.DataTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
@@ -189,7 +190,7 @@ abstract class FlinkTableSourceBatchITCase extends
FlinkTestBase {
// normal scan
String query = String.format("SELECT * FROM %s limit 2", tableName);
CloseableIterator<Row> iterRows = tEnv.executeSql(query).collect();
- List<String> collected = assertAndCollectRecords(iterRows, 2);
+ List<String> collected = collectRowsWithTimeout(iterRows, 2);
List<String> expected =
Arrays.asList(
"+I[1, address1, name1]",
@@ -203,14 +204,14 @@ abstract class FlinkTableSourceBatchITCase extends
FlinkTestBase {
// limit which is larger than all the data.
query = String.format("SELECT * FROM %s limit 10", tableName);
iterRows = tEnv.executeSql(query).collect();
- collected = assertAndCollectRecords(iterRows, 5);
+ collected = collectRowsWithTimeout(iterRows, 5);
assertThat(collected).isSubsetOf(expected);
assertThat(collected).hasSize(5);
// projection scan
query = String.format("SELECT id, name FROM %s limit 3", tableName);
iterRows = tEnv.executeSql(query).collect();
- collected = assertAndCollectRecords(iterRows, 3);
+ collected = collectRowsWithTimeout(iterRows, 3);
expected =
Arrays.asList(
"+I[1, name1]",
@@ -237,7 +238,7 @@ abstract class FlinkTableSourceBatchITCase extends
FlinkTestBase {
// normal scan
String query = String.format("SELECT * FROM %s limit 2", tableName);
CloseableIterator<Row> iterRows = tEnv.executeSql(query).collect();
- List<String> collected = assertAndCollectRecords(iterRows, 2);
+ List<String> collected = collectRowsWithTimeout(iterRows, 2);
List<String> expected =
Arrays.asList(
"+I[1, address1, name1]",
@@ -251,7 +252,7 @@ abstract class FlinkTableSourceBatchITCase extends
FlinkTestBase {
// projection scan
query = String.format("SELECT id, name FROM %s limit 3", tableName);
iterRows = tEnv.executeSql(query).collect();
- collected = assertAndCollectRecords(iterRows, 3);
+ collected = collectRowsWithTimeout(iterRows, 3);
expected =
Arrays.asList(
"+I[1, name1]",
@@ -266,7 +267,7 @@ abstract class FlinkTableSourceBatchITCase extends
FlinkTestBase {
String partitionTable = preparePartitionedLogTable();
query = String.format("SELECT id, name FROM %s limit 3",
partitionTable);
iterRows = tEnv.executeSql(query).collect();
- collected = assertAndCollectRecords(iterRows, 3);
+ collected = collectRowsWithTimeout(iterRows, 3);
assertThat(collected).isSubsetOf(expected);
assertThat(collected).hasSize(3);
}
@@ -286,7 +287,7 @@ abstract class FlinkTableSourceBatchITCase extends
FlinkTestBase {
+ "fields=[count1$0])",
tableName));
CloseableIterator<Row> iterRows = tEnv.executeSql(query).collect();
- List<String> collected = assertAndCollectRecords(iterRows, 1);
+ List<String> collected = collectRowsWithTimeout(iterRows, 1);
List<String> expected =
Collections.singletonList(String.format("+I[%s]", expectedRows));
assertThat(collected).isEqualTo(expected);
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
index 24757cb05..c279c15b3 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
@@ -70,7 +70,9 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertQueryResultExactOrder;
import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
import static org.apache.fluss.flink.utils.FlinkTestBase.waitUntilPartitions;
import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows;
import static org.apache.fluss.flink.utils.FlinkTestBase.writeRowsToPartition;
@@ -205,16 +207,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
writeRows(conn, tablePath, rows, true);
List<String> expected = Arrays.asList("+I[1, v1]", "+I[2, v2]", "+I[3,
v3]");
- try (org.apache.flink.util.CloseableIterator<Row> rowIter =
- tEnv.executeSql("select * from non_pk_table_test").collect()) {
- int expectRecords = expected.size();
- List<String> actual = new ArrayList<>(expectRecords);
- for (int i = 0; i < expectRecords; i++) {
- String row = rowIter.next().toString();
- actual.add(row);
- }
- assertThat(actual).containsExactlyElementsOf(expected);
- }
+ assertQueryResultExactOrder(tEnv, "select * from non_pk_table_test",
expected);
}
@ParameterizedTest
@@ -262,17 +255,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
"+I[v8, 8000, 800]",
"+I[v9, 9000, 900]",
"+I[v10, 10000, 1000]");
- try (org.apache.flink.util.CloseableIterator<Row> rowIter =
- tEnv.executeSql(query).collect()) {
- int expectRecords = expected.size();
- List<String> actual = new ArrayList<>(expectRecords);
- for (int i = 0; i < expectRecords; i++) {
- Row r = rowIter.next();
- String row = r.toString();
- actual.add(row);
- }
- assertThat(actual).containsExactlyElementsOf(expected);
- }
+ assertQueryResultExactOrder(tEnv, query, expected);
}
@ParameterizedTest
@@ -331,22 +314,15 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
"+I[v8, 8, 800]",
"+I[v9, 9, 900]",
"+I[v10, 10, 1000]");
- try (org.apache.flink.util.CloseableIterator<Row> rowIter =
- tEnv.executeSql(query).collect()) {
- int expectRecords = expected.size();
- List<String> actual = new ArrayList<>(expectRecords);
- if (testPkLog) {
- // delay the write after collect job start,
- // to make sure reading from log instead of snapshot
- writeRows(conn, tablePath, rows, false);
- }
- for (int i = 0; i < expectRecords; i++) {
- Row r = rowIter.next();
- String row = r.toString();
- actual.add(row);
- }
- assertThat(actual).containsExactlyElementsOf(expected);
+ org.apache.flink.util.CloseableIterator<Row> rowIter =
tEnv.executeSql(query).collect();
+ if (testPkLog) {
+ // delay the write after collect job start,
+ // to make sure reading from log instead of snapshot
+ writeRows(conn, tablePath, rows, false);
}
+ int expectRecords = expected.size();
+ List<String> actual = collectRowsWithTimeout(rowIter, expectRecords);
+ assertThat(actual).containsExactlyElementsOf(expected);
}
@Test
@@ -451,12 +427,12 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
"+I[8, v8, 800, 8000]",
"+I[9, v9, 900, 9000]",
"+I[10, v10, 1000, 10000]");
- assertQueryResult(query, expected);
+ assertQueryResultExactOrder(tEnv, query, expected);
// 2. read kv table with scan.startup.mode='earliest'
options = " /*+ OPTIONS('scan.startup.mode' = 'earliest') */";
query = "select a, b, c, d from " + tableName + options;
- assertQueryResult(query, expected);
+ assertQueryResultExactOrder(tEnv, query, expected);
// 3. read log table with scan.startup.mode='timestamp'
expected =
@@ -471,7 +447,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
" /*+ OPTIONS('scan.startup.mode' = 'timestamp',
'scan.startup.timestamp' ='%d') */",
timestamp);
query = "select a, b, c, d from " + tableName + options;
- assertQueryResult(query, expected);
+ assertQueryResultExactOrder(tEnv, query, expected);
}
@Test
@@ -501,20 +477,13 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
"-U[2, v2]",
"+U[2, v22]",
"+I[4, v4]");
- try (org.apache.flink.util.CloseableIterator<Row> rowIter =
- tEnv.executeSql(query).collect()) {
- int expectRecords = 8;
- List<String> actual = new ArrayList<>(expectRecords);
- // delay to write after collect job start, to make sure reading
from log instead of
- // snapshot
- writeRows(conn, tablePath, rows2, false);
- for (int i = 0; i < expectRecords; i++) {
- Row r = rowIter.next();
- String row = r.toString();
- actual.add(row);
- }
- assertThat(actual).containsExactlyElementsOf(expected);
- }
+ org.apache.flink.util.CloseableIterator<Row> rowIter =
tEnv.executeSql(query).collect();
+ int expectRecords = 8;
+ // delay to write after collect job start, to make sure reading from
log instead of
+ // snapshot
+ writeRows(conn, tablePath, rows2, false);
+ List<String> actual = collectRowsWithTimeout(rowIter, expectRecords);
+ assertThat(actual).containsExactlyElementsOf(expected);
}
private static Stream<Arguments> readKvTableScanStartupModeArgs() {
@@ -595,17 +564,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
"-U[2, v2]",
"+U[2, v22]",
"+I[4, v4]");
- try (org.apache.flink.util.CloseableIterator<Row> rowIter =
- tEnv.executeSql(query).collect()) {
- int expectRecords = 10;
- List<String> actual = new ArrayList<>(expectRecords);
- for (int i = 0; i < expectRecords; i++) {
- Row r = rowIter.next();
- String row = r.toString();
- actual.add(row);
- }
- assertThat(actual).containsExactlyElementsOf(expected);
- }
+ assertQueryResultExactOrder(tEnv, query, expected);
}
@ParameterizedTest
@@ -687,25 +646,20 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
"the fetch timestamp %s is larger than the
current timestamp",
currentTimeMillis +
Duration.ofMinutes(5).toMillis()));
- try (org.apache.flink.util.CloseableIterator<Row> rowIter =
+ org.apache.flink.util.CloseableIterator<Row> rowIter =
tEnv.executeSql(
String.format(
"select * from timestamp_table /*+
OPTIONS('scan.startup.mode' = 'timestamp', 'scan.startup.timestamp' = '%s') */
",
currentTimeMillis))
- .collect()) {
- CLOCK.advanceTime(Duration.ofMillis(100L));
- // write second batch record.
- rows = Arrays.asList(row(4, "v4"), row(5, "v5"), row(6, "v6"));
- writeRows(conn, tablePath, rows, true);
- List<String> expected = Arrays.asList("+I[4, v4]", "+I[5, v5]",
"+I[6, v6]");
- int expectRecords = expected.size();
- List<String> actual = new ArrayList<>(expectRecords);
- for (int i = 0; i < expectRecords; i++) {
- String row = rowIter.next().toString();
- actual.add(row);
- }
- assertThat(actual).containsExactlyElementsOf(expected);
- }
+ .collect();
+ CLOCK.advanceTime(Duration.ofMillis(100L));
+ // write second batch record.
+ rows = Arrays.asList(row(4, "v4"), row(5, "v5"), row(6, "v6"));
+ writeRows(conn, tablePath, rows, true);
+ List<String> expected = Arrays.asList("+I[4, v4]", "+I[5, v5]", "+I[6,
v6]");
+ int expectRecords = expected.size();
+ List<String> actual = collectRowsWithTimeout(rowIter, expectRecords);
+ assertThat(actual).containsExactlyElementsOf(expected);
}
//
-------------------------------------------------------------------------------------
@@ -1319,20 +1273,6 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
"Fail to wait until all bucket finish snapshot");
}
- private void assertQueryResult(String query, List<String> expected) throws
Exception {
- try (org.apache.flink.util.CloseableIterator<Row> rowIter =
- tEnv.executeSql(query).collect()) {
- int expectRecords = expected.size();
- List<String> actual = new ArrayList<>(expectRecords);
- for (int i = 0; i < expectRecords; i++) {
- Row r = rowIter.next();
- String row = r.toString();
- actual.add(row);
- }
- assertThat(actual).containsExactlyElementsOf(expected);
- }
- }
-
private GenericRow rowWithPartition(Object[] values, @Nullable String
partition) {
if (partition == null) {
return row(values);
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
index 092220244..87c7e14c1 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
@@ -21,8 +21,12 @@ import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
@@ -62,29 +66,44 @@ public class FlinkRowAssertionsUtils {
}
}
- private static List<String> collectRowsWithTimeout(
+ public static List<String> collectRowsWithTimeout(
+ CloseableIterator<Row> iterator, int expectedCount) {
+ return collectRowsWithTimeout(iterator, expectedCount, true);
+ }
+
+ public static List<String> collectRowsWithTimeout(
CloseableIterator<Row> iterator, int expectedCount, boolean
closeIterator) {
- List<String> actual = new ArrayList<>();
- long startTime = System.currentTimeMillis();
- int maxWaitTime = 60000; // 60 seconds
+ if (expectedCount < 0) {
+ throw new IllegalArgumentException(
+ "Expected count must be non-negative: " + expectedCount);
+ }
+ if (iterator == null) {
+ throw new IllegalArgumentException("Iterator cannot be null");
+ }
+ return collectRowsWithTimeout(
+ iterator,
+ expectedCount,
+ closeIterator,
+ // max wait 1 minute
+ Duration.ofMinutes(1));
+ }
+ protected static List<String> collectRowsWithTimeout(
+ CloseableIterator<Row> iterator,
+ int expectedCount,
+ boolean closeIterator,
+ Duration maxWaitTime) {
+ List<String> actual = new ArrayList<>();
+ long startTimeMs = System.currentTimeMillis();
+ long deadlineTimeMs = startTimeMs + maxWaitTime.toMillis();
try {
for (int i = 0; i < expectedCount; i++) {
// Wait for next record with timeout
- while (!iterator.hasNext()) {
- long elapsedTime = System.currentTimeMillis() - startTime;
- if (elapsedTime > maxWaitTime) {
- // Timeout reached - provide detailed failure info
- throw new AssertionError(
- String.format(
- "Timeout after waiting %d ms for Flink
job results. "
- + "Expected %d records but
only received %d. "
- + "This might indicate a job
hang or insufficient data generation.",
- elapsedTime, expectedCount,
actual.size()));
- }
- Thread.sleep(10);
+ if (!waitForNextWithTimeout(
+ iterator, deadlineTimeMs -
System.currentTimeMillis())) {
+ throw timeoutError(
+ System.currentTimeMillis() - startTimeMs,
expectedCount, actual.size());
}
-
if (iterator.hasNext()) {
actual.add(iterator.next().toString());
} else {
@@ -92,12 +111,7 @@ public class FlinkRowAssertionsUtils {
break;
}
}
-
return actual;
-
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException("Test interrupted while waiting for
Flink job results", e);
} catch (AssertionError e) {
// Re-throw our timeout assertion errors
throw e;
@@ -107,7 +121,7 @@ public class FlinkRowAssertionsUtils {
// Job completed normally - return what we have
return actual;
} else {
- long elapsedTime = System.currentTimeMillis() - startTime;
+ long elapsedTime = System.currentTimeMillis() - startTimeMs;
throw new RuntimeException(
String.format(
"Unexpected error after waiting %d ms for
Flink job results. "
@@ -126,6 +140,29 @@ public class FlinkRowAssertionsUtils {
}
}
+ private static AssertionError timeoutError(
+ long elapsedTime, int expectedCount, int actualCount) {
+ return new AssertionError(
+ String.format(
+ "Timeout after waiting %d ms for Flink job results. "
+ + "Expected %d records but only received %d. "
+ + "This might indicate a job hang or
insufficient data generation.",
+ elapsedTime, expectedCount, actualCount));
+ }
+
+ private static boolean waitForNextWithTimeout(
+ CloseableIterator<Row> iterator, long maxWaitTime) {
+ CompletableFuture<Boolean> future =
CompletableFuture.supplyAsync(iterator::hasNext);
+ try {
+ return future.get(maxWaitTime, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ future.cancel(true);
+ return false;
+ } catch (Exception e) {
+ throw new RuntimeException("Error checking iterator.hasNext()", e);
+ }
+ }
+
private static boolean isMiniClusterCompletionException(Exception e) {
return e.getCause() instanceof IllegalStateException
&& e.getMessage() != null
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtilsTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtilsTest.java
new file mode 100644
index 000000000..0086adeb0
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtilsTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source.testutils;
+
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
+
+/** Test for {@link FlinkRowAssertionsUtils}. */
+class FlinkRowAssertionsUtilsTest {
+
+ @Test
+ void testCollectRowsWithTimeout() {
+ // should throw AssertionError if wait rows timeout
+ assertThatThrownBy(
+ () ->
+ collectRowsWithTimeout(
+ createBlockingHasNextIterator(),
+ 10,
+ true,
+ Duration.ofSeconds(1)))
+ .isInstanceOf(AssertionError.class)
+ .hasMessageContaining("Timeout after waiting")
+ .hasMessageContaining(
+ "ms for Flink job results. Expected 10 records but
only received 0.");
+ }
+
+ CloseableIterator<Row> createBlockingHasNextIterator() {
+ return new CloseableIterator<Row>() {
+ @Override
+ public void close() throws Exception {}
+
+ @SuppressWarnings("all")
+ @Override
+ public boolean hasNext() {
+ // to mock blocking
+ try {
+ while (true) {
+ Thread.sleep(1_000);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ fail("Thread sleeping for blocking hasNext() was
interrupted.");
+ }
+ return true;
+ }
+
+ @Override
+ public Row next() {
+ return null;
+ }
+ };
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
index 914ad2239..d4ac97bf0 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
@@ -43,7 +43,6 @@ import org.apache.fluss.server.zk.data.TableAssignment;
import org.apache.fluss.types.DataTypes;
import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.flink.types.Row;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
@@ -61,7 +60,6 @@ import java.util.Set;
import static
org.apache.fluss.server.utils.TableAssignmentUtils.generateAssignment;
import static org.apache.fluss.testutils.DataTestUtils.row;
import static org.apache.fluss.testutils.common.CommonTestUtils.waitValue;
-import static org.assertj.core.api.Assertions.assertThat;
/** A base class for testing with Fluss cluster prepared. */
public class FlinkTestBase extends AbstractTestBase {
@@ -182,18 +180,6 @@ public class FlinkTestBase extends AbstractTestBase {
return admin.getTableInfo(tablePath).get().getTableId();
}
- public static List<String> assertAndCollectRecords(
- org.apache.flink.util.CloseableIterator<Row> iterator, int
expectedNum)
- throws Exception {
- List<String> actual = new ArrayList<>(expectedNum);
- for (int i = 0; i < expectedNum; i++) {
- actual.add(iterator.next().toString());
- }
- assertThat(iterator.hasNext()).isFalse();
- iterator.close();
- return actual;
- }
-
protected void waitUntilSnapshot(long tableId, long snapshotId) {
for (int i = 0; i < DEFAULT_BUCKET_NUM; i++) {
TableBucket tableBucket = new TableBucket(tableId, i);