Repository: samza Updated Branches: refs/heads/master 7055ce670 -> 525e8e2d8
SAMZA-1774: Support table API in low level Code changes to support table in low level API. Author: Aditya Toomula <atoom...@linkedin.com> Reviewers: Srini P<spun...@linkedin.com> Closes #556 from atoomula/table1 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/525e8e2d Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/525e8e2d Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/525e8e2d Branch: refs/heads/master Commit: 525e8e2d8205c7e25d6ba3a13ed22521490496e3 Parents: 7055ce6 Author: Aditya Toomula <atoom...@linkedin.com> Authored: Thu Jul 19 15:06:27 2018 -0700 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Thu Jul 19 15:06:27 2018 -0700 ---------------------------------------------------------------------- .../samza/table/TableDescriptorsProvider.java | 100 +++++++++++ .../org/apache/samza/table/TableProvider.java | 3 +- .../org/apache/samza/execution/JobNode.java | 41 +---- .../samza/table/TableConfigGenerator.java | 138 ++++++++++++++++ .../table/TestTableDescriptorsProvider.java | 164 +++++++++++++++++++ 5 files changed, 405 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/525e8e2d/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java b/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java new file mode 100644 index 0000000..766a4b4 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.table; + +import java.util.List; + +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.config.Config; +import org.apache.samza.operators.TableDescriptor; + + +/** + * Provider to create a list of {@link TableDescriptor} objects to describe one or more Samza tables. This is the + * mechanism for providing table support for Samza low level API. + * + * Developers writing Samza jobs using Samza table(s) should describe the table(s) by implementing + * TableDescriptorsProvider. + * + * Typical user code using Samza tables should look like the following: + * + * <pre> + * {@code + * public class SampleTableDescriptorsProvider implements TableDescriptorsProvider { + * private ReadableTable<String, Long> remoteTable; + * private ReadWriteTable<String, String> localTable; + * + * {@code @Override} + * public List<TableDescriptor> getTableDescriptors() { + * List<TableDescriptor> tableDescriptors = new ArrayList<>(); + * final TableReadFunction readRemoteTableFn = new MyStoreReadFunction(); + * tableDescriptors.add(new RemoteTableDescriptor<>("remote-table-1") + * .withReadFunction(readRemoteTableFn) + * .withSerde(KVSerde.of(new StringSerde(), new StringSerde()))); + * + * tableDescriptors.add(new RocksDbTableDescriptor("local-table-1") + * .withBlockSize(4096) + * .withSerde(KVSerde.of(new LongSerde(), new StringSerde<>()))); + * .withConfig("some-key", "some-value"); + * return tableDescriptors; + * } + * } + * } + * </pre> + * + * [TODO:SAMZA-1772] will complete the work of introducing low-level Table API. Until then, Table API in low-level + * could be used by generating configs from TableDescriptorsProvider (sample code below) through config rewriter. + * + * <pre> + * {@code + * private Map<String, String> generateTableConfigs(Config config) { + * String tableDescriptorsProviderClassName = config.get("tables.descriptors.provider.class"); + * if (tableDescriptorsProviderClassName == null || tableDescriptorsProviderClassName.isEmpty()) { + * // tableDescriptorsProviderClass is not configured + * return config; + * } + * + * try { + * if (!TableDescriptorsProvider.class.isAssignableFrom(Class.forName(tableDescriptorsProviderClassName))) { + * LOG.warn("TableDescriptorsProvider class {} does not implement TableDescriptosProvider.", + * tableDescriptorsProviderClassName); + * return config; + * } + * + * TableDescriptorsProvider tableDescriptorsProvider = + * Util.getObj(tableDescriptorsProviderClassName, TableDescriptorsProvider.class); + * List<TableDescriptor> tableDescs = tableDescriptorsProvider.getTableDescriptors(config); + * return TableConfigGenerator.generateConfigsForTableDescs(tableDescs); + * } catch (Exception e) { + * throw new ConfigException(String.format("Invalid configuration for TableDescriptorsProvider class: %s", + * tableDescriptorsProviderClassName), e); + * } + * } + * } + * </pre> + */ +@InterfaceStability.Unstable +public interface TableDescriptorsProvider { + /** + * Constructs instances of the table descriptors + * @param config + * @return list of table descriptors + */ + List<TableDescriptor> getTableDescriptors(Config config); +} http://git-wip-us.apache.org/repos/asf/samza/blob/525e8e2d/samza-api/src/main/java/org/apache/samza/table/TableProvider.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java index bbbe38a..8e60dad 100644 --- a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java +++ b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java @@ -47,8 +47,7 @@ public interface TableProvider { /** * Generate any configuration for this table, the generated configuration * is used by Samza container to construct this table and any components - * necessary - * . + * necessary. * @param config the current configuration * @return configuration for this table */ http://git-wip-us.apache.org/repos/asf/samza/blob/525e8e2d/samza-core/src/main/java/org/apache/samza/execution/JobNode.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java index db44d9f..6507996 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java @@ -31,7 +31,6 @@ import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.samza.config.Config; -import org.apache.samza.config.JavaTableConfig; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.SerializerConfig; @@ -46,12 +45,11 @@ import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.OutputStreamImpl; import org.apache.samza.operators.spec.StatefulOperatorSpec; import org.apache.samza.operators.spec.WindowOperatorSpec; +import org.apache.samza.table.TableConfigGenerator; import org.apache.samza.util.MathUtil; import org.apache.samza.serializers.Serde; import org.apache.samza.serializers.SerializableSerde; import org.apache.samza.system.StreamSpec; -import org.apache.samza.table.TableProvider; -import org.apache.samza.table.TableProviderFactory; import org.apache.samza.table.TableSpec; import org.apache.samza.util.Util; import org.slf4j.Logger; @@ -179,21 +177,7 @@ public class JobNode { // write serialized serde instances and stream serde configs to configs addSerdeConfigs(configs); - tables.forEach(tableSpec -> { - // Table provider factory - configs.put(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, tableSpec.getId()), - tableSpec.getTableProviderFactoryClassName()); - - // Note: no need to generate config for Serde's, as they are already produced by addSerdeConfigs() - - // Generate additional configuration - TableProviderFactory tableProviderFactory = - Util.getObj(tableSpec.getTableProviderFactoryClassName(), TableProviderFactory.class); - TableProvider tableProvider = tableProviderFactory.getTableProvider(tableSpec); - configs.putAll(tableProvider.generateConfig(configs)); - }); - - log.info("Job {} has generated configs {}", jobName, configs); + configs.putAll(TableConfigGenerator.generateConfigsForTableSpecs(tables)); String configPrefix = String.format(CONFIG_JOB_PREFIX, jobName); @@ -255,21 +239,11 @@ public class JobNode { } }); - // collect all key and msg serde instances for tables - Map<String, Serde> tableKeySerdes = new HashMap<>(); - Map<String, Serde> tableValueSerdes = new HashMap<>(); - tables.forEach(tableSpec -> { - tableKeySerdes.put(tableSpec.getId(), tableSpec.getSerde().getKeySerde()); - tableValueSerdes.put(tableSpec.getId(), tableSpec.getSerde().getValueSerde()); - }); - // for each unique stream or store serde instance, generate a unique name and serialize to config HashSet<Serde> serdes = new HashSet<>(streamKeySerdes.values()); serdes.addAll(streamMsgSerdes.values()); serdes.addAll(storeKeySerdes.values()); serdes.addAll(storeMsgSerdes.values()); - serdes.addAll(tableKeySerdes.values()); - serdes.addAll(tableValueSerdes.values()); SerializableSerde<Serde> serializableSerde = new SerializableSerde<>(); Base64.Encoder base64Encoder = Base64.getEncoder(); Map<Serde, String> serdeUUIDs = new HashMap<>(); @@ -303,17 +277,6 @@ public class JobNode { String msgSerdeConfigKey = String.format(StorageConfig.MSG_SERDE(), storeName); configs.put(msgSerdeConfigKey, serdeUUIDs.get(serde)); }); - - // set key and msg serdes for tables to the serde names generated above - tableKeySerdes.forEach((tableId, serde) -> { - String keySerdeConfigKey = String.format(JavaTableConfig.TABLE_KEY_SERDE, tableId); - configs.put(keySerdeConfigKey, serdeUUIDs.get(serde)); - }); - - tableValueSerdes.forEach((tableId, serde) -> { - String valueSerdeConfigKey = String.format(JavaTableConfig.TABLE_VALUE_SERDE, tableId); - configs.put(valueSerdeConfigKey, serdeUUIDs.get(serde)); - }); } /** http://git-wip-us.apache.org/repos/asf/samza/blob/525e8e2d/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java b/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java new file mode 100644 index 0000000..ac17c68 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.apache.samza.config.JavaTableConfig; +import org.apache.samza.config.SerializerConfig; +import org.apache.samza.operators.BaseTableDescriptor; +import org.apache.samza.operators.TableDescriptor; +import org.apache.samza.operators.TableImpl; +import org.apache.samza.serializers.Serde; +import org.apache.samza.serializers.SerializableSerde; +import org.apache.samza.util.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Helper class to generate table configs. + */ +public class TableConfigGenerator { + private static final Logger LOG = LoggerFactory.getLogger(TableConfigGenerator.class); + + /** + * Generate table configurations given a list of table descriptors + * @param tableDescriptors the list of tableDescriptors + * @return configuration for the tables + */ + static public Map<String, String> generateConfigsForTableDescs(List<TableDescriptor> tableDescriptors) { + return generateConfigsForTableSpecs(getTableSpecs(tableDescriptors)); + } + + /** + * Generate table configurations given a list of table specs + * @param tableSpecs the list of tableSpecs + * @return configuration for the tables + */ + static public Map<String, String> generateConfigsForTableSpecs(List<TableSpec> tableSpecs) { + Map<String, String> tableConfigs = new HashMap<>(); + + tableConfigs.putAll(generateTableKVSerdeConfigs(tableSpecs)); + + tableSpecs.forEach(tableSpec -> { + // Add table provider factory config + tableConfigs.put(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, tableSpec.getId()), + tableSpec.getTableProviderFactoryClassName()); + + // Generate additional configuration + TableProviderFactory tableProviderFactory = + Util.getObj(tableSpec.getTableProviderFactoryClassName(), TableProviderFactory.class); + TableProvider tableProvider = tableProviderFactory.getTableProvider(tableSpec); + tableConfigs.putAll(tableProvider.generateConfig(tableConfigs)); + }); + + LOG.info("TableConfigGenerator has generated configs {}", tableConfigs); + return tableConfigs; + } + + static private Map<String, String> generateTableKVSerdeConfigs(List<TableSpec> tableSpecs) { + Map<String, String> serdeConfigs = new HashMap<>(); + + // Collect key and msg serde instances for all the tables + Map<String, Serde> tableKeySerdes = new HashMap<>(); + Map<String, Serde> tableValueSerdes = new HashMap<>(); + HashSet<Serde> serdes = new HashSet<>(); + + tableSpecs.forEach(tableSpec -> { + tableKeySerdes.put(tableSpec.getId(), tableSpec.getSerde().getKeySerde()); + tableValueSerdes.put(tableSpec.getId(), tableSpec.getSerde().getValueSerde()); + }); + serdes.addAll(tableKeySerdes.values()); + serdes.addAll(tableValueSerdes.values()); + + // Generate serde names + SerializableSerde<Serde> serializableSerde = new SerializableSerde<>(); + Base64.Encoder base64Encoder = Base64.getEncoder(); + Map<Serde, String> serdeUUIDs = new HashMap<>(); + serdes.forEach(serde -> { + String serdeName = serdeUUIDs.computeIfAbsent(serde, + s -> serde.getClass().getSimpleName() + "-" + UUID.randomUUID().toString()); + serdeConfigs.putIfAbsent(String.format(SerializerConfig.SERDE_SERIALIZED_INSTANCE(), serdeName), + base64Encoder.encodeToString(serializableSerde.toBytes(serde))); + }); + + // Set key and msg serdes for tables to the serde names generated above + tableKeySerdes.forEach((tableId, serde) -> { + String keySerdeConfigKey = String.format(JavaTableConfig.TABLE_KEY_SERDE, tableId); + serdeConfigs.put(keySerdeConfigKey, serdeUUIDs.get(serde)); + }); + + tableValueSerdes.forEach((tableId, serde) -> { + String valueSerdeConfigKey = String.format(JavaTableConfig.TABLE_VALUE_SERDE, tableId); + serdeConfigs.put(valueSerdeConfigKey, serdeUUIDs.get(serde)); + }); + + return serdeConfigs; + } + + static private List<TableSpec> getTableSpecs(List<TableDescriptor> tableDescs) { + Map<TableSpec, TableImpl> tableSpecs = new LinkedHashMap<>(); + + tableDescs.forEach(tableDesc -> { + TableSpec tableSpec = ((BaseTableDescriptor) tableDesc).getTableSpec(); + + if (tableSpecs.containsKey(tableSpec)) { + throw new IllegalStateException( + String.format("getTable() invoked multiple times with the same tableId: %s", tableDesc.getTableId())); + } + tableSpecs.put(tableSpec, new TableImpl(tableSpec)); + }); + return new ArrayList<>(tableSpecs.keySet()); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/525e8e2d/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java new file mode 100644 index 0000000..3ed29ca --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.table; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.samza.config.Config; +import org.apache.samza.config.ConfigException; +import org.apache.samza.config.ConfigRewriter; +import org.apache.samza.config.JavaStorageConfig; +import org.apache.samza.config.JavaTableConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.operators.TableDescriptor; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.LongSerde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory; +import org.apache.samza.storage.kv.RocksDbTableDescriptor; +import org.apache.samza.storage.kv.RocksDbTableProviderFactory; +import org.apache.samza.table.TableConfigGenerator; +import org.apache.samza.table.TableDescriptorsProvider; +import org.apache.samza.table.remote.RemoteTableDescriptor; +import org.apache.samza.table.remote.RemoteTableProviderFactory; +import org.apache.samza.table.remote.TableReadFunction; + +import org.apache.samza.util.RateLimiter; +import org.apache.samza.util.Util; +import org.junit.Assert; +import org.junit.Test; + +import static org.mockito.Mockito.*; + + +/** + * Table descriptors provider tests for both remote and local tables + */ +public class TestTableDescriptorsProvider { + + @Test + public void testWithNoConfiguredTableDescriptorProviderClass() throws Exception { + Map<String, String> configs = new HashMap<>(); + String tableRewriterName = "tableRewriter"; + Config resultConfig = new MySampleTableConfigRewriter().rewrite(tableRewriterName, new MapConfig(configs)); + Assert.assertTrue(resultConfig.size() == 0); + } + + @Test + public void testWithNonTableDescriptorsProviderClass() throws Exception { + Map<String, String> configs = new HashMap<>(); + String tableRewriterName = "tableRewriter"; + configs.put("tables.descriptors.provider.class", MySampleNonTableDescriptorsProvider.class.getName()); + Config resultConfig = new MySampleTableConfigRewriter().rewrite(tableRewriterName, new MapConfig(configs)); + Assert.assertTrue(resultConfig.size() == 1); + JavaTableConfig tableConfig = new JavaTableConfig(resultConfig); + Assert.assertTrue(tableConfig.getTableIds().size() == 0); + } + + @Test + public void testWithTableDescriptorsProviderClass() throws Exception { + Map<String, String> configs = new HashMap<>(); + String tableRewriterName = "tableRewriter"; + configs.put("tables.descriptors.provider.class", MySampleTableDescriptorsProvider.class.getName()); + Config resultConfig = new MySampleTableConfigRewriter().rewrite(tableRewriterName, new MapConfig(configs)); + Assert.assertTrue(resultConfig.size() == 17); + + String localTableId = "local-table-1"; + String remoteTableId = "remote-table-1"; + + JavaStorageConfig storageConfig = new JavaStorageConfig(resultConfig); + Assert.assertTrue(storageConfig.getStoreNames().size() == 1); + Assert.assertEquals(storageConfig.getStoreNames().get(0), localTableId); + Assert.assertEquals(storageConfig.getStorageFactoryClassName(localTableId), + RocksDbKeyValueStorageEngineFactory.class.getName()); + Assert.assertTrue(storageConfig.getStorageKeySerde(localTableId).startsWith("StringSerde")); + Assert.assertTrue(storageConfig.getStorageMsgSerde(localTableId).startsWith("StringSerde")); + Config storeConfig = resultConfig.subset("stores." + localTableId + ".", true); + Assert.assertTrue(storeConfig.size() == 4); + Assert.assertEquals(storeConfig.getInt("rocksdb.block.size.bytes"), 4096); + + JavaTableConfig tableConfig = new JavaTableConfig(resultConfig); + Assert.assertEquals(tableConfig.getTableProviderFactory(localTableId), + RocksDbTableProviderFactory.class.getName()); + Assert.assertEquals(tableConfig.getTableProviderFactory(remoteTableId), + RemoteTableProviderFactory.class.getName()); + Assert.assertTrue(tableConfig.getKeySerde(localTableId).startsWith("StringSerde")); + Assert.assertTrue(tableConfig.getValueSerde(localTableId).startsWith("StringSerde")); + Assert.assertTrue(tableConfig.getKeySerde(remoteTableId).startsWith("StringSerde")); + Assert.assertTrue(tableConfig.getValueSerde(remoteTableId).startsWith("LongSerde")); + Assert.assertEquals(tableConfig.getTableProviderFactory(localTableId), RocksDbTableProviderFactory.class.getName()); + Assert.assertEquals(tableConfig.getTableProviderFactory(remoteTableId), RemoteTableProviderFactory.class.getName()); + } + + public static class MySampleNonTableDescriptorsProvider { + } + + public static class MySampleTableDescriptorsProvider implements TableDescriptorsProvider { + @Override + public List<TableDescriptor> getTableDescriptors(Config config) { + List<TableDescriptor> tableDescriptors = new ArrayList<>(); + final RateLimiter readRateLimiter = mock(RateLimiter.class); + final TableReadFunction readRemoteTable = (TableReadFunction) key -> null; + + tableDescriptors.add(new RemoteTableDescriptor<>("remote-table-1") + .withReadFunction(readRemoteTable) + .withRateLimiter(readRateLimiter, null, null) + .withSerde(KVSerde.of(new StringSerde(), new LongSerde()))); + tableDescriptors.add(new RocksDbTableDescriptor("local-table-1") + .withBlockSize(4096) + .withSerde(KVSerde.of(new StringSerde(), new StringSerde()))); + return tableDescriptors; + } + } + + /** + * A sample config rewriter to generate table configs. It instantiates the configured tableDescriptorsProvider class + * which implements {@link TableDescriptorsProvider} and generates the table configs. + */ + public static class MySampleTableConfigRewriter implements ConfigRewriter { + + @Override + public Config rewrite(String name, Config config) { + String tableDescriptorsProviderClassName = config.get("tables.descriptors.provider.class"); + if (tableDescriptorsProviderClassName == null || tableDescriptorsProviderClassName.isEmpty()) { + // tableDescriptorsProviderClass is not configured + return config; + } + + try { + if (!TableDescriptorsProvider.class.isAssignableFrom(Class.forName(tableDescriptorsProviderClassName))) { + // The configured class does not implement TableDescriptorsProvider. + return config; + } + + TableDescriptorsProvider tableDescriptorsProvider = + Util.getObj(tableDescriptorsProviderClassName, TableDescriptorsProvider.class); + List<TableDescriptor> tableDescs = tableDescriptorsProvider.getTableDescriptors(config); + return new MapConfig(Arrays.asList(config, TableConfigGenerator.generateConfigsForTableDescs(tableDescs))); + } catch (Exception e) { + throw new ConfigException(String.format("Invalid configuration for TableDescriptorsProvider class: %s", + tableDescriptorsProviderClassName), e); + } + } + } +}