wuchong commented on code in PR #1666:
URL: https://github.com/apache/fluss/pull/1666#discussion_r2348627118
##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java:
##########
@@ -243,18 +273,21 @@ private TableOrPartitions
getTableOrPartitionsInFetchRequest(FetchLogRequest fet
return new TableOrPartitions(tableIdsInFetchRequest,
tablePartitionsInFetchRequest);
}
- private static class TableOrPartitions {
+ /** A helper class to hold table ids or table partitions. */
+ @VisibleForTesting
+ public static class TableOrPartitions {
private final @Nullable Set<Long> tableIds;
private final @Nullable Set<TablePartition> tablePartitions;
- private TableOrPartitions(
+ TableOrPartitions(
@Nullable Set<Long> tableIds, @Nullable Set<TablePartition>
tablePartitions) {
this.tableIds = tableIds;
this.tablePartitions = tablePartitions;
}
}
- private void invalidTableOrPartitions(TableOrPartitions tableOrPartitions)
{
+ @VisibleForTesting
+ public void invalidTableOrPartitions(TableOrPartitions tableOrPartitions) {
Review Comment:
Change them to package visible, it is only accessed in the same test class.
##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java:
##########
@@ -243,18 +273,21 @@ private TableOrPartitions
getTableOrPartitionsInFetchRequest(FetchLogRequest fet
return new TableOrPartitions(tableIdsInFetchRequest,
tablePartitionsInFetchRequest);
}
- private static class TableOrPartitions {
+ /** A helper class to hold table ids or table partitions. */
+ @VisibleForTesting
+ public static class TableOrPartitions {
Review Comment:
Change them to package visible, it is only accessed in the same test class.
##########
fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java:
##########
@@ -183,6 +188,37 @@ void testFetchWhenDestinationIsNullInMetadata() throws
Exception {
assertThat(records.get(tb0).size()).isEqualTo(10);
}
+ @Test
+ void testFetchWithInvalidTableOrPartitions() throws Exception {
+ MetadataUpdater metadataUpdater1 =
+ new MetadataUpdater(clientConf,
FLUSS_CLUSTER_EXTENSION.getRpcClient());
+ logFetcher =
+ new LogFetcher(
+ DATA1_TABLE_INFO,
+ null,
+ logScannerStatus,
+ clientConf,
+ metadataUpdater1,
+ TestingScannerMetricGroup.newInstance(),
+ new RemoteFileDownloader(1));
+
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ Future<?> future =
+ executor.submit(
+ () -> {
+ // If this test blocked, please checking whether
it was blocked with
+ // the same reason as
https://github.com/apache/fluss/pull/1666
+ for (int i = 0; i < 1000; i++) {
+ logFetcher.sendFetches();
+ logFetcher.invalidTableOrPartitions(
+ new LogFetcher.TableOrPartitions(
+
Collections.singleton(tableId), null));
+ }
+ });
+
+ future.get(2, TimeUnit.MINUTES);
Review Comment:
assert future is done in the end, and close the executor
##########
fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java:
##########
@@ -63,9 +63,10 @@ public class MetadataUtils {
* add those table into cluster.
*/
public static Cluster sendMetadataRequestAndRebuildCluster(
- AdminReadOnlyGateway gateway, Set<TablePath> tablePaths)
+ AdminReadOnlyGateway gateway, Set<TablePath> tablePaths, long
metadataRequestTimeoutMs)
Review Comment:
Please remove the timeout parameter, and use the default 30s in the test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]