This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git
commit 6832f05fd4b73ef3eeee583e24e9968ccbd0faa4 Author: Joao Boto <b...@boto.pro> AuthorDate: Thu Mar 11 19:03:43 2021 +0100 [BAHIR-243] Change KuduTestHarness with TestContainers --- flink-connector-kudu/pom.xml | 4 + .../connectors/kudu/batch/KuduInputFormatTest.java | 4 +- .../kudu/batch/KuduOutputFormatTest.java | 8 +- .../connectors/kudu/connector/KuduTestBase.java | 85 +++++++++++++++++----- .../connectors/kudu/streaming/KuduSinkTest.java | 11 ++- .../connectors/kudu/table/KuduCatalogTest.java | 20 ++--- .../kudu/table/KuduTableFactoryTest.java | 10 +-- .../kudu/table/KuduTableSourceITCase.java | 2 +- .../connectors/kudu/table/KuduTableSourceTest.java | 2 +- 9 files changed, 99 insertions(+), 47 deletions(-) diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml index 2a2dde2..9ace382 100644 --- a/flink-connector-kudu/pom.xml +++ b/flink-connector-kudu/pom.xml @@ -157,6 +157,10 @@ <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers</artifactId> + </dependency> </dependencies> <build> diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java index fb7d8e3..126f7fd 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java @@ -39,7 +39,7 @@ class KuduInputFormatTest extends KuduTestBase { @Test void testInvalidTableInfo() { - String masterAddresses = harness.getMasterAddressesAsString(); + String masterAddresses = getMasterAddress(); KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build(); Assertions.assertThrows(NullPointerException.class, () -> new KuduRowInputFormat(readerConfig, null)); } @@ -71,7 +71,7 @@ class KuduInputFormatTest extends KuduTestBase { } private List<Row> readRows(KuduTableInfo tableInfo, String... fieldProjection) throws Exception { - String masterAddresses = harness.getMasterAddressesAsString(); + String masterAddresses = getMasterAddress(); KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build(); KuduRowInputFormat inputFormat = new KuduRowInputFormat(readerConfig, tableInfo, new ArrayList<>(), fieldProjection == null ? null : Arrays.asList(fieldProjection)); diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormatTest.java index 22fa0a4..693e113 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormatTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormatTest.java @@ -39,14 +39,14 @@ class KuduOutputFormatTest extends KuduTestBase { @Test void testInvalidTableInfo() { - String masterAddresses = harness.getMasterAddressesAsString(); + String masterAddresses = getMasterAddress(); KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build(); Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(writerConfig, null, null)); } @Test void testNotTableExist() { - String masterAddresses = harness.getMasterAddressesAsString(); + String masterAddresses = getMasterAddress(); KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), false); KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build(); KuduOutputFormat<Row> outputFormat = new KuduOutputFormat<>(writerConfig, tableInfo, new RowOperationMapper(KuduTestBase.columns, AbstractSingleOperationMapper.KuduOperation.INSERT)); @@ -55,7 +55,7 @@ class KuduOutputFormatTest extends KuduTestBase { @Test void testOutputWithStrongConsistency() throws Exception { - String masterAddresses = harness.getMasterAddressesAsString(); + String masterAddresses = getMasterAddress(); KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), true); KuduWriterConfig writerConfig = KuduWriterConfig.Builder @@ -80,7 +80,7 @@ class KuduOutputFormatTest extends KuduTestBase { @Test void testOutputWithEventualConsistency() throws Exception { - String masterAddresses = harness.getMasterAddressesAsString(); + String masterAddresses = getMasterAddress(); KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), true); KuduWriterConfig writerConfig = KuduWriterConfig.Builder diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java index 36572c6..3a872c5 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java @@ -26,22 +26,21 @@ import org.apache.flink.connectors.kudu.connector.writer.KuduWriter; import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig; import org.apache.flink.connectors.kudu.connector.writer.RowOperationMapper; import org.apache.flink.types.Row; - import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.Type; -import org.apache.kudu.client.CreateTableOptions; -import org.apache.kudu.client.KuduScanner; -import org.apache.kudu.client.KuduTable; -import org.apache.kudu.client.RowResult; +import org.apache.kudu.client.*; import org.apache.kudu.shaded.com.google.common.collect.Lists; -import org.apache.kudu.test.KuduTestHarness; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.migrationsupport.rules.ExternalResourceSupport; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.shaded.com.google.common.collect.ImmutableList; +import org.testcontainers.shaded.com.google.common.io.Closer; +import org.testcontainers.shaded.com.google.common.net.HostAndPort; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -51,9 +50,19 @@ import static org.junit.Assert.assertFalse; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; -@ExtendWith(ExternalResourceSupport.class) public class KuduTestBase { + private static final String DOCKER_IMAGE = "apache/kudu:1.11.1"; + private static final Integer KUDU_MASTER_PORT = 7051; + private static final Integer KUDU_TSERVER_PORT = 7050; + private static final Integer NUMBER_OF_REPLICA = 3; + + private static GenericContainer<?> master; + private static List<GenericContainer<?>> tServers; + + private static String masterAddress; + private static KuduClient kuduClient; + private static final Object[][] booksTableData = { {1001, "Java for dummies", "Tan Ah Teck", 11.11, 11}, {1002, "More Java for dummies", "Tan Ah Teck", 22.22, 22}, @@ -61,20 +70,60 @@ public class KuduTestBase { {1004, "A Cup of Java", "Kumar", 44.44, 44}, {1005, "A Teaspoon of Java", "Kevin Jones", 55.55, 55}}; - public static KuduTestHarness harness; public static String[] columns = new String[]{"id", "title", "author", "price", "quantity"}; @BeforeAll public static void beforeClass() throws Exception { - harness = new KuduTestHarness(); - harness.before(); + Network network = Network.newNetwork(); + + ImmutableList.Builder<GenericContainer<?>> tServersBuilder = ImmutableList.builder(); + master = new GenericContainer<>(DOCKER_IMAGE) + .withExposedPorts(KUDU_MASTER_PORT, 8051) + .withCommand("master") + .withNetwork(network) + .withNetworkAliases("kudu-master"); + master.start(); + masterAddress = HostAndPort.fromParts(master.getHost(), master.getMappedPort(KUDU_MASTER_PORT)).toString(); + + for (int instance = 1; instance <= NUMBER_OF_REPLICA; instance++) { + String instanceName = "kudu-tserver-" + instance; + GenericContainer<?> tableServer = new GenericContainer<>(DOCKER_IMAGE) + .withExposedPorts(KUDU_TSERVER_PORT) + .withCommand("tserver") + .withEnv("KUDU_MASTERS", "kudu-master:" + KUDU_MASTER_PORT) + .withEnv("TSERVER_ARGS", "--fs_wal_dir=/var/lib/kudu/tserver --use_hybrid_clock=false --rpc_advertised_addresses=" + instanceName) + .withNetwork(network) + .withNetworkAliases(instanceName) + .dependsOn(master); + tableServer.start(); + tServersBuilder.add(tableServer); + } + tServers = tServersBuilder.build(); + + System.out.println(HostAndPort.fromParts(master.getHost(), master.getMappedPort(8051)).toString()); + kuduClient = new KuduClient.KuduClientBuilder(masterAddress).build(); } @AfterAll public static void afterClass() throws Exception { - harness.after(); + kuduClient.close(); + try (Closer closer = Closer.create()) { + closer.register(master::stop); + tServers.forEach(tabletServer -> closer.register(tabletServer::stop)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public String getMasterAddress() { + return masterAddress; } + public KuduClient getClient() { + return kuduClient; + } + + public static KuduTableInfo booksTableInfo(String tableName, boolean createIfNotExist) { KuduTableInfo tableInfo = KuduTableInfo.forTable(tableName); @@ -159,7 +208,7 @@ public class KuduTestBase { protected void setUpDatabase(KuduTableInfo tableInfo) { try { - String masterAddresses = harness.getMasterAddressesAsString(); + String masterAddresses = getMasterAddress(); KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build(); KuduWriter kuduWriter = new KuduWriter(tableInfo, writerConfig, new RowOperationMapper(columns, AbstractSingleOperationMapper.KuduOperation.INSERT)); booksDataRow().forEach(row -> { @@ -177,7 +226,7 @@ public class KuduTestBase { protected void cleanDatabase(KuduTableInfo tableInfo) { try { - String masterAddresses = harness.getMasterAddressesAsString(); + String masterAddresses = getMasterAddress(); KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build(); KuduWriter kuduWriter = new KuduWriter(tableInfo, writerConfig, new RowOperationMapper(columns, AbstractSingleOperationMapper.KuduOperation.INSERT)); kuduWriter.deleteTable(); @@ -188,7 +237,7 @@ public class KuduTestBase { } protected List<Row> readRows(KuduTableInfo tableInfo) throws Exception { - String masterAddresses = harness.getMasterAddressesAsString(); + String masterAddresses = getMasterAddress(); KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build(); KuduReader reader = new KuduReader(tableInfo, readerConfig); @@ -222,7 +271,7 @@ public class KuduTestBase { } protected void validateSingleKey(String tableName) throws Exception { - KuduTable kuduTable = harness.getClient().openTable(tableName); + KuduTable kuduTable = getClient().openTable(tableName); Schema schema = kuduTable.getSchema(); assertEquals(1, schema.getPrimaryKeyColumnCount()); @@ -231,7 +280,7 @@ public class KuduTestBase { assertTrue(schema.getColumn("first").isKey()); assertFalse(schema.getColumn("second").isKey()); - KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build(); + KuduScanner scanner = getClient().newScannerBuilder(kuduTable).build(); List<RowResult> rows = new ArrayList<>(); scanner.forEach(rows::add); diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java index 4d74fda..6791765 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java @@ -59,15 +59,14 @@ public class KuduSinkTest extends KuduTestBase { @Test void testInvalidTableInfo() { - harness.getClient(); - String masterAddresses = harness.getMasterAddressesAsString(); + String masterAddresses = getMasterAddress(); KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build(); Assertions.assertThrows(NullPointerException.class, () -> new KuduSink<>(writerConfig, null, new RowOperationMapper(columns, AbstractSingleOperationMapper.KuduOperation.INSERT))); } @Test void testNotTableExist() { - String masterAddresses = harness.getMasterAddressesAsString(); + String masterAddresses = getMasterAddress(); KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), false); KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build(); KuduSink<Row> sink = new KuduSink<>(writerConfig, tableInfo, new RowOperationMapper(columns, AbstractSingleOperationMapper.KuduOperation.INSERT)); @@ -78,7 +77,7 @@ public class KuduSinkTest extends KuduTestBase { @Test void testOutputWithStrongConsistency() throws Exception { - String masterAddresses = harness.getMasterAddressesAsString(); + String masterAddresses = getMasterAddress(); KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), true); KuduWriterConfig writerConfig = KuduWriterConfig.Builder @@ -102,7 +101,7 @@ public class KuduSinkTest extends KuduTestBase { @Test void testOutputWithEventualConsistency() throws Exception { - String masterAddresses = harness.getMasterAddressesAsString(); + String masterAddresses = getMasterAddress(); KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), true); KuduWriterConfig writerConfig = KuduWriterConfig.Builder @@ -130,7 +129,7 @@ public class KuduSinkTest extends KuduTestBase { @Test void testSpeed() throws Exception { - String masterAddresses = harness.getMasterAddressesAsString(); + String masterAddresses = getMasterAddress(); KuduTableInfo tableInfo = KuduTableInfo .forTable("test_speed") diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java index 2bc8b12..1927631 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java @@ -65,7 +65,7 @@ public class KuduCatalogTest extends KuduTestBase { @BeforeEach public void init() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - catalog = new KuduCatalog(harness.getMasterAddressesAsString()); + catalog = new KuduCatalog(getMasterAddress()); tableEnv = KuduTableTestUtils.createTableEnvWithBlinkPlannerStreamingMode(env); tableEnv.registerCatalog("kudu", catalog); tableEnv.useCatalog("kudu"); @@ -92,7 +92,7 @@ public class KuduCatalogTest extends KuduTestBase { validateSingleKey("TestTable1R"); tableEnv.executeSql("DROP TABLE TestTable1R"); - assertFalse(harness.getClient().tableExists("TestTable1R")); + assertFalse(getClient().tableExists("TestTable1R")); } @Test @@ -160,7 +160,7 @@ public class KuduCatalogTest extends KuduTestBase { @Test public void dataStreamEndToEstTest() throws Exception { - KuduCatalog catalog = new KuduCatalog(harness.getMasterAddressesAsString()); + KuduCatalog catalog = new KuduCatalog(getMasterAddress()); // Creating table through catalog KuduTableFactory tableFactory = catalog.getKuduTableFactory(); @@ -189,7 +189,7 @@ public class KuduCatalogTest extends KuduTestBase { // Writing with simple sink StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); - KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(harness.getMasterAddressesAsString()).build(); + KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(getMasterAddress()).build(); env.fromCollection(input).addSink( new KuduSink<>( writerConfig, @@ -228,10 +228,10 @@ public class KuduCatalogTest extends KuduTestBase { .getJobExecutionResult() .get(1, TimeUnit.MINUTES); - KuduTable kuduTable = harness.getClient().openTable("TestTableTsC"); + KuduTable kuduTable = getClient().openTable("TestTableTsC"); assertEquals(Type.UNIXTIME_MICROS, kuduTable.getSchema().getColumn("second").getType()); - KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build(); + KuduScanner scanner = getClient().newScannerBuilder(kuduTable).build(); List<RowResult> rows = new ArrayList<>(); scanner.forEach(rows::add); @@ -272,7 +272,7 @@ public class KuduCatalogTest extends KuduTestBase { } private void validateManyTypes(String tableName) throws Exception { - KuduTable kuduTable = harness.getClient().openTable(tableName); + KuduTable kuduTable = getClient().openTable(tableName); Schema schema = kuduTable.getSchema(); assertEquals(Type.STRING, schema.getColumn("first").getType()); @@ -286,7 +286,7 @@ public class KuduCatalogTest extends KuduTestBase { assertEquals(Type.DOUBLE, schema.getColumn("ninth").getType()); assertEquals(Type.UNIXTIME_MICROS, schema.getColumn("tenth").getType()); - KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build(); + KuduScanner scanner = getClient().newScannerBuilder(kuduTable).build(); List<RowResult> rows = new ArrayList<>(); scanner.forEach(rows::add); @@ -304,7 +304,7 @@ public class KuduCatalogTest extends KuduTestBase { } private void validateMultiKey(String tableName) throws Exception { - KuduTable kuduTable = harness.getClient().openTable(tableName); + KuduTable kuduTable = getClient().openTable(tableName); Schema schema = kuduTable.getSchema(); assertEquals(2, schema.getPrimaryKeyColumnCount()); @@ -315,7 +315,7 @@ public class KuduCatalogTest extends KuduTestBase { assertFalse(schema.getColumn("third").isKey()); - KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build(); + KuduScanner scanner = getClient().newScannerBuilder(kuduTable).build(); List<RowResult> rows = new ArrayList<>(); scanner.forEach(rows::add); diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java index a8ec686..f6482da 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java @@ -61,7 +61,7 @@ public class KuduTableFactoryTest extends KuduTestBase { public void init() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); tableEnv = KuduTableTestUtils.createTableEnvWithBlinkPlannerStreamingMode(env); - kuduMasters = harness.getMasterAddressesAsString(); + kuduMasters = getMasterAddress(); } @Test @@ -119,10 +119,10 @@ public class KuduTableFactoryTest extends KuduTestBase { .getJobExecutionResult() .get(1, TimeUnit.MINUTES); - KuduTable kuduTable = harness.getClient().openTable("TestTableTs"); + KuduTable kuduTable = getClient().openTable("TestTableTs"); assertEquals(Type.UNIXTIME_MICROS, kuduTable.getSchema().getColumn("second").getType()); - KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build(); + KuduScanner scanner = getClient().newScannerBuilder(kuduTable).build(); HashSet<Timestamp> results = new HashSet<>(); scanner.forEach(sc -> results.add(sc.getTimestamp("second"))); @@ -156,8 +156,8 @@ public class KuduTableFactoryTest extends KuduTestBase { .get(1, TimeUnit.MINUTES); // Validate that both insertions were into the same table - KuduTable kuduTable = harness.getClient().openTable("TestTable12"); - KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build(); + KuduTable kuduTable = getClient().openTable("TestTable12"); + KuduScanner scanner = getClient().newScannerBuilder(kuduTable).build(); List<RowResult> rows = new ArrayList<>(); scanner.forEach(rows::add); diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java index b83497c..2a043f9 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java @@ -41,7 +41,7 @@ public class KuduTableSourceITCase extends KuduTestBase { KuduTableInfo tableInfo = booksTableInfo("books", true); setUpDatabase(tableInfo); tableEnv = KuduTableTestUtils.createTableEnvWithBlinkPlannerBatchMode(); - catalog = new KuduCatalog(harness.getMasterAddressesAsString()); + catalog = new KuduCatalog(getMasterAddress()); tableEnv.registerCatalog("kudu", catalog); tableEnv.useCatalog("kudu"); } diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java index e02a297..43734e4 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java @@ -67,7 +67,7 @@ public class KuduTableSourceTest extends KuduTestBase { public void init() { KuduTableInfo tableInfo = booksTableInfo("books", true); setUpDatabase(tableInfo); - catalog = new KuduCatalog(harness.getMasterAddressesAsString()); + catalog = new KuduCatalog(getMasterAddress()); ObjectPath op = new ObjectPath(EnvironmentSettings.DEFAULT_BUILTIN_DATABASE, "books"); try { kuduTableSource = catalog.getKuduTableFactory().createTableSource(op, catalog.getTable(op));