This is an automated email from the ASF dual-hosted git repository.
ipolyzos pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new a0788b19e [typo] fix typos waitUtil-> waitUntil (#1402)
a0788b19e is described below
commit a0788b19e3d25a4afd8e8ce833297d275b080418
Author: xiaozhou <[email protected]>
AuthorDate: Mon Jul 28 17:23:11 2025 +0800
[typo] fix typos waitUtil-> waitUntil (#1402)
* fix typos waitUtil-> waitUntil
* fix typos waitUtil-> waitUntil
---
.../client/table/scanner/log/LogFetchBuffer.java | 2 +-
.../fluss/client/table/scanner/log/LogFetcher.java | 2 +-
.../client/admin/ClientToServerITCaseBase.java | 2 +-
.../fluss/client/admin/FlussAdminITCase.java | 6 ++--
.../security/acl/FlussAuthorizationITCase.java | 24 ++++++-------
.../client/table/FlussFailServerTableITCase.java | 2 +-
.../batch/KvSnapshotBatchScannerITCase.java | 8 ++---
.../client/table/scanner/log/LogFetcherTest.java | 2 +-
.../client/table/scanner/log/LogScannerITCase.java | 8 ++---
.../table/scanner/log/RemoteLogScannerITCase.java | 6 ++--
.../security/acl/FlinkAuthorizationITCase.java | 4 +--
.../fluss/flink/source/FlinkTableSourceITCase.java | 41 +++++++++++-----------
.../flink/source/reader/FlinkSourceReaderTest.java | 2 +-
.../FlussTableLakeSnapshotCommitterTest.java | 2 +-
.../flink/tiering/source/TieringTestBase.java | 6 ++--
.../alibaba/fluss/flink/utils/FlinkTestBase.java | 2 +-
.../paimon/flink/FlinkUnionReadLogTableITCase.java | 2 +-
.../flink/FlinkUnionReadPrimaryKeyTableITCase.java | 2 +-
.../testutils/FlinkPaimonTieringTestBase.java | 14 ++++----
.../lake/paimon/tiering/PaimonTieringITCase.java | 2 +-
.../com/alibaba/fluss/server/ServerITCaseBase.java | 2 +-
.../server/coordinator/StopReplicaITCase.java | 8 ++---
.../server/coordinator/TableManagerITCase.java | 8 ++---
.../TestingCompletedKvSnapshotCommitter.java | 2 +-
.../log/remote/CommitRemoteLogManifestITCase.java | 6 ++--
.../fluss/server/log/remote/RemoteLogITCase.java | 12 +++----
.../server/metadata/MetadataUpdateITCase.java | 6 ++--
.../replica/CommitLakeTableSnapshotITCase.java | 2 +-
.../server/replica/KvReplicaRestoreITCase.java | 10 +++---
.../fluss/server/replica/ReplicaManagerTest.java | 8 ++---
.../alibaba/fluss/server/replica/ReplicaTest.java | 18 +++++-----
.../replica/fetcher/RemoteLeaderEndpointTest.java | 4 +--
.../replica/fetcher/ReplicaFetcherITCase.java | 18 +++++-----
.../server/tablet/TabletServerFailOverITCase.java | 2 +-
.../fluss/server/tablet/TabletServiceITCase.java | 28 +++++++--------
.../server/testutils/FlussClusterExtension.java | 26 +++++++-------
.../fluss/testutils/common/CommonTestUtils.java | 26 +++++++-------
37 files changed, 163 insertions(+), 162 deletions(-)
diff --git
a/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetchBuffer.java
b/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetchBuffer.java
index b7dcaac08..5441bd90e 100644
---
a/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetchBuffer.java
+++
b/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetchBuffer.java
@@ -175,7 +175,7 @@ public class LogFetchBuffer implements AutoCloseable {
* <li>The thread was interrupted
* </ol>
*
- * @param deadlineNanos the deadline time to wait util
+ * @param deadlineNanos the deadline time to wait until
* @return false if the waiting time detectably elapsed before return from
the method, else true
*/
boolean awaitNotEmpty(long deadlineNanos) throws InterruptedException {
diff --git
a/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetcher.java
b/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetcher.java
index 3c5dc29f8..cc3cf9874 100644
---
a/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetcher.java
+++
b/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetcher.java
@@ -178,7 +178,7 @@ public class LogFetcher implements Closeable {
}
/**
- * @param deadlineNanos the deadline time to wait util
+ * @param deadlineNanos the deadline time to wait until
* @return false if the waiting time detectably elapsed before return from
the method, else true
*/
public boolean awaitNotEmpty(long deadlineNanos) {
diff --git
a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/ClientToServerITCaseBase.java
b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/ClientToServerITCaseBase.java
index 3f7c26bd8..e9b9976b2 100644
---
a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/ClientToServerITCaseBase.java
+++
b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/ClientToServerITCaseBase.java
@@ -231,7 +231,7 @@ public abstract class ClientToServerITCaseBase {
public static void waitAllReplicasReady(long tableId, int
expectBucketCount) {
// retry until all replica ready.
for (int i = 0; i < expectBucketCount; i++) {
- FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(new
TableBucket(tableId, i));
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(new
TableBucket(tableId, i));
}
}
diff --git
a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java
b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java
index 274dfbac5..0c29e3b26 100644
---
a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java
+++
b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java
@@ -613,7 +613,7 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
Map<Integer, CompletedSnapshot> expectedSnapshots = new
HashMap<>();
for (int bucket = 0; bucket < bucketNum; bucket++) {
CompletedSnapshot completedSnapshot =
- FLUSS_CLUSTER_EXTENSION.waitUtilSnapshotFinished(
+ FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(
new TableBucket(tableId, bucket), 0);
expectedSnapshots.put(bucket, completedSnapshot);
}
@@ -628,9 +628,9 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
upsertWriter.flush();
TableBucket tb = new TableBucket(snapshots.getTableId(), 0);
- // wait util the snapshot finish
+ // wait until the snapshot finish
expectedSnapshots.put(
- tb.getBucket(),
FLUSS_CLUSTER_EXTENSION.waitUtilSnapshotFinished(tb, 1));
+ tb.getBucket(),
FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tb, 1));
// check snapshot
snapshots = admin.getLatestKvSnapshots(tablePath1).get();
diff --git
a/fluss-client/src/test/java/com/alibaba/fluss/client/security/acl/FlussAuthorizationITCase.java
b/fluss-client/src/test/java/com/alibaba/fluss/client/security/acl/FlussAuthorizationITCase.java
index f3a87bc38..7ba60d702 100644
---
a/fluss-client/src/test/java/com/alibaba/fluss/client/security/acl/FlussAuthorizationITCase.java
+++
b/fluss-client/src/test/java/com/alibaba/fluss/client/security/acl/FlussAuthorizationITCase.java
@@ -335,7 +335,7 @@ public class FlussAuthorizationITCase {
OperationType.DESCRIBE,
PermissionType.ALLOW)));
rootAdmin.createAcls(aclBindings).all().get();
- FLUSS_CLUSTER_EXTENSION.waitUtilAuthenticationSync(aclBindings, true);
+ FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true);
assertThat(guestAdmin.listDatabases().get()).isEqualTo(Collections.singletonList("fluss"));
aclBindings =
@@ -348,7 +348,7 @@ public class FlussAuthorizationITCase {
OperationType.ALL,
PermissionType.ALLOW)));
rootAdmin.createAcls(aclBindings).all().get();
- FLUSS_CLUSTER_EXTENSION.waitUtilAuthenticationSync(aclBindings, true);
+ FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true);
assertThat(guestAdmin.listDatabases().get())
.containsExactlyInAnyOrderElementsOf(
Lists.newArrayList("fluss",
DATA1_TABLE_PATH_PK.getDatabaseName()));
@@ -397,7 +397,7 @@ public class FlussAuthorizationITCase {
OperationType.DESCRIBE,
PermissionType.ALLOW)));
rootAdmin.createAcls(aclBindings).all().get();
- FLUSS_CLUSTER_EXTENSION.waitUtilAuthenticationSync(aclBindings, true);
+ FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true);
assertThat(guestAdmin.listTables(DATA1_TABLE_PATH_PK.getDatabaseName()).get())
.isEqualTo(Collections.singletonList(DATA1_TABLE_PATH_PK.getTableName()));
}
@@ -430,7 +430,7 @@ public class FlussAuthorizationITCase {
OperationType.DESCRIBE,
PermissionType.ALLOW)));
rootAdmin.createAcls(aclBindings).all().get();
- FLUSS_CLUSTER_EXTENSION.waitUtilAuthenticationSync(aclBindings,
true);
+ FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings,
true);
assertThat(guestGateway.metadata(metadataRequest).get().getTableMetadatasList())
.hasSize(1);
}
@@ -445,7 +445,7 @@ public class FlussAuthorizationITCase {
TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(1).build();
rootAdmin.createTable(writeAclTable, descriptor, false).get();
TableInfo tableInfo = rootAdmin.getTableInfo(writeAclTable).get();
- FLUSS_CLUSTER_EXTENSION.waitUtilTableReady(tableInfo.getTableId());
+ FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableInfo.getTableId());
// create acl to allow guest write.
List<AclBinding> aclBindings =
Collections.singletonList(
@@ -457,7 +457,7 @@ public class FlussAuthorizationITCase {
OperationType.WRITE,
PermissionType.ALLOW)));
rootAdmin.createAcls(aclBindings).all().get();
- FLUSS_CLUSTER_EXTENSION.waitUtilAuthenticationSync(aclBindings, true);
+ FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true);
FlussConnection flussConnection = (FlussConnection) guestConn;
TabletServerGateway tabletServerGateway =
@@ -506,9 +506,9 @@ public class FlussAuthorizationITCase {
TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(1).build();
rootAdmin.createTable(writeAclTable, descriptor, false).get();
rootAdmin.createTable(noWriteAclTable, descriptor, false).get();
- FLUSS_CLUSTER_EXTENSION.waitUtilTableReady(
+ FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(
rootAdmin.getTableInfo(writeAclTable).get().getTableId());
- FLUSS_CLUSTER_EXTENSION.waitUtilTableReady(
+ FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(
rootAdmin.getTableInfo(noWriteAclTable).get().getTableId());
// create acl to allow guest write for writeAclTable.
@@ -528,9 +528,9 @@ public class FlussAuthorizationITCase {
new AccessControlEntry(
guestPrincipal, "*", READ,
PermissionType.ALLOW)));
rootAdmin.createAcls(aclBindingOfWriteAclTables).all().get();
-
FLUSS_CLUSTER_EXTENSION.waitUtilAuthenticationSync(aclBindingOfWriteAclTables,
true);
+
FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindingOfWriteAclTables,
true);
rootAdmin.createAcls(aclBindingOfNoWriteAclTables).all().get();
-
FLUSS_CLUSTER_EXTENSION.waitUtilAuthenticationSync(aclBindingOfNoWriteAclTables,
true);
+
FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindingOfNoWriteAclTables,
true);
// 1. Try to write data to noWriteAclTable. It should throw
AuthorizationException because
// of request writeId failed.
@@ -576,7 +576,7 @@ public class FlussAuthorizationITCase {
TableDescriptor descriptor =
TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(1).build();
rootAdmin.createTable(DATA1_TABLE_PATH, descriptor, false).get();
- FLUSS_CLUSTER_EXTENSION.waitUtilTableReady(
+ FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(
rootAdmin.getTableInfo(DATA1_TABLE_PATH).get().getTableId());
// create acl to allow guest write.
List<AclBinding> aclBindings =
@@ -589,7 +589,7 @@ public class FlussAuthorizationITCase {
OperationType.WRITE,
PermissionType.ALLOW)));
rootAdmin.createAcls(aclBindings).all().get();
- FLUSS_CLUSTER_EXTENSION.waitUtilAuthenticationSync(aclBindings, true);
+ FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true);
try (Table table = guestConn.getTable(DATA1_TABLE_PATH)) {
AppendWriter appendWriter = table.newAppend().createWriter();
appendWriter.append(row(1, "a")).get();
diff --git
a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussFailServerTableITCase.java
b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussFailServerTableITCase.java
index 017211cac..eb11d443c 100644
---
a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussFailServerTableITCase.java
+++
b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussFailServerTableITCase.java
@@ -50,7 +50,7 @@ class FlussFailServerTableITCase extends
ClientToServerITCaseBase {
void beforeEach() throws Exception {
// since we kill and start one tablet server in each test,
// we need to wait for metadata to be updated to servers
- FLUSS_CLUSTER_EXTENSION.waitUtilAllGatewayHasSameMetadata();
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllGatewayHasSameMetadata();
super.setup();
}
diff --git
a/fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java
b/fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java
index c16dd9645..c2dfec90f 100644
---
a/fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java
+++
b/fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java
@@ -101,7 +101,7 @@ class KvSnapshotBatchScannerITCase extends
ClientToServerITCaseBase {
Map<TableBucket, List<InternalRow>> expectedRowByBuckets =
putRows(tableId, tablePath, 10);
// wait snapshot finish
- waitUtilAllSnapshotFinished(expectedRowByBuckets.keySet(), 0);
+ waitUntilAllSnapshotFinished(expectedRowByBuckets.keySet(), 0);
// test read snapshot
testSnapshotRead(tablePath, expectedRowByBuckets);
@@ -110,7 +110,7 @@ class KvSnapshotBatchScannerITCase extends
ClientToServerITCaseBase {
expectedRowByBuckets = putRows(tableId, tablePath, 20);
// wait snapshot finish
- waitUtilAllSnapshotFinished(expectedRowByBuckets.keySet(), 1);
+ waitUntilAllSnapshotFinished(expectedRowByBuckets.keySet(), 1);
// test read snapshot
testSnapshotRead(tablePath, expectedRowByBuckets);
@@ -166,9 +166,9 @@ class KvSnapshotBatchScannerITCase extends
ClientToServerITCaseBase {
return function.bucketing(key, DEFAULT_BUCKET_NUM);
}
- private void waitUtilAllSnapshotFinished(Set<TableBucket> tableBuckets,
long snapshotId) {
+ private void waitUntilAllSnapshotFinished(Set<TableBucket> tableBuckets,
long snapshotId) {
for (TableBucket tableBucket : tableBuckets) {
- FLUSS_CLUSTER_EXTENSION.waitUtilSnapshotFinished(tableBucket,
snapshotId);
+ FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket,
snapshotId);
}
}
}
diff --git
a/fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/LogFetcherTest.java
b/fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/LogFetcherTest.java
index ae0917c31..34131b7e8 100644
---
a/fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/LogFetcherTest.java
+++
b/fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/LogFetcherTest.java
@@ -64,7 +64,7 @@ public class LogFetcherTest extends ClientToServerITCaseBase {
// We create table data1NonPkTablePath previously.
tableId = createTable(DATA1_TABLE_PATH, DATA1_TABLE_DESCRIPTOR, false);
- FLUSS_CLUSTER_EXTENSION.waitUtilTableReady(tableId);
+ FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
RpcClient rpcClient = FLUSS_CLUSTER_EXTENSION.getRpcClient();
MetadataUpdater metadataUpdater = new MetadataUpdater(clientConf,
rpcClient);
diff --git
a/fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/LogScannerITCase.java
b/fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/LogScannerITCase.java
index 8b87bd867..df26e34ea 100644
---
a/fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/LogScannerITCase.java
+++
b/fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/LogScannerITCase.java
@@ -288,7 +288,7 @@ public class LogScannerITCase extends
ClientToServerITCaseBase {
String partitionName = null;
Long partitionId = null;
if (!isPartitioned) {
- FLUSS_CLUSTER_EXTENSION.waitUtilTableReady(tableId);
+ FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
} else {
Map<String, Long> partitionNameAndIds =
FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath);
@@ -297,7 +297,7 @@ public class LogScannerITCase extends
ClientToServerITCaseBase {
partitionNameAndIds.entrySet().iterator().next();
partitionName = partitionNameAndIdEntry.getKey();
partitionId = partitionNameAndIds.get(partitionName);
- FLUSS_CLUSTER_EXTENSION.waitUtilTablePartitionReady(tableId,
partitionId);
+ FLUSS_CLUSTER_EXTENSION.waitUntilTablePartitionReady(tableId,
partitionId);
}
long firstStartTimestamp = System.currentTimeMillis();
@@ -390,14 +390,14 @@ public class LogScannerITCase extends
ClientToServerITCaseBase {
String partitionName = null;
Long partitionId = null;
if (!isPartitioned) {
- FLUSS_CLUSTER_EXTENSION.waitUtilTableReady(tableId);
+ FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
} else {
Map<String, Long> partitionNameAndIds =
FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath);
// just pick one partition
partitionName = partitionNameAndIds.keySet().iterator().next();
partitionId = partitionNameAndIds.get(partitionName);
- FLUSS_CLUSTER_EXTENSION.waitUtilTablePartitionReady(tableId,
partitionId);
+ FLUSS_CLUSTER_EXTENSION.waitUntilTablePartitionReady(tableId,
partitionId);
}
int batchRecordSize = 10;
diff --git
a/fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/RemoteLogScannerITCase.java
b/fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/RemoteLogScannerITCase.java
index 6e1881911..f0a6d48fe 100644
---
a/fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/RemoteLogScannerITCase.java
+++
b/fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/RemoteLogScannerITCase.java
@@ -96,7 +96,7 @@ public class RemoteLogScannerITCase {
appendWriter.append(row).get();
}
- FLUSS_CLUSTER_EXTENSION.waitUtilSomeLogSegmentsCopyToRemote(new
TableBucket(tableId, 0));
+ FLUSS_CLUSTER_EXTENSION.waitUntilSomeLogSegmentsCopyToRemote(new
TableBucket(tableId, 0));
// test fetch.
LogScanner logScanner = table.newScan().createLogScanner();
@@ -147,7 +147,7 @@ public class RemoteLogScannerITCase {
}
}
- FLUSS_CLUSTER_EXTENSION.waitUtilSomeLogSegmentsCopyToRemote(new
TableBucket(tableId, 0));
+ FLUSS_CLUSTER_EXTENSION.waitUntilSomeLogSegmentsCopyToRemote(new
TableBucket(tableId, 0));
// test fetch.
LogScanner logScanner = createLogScanner(table, new int[] {0, 2});
@@ -237,7 +237,7 @@ public class RemoteLogScannerITCase {
}
for (long id : partitionIdByNames.values()) {
- FLUSS_CLUSTER_EXTENSION.waitUtilSomeLogSegmentsCopyToRemote(
+ FLUSS_CLUSTER_EXTENSION.waitUntilSomeLogSegmentsCopyToRemote(
new TableBucket(tableId, id, 0));
}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/security/acl/FlinkAuthorizationITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/security/acl/FlinkAuthorizationITCase.java
index 24497100a..8e2afa937 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/security/acl/FlinkAuthorizationITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/security/acl/FlinkAuthorizationITCase.java
@@ -424,7 +424,7 @@ abstract class FlinkAuthorizationITCase extends
AbstractTestBase {
String.format("%s:%s", guest.getType(),
guest.getName()),
operationType.name()))
.await();
- FLUSS_CLUSTER_EXTENSION.waitUtilAuthenticationSync(
+ FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(
Collections.singletonList(
new AclBinding(
resource,
@@ -443,7 +443,7 @@ abstract class FlinkAuthorizationITCase extends
AbstractTestBase {
String.format("%s:%s", guest.getType(),
guest.getName()),
operationType.name()))
.await();
- FLUSS_CLUSTER_EXTENSION.waitUtilAuthenticationSync(
+ FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(
Collections.singletonList(
new AclBinding(
resource,
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/FlinkTableSourceITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/FlinkTableSourceITCase.java
index 994577673..8eb6f7af9 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/FlinkTableSourceITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/FlinkTableSourceITCase.java
@@ -76,7 +76,7 @@ import static
com.alibaba.fluss.flink.utils.FlinkTestBase.writeRows;
import static com.alibaba.fluss.flink.utils.FlinkTestBase.writeRowsToPartition;
import static
com.alibaba.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
import static com.alibaba.fluss.testutils.DataTestUtils.row;
-import static com.alibaba.fluss.testutils.common.CommonTestUtils.waitUtil;
+import static com.alibaba.fluss.testutils.common.CommonTestUtils.waitUntil;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -160,7 +160,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
// write records
writeRows(conn, tablePath, rows, false);
- waitUtilAllBucketFinishSnapshot(admin, tablePath);
+ waitUntilAllBucketFinishSnapshot(admin, tablePath);
List<String> expectedRows = Arrays.asList("+I[1, v1]", "+I[2, v2]",
"+I[3, v3]");
@@ -179,7 +179,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
// write records
writeRows(conn, tablePath, rows, false);
- waitUtilAllBucketFinishSnapshot(admin, tablePath);
+ waitUntilAllBucketFinishSnapshot(admin, tablePath);
List<String> expectedRows = Arrays.asList("+I[1, v1]", "+I[2, v2]",
"+I[3, v3]");
@@ -305,7 +305,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
// write records and wait snapshot before collect job start,
// to make sure reading from kv snapshot
writeRows(conn, tablePath, rows, false);
- waitUtilAllBucketFinishSnapshot(admin,
TablePath.of(DEFAULT_DB, tableName));
+ waitUntilAllBucketFinishSnapshot(admin,
TablePath.of(DEFAULT_DB, tableName));
}
} else {
writeRows(conn, tablePath, rows, true);
@@ -360,7 +360,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
// write records
writeRows(conn, tablePath, rows, false);
- waitUtilAllBucketFinishSnapshot(admin, tablePath);
+ waitUntilAllBucketFinishSnapshot(admin, tablePath);
List<String> expectedRows = Arrays.asList("+I[1, v1]", "+I[2, v2]",
"+I[3, v3]");
@@ -485,7 +485,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
// write records and wait generate snapshot.
writeRows(conn, tablePath, rows1, false);
- waitUtilAllBucketFinishSnapshot(admin, tablePath);
+ waitUntilAllBucketFinishSnapshot(admin, tablePath);
List<InternalRow> rows2 = Arrays.asList(row(1, "v11"), row(2, "v22"),
row(4, "v4"));
@@ -563,9 +563,10 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
// write records and wait generate snapshot.
writeRows(conn, tablePath, rows1, false);
if (partitionName == null) {
- waitUtilAllBucketFinishSnapshot(admin, tablePath);
+ waitUntilAllBucketFinishSnapshot(admin, tablePath);
} else {
- waitUtilAllBucketFinishSnapshot(admin, tablePath,
Collections.singleton(partitionName));
+ waitUntilAllBucketFinishSnapshot(
+ admin, tablePath, Collections.singleton(partitionName));
}
CLOCK.advanceTime(Duration.ofMillis(100));
@@ -645,7 +646,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
List<String> expectedRowValues =
writeRowsToPartition(conn, tablePath,
partitionNameById.values());
- waitUtilAllBucketFinishSnapshot(admin, tablePath,
partitionNameById.values());
+ waitUntilAllBucketFinishSnapshot(admin, tablePath,
partitionNameById.values());
org.apache.flink.util.CloseableIterator<Row> rowIter =
tEnv.executeSql(String.format("select * from %s",
tableName)).collect();
@@ -991,7 +992,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
writeRowsToPartition(conn, tablePath, Arrays.asList("2025",
"2026")).stream()
.filter(s -> s.contains("2025"))
.collect(Collectors.toList());
- waitUtilAllBucketFinishSnapshot(admin, tablePath,
Arrays.asList("2025", "2026"));
+ waitUntilAllBucketFinishSnapshot(admin, tablePath,
Arrays.asList("2025", "2026"));
String plan = tEnv.explainSql("select * from partitioned_table where c
='2025'");
assertThat(plan)
@@ -1023,7 +1024,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
.stream()
.filter(s -> s.contains("2025"))
.collect(Collectors.toList());
- waitUtilAllBucketFinishSnapshot(
+ waitUntilAllBucketFinishSnapshot(
admin, tablePath, Arrays.asList("2025$1", "2025$2", "2025$2"));
String plan = tEnv.explainSql("select * from multi_partitioned_table
where c ='2025'");
@@ -1046,7 +1047,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
.stream()
.filter(s -> s.contains("2025"))
.collect(Collectors.toList());
- waitUtilAllBucketFinishSnapshot(admin, tablePath,
Arrays.asList("2025$3", "2026$2"));
+ waitUntilAllBucketFinishSnapshot(admin, tablePath,
Arrays.asList("2025$3", "2026$2"));
assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
String plan2 =
@@ -1090,7 +1091,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
}
writeRows(conn, tablePath, rows, false);
- waitUtilAllBucketFinishSnapshot(admin, tablePath,
Arrays.asList("2025", "2026"));
+ waitUntilAllBucketFinishSnapshot(admin, tablePath,
Arrays.asList("2025", "2026"));
String plan =
tEnv.explainSql(
@@ -1121,7 +1122,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
List<String> expectedRowValues =
writeRowsToPartition(conn, tablePath, Arrays.asList("2025",
"2026"));
- waitUtilAllBucketFinishSnapshot(admin, tablePath,
Arrays.asList("2025", "2026"));
+ waitUntilAllBucketFinishSnapshot(admin, tablePath,
Arrays.asList("2025", "2026"));
org.apache.flink.util.CloseableIterator<Row> rowIter =
tEnv.executeSql("select * from
partitioned_table_no_filter").collect();
@@ -1283,8 +1284,8 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
return tableName;
}
- private void waitUtilAllBucketFinishSnapshot(Admin admin, TablePath
tablePath) {
- waitUtil(
+ private void waitUntilAllBucketFinishSnapshot(Admin admin, TablePath
tablePath) {
+ waitUntil(
() -> {
KvSnapshots snapshots =
admin.getLatestKvSnapshots(tablePath).get();
for (int bucketId : snapshots.getBucketIds()) {
@@ -1295,12 +1296,12 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
return true;
},
Duration.ofMinutes(1),
- "Fail to wait util all bucket finish snapshot");
+ "Fail to wait until all bucket finish snapshot");
}
- private void waitUtilAllBucketFinishSnapshot(
+ private void waitUntilAllBucketFinishSnapshot(
Admin admin, TablePath tablePath, Collection<String> partitions) {
- waitUtil(
+ waitUntil(
() -> {
for (String partition : partitions) {
KvSnapshots snapshots =
@@ -1314,7 +1315,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
return true;
},
Duration.ofMinutes(1),
- "Fail to wait util all bucket finish snapshot");
+ "Fail to wait until all bucket finish snapshot");
}
private void assertQueryResult(String query, List<String> expected) throws
Exception {
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReaderTest.java
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReaderTest.java
index 4338989fc..d29749853 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReaderTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReaderTest.java
@@ -65,7 +65,7 @@ class FlinkSourceReaderTest extends FlinkTestBase {
TableDescriptor tableDescriptor =
DEFAULT_AUTO_PARTITIONED_PK_TABLE_DESCRIPTOR;
long tableId = createTable(tablePath, tableDescriptor);
- // wait util partitions are created
+ // wait until partitions are created
ZooKeeperClient zooKeeperClient =
FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
Map<Long, String> partitionNameByIds =
waitUntilPartitions(zooKeeperClient, tablePath);
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
index 68a5e1554..0d9d1b0e7 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
@@ -74,7 +74,7 @@ class FlussTableLakeSnapshotCommitterTest extends
FlinkTestBase {
List<String> partitions;
Map<String, Long> partitionNameAndIds = Collections.emptyMap();
if (!isPartitioned) {
- FLUSS_CLUSTER_EXTENSION.waitUtilTableReady(tableId);
+ FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
partitions = Collections.singletonList(null);
} else {
partitionNameAndIds =
FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath);
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/source/TieringTestBase.java
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/source/TieringTestBase.java
index afcd8f236..d99657e8d 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/source/TieringTestBase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/source/TieringTestBase.java
@@ -205,7 +205,7 @@ public class TieringTestBase extends AbstractTestBase {
throws Exception {
admin.createTable(tablePath, tableDescriptor, true).get();
long tableId = admin.getTableInfo(tablePath).get().getTableId();
- FLUSS_CLUSTER_EXTENSION.waitUtilTableReady(tableId);
+ FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
return tableId;
}
@@ -228,7 +228,7 @@ public class TieringTestBase extends AbstractTestBase {
for (Map.Entry<String, Long> entry : partitionNameByIds.entrySet()) {
for (int i = 0; i < DEFAULT_BUCKET_NUM; i++) {
TableBucket tableBucket = new TableBucket(tableId,
entry.getValue(), i);
- FLUSS_CLUSTER_EXTENSION.waitUtilSnapshotFinished(tableBucket,
snapshotId);
+ FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket,
snapshotId);
}
}
}
@@ -236,7 +236,7 @@ public class TieringTestBase extends AbstractTestBase {
protected void waitUntilSnapshot(long tableId, @Nullable Long partitionId,
long snapshotId) {
for (int i = 0; i < DEFAULT_BUCKET_NUM; i++) {
TableBucket tableBucket = new TableBucket(tableId, partitionId, i);
- FLUSS_CLUSTER_EXTENSION.waitUtilSnapshotFinished(tableBucket,
snapshotId);
+ FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket,
snapshotId);
}
}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/utils/FlinkTestBase.java
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/utils/FlinkTestBase.java
index b3df7eb3a..abbc95f12 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/utils/FlinkTestBase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/utils/FlinkTestBase.java
@@ -197,7 +197,7 @@ public class FlinkTestBase extends AbstractTestBase {
protected void waitUntilSnapshot(long tableId, long snapshotId) {
for (int i = 0; i < DEFAULT_BUCKET_NUM; i++) {
TableBucket tableBucket = new TableBucket(tableId, i);
- FLUSS_CLUSTER_EXTENSION.waitUtilSnapshotFinished(tableBucket,
snapshotId);
+ FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket,
snapshotId);
}
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
index 3b1d348a2..290a51d10 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
@@ -63,7 +63,7 @@ class FlinkUnionReadLogTableITCase extends
FlinkUnionReadTestBase {
List<Row> writtenRows = new ArrayList<>();
long tableId = prepareLogTable(t1, DEFAULT_BUCKET_NUM, isPartitioned,
writtenRows);
// wait until records has been synced
- waitUtilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
+ waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
// now, start to read the log table, which will read paimon
// may read fluss or not, depends on the log offset of paimon snapshot
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
index cfc6f49f0..b71530a83 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
@@ -75,7 +75,7 @@ class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase {
preparePKTableFullType(t1, DEFAULT_BUCKET_NUM, isPartitioned,
bucketLogEndOffset);
// wait unit records have been synced
- waitUtilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
+ waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
// check the status of replica after synced
assertReplicaStatus(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned,
bucketLogEndOffset);
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
index f8a8641cd..b4788d870 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
@@ -69,7 +69,7 @@ import java.util.Optional;
import static
com.alibaba.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL;
import static com.alibaba.fluss.testutils.DataTestUtils.row;
import static com.alibaba.fluss.testutils.common.CommonTestUtils.retry;
-import static com.alibaba.fluss.testutils.common.CommonTestUtils.waitUtil;
+import static com.alibaba.fluss.testutils.common.CommonTestUtils.waitUntil;
import static com.alibaba.fluss.testutils.common.CommonTestUtils.waitValue;
import static org.assertj.core.api.Assertions.assertThat;
@@ -167,7 +167,7 @@ public class FlinkPaimonTieringTestBase {
protected void waitUntilSnapshot(long tableId, int bucketNum, long
snapshotId) {
for (int i = 0; i < bucketNum; i++) {
TableBucket tableBucket = new TableBucket(tableId, i);
- FLUSS_CLUSTER_EXTENSION.waitUtilSnapshotFinished(tableBucket,
snapshotId);
+ FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket,
snapshotId);
}
}
@@ -408,26 +408,26 @@ public class FlinkPaimonTieringTestBase {
});
}
- protected void waitUtilBucketSynced(
+ protected void waitUntilBucketSynced(
TablePath tablePath, long tableId, int bucketCount, boolean
isPartition) {
if (isPartition) {
Map<Long, String> partitionById = waitUntilPartitions(tablePath);
for (Long partitionId : partitionById.keySet()) {
for (int i = 0; i < bucketCount; i++) {
TableBucket tableBucket = new TableBucket(tableId,
partitionId, i);
- waitUtilBucketSynced(tableBucket);
+ waitUntilBucketSynced(tableBucket);
}
}
} else {
for (int i = 0; i < bucketCount; i++) {
TableBucket tableBucket = new TableBucket(tableId, i);
- waitUtilBucketSynced(tableBucket);
+ waitUntilBucketSynced(tableBucket);
}
}
}
- protected void waitUtilBucketSynced(TableBucket tb) {
- waitUtil(
+ protected void waitUntilBucketSynced(TableBucket tb) {
+ waitUntil(
() -> {
Replica replica = getLeaderReplica(tb);
return replica.getLogTablet().getLakeTableSnapshotId() >=
0;
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringITCase.java
index 4309a6172..2ee3083ff 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringITCase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringITCase.java
@@ -130,7 +130,7 @@ class PaimonTieringITCase extends
FlinkPaimonTieringTestBase {
partitionedTablePath, partitionedTableDescriptor,
partitionNameByIds);
long tableId = tableIdAndDescriptor.f0;
- // wait util synced to paimon
+ // wait until synced to paimon
for (Long partitionId : partitionNameByIds.keySet()) {
TableBucket tableBucket = new TableBucket(tableId, partitionId, 0);
assertReplicaStatus(tableBucket, 3);
diff --git
a/fluss-server/src/test/java/com/alibaba/fluss/server/ServerITCaseBase.java
b/fluss-server/src/test/java/com/alibaba/fluss/server/ServerITCaseBase.java
index 88317d5c8..8db2525e4 100644
--- a/fluss-server/src/test/java/com/alibaba/fluss/server/ServerITCaseBase.java
+++ b/fluss-server/src/test/java/com/alibaba/fluss/server/ServerITCaseBase.java
@@ -112,7 +112,7 @@ public abstract class ServerITCaseBase {
}
private void waitUntilServerStartup(TestProcessBuilder.TestProcess
process) {
- CommonTestUtils.waitUtil(
+ CommonTestUtils.waitUntil(
() ->
process.getProcessOutput().toString().contains(SERVER_STARTED_MARKER)
||
!process.getErrorOutput().toString().isEmpty(),
diff --git
a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/StopReplicaITCase.java
b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/StopReplicaITCase.java
index d2ced9a91..b85eac316 100644
---
a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/StopReplicaITCase.java
+++
b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/StopReplicaITCase.java
@@ -75,13 +75,13 @@ public class StopReplicaITCase {
// wait until all the gateway has same metadata because the follower
fetcher manager need
// to get the leader address from server metadata while make follower.
- FLUSS_CLUSTER_EXTENSION.waitUtilAllGatewayHasSameMetadata();
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllGatewayHasSameMetadata();
long tableId =
RpcMessageTestUtils.createTable(
FLUSS_CLUSTER_EXTENSION, tablePath, tableDescriptor);
TableBucket tb = new TableBucket(tableId, 0);
- FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb);
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb);
List<Integer> isr = waitAndGetIsr(tb);
List<Path> tableDirs =
assertReplicaExistAndGetTableOrPartitionDirs(tb, isr, isPkTable);
@@ -99,7 +99,7 @@ public class StopReplicaITCase {
RpcMessageTestUtils.createTable(
FLUSS_CLUSTER_EXTENSION, tablePath, tableDescriptor);
TableBucket tb1 = new TableBucket(tableId, 0);
- FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb1);
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb1);
isr = waitAndGetIsr(tb1);
tableDirs = assertReplicaExistAndGetTableOrPartitionDirs(tb1, isr,
isPkTable);
@@ -116,7 +116,7 @@ public class StopReplicaITCase {
RpcMessageTestUtils.createTable(
FLUSS_CLUSTER_EXTENSION, tablePath, tableDescriptor);
TableBucket tb2 = new TableBucket(tableId, 0);
- FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb2);
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb2);
List<Integer> isr2 = waitAndGetIsr(tb2);
List<Path> tableDirs2 =
assertReplicaExistAndGetTableOrPartitionDirs(tb2, isr2, isPkTable);
diff --git
a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TableManagerITCase.java
b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TableManagerITCase.java
index 03e79819b..e1f834d1f 100644
---
a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TableManagerITCase.java
+++
b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TableManagerITCase.java
@@ -375,7 +375,7 @@ class TableManagerITCase {
TableDescriptor tableDescriptor =
newPartitionedTable().withProperties(options);
adminGateway.createTable(newCreateTableRequest(tablePath,
tableDescriptor, false)).get();
- // wait util partition is created
+ // wait until partition is created
Map<String, Long> partitions =
waitValue(
() -> {
@@ -462,11 +462,11 @@ class TableManagerITCase {
// retry until all replica ready.
int expectBucketCount =
tableDescriptor.getTableDistribution().get().getBucketCount().get();
for (int i = 0; i < expectBucketCount; i++) {
- FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(new
TableBucket(tableId, i));
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(new
TableBucket(tableId, i));
}
// retry to check metadata.
- FLUSS_CLUSTER_EXTENSION.waitUtilAllGatewayHasSameMetadata();
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllGatewayHasSameMetadata();
MetadataResponse metadataResponse =
gateway.metadata(newMetadataRequest(Collections.singletonList(tablePath))).get();
// should be no tablet server as we only create tablet service.
@@ -575,7 +575,7 @@ class TableManagerITCase {
for (long partitionId : partitionById.values()) {
for (int i = 0; i < expectBucketCount; i++) {
- FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(
new TableBucket(tableId, partitionId, i));
}
}
diff --git
a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/snapshot/TestingCompletedKvSnapshotCommitter.java
b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/snapshot/TestingCompletedKvSnapshotCommitter.java
index dc2fd99d9..15f277bfd 100644
---
a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/snapshot/TestingCompletedKvSnapshotCommitter.java
+++
b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/snapshot/TestingCompletedKvSnapshotCommitter.java
@@ -51,7 +51,7 @@ public class TestingCompletedKvSnapshotCommitter implements
CompletedKvSnapshotC
.put(snapshot.getSnapshotID(), bucketLeaderEpoch);
}
- public CompletedSnapshot waitUtilSnapshotComplete(
+ public CompletedSnapshot waitUntilSnapshotComplete(
TableBucket tableBucket, int snapshotIdToWait) {
return waitValue(
() -> {
diff --git
a/fluss-server/src/test/java/com/alibaba/fluss/server/log/remote/CommitRemoteLogManifestITCase.java
b/fluss-server/src/test/java/com/alibaba/fluss/server/log/remote/CommitRemoteLogManifestITCase.java
index ab6e111d8..292e9e572 100644
---
a/fluss-server/src/test/java/com/alibaba/fluss/server/log/remote/CommitRemoteLogManifestITCase.java
+++
b/fluss-server/src/test/java/com/alibaba/fluss/server/log/remote/CommitRemoteLogManifestITCase.java
@@ -62,7 +62,7 @@ class CommitRemoteLogManifestITCase {
// find the tb whose leader is the server with large log tiering
interval.
TableBucket tb = new TableBucket(tableId, 0);
- FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb);
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb);
int leader =
Objects.requireNonNull(
FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tb).getLeaderId());
@@ -91,7 +91,7 @@ class CommitRemoteLogManifestITCase {
newProduceLogRequest(tableId, 0, -1,
genMemoryLogRecordsByObject(DATA1)))
.get();
for (int stopFollower : stopFollowers) {
- FLUSS_CLUSTER_EXTENSION.waitUtilReplicaShrinkFromIsr(tb,
stopFollower);
+ FLUSS_CLUSTER_EXTENSION.waitUntilReplicaShrinkFromIsr(tb,
stopFollower);
LogTablet stopfollowerLogTablet =
FLUSS_CLUSTER_EXTENSION
.waitAndGetFollowerReplica(tb, stopFollower)
@@ -106,7 +106,7 @@ class CommitRemoteLogManifestITCase {
.set(
ConfigOptions.REMOTE_LOG_TASK_INTERVAL_DURATION,
Duration.ofMillis(1)));
- FLUSS_CLUSTER_EXTENSION.waitUtilSomeLogSegmentsCopyToRemote(tb);
+ FLUSS_CLUSTER_EXTENSION.waitUntilSomeLogSegmentsCopyToRemote(tb);
// check only has two remote log segments for the stopped replicas
for (int stopFollower : stopFollowers) {
diff --git
a/fluss-server/src/test/java/com/alibaba/fluss/server/log/remote/RemoteLogITCase.java
b/fluss-server/src/test/java/com/alibaba/fluss/server/log/remote/RemoteLogITCase.java
index e17b0ec70..5f92d043d 100644
---
a/fluss-server/src/test/java/com/alibaba/fluss/server/log/remote/RemoteLogITCase.java
+++
b/fluss-server/src/test/java/com/alibaba/fluss/server/log/remote/RemoteLogITCase.java
@@ -69,7 +69,7 @@ public class RemoteLogITCase {
long tableId =
createTable(FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH,
DATA1_TABLE_DESCRIPTOR);
TableBucket tb = new TableBucket(tableId, 0);
- FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb);
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb);
return tb;
}
@@ -88,7 +88,7 @@ public class RemoteLogITCase {
0,
i * 10L);
}
- FLUSS_CLUSTER_EXTENSION.waitUtilSomeLogSegmentsCopyToRemote(
+ FLUSS_CLUSTER_EXTENSION.waitUntilSomeLogSegmentsCopyToRemote(
new TableBucket(tb.getTableId(), 0));
}
@@ -161,7 +161,7 @@ public class RemoteLogITCase {
createTable(FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH,
DATA1_TABLE_DESCRIPTOR);
TableBucket tb = new TableBucket(tableId, 0);
- FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb);
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb);
int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
int follower;
for (int i = 0; true; i++) {
@@ -193,12 +193,12 @@ public class RemoteLogITCase {
i * 10L);
}
- FLUSS_CLUSTER_EXTENSION.waitUtilReplicaShrinkFromIsr(tb, follower);
- FLUSS_CLUSTER_EXTENSION.waitUtilSomeLogSegmentsCopyToRemote(tb);
+ FLUSS_CLUSTER_EXTENSION.waitUntilReplicaShrinkFromIsr(tb, follower);
+ FLUSS_CLUSTER_EXTENSION.waitUntilSomeLogSegmentsCopyToRemote(tb);
// restart follower
FLUSS_CLUSTER_EXTENSION.startTabletServer(follower);
- FLUSS_CLUSTER_EXTENSION.waitUtilReplicaExpandToIsr(tb, follower);
+ FLUSS_CLUSTER_EXTENSION.waitUntilReplicaExpandToIsr(tb, follower);
}
private static Configuration initConfig() {
diff --git
a/fluss-server/src/test/java/com/alibaba/fluss/server/metadata/MetadataUpdateITCase.java
b/fluss-server/src/test/java/com/alibaba/fluss/server/metadata/MetadataUpdateITCase.java
index f4341ec36..171e81790 100644
---
a/fluss-server/src/test/java/com/alibaba/fluss/server/metadata/MetadataUpdateITCase.java
+++
b/fluss-server/src/test/java/com/alibaba/fluss/server/metadata/MetadataUpdateITCase.java
@@ -90,7 +90,7 @@ class MetadataUpdateITCase {
@Test
void testMetadataUpdateForServerStartAndStop() throws Exception {
// get metadata and check it
- FLUSS_CLUSTER_EXTENSION.waitUtilAllGatewayHasSameMetadata();
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllGatewayHasSameMetadata();
Map<Long, TableContext> expectedTablePathById = new HashMap<>();
// create non-partitioned table
@@ -171,7 +171,7 @@ class MetadataUpdateITCase {
@Test
void testMetadataUpdateForTableCreateAndDrop() throws Exception {
- FLUSS_CLUSTER_EXTENSION.waitUtilAllGatewayHasSameMetadata();
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllGatewayHasSameMetadata();
Map<Long, TableContext> expectedTablePathById = new HashMap<>();
assertUpdateMetadataEquals(
coordinatorServerNode, 3, expectedTablePathById,
Collections.emptyMap());
@@ -252,7 +252,7 @@ class MetadataUpdateITCase {
@Test
void testMetadataUpdateForPartitionCreateAndDrop() throws Exception {
- FLUSS_CLUSTER_EXTENSION.waitUtilAllGatewayHasSameMetadata();
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllGatewayHasSameMetadata();
Map<Long, TableContext> expectedTablePathById = new HashMap<>();
Map<Long, TableContext> expectedPartitionNameById = new HashMap<>();
assertUpdateMetadataEquals(
diff --git
a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/CommitLakeTableSnapshotITCase.java
b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/CommitLakeTableSnapshotITCase.java
index f07b46fa4..77bd44bae 100644
---
a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/CommitLakeTableSnapshotITCase.java
+++
b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/CommitLakeTableSnapshotITCase.java
@@ -84,7 +84,7 @@ class CommitLakeTableSnapshotITCase {
int leaderServer = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
TabletServerGateway leaderGateWay =
FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leaderServer);
- FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb);
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb);
for (int i = 0; i < 10; i++) {
leaderGateWay
diff --git
a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/KvReplicaRestoreITCase.java
b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/KvReplicaRestoreITCase.java
index 808fb6b02..3072e9c1f 100644
---
a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/KvReplicaRestoreITCase.java
+++
b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/KvReplicaRestoreITCase.java
@@ -53,7 +53,7 @@ import static
com.alibaba.fluss.testutils.DataTestUtils.genKvRecordBatch;
import static com.alibaba.fluss.testutils.DataTestUtils.genKvRecords;
import static com.alibaba.fluss.testutils.DataTestUtils.getKeyValuePairs;
import static com.alibaba.fluss.testutils.DataTestUtils.toKvRecordBatch;
-import static com.alibaba.fluss.testutils.common.CommonTestUtils.waitUtil;
+import static com.alibaba.fluss.testutils.common.CommonTestUtils.waitUntil;
/** The IT case for the restoring of kv replica. */
class KvReplicaRestoreITCase {
@@ -103,7 +103,7 @@ class KvReplicaRestoreITCase {
// wait for snapshot finish so that we can restore from snapshot
for (TableBucket tableBucket : tableBuckets) {
final long snapshot1Id = 0;
- waitUtil(
+ waitUntil(
() -> completedSnapshotHandleStore.get(tableBucket,
snapshot1Id).isPresent(),
Duration.ofMinutes(2),
"Fail to wait for the snapshot 0 for bucket " +
tableBucket);
@@ -128,7 +128,7 @@ class KvReplicaRestoreITCase {
// wait for the replica to restore in another server
AtomicInteger newLeaderServer = new AtomicInteger(-1);
- waitUtil(
+ waitUntil(
() -> {
int restoreServer =
FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tableBucket);
if (restoreServer != leaderServer) {
@@ -151,8 +151,8 @@ class KvReplicaRestoreITCase {
// once restore in another server, it should also restore the records
to kv
List<Tuple2<byte[], byte[]>> expectedKeyValues =
getKeyValuePairs(records);
- // wait util we can lookup the last record from the kv
- waitUtil(
+ // wait until we can lookup the last record from the kv
+ waitUntil(
() -> {
try {
PbLookupRespForBucket pbLookupRespForBucket =
diff --git
a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaManagerTest.java
b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaManagerTest.java
index d3ec586b7..6dd2ce4f4 100644
---
a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaManagerTest.java
+++
b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaManagerTest.java
@@ -1312,9 +1312,9 @@ class ReplicaManagerTest extends ReplicaTestBase {
});
List<CompletedSnapshot> completedSnapshots = new ArrayList<>();
- // wait util we get completed snapshots for all table buckets.
+ // wait until we get completed snapshots for all table buckets.
for (TableBucket tableBucket : tableBuckets) {
-
completedSnapshots.add(snapshotReporter.waitUtilSnapshotComplete(tableBucket,
0));
+
completedSnapshots.add(snapshotReporter.waitUntilSnapshotComplete(tableBucket,
0));
}
// check the snapshots for each table bucket
@@ -1350,9 +1350,9 @@ class ReplicaManagerTest extends ReplicaTestBase {
});
completedSnapshots.clear();
- // wait util we get completed snapshots for all table buckets.
+ // wait until we get completed snapshots for all table buckets.
for (TableBucket tableBucket : tableBuckets) {
-
completedSnapshots.add(snapshotReporter.waitUtilSnapshotComplete(tableBucket,
1));
+
completedSnapshots.add(snapshotReporter.waitUntilSnapshotComplete(tableBucket,
1));
}
// check the snapshots for each table bucket
for (int i = 0; i < tableBuckets.size(); i++) {
diff --git
a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaTest.java
b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaTest.java
index 5d93e1923..eaf19158a 100644
---
a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaTest.java
+++
b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaTest.java
@@ -333,9 +333,9 @@ final class ReplicaTest extends ReplicaTestBase {
// trigger one snapshot,
scheduledExecutorService.triggerNonPeriodicScheduledTask();
- // wait util the snapshot 0 success
+ // wait until the snapshot 0 success
CompletedSnapshot completedSnapshot0 =
- kvSnapshotStore.waitUtilSnapshotComplete(tableBucket, 0);
+ kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 0);
// check snapshot
long expectedLogOffset = 4;
@@ -353,9 +353,9 @@ final class ReplicaTest extends ReplicaTestBase {
// trigger next checkpoint
scheduledExecutorService.triggerNonPeriodicScheduledTask();
- // wait util the snapshot 1 success
+ // wait until the snapshot 1 success
CompletedSnapshot completedSnapshot1 =
- kvSnapshotStore.waitUtilSnapshotComplete(tableBucket, 1);
+ kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 1);
// check snapshot
expectedLogOffset = 7;
@@ -394,9 +394,9 @@ final class ReplicaTest extends ReplicaTestBase {
// trigger another one snapshot,
scheduledExecutorService.triggerNonPeriodicScheduledTask();
- // wait util the snapshot 2 success
+ // wait until the snapshot 2 success
CompletedSnapshot completedSnapshot2 =
- kvSnapshotStore.waitUtilSnapshotComplete(tableBucket, 2);
+ kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 2);
expectedLogOffset = 10;
expectedKeyValues =
getKeyValuePairs(
@@ -434,7 +434,7 @@ final class ReplicaTest extends ReplicaTestBase {
int latestLeaderEpoch = 1;
int snapshot = 0;
makeKvReplicaAsLeader(kvReplica, latestLeaderEpoch);
- kvSnapshotStore.waitUtilSnapshotComplete(tableBucket, snapshot);
+ kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, snapshot);
assertThat(kvSnapshotStore.getSnapshotLeaderEpoch(tableBucket,
snapshot))
.isEqualTo(latestLeaderEpoch);
}
@@ -474,8 +474,8 @@ final class ReplicaTest extends ReplicaTestBase {
// trigger one snapshot,
scheduledExecutorService.triggerNonPeriodicScheduledTask();
- // wait util the snapshot success
- kvSnapshotStore.waitUtilSnapshotComplete(tableBucket, 0);
+ // wait until the snapshot success
+ kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 0);
// write data again
putRecordsToLeader(
diff --git
a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/RemoteLeaderEndpointTest.java
b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/RemoteLeaderEndpointTest.java
index 260f4df6f..ca1cb2757 100644
---
a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/RemoteLeaderEndpointTest.java
+++
b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/RemoteLeaderEndpointTest.java
@@ -71,12 +71,12 @@ public class RemoteLeaderEndpointTest {
// set bucket count to 1 to easy for debug.
TableDescriptor tableDescriptor =
TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(1, "a").build();
- FLUSS_CLUSTER_EXTENSION.waitUtilAllGatewayHasSameMetadata();
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllGatewayHasSameMetadata();
long tableId = createTable(FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH,
tableDescriptor);
int bucketId = 0;
TableBucket tb = new TableBucket(tableId, bucketId);
- FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb);
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb);
int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
TabletServerGateway leaderGateWay =
FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader);
diff --git
a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/ReplicaFetcherITCase.java
b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/ReplicaFetcherITCase.java
index 9f402446b..da80d88af 100644
---
a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/ReplicaFetcherITCase.java
+++
b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/ReplicaFetcherITCase.java
@@ -98,13 +98,13 @@ public class ReplicaFetcherITCase {
// wait until all the gateway has same metadata because the follower
fetcher manager need
// to get the leader address from server metadata while make follower.
- FLUSS_CLUSTER_EXTENSION.waitUtilAllGatewayHasSameMetadata();
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllGatewayHasSameMetadata();
long tableId = createTable(FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH,
tableDescriptor);
int bucketId = 0;
TableBucket tb = new TableBucket(tableId, bucketId);
- FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb);
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb);
int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
TabletServerGateway leaderGateWay =
@@ -140,7 +140,7 @@ public class ReplicaFetcherITCase {
ReplicaManager replicaManager =
FLUSS_CLUSTER_EXTENSION.getTabletServerById(followId).getReplicaManager();
- // wait util follower highWaterMark equals leader.
+ // wait until follower highWaterMark equals leader.
retry(
Duration.ofMinutes(1),
() ->
@@ -172,7 +172,7 @@ public class ReplicaFetcherITCase {
void testPutKvNeedAck() throws Exception {
// wait until all the gateway has same metadata because the follower
fetcher manager need
// to get the leader address from server metadata while make follower.
- FLUSS_CLUSTER_EXTENSION.waitUtilAllGatewayHasSameMetadata();
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllGatewayHasSameMetadata();
long tableId =
createTable(
@@ -180,7 +180,7 @@ public class ReplicaFetcherITCase {
int bucketId = 0;
TableBucket tb = new TableBucket(tableId, bucketId);
- FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb);
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb);
int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
TabletServerGateway leaderGateWay =
@@ -215,7 +215,7 @@ public class ReplicaFetcherITCase {
ReplicaManager replicaManager =
FLUSS_CLUSTER_EXTENSION.getTabletServerById(followId).getReplicaManager();
- // wait util follower highWaterMark equals leader. So we can fetch
log from follower
+ // wait until follower highWaterMark equals leader. So we can
fetch log from follower
// before highWaterMark.
retry(
Duration.ofMinutes(1),
@@ -254,7 +254,7 @@ public class ReplicaFetcherITCase {
int bucketId = 0;
TableBucket tb = new TableBucket(tableId, bucketId);
- FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb);
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb);
// let's kill a non leader server
int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
@@ -286,7 +286,7 @@ public class ReplicaFetcherITCase {
CompletableFuture<PutKvResponse> putResponse =
leaderGateWay.putKv(newPutKvRequest(tableId, bucketId, -1,
kvRecords));
- // wait util the log has been written
+ // wait until the log has been written
Replica replica = FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tb);
retry(
Duration.ofMinutes(1),
@@ -318,7 +318,7 @@ public class ReplicaFetcherITCase {
FLUSS_CLUSTER_EXTENSION.notifyLeaderAndIsr(
followerToStop, DATA1_TABLE_PATH, tb, newLeaderAndIsr,
Arrays.asList(0, 1, 2));
- // wait util the put future is done
+ // wait until the put future is done
putResponse.get();
// then we can check all the value
diff --git
a/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TabletServerFailOverITCase.java
b/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TabletServerFailOverITCase.java
index 43539ca58..02836864a 100644
---
a/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TabletServerFailOverITCase.java
+++
b/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TabletServerFailOverITCase.java
@@ -86,7 +86,7 @@ class TabletServerFailOverITCase {
long tableId = createTable(FLUSS_CLUSTER_EXTENSION, tablePath,
tableDescriptor);
TableBucket tb = new TableBucket(tableId, 0);
- FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb);
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb);
int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
TabletServerGateway leaderGateWay =
diff --git
a/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TabletServiceITCase.java
b/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TabletServiceITCase.java
index 9ca588062..2f6b4717e 100644
---
a/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TabletServiceITCase.java
+++
b/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TabletServiceITCase.java
@@ -124,7 +124,7 @@ public class TabletServiceITCase {
createTable(FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH,
DATA1_TABLE_DESCRIPTOR);
TableBucket tb = new TableBucket(tableId, 0);
- FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb);
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb);
int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
TabletServerGateway leaderGateWay =
@@ -175,7 +175,7 @@ public class TabletServiceITCase {
DATA1_TABLE_DESCRIPTOR.withReplicationFactor(3));
TableBucket tb = new TableBucket(tableId, 0);
- FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb);
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb);
int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
TabletServerGateway leaderGateWay =
@@ -211,7 +211,7 @@ public class TabletServiceITCase {
createTable(FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH,
DATA1_TABLE_DESCRIPTOR);
TableBucket tb = new TableBucket(tableId, 0);
- FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb);
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb);
int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
TabletServerGateway leaderGateWay =
@@ -318,7 +318,7 @@ public class TabletServiceITCase {
createTable(FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH,
DATA1_TABLE_DESCRIPTOR);
TableBucket tb = new TableBucket(tableId, 0);
- FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb);
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb);
int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
TabletServerGateway leaderGateWay =
@@ -412,7 +412,7 @@ public class TabletServiceITCase {
.build());
TableBucket tb = new TableBucket(tableId, 0);
- FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb);
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb);
int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
TabletServerGateway leaderGateWay =
FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader);
@@ -432,7 +432,7 @@ public class TabletServiceITCase {
FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH_PK,
DATA1_TABLE_DESCRIPTOR_PK);
TableBucket tb = new TableBucket(tableId, 0);
- FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb);
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb);
int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
TabletServerGateway leaderGateWay =
@@ -469,7 +469,7 @@ public class TabletServiceITCase {
FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH_PK,
DATA1_TABLE_DESCRIPTOR_PK);
TableBucket tb = new TableBucket(tableId, 0);
- FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb);
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb);
int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
TabletServerGateway leaderGateWay =
@@ -520,7 +520,7 @@ public class TabletServiceITCase {
createTable(FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH,
DATA1_TABLE_DESCRIPTOR);
TableBucket logTableBucket = new TableBucket(logTableId, 0);
- FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(logTableBucket);
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(logTableBucket);
int logLeader =
FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(logTableBucket);
TabletServerGateway logLeaderGateWay =
@@ -559,7 +559,7 @@ public class TabletServiceITCase {
long tableId = createTable(FLUSS_CLUSTER_EXTENSION, tablePath,
descriptor);
TableBucket tb = new TableBucket(tableId, 0);
- FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb);
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb);
int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
TabletServerGateway leaderGateWay =
@@ -635,7 +635,7 @@ public class TabletServiceITCase {
long logTableId =
createTable(FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH,
DATA1_TABLE_DESCRIPTOR);
tb = new TableBucket(logTableId, 0);
- FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb);
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb);
leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
TabletServerGateway leaderGateWay2 =
FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader);
@@ -659,7 +659,7 @@ public class TabletServiceITCase {
FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH_PK,
DATA1_TABLE_DESCRIPTOR_PK);
TableBucket tb = new TableBucket(tableId, 0);
- FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb);
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb);
int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
TabletServerGateway leaderGateWay =
@@ -690,7 +690,7 @@ public class TabletServiceITCase {
long logTableId =
createTable(FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH,
DATA1_TABLE_DESCRIPTOR);
TableBucket logTableBucket = new TableBucket(logTableId, 0);
- FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(logTableBucket);
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(logTableBucket);
int logLeader =
FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(logTableBucket);
TabletServerGateway logLeaderGateWay =
FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(logLeader);
@@ -733,7 +733,7 @@ public class TabletServiceITCase {
createTable(FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH,
DATA1_TABLE_DESCRIPTOR);
TableBucket tb = new TableBucket(tableId, 0);
- FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb);
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb);
int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
TabletServerGateway leaderGateWay =
@@ -819,7 +819,7 @@ public class TabletServiceITCase {
.build());
TableBucket tb = new TableBucket(tableId, 0);
- FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb);
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb);
LeaderAndIsr originLeaderAndIsr =
FLUSS_CLUSTER_EXTENSION.waitLeaderAndIsrReady(tb);
int leader = originLeaderAndIsr.leader();
diff --git
a/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/FlussClusterExtension.java
b/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/FlussClusterExtension.java
index 0d92bb0a6..70ed70220 100644
---
a/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/FlussClusterExtension.java
+++
b/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/FlussClusterExtension.java
@@ -97,7 +97,7 @@ import static
com.alibaba.fluss.server.utils.ServerRpcMessageUtils.makeStopBucke
import static
com.alibaba.fluss.server.utils.ServerRpcMessageUtils.toServerNode;
import static
com.alibaba.fluss.server.zk.ZooKeeperTestUtils.createZooKeeperClient;
import static com.alibaba.fluss.testutils.common.CommonTestUtils.retry;
-import static com.alibaba.fluss.testutils.common.CommonTestUtils.waitUtil;
+import static com.alibaba.fluss.testutils.common.CommonTestUtils.waitUntil;
import static com.alibaba.fluss.testutils.common.CommonTestUtils.waitValue;
import static com.alibaba.fluss.utils.function.FunctionUtils.uncheckedFunction;
import static org.assertj.core.api.Assertions.assertThat;
@@ -213,7 +213,7 @@ public final class FlussClusterExtension
startTabletServers();
// wait coordinator knows all tablet servers to make cluster
// have enough replication factor when creating table.
- waitUtilAllGatewayHasSameMetadata();
+ waitUntilAllGatewayHasSameMetadata();
}
public void close() throws Exception {
@@ -492,7 +492,7 @@ public final class FlussClusterExtension
* make sure same server info not to make sure table metadata). This
method needs to be called
* in advance for those ITCase which need to get metadata from server.
*/
- public void waitUtilAllGatewayHasSameMetadata() {
+ public void waitUntilAllGatewayHasSameMetadata() {
for (AdminReadOnlyGateway gateway : collectAllRpcGateways()) {
retry(
Duration.ofMinutes(1),
@@ -520,7 +520,7 @@ public final class FlussClusterExtension
}
/** Wait until all the table assignments buckets are ready for table. */
- public void waitUtilTableReady(long tableId) {
+ public void waitUntilTableReady(long tableId) {
ZooKeeperClient zkClient = getZooKeeperClient();
retry(
Duration.ofMinutes(1),
@@ -538,7 +538,7 @@ public final class FlussClusterExtension
* @param aclBindings aclBindings to be synchronized.
* @param exist whether aclBinding exist.
*/
- public void waitUtilAuthenticationSync(Collection<AclBinding> aclBindings,
boolean exist) {
+ public void waitUntilAuthenticationSync(Collection<AclBinding>
aclBindings, boolean exist) {
retry(
Duration.ofMinutes(1),
() -> {
@@ -566,7 +566,7 @@ public final class FlussClusterExtension
});
}
- public void waitUtilTablePartitionReady(long tableId, long partitionId) {
+ public void waitUntilTablePartitionReady(long tableId, long partitionId) {
ZooKeeperClient zkClient = getZooKeeperClient();
retry(
Duration.ofMinutes(1),
@@ -600,7 +600,7 @@ public final class FlussClusterExtension
}
/** Wait until the input replica is kicked out of isr. */
- public void waitUtilReplicaShrinkFromIsr(TableBucket tableBucket, int
replicaId) {
+ public void waitUntilReplicaShrinkFromIsr(TableBucket tableBucket, int
replicaId) {
ZooKeeperClient zkClient = getZooKeeperClient();
retry(
Duration.ofMinutes(1),
@@ -613,7 +613,7 @@ public final class FlussClusterExtension
}
/** Wait until the input replica is expended into isr. */
- public void waitUtilReplicaExpandToIsr(TableBucket tableBucket, int
replicaId) {
+ public void waitUntilReplicaExpandToIsr(TableBucket tableBucket, int
replicaId) {
ZooKeeperClient zkClient = getZooKeeperClient();
retry(
Duration.ofMinutes(1),
@@ -626,7 +626,7 @@ public final class FlussClusterExtension
}
/** Wait until all the replicas are ready if we have multi replica for one
table bucket. */
- public void waitUtilAllReplicaReady(TableBucket tableBucket) {
+ public void waitUntilAllReplicaReady(TableBucket tableBucket) {
ZooKeeperClient zkClient = getZooKeeperClient();
retry(
Duration.ofMinutes(1),
@@ -660,7 +660,7 @@ public final class FlussClusterExtension
* least one log segment has been copied to remote, but it does not ensure
that all log segments
* have been copied to remote.
*/
- public void waitUtilSomeLogSegmentsCopyToRemote(TableBucket tableBucket) {
+ public void waitUntilSomeLogSegmentsCopyToRemote(TableBucket tableBucket) {
ZooKeeperClient zkClient = getZooKeeperClient();
retry(
Duration.ofMinutes(2),
@@ -671,7 +671,7 @@ public final class FlussClusterExtension
});
}
- public CompletedSnapshot waitUtilSnapshotFinished(TableBucket tableBucket,
long snapshotId) {
+ public CompletedSnapshot waitUntilSnapshotFinished(TableBucket
tableBucket, long snapshotId) {
ZooKeeperClient zkClient = getZooKeeperClient();
return waitValue(
() -> {
@@ -763,7 +763,7 @@ public final class FlussClusterExtension
public Map<String, Long> waitUntilPartitionAllReady(TablePath tablePath) {
int preCreatePartitions =
ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE.defaultValue();
- // wait util table partition is created
+ // wait until table partition is created
return waitUntilPartitionsCreated(tablePath, preCreatePartitions);
}
@@ -787,7 +787,7 @@ public final class FlussClusterExtension
}
public void waitUntilPartitionsDropped(TablePath tablePath, List<String>
droppedPartitions) {
- waitUtil(
+ waitUntil(
() -> {
Map<String, Long> partitions =
zooKeeperClient.getPartitionNameAndIds(tablePath);
diff --git
a/fluss-test-utils/src/main/java/com/alibaba/fluss/testutils/common/CommonTestUtils.java
b/fluss-test-utils/src/main/java/com/alibaba/fluss/testutils/common/CommonTestUtils.java
index 8bc090e77..9ba6c7946 100644
---
a/fluss-test-utils/src/main/java/com/alibaba/fluss/testutils/common/CommonTestUtils.java
+++
b/fluss-test-utils/src/main/java/com/alibaba/fluss/testutils/common/CommonTestUtils.java
@@ -44,11 +44,11 @@ public class CommonTestUtils {
private static final Logger LOG =
LoggerFactory.getLogger(CommonTestUtils.class);
/**
- * Wait util the given condition is met or timeout.
+ * Wait until the given condition is met or timeout.
*
- * <p>Note: use {@code #waitUtil(ThrowingSupplier, Duration, String)} if
waiting test to reach a
- * condition and no assertion is needed. Otherwise, use {@link
#retry(Duration, Executable)} if
- * the there is assertion expected to succeed eventually.
+ * <p>Note: use {@code #waitUntil(ThrowingSupplier, Duration, String)} if
waiting test to reach
+ * a condition and no assertion is needed. Otherwise, use {@link
#retry(Duration, Executable)}
+ * if the there is assertion expected to succeed eventually.
*
* @param condition the condition to wait for.
* @param timeout the maximum time to wait for the condition to become
true.
@@ -56,7 +56,7 @@ public class CommonTestUtils {
* @param errorMsg the error message to include in the
<code>TimeoutException</code> if the
* condition was not met before timeout.
*/
- public static void waitUtil(
+ public static void waitUntil(
ThrowingSupplier<Boolean> condition,
Duration timeout,
Duration pause,
@@ -83,20 +83,20 @@ public class CommonTestUtils {
}
/**
- * Wait util the given condition is met or timeout.
+ * Wait until the given condition is met or timeout.
*
- * <p>Note: use {@code #waitUtil(ThrowingSupplier, Duration, String)} if
waiting test to reach a
- * condition and no assertion is needed. Otherwise, use {@link
#retry(Duration, Executable)} if
- * the there is assertion expected to succeed eventually.
+ * <p>Note: use {@code #waitUntil(ThrowingSupplier, Duration, String)} if
waiting test to reach
+ * a condition and no assertion is needed. Otherwise, use {@link
#retry(Duration, Executable)}
+ * if the there is assertion expected to succeed eventually.
*
* @param condition the condition to wait for.
* @param timeout the maximum time to wait for the condition to become
true.
* @param errorMsg the error message to include in the
<code>TimeoutException</code> if the
* condition was not met before timeout.
*/
- public static void waitUtil(
+ public static void waitUntil(
ThrowingSupplier<Boolean> condition, Duration timeout, String
errorMsg) {
- waitUtil(condition, timeout, Duration.ofMillis(1), errorMsg);
+ waitUntil(condition, timeout, Duration.ofMillis(1), errorMsg);
}
/**
@@ -110,7 +110,7 @@ public class CommonTestUtils {
public static <T> T waitValue(
ThrowingSupplier<Optional<T>> supplier, Duration timeout, String
errorMsg) {
AtomicReference<T> result = new AtomicReference<>();
- waitUtil(
+ waitUntil(
() -> {
Optional<T> opt = supplier.get();
if (opt.isPresent()) {
@@ -131,7 +131,7 @@ public class CommonTestUtils {
*
* <p>Note: use {@code retry(Duration, Executable)} if the assertion is
expected to succeed
* eventually. If waiting test to reach a condition and no assertion is
needed, use {@link
- * #waitUtil(ThrowingSupplier, Duration, String)} instead.
+ * #waitUntil(ThrowingSupplier, Duration, String)} instead.
*/
public static void retry(Duration timeout, Executable assertion) {
final long maxWaitMs = timeout.toMillis();