Repository: samza Updated Branches: refs/heads/master fc7a4dce7 -> 493053dff
SAMZA-1812: Added configuration for changelog to local store backed tables Currently changelog for tables can be supported implicitly by adding user defined configuration, to be more user friendly we want to expose it though table API. This is applicable to RocksDB and in memory table implementation. - By default, changelog is enabled with auto-generated name <job-name><job-id>-table<table-id> - Added ability to disable changelog - Added ability to set changelog stream name, by default - Added validation of changelog stream name, and modify to confirm to Kafka topic name spec - Added configuration for replication factor - Disable changelog when user enables side input Author: Wei Song <[email protected]> Reviewers: Bharath Kumarasubramanian <[email protected]> Closes #611 from weisong44/fix-changelog and squashes the following commits: 010bdfd0 [Wei Song] Updated based on review comments 9a58d566 [Wei Song] Merge branch 'master' into fix-changelog f28b491d [Wei Song] Merge remote-tracking branch 'upstream/master' 2a4e85fa [Wei Song] Updated based on review comments eae48aee [Wei Song] Updated based on review comments 25fe2ebc [Wei Song] Updated based on review comments 56fb7f27 [Wei Song] Merge branch 'master' into fix-changelog 4782c61d [Wei Song] Merge remote-tracking branch 'upstream/master' 721d261b [Wei Song] Added more unit tests to TestRocksDbTableDescriptor 83b7c2d5 [Wei Song] Merge branch 'master' into fix-changelog 0440f75f [Wei Song] Merge remote-tracking branch 'upstream/master' 9ec54788 [Wei Song] SAMZA-1812: Added configuration for changelog to local store backed tables aae0f380 [Wei Song] Merge remote-tracking branch 'upstream/master' a15a7c9a [Wei Song] Merge remote-tracking branch 'upstream/master' 5cbf9af9 [Wei Song] Merge remote-tracking branch 'upstream/master' 3f7ed71f [Wei Song] Added self to committer list Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/493053df Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/493053df Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/493053df Branch: refs/heads/master Commit: 493053dff158582d9135f17bf28396e9c65e7e32 Parents: fc7a4dc Author: Wei Song <[email protected]> Authored: Thu Aug 23 16:33:16 2018 -0700 Committer: Wei Song <[email protected]> Committed: Thu Aug 23 16:33:16 2018 -0700 ---------------------------------------------------------------------- .../org/apache/samza/table/TableProvider.java | 12 +- .../org/apache/samza/execution/JobNode.java | 3 +- .../samza/operators/BaseTableDescriptor.java | 2 +- .../apache/samza/operators/StreamGraphSpec.java | 30 ++-- .../samza/table/TableConfigGenerator.java | 11 +- .../table/caching/CachingTableDescriptor.java | 3 +- .../table/caching/CachingTableProvider.java | 56 ++----- .../guava/GuavaCacheTableDescriptor.java | 3 +- .../caching/guava/GuavaCacheTableProvider.java | 44 +----- .../samza/table/remote/RemoteReadableTable.java | 5 + .../table/remote/RemoteTableDescriptor.java | 3 +- .../samza/table/remote/RemoteTableProvider.java | 43 +----- .../samza/table/utils/BaseTableProvider.java | 76 ++++++++++ .../org/apache/samza/config/StorageConfig.scala | 1 + .../samza/operators/TestStreamGraphSpec.java | 17 ++- .../kv/inmemory/InMemoryTableDescriptor.java | 3 + .../kv/inmemory/InMemoryTableProvider.java | 21 ++- .../kv/inmemory/TestInMemoryTableProvider.java | 9 +- .../storage/kv/RocksDbTableDescriptor.java | 7 +- .../samza/storage/kv/RocksDbTableProvider.java | 21 ++- .../storage/kv/TestRocksDbTableDescriptor.java | 15 ++ .../storage/kv/TestRocksDbTableProvider.java | 9 +- .../kv/BaseLocalStoreBackedTableDescriptor.java | 68 +++++++++ .../kv/BaseLocalStoreBackedTableProvider.java | 55 ++++--- .../kv/LocalStoreBackedReadableTable.java | 1 + .../TestBaseLocalStoreBackedTableProvider.java | 149 +++++++++++++++++++ .../TestLocalBaseStoreBackedTableProvider.java | 85 ----------- .../sql/testutil/TestIOResolverFactory.java | 15 +- .../table/TestTableDescriptorsProvider.java | 10 +- 29 files changed, 481 insertions(+), 296 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/493053df/samza-api/src/main/java/org/apache/samza/table/TableProvider.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java index 8e60dad..99446e4 100644 --- a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java +++ b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java @@ -21,10 +21,10 @@ package org.apache.samza.table; import java.util.Map; import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.config.Config; import org.apache.samza.container.SamzaContainerContext; import org.apache.samza.task.TaskContext; - /** * A table provider provides the implementation for a table. It ensures a table is * properly constructed and also manages its lifecycle. @@ -47,11 +47,15 @@ public interface TableProvider { /** * Generate any configuration for this table, the generated configuration * is used by Samza container to construct this table and any components - * necessary. - * @param config the current configuration + * necessary. Instead of manipulating the input parameters, this method + * should return the generated configuration. + * + * @param jobConfig the job config + * @param generatedConfig config generated by earlier processing, but has + * not yet been merged to job config * @return configuration for this table */ - Map<String, String> generateConfig(Map<String, String> config); + Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig); /** * Shutdown the underlying table http://git-wip-us.apache.org/repos/asf/samza/blob/493053df/samza-core/src/main/java/org/apache/samza/execution/JobNode.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java index dba47e1..2b279ef 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java @@ -129,6 +129,7 @@ public class JobNode { public JobConfig generateConfig(String executionPlanJson) { Map<String, String> configs = new HashMap<>(); configs.put(JobConfig.JOB_NAME(), jobName); + configs.put(JobConfig.JOB_ID(), jobId); final List<String> inputs = new ArrayList<>(); final List<String> broadcasts = new ArrayList<>(); @@ -177,7 +178,7 @@ public class JobNode { // write serialized serde instances and stream serde configs to configs addSerdeConfigs(configs); - configs.putAll(TableConfigGenerator.generateConfigsForTableSpecs(tables)); + configs.putAll(TableConfigGenerator.generateConfigsForTableSpecs(new MapConfig(configs), tables)); // Add side inputs to the inputs and mark the stream as bootstrap tables.forEach(tableSpec -> { http://git-wip-us.apache.org/repos/asf/samza/blob/493053df/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java index b875c2e..f81f3b8 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java +++ b/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java @@ -45,7 +45,7 @@ abstract public class BaseTableDescriptor<K, V, D extends BaseTableDescriptor<K, /** * Constructs a table descriptor instance - * @param tableId Id of the table + * @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ } */ protected BaseTableDescriptor(String tableId) { this.tableId = tableId; http://git-wip-us.apache.org/repos/asf/samza/blob/493053df/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java index a187b94..7dcd32e 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java @@ -53,7 +53,8 @@ import com.google.common.base.Preconditions; */ public class StreamGraphSpec implements StreamGraph { private static final Logger LOGGER = LoggerFactory.getLogger(StreamGraphSpec.class); - private static final Pattern ID_PATTERN = Pattern.compile("[\\d\\w-_.]+"); + public static final Pattern STREAM_ID_PATTERN = Pattern.compile("[\\d\\w-_.]+"); + public static final Pattern TABLE_ID_PATTERN = Pattern.compile("[\\d\\w-_]+"); // We use a LHM for deterministic order in initializing and closing operators. private final Map<String, InputOperatorSpec> inputOperators = new LinkedHashMap<>(); @@ -86,8 +87,8 @@ public class StreamGraphSpec implements StreamGraph { @Override public <M> MessageStream<M> getInputStream(String streamId, Serde<M> serde) { - Preconditions.checkState(isValidId(streamId), - "streamId must be non-empty and must not contain spaces or special characters: " + streamId); + Preconditions.checkState(isValidStreamId(streamId), String.format( + "streamId %s doesn't confirm to pattern %s", streamId, StreamGraphSpec.STREAM_ID_PATTERN)); Preconditions.checkNotNull(serde, "serde must not be null for an input stream."); Preconditions.checkState(!inputOperators.containsKey(streamId), "getInputStream must not be called multiple times with the same streamId: " + streamId); @@ -117,8 +118,8 @@ public class StreamGraphSpec implements StreamGraph { @Override public <M> OutputStream<M> getOutputStream(String streamId, Serde<M> serde) { - Preconditions.checkState(isValidId(streamId), - "streamId must be non-empty and must not contain spaces or special characters: " + streamId); + Preconditions.checkState(isValidStreamId(streamId), String.format( + "streamId %s doesn't confirm to pattern %s", streamId, StreamGraphSpec.STREAM_ID_PATTERN)); Preconditions.checkNotNull(serde, "serde must not be null for an output stream."); Preconditions.checkState(!outputStreams.containsKey(streamId), "getOutputStream must not be called multiple times with the same streamId: " + streamId); @@ -145,12 +146,11 @@ public class StreamGraphSpec implements StreamGraph { @Override public <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDesc) { + Preconditions.checkState(isValidTableId(tableDesc.getTableId()), String.format( + "tableId %s doesn't confirm to pattern %s", tableDesc.getTableId(), TABLE_ID_PATTERN.toString())); TableSpec tableSpec = ((BaseTableDescriptor) tableDesc).getTableSpec(); - if (tables.containsKey(tableSpec)) { - throw new IllegalStateException(String.format( - "getTable() invoked multiple times with the same tableId: %s", - tableDesc.getTableId())); - } + Preconditions.checkState(!tables.containsKey(tableSpec), String.format( + "getTable() invoked multiple times with the same tableId: %s", tableDesc.getTableId())); tables.put(tableSpec, new TableImpl(tableSpec)); return tables.get(tableSpec); } @@ -178,7 +178,7 @@ public class StreamGraphSpec implements StreamGraph { * @return the unique ID for the next operator in the graph */ public String getNextOpId(OpCode opCode, String userDefinedId) { - if (StringUtils.isNotBlank(userDefinedId) && !ID_PATTERN.matcher(userDefinedId).matches()) { + if (StringUtils.isNotBlank(userDefinedId) && !STREAM_ID_PATTERN.matcher(userDefinedId).matches()) { throw new SamzaException("Operator ID must not contain spaces or special characters: " + userDefinedId); } @@ -267,8 +267,12 @@ public class StreamGraphSpec implements StreamGraph { return Collections.unmodifiableMap(tables); } - private boolean isValidId(String id) { - return StringUtils.isNotBlank(id) && ID_PATTERN.matcher(id).matches(); + public static boolean isValidStreamId(String id) { + return StringUtils.isNotBlank(id) && STREAM_ID_PATTERN.matcher(id).matches(); + } + + public static boolean isValidTableId(String id) { + return StringUtils.isNotBlank(id) && TABLE_ID_PATTERN.matcher(id).matches(); } private KV<Serde, Serde> getKVSerdes(String streamId, Serde serde) { http://git-wip-us.apache.org/repos/asf/samza/blob/493053df/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java b/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java index 3b87eff..085131c 100644 --- a/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java +++ b/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import org.apache.samza.config.Config; import org.apache.samza.config.JavaTableConfig; import org.apache.samza.config.SerializerConfig; import org.apache.samza.operators.BaseTableDescriptor; @@ -48,19 +49,21 @@ public class TableConfigGenerator { /** * Generate table configurations given a list of table descriptors + * @param config the job configuration * @param tableDescriptors the list of tableDescriptors * @return configuration for the tables */ - static public Map<String, String> generateConfigsForTableDescs(List<TableDescriptor> tableDescriptors) { - return generateConfigsForTableSpecs(getTableSpecs(tableDescriptors)); + static public Map<String, String> generateConfigsForTableDescs(Config config, List<TableDescriptor> tableDescriptors) { + return generateConfigsForTableSpecs(config, getTableSpecs(tableDescriptors)); } /** * Generate table configurations given a list of table specs + * @param config the job configuration * @param tableSpecs the list of tableSpecs * @return configuration for the tables */ - static public Map<String, String> generateConfigsForTableSpecs(List<TableSpec> tableSpecs) { + static public Map<String, String> generateConfigsForTableSpecs(Config config, List<TableSpec> tableSpecs) { Map<String, String> tableConfigs = new HashMap<>(); tableConfigs.putAll(generateTableKVSerdeConfigs(tableSpecs)); @@ -74,7 +77,7 @@ public class TableConfigGenerator { TableProviderFactory tableProviderFactory = Util.getObj(tableSpec.getTableProviderFactoryClassName(), TableProviderFactory.class); TableProvider tableProvider = tableProviderFactory.getTableProvider(tableSpec); - tableConfigs.putAll(tableProvider.generateConfig(tableConfigs)); + tableConfigs.putAll(tableProvider.generateConfig(config, tableConfigs)); }); LOG.info("TableConfigGenerator has generated configs {}", tableConfigs); http://git-wip-us.apache.org/repos/asf/samza/blob/493053df/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java index 21463c2..a1accd8 100644 --- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java +++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java @@ -45,8 +45,7 @@ public class CachingTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Cach private boolean isWriteAround; /** - * Constructs a table descriptor instance - * @param tableId Id of the table + * {@inheritDoc} */ public CachingTableDescriptor(String tableId) { super(tableId); http://git-wip-us.apache.org/repos/asf/samza/blob/493053df/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java index a7d65bc..d5f7767 100644 --- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java +++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java @@ -20,30 +20,22 @@ package org.apache.samza.table.caching; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.TimeUnit; -import org.apache.samza.config.JavaTableConfig; -import org.apache.samza.container.SamzaContainerContext; import org.apache.samza.table.ReadWriteTable; import org.apache.samza.table.ReadableTable; import org.apache.samza.table.Table; -import org.apache.samza.table.TableProvider; import org.apache.samza.table.TableSpec; import org.apache.samza.table.caching.guava.GuavaCacheTable; -import org.apache.samza.task.TaskContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.samza.table.utils.BaseTableProvider; import com.google.common.cache.CacheBuilder; /** * Table provider for {@link CachingTable}. */ -public class CachingTableProvider implements TableProvider { - private static final Logger LOG = LoggerFactory.getLogger(CachingTableProvider.class); +public class CachingTableProvider extends BaseTableProvider { public static final String REAL_TABLE_ID = "realTableId"; public static final String CACHE_TABLE_ID = "cacheTableId"; @@ -52,30 +44,19 @@ public class CachingTableProvider implements TableProvider { public static final String CACHE_SIZE = "cacheSize"; public static final String WRITE_AROUND = "writeAround"; - private final TableSpec cachingTableSpec; - // Store the cache instances created by default private final List<ReadWriteTable> defaultCaches = new ArrayList<>(); - private SamzaContainerContext containerContext; - private TaskContext taskContext; - public CachingTableProvider(TableSpec tableSpec) { - this.cachingTableSpec = tableSpec; - } - - @Override - public void init(SamzaContainerContext containerContext, TaskContext taskContext) { - this.taskContext = taskContext; - this.containerContext = containerContext; + super(tableSpec); } @Override public Table getTable() { - String realTableId = cachingTableSpec.getConfig().get(REAL_TABLE_ID); + String realTableId = tableSpec.getConfig().get(REAL_TABLE_ID); ReadableTable table = (ReadableTable) taskContext.getTable(realTableId); - String cacheTableId = cachingTableSpec.getConfig().get(CACHE_TABLE_ID); + String cacheTableId = tableSpec.getConfig().get(CACHE_TABLE_ID); ReadWriteTable cache; if (cacheTableId != null) { @@ -85,36 +66,21 @@ public class CachingTableProvider implements TableProvider { defaultCaches.add(cache); } - boolean isWriteAround = Boolean.parseBoolean(cachingTableSpec.getConfig().get(WRITE_AROUND)); - CachingTable cachingTable = new CachingTable(cachingTableSpec.getId(), table, cache, isWriteAround); + boolean isWriteAround = Boolean.parseBoolean(tableSpec.getConfig().get(WRITE_AROUND)); + CachingTable cachingTable = new CachingTable(tableSpec.getId(), table, cache, isWriteAround); cachingTable.init(containerContext, taskContext); return cachingTable; } @Override - public Map<String, String> generateConfig(Map<String, String> config) { - Map<String, String> tableConfig = new HashMap<>(); - - // Insert table_id prefix to config entries - cachingTableSpec.getConfig().forEach((k, v) -> { - String realKey = String.format(JavaTableConfig.TABLE_ID_PREFIX, cachingTableSpec.getId()) + "." + k; - tableConfig.put(realKey, v); - }); - - LOG.info("Generated configuration for table " + cachingTableSpec.getId()); - - return tableConfig; - } - - @Override public void close() { defaultCaches.forEach(c -> c.close()); } private ReadWriteTable createDefaultCacheTable(String tableId) { - long readTtlMs = Long.parseLong(cachingTableSpec.getConfig().getOrDefault(READ_TTL_MS, "-1")); - long writeTtlMs = Long.parseLong(cachingTableSpec.getConfig().getOrDefault(WRITE_TTL_MS, "-1")); - long cacheSize = Long.parseLong(cachingTableSpec.getConfig().getOrDefault(CACHE_SIZE, "-1")); + long readTtlMs = Long.parseLong(tableSpec.getConfig().getOrDefault(READ_TTL_MS, "-1")); + long writeTtlMs = Long.parseLong(tableSpec.getConfig().getOrDefault(WRITE_TTL_MS, "-1")); + long cacheSize = Long.parseLong(tableSpec.getConfig().getOrDefault(CACHE_SIZE, "-1")); CacheBuilder cacheBuilder = CacheBuilder.newBuilder(); if (readTtlMs != -1) { @@ -127,7 +93,7 @@ public class CachingTableProvider implements TableProvider { cacheBuilder.maximumSize(cacheSize); } - LOG.info(String.format("Creating default cache with: readTtl=%d, writeTtl=%d, maxSize=%d", + logger.info(String.format("Creating default cache with: readTtl=%d, writeTtl=%d, maxSize=%d", readTtlMs, writeTtlMs, cacheSize)); GuavaCacheTable cacheTable = new GuavaCacheTable(tableId + "-def-cache", cacheBuilder.build()); http://git-wip-us.apache.org/repos/asf/samza/blob/493053df/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableDescriptor.java index ce125c0..4a05013 100644 --- a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableDescriptor.java +++ b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableDescriptor.java @@ -39,8 +39,7 @@ public class GuavaCacheTableDescriptor<K, V> extends BaseTableDescriptor<K, V, G private Cache<K, V> cache; /** - * Constructs a table descriptor instance - * @param tableId Id of the table + * {@inheritDoc} */ public GuavaCacheTableDescriptor(String tableId) { super(tableId); http://git-wip-us.apache.org/repos/asf/samza/blob/493053df/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java index 1ba26c7..1513249 100644 --- a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java +++ b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java @@ -20,19 +20,12 @@ package org.apache.samza.table.caching.guava; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import org.apache.samza.config.JavaTableConfig; -import org.apache.samza.container.SamzaContainerContext; import org.apache.samza.table.Table; -import org.apache.samza.table.TableProvider; import org.apache.samza.table.TableSpec; +import org.apache.samza.table.utils.BaseTableProvider; import org.apache.samza.table.utils.SerdeUtils; -import org.apache.samza.task.TaskContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.common.cache.Cache; @@ -40,53 +33,26 @@ import com.google.common.cache.Cache; /** * Table provider for {@link GuavaCacheTable}. */ -public class GuavaCacheTableProvider implements TableProvider { - private static final Logger LOG = LoggerFactory.getLogger(GuavaCacheTableProvider.class); +public class GuavaCacheTableProvider extends BaseTableProvider { public static final String GUAVA_CACHE = "guavaCache"; - private final TableSpec guavaCacheTableSpec; - - private SamzaContainerContext containerContext; - private TaskContext taskContext; - private List<GuavaCacheTable> guavaTables = new ArrayList<>(); public GuavaCacheTableProvider(TableSpec tableSpec) { - this.guavaCacheTableSpec = tableSpec; - } - - @Override - public void init(SamzaContainerContext containerContext, TaskContext taskContext) { - this.taskContext = taskContext; - this.containerContext = containerContext; + super(tableSpec); } @Override public Table getTable() { - Cache guavaCache = SerdeUtils.deserialize(GUAVA_CACHE, guavaCacheTableSpec.getConfig().get(GUAVA_CACHE)); - GuavaCacheTable table = new GuavaCacheTable(guavaCacheTableSpec.getId(), guavaCache); + Cache guavaCache = SerdeUtils.deserialize(GUAVA_CACHE, tableSpec.getConfig().get(GUAVA_CACHE)); + GuavaCacheTable table = new GuavaCacheTable(tableSpec.getId(), guavaCache); table.init(containerContext, taskContext); guavaTables.add(table); return table; } @Override - public Map<String, String> generateConfig(Map<String, String> config) { - Map<String, String> tableConfig = new HashMap<>(); - - // Insert table_id prefix to config entries - guavaCacheTableSpec.getConfig().forEach((k, v) -> { - String realKey = String.format(JavaTableConfig.TABLE_ID_PREFIX, guavaCacheTableSpec.getId()) + "." + k; - tableConfig.put(realKey, v); - }); - - LOG.info("Generated configuration for table " + guavaCacheTableSpec.getId()); - - return tableConfig; - } - - @Override public void close() { guavaTables.forEach(t -> t.close()); } http://git-wip-us.apache.org/repos/asf/samza/blob/493053df/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java index 24edbce..3186fee 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java @@ -169,6 +169,7 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> { /** * Execute an async request given a table key + * @param rateLimiter helper for rate limiting * @param key key of the table record * @param method method to be executed * @param timer latency metric to be updated @@ -199,6 +200,7 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> { /** * Execute an async request given a table record (key+value) + * @param rateLimiter helper for rate limiting * @param key key of the table record * @param value value of the table record * @param method method to be executed @@ -229,9 +231,11 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> { /** * Execute an async request given a collection of table keys + * @param rateLimiter helper for rate limiting * @param keys collection of keys * @param method method to be executed * @param timer latency metric to be updated + * @param <T> return type * @return CompletableFuture of the operation */ protected <T> CompletableFuture<T> execute(TableRateLimiter<K, V> rateLimiter, @@ -258,6 +262,7 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> { /** * Execute an async request given a collection of table records + * @param rateLimiter helper for rate limiting * @param records list of records * @param method method to be executed * @param timer latency metric to be updated http://git-wip-us.apache.org/repos/asf/samza/blob/493053df/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java index e405096..a8d419d 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java @@ -75,8 +75,7 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot private int asyncCallbackPoolSize = -1; /** - * Construct a table descriptor instance - * @param tableId Id of the table + * {@inheritDoc} */ public RemoteTableDescriptor(String tableId) { super(tableId); http://git-wip-us.apache.org/repos/asf/samza/blob/493053df/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java index f09c6fd..6c5d9b3 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java @@ -20,23 +20,17 @@ package org.apache.samza.table.remote; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import org.apache.samza.config.JavaTableConfig; -import org.apache.samza.container.SamzaContainerContext; import org.apache.samza.table.Table; -import org.apache.samza.table.TableProvider; import org.apache.samza.table.TableSpec; +import org.apache.samza.table.utils.BaseTableProvider; import org.apache.samza.table.utils.SerdeUtils; -import org.apache.samza.task.TaskContext; import org.apache.samza.util.RateLimiter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_READ_TAG; import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_WRITE_TAG; @@ -45,8 +39,7 @@ import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_WRITE_TAG; /** * Provide for remote table instances */ -public class RemoteTableProvider implements TableProvider { - private static final Logger LOG = LoggerFactory.getLogger(RemoteTableProvider.class); +public class RemoteTableProvider extends BaseTableProvider { static final String READ_FN = "io.read.func"; static final String WRITE_FN = "io.write.func"; @@ -55,11 +48,8 @@ public class RemoteTableProvider implements TableProvider { static final String WRITE_CREDIT_FN = "io.write.credit.func"; static final String ASYNC_CALLBACK_POOL_SIZE = "io.async.callback.pool.size"; - private final TableSpec tableSpec; private final boolean readOnly; private final List<RemoteReadableTable<?, ?>> tables = new ArrayList<>(); - private SamzaContainerContext containerContext; - private TaskContext taskContext; /** * Map of tableId -> executor service for async table IO and callbacks. The same executors @@ -70,7 +60,7 @@ public class RemoteTableProvider implements TableProvider { private static Map<String, ExecutorService> callbackExecutors = new ConcurrentHashMap<>(); public RemoteTableProvider(TableSpec tableSpec) { - this.tableSpec = tableSpec; + super(tableSpec); this.readOnly = !tableSpec.getConfig().containsKey(WRITE_FN); } @@ -78,15 +68,6 @@ public class RemoteTableProvider implements TableProvider { * {@inheritDoc} */ @Override - public void init(SamzaContainerContext containerContext, TaskContext taskContext) { - this.containerContext = containerContext; - this.taskContext = taskContext; - } - - /** - * {@inheritDoc} - */ - @Override public Table getTable() { RemoteReadableTable table; String tableId = tableSpec.getId(); @@ -148,24 +129,6 @@ public class RemoteTableProvider implements TableProvider { * {@inheritDoc} */ @Override - public Map<String, String> generateConfig(Map<String, String> config) { - Map<String, String> tableConfig = new HashMap<>(); - - // Insert table_id prefix to config entries - tableSpec.getConfig().forEach((k, v) -> { - String realKey = String.format(JavaTableConfig.TABLE_ID_PREFIX, tableSpec.getId()) + "." + k; - tableConfig.put(realKey, v); - }); - - LOG.info("Generated configuration for table " + tableSpec.getId()); - - return tableConfig; - } - - /** - * {@inheritDoc} - */ - @Override public void close() { tables.forEach(t -> t.close()); tableExecutors.values().forEach(e -> e.shutdown()); http://git-wip-us.apache.org/repos/asf/samza/blob/493053df/samza-core/src/main/java/org/apache/samza/table/utils/BaseTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/BaseTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/utils/BaseTableProvider.java new file mode 100644 index 0000000..960e2a4 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/table/utils/BaseTableProvider.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.table.utils; + +import java.util.HashMap; +import java.util.Map; +import org.apache.samza.config.Config; +import org.apache.samza.config.JavaTableConfig; +import org.apache.samza.container.SamzaContainerContext; +import org.apache.samza.table.TableProvider; +import org.apache.samza.table.TableSpec; +import org.apache.samza.task.TaskContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Base class for all table provider implementations. + */ +abstract public class BaseTableProvider implements TableProvider { + + final protected Logger logger = LoggerFactory.getLogger(getClass()); + + final protected TableSpec tableSpec; + + protected SamzaContainerContext containerContext; + protected TaskContext taskContext; + + public BaseTableProvider(TableSpec tableSpec) { + this.tableSpec = tableSpec; + } + + /** + * {@inheritDoc} + */ + @Override + public void init(SamzaContainerContext containerContext, TaskContext taskContext) { + this.containerContext = containerContext; + this.taskContext = taskContext; + } + + /** + * {@inheritDoc} + */ + @Override + public Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig) { + Map<String, String> tableConfig = new HashMap<>(); + + // Insert table_id prefix to config entries + tableSpec.getConfig().forEach((k, v) -> { + String realKey = String.format(JavaTableConfig.TABLE_ID_PREFIX, tableSpec.getId()) + "." + k; + tableConfig.put(realKey, v); + }); + + logger.info("Generated configuration for table " + tableSpec.getId()); + + return tableConfig; + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/493053df/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala index 42b6130..1577e62 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala @@ -31,6 +31,7 @@ object StorageConfig { val MSG_SERDE = "stores.%s.msg.serde" val CHANGELOG_STREAM = "stores.%s.changelog" val CHANGELOG_SYSTEM = "job.changelog.system" + val CHANGELOG_REPLICATION_FACTOR = "stores.%s.changelog.replication.factor" val CHANGELOG_DELETE_RETENTION_MS = "stores.%s.changelog.delete.retention.ms" val DEFAULT_CHANGELOG_DELETE_RETENTION_MS = TimeUnit.DAYS.toMillis(1) val ACCESSLOG_STREAM_SUFFIX = "access-log" http://git-wip-us.apache.org/repos/asf/samza/blob/493053df/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java index dfb4b70..109c138 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java @@ -384,7 +384,7 @@ public class TestStreamGraphSpec { public void testGetIntermediateStreamWithDefaultKeyValueSerde() { Config mockConfig = mock(Config.class); String streamId = "streamId"; - + StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig); KVSerde mockKVSerde = mock(KVSerde.class); @@ -409,7 +409,7 @@ public class TestStreamGraphSpec { public void testGetIntermediateStreamWithDefaultDefaultSerde() { Config mockConfig = mock(Config.class); String streamId = "streamId"; - + StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig); IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl = graphSpec.getIntermediateStream(streamId, null); @@ -498,7 +498,7 @@ public class TestStreamGraphSpec { String testStreamId1 = "test-stream-1"; String testStreamId2 = "test-stream-2"; String testStreamId3 = "test-stream-3"; - + graphSpec.getInputStream("test-stream-1"); graphSpec.getInputStream("test-stream-2"); graphSpec.getInputStream("test-stream-3"); @@ -516,8 +516,19 @@ public class TestStreamGraphSpec { StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig); BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class); + when(mockTableDescriptor.getTableId()).thenReturn("t1"); when(mockTableDescriptor.getTableSpec()).thenReturn( new TableSpec("t1", KVSerde.of(new NoOpSerde(), new NoOpSerde()), "", new HashMap<>())); assertNotNull(graphSpec.getTable(mockTableDescriptor)); } + + @Test(expected = IllegalStateException.class) + public void testGetTableWithBadId() { + Config mockConfig = mock(Config.class); + StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig); + + BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class); + when(mockTableDescriptor.getTableId()).thenReturn("my.table"); + graphSpec.getTable(mockTableDescriptor); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/493053df/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java b/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java index 2a9532b..8328417 100644 --- a/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java +++ b/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java @@ -33,6 +33,9 @@ import org.apache.samza.table.TableSpec; */ public class InMemoryTableDescriptor<K, V> extends BaseLocalStoreBackedTableDescriptor<K, V, InMemoryTableDescriptor<K, V>> { + /** + * {@inheritDoc} + */ public InMemoryTableDescriptor(String tableId) { super(tableId); } http://git-wip-us.apache.org/repos/asf/samza/blob/493053df/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableProvider.java b/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableProvider.java index c1c2f1c..46406e5 100644 --- a/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableProvider.java +++ b/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableProvider.java @@ -21,6 +21,7 @@ package org.apache.samza.storage.kv.inmemory; import java.util.HashMap; import java.util.Map; +import org.apache.samza.config.Config; import org.apache.samza.config.JavaTableConfig; import org.apache.samza.config.StorageConfig; import org.apache.samza.storage.kv.BaseLocalStoreBackedTableProvider; @@ -37,7 +38,7 @@ public class InMemoryTableProvider extends BaseLocalStoreBackedTableProvider { } @Override - public Map<String, String> generateConfig(Map<String, String> config) { + public Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig) { Map<String, String> tableConfig = new HashMap<>(); @@ -47,15 +48,19 @@ public class InMemoryTableProvider extends BaseLocalStoreBackedTableProvider { InMemoryKeyValueStorageEngineFactory.class.getName()); // Common store configuration - tableConfig.putAll(generateCommonStoreConfig(config)); + tableConfig.putAll(generateCommonStoreConfig(jobConfig, generatedConfig)); // Rest of the configuration - tableSpec.getConfig().forEach((k, v) -> { - String realKey = k.startsWith("inmemory.") ? - String.format("stores.%s", tableSpec.getId()) + "." + k.substring("inmemory.".length()) - : String.format(JavaTableConfig.TABLE_ID_PREFIX, tableSpec.getId()) + "." + k; - tableConfig.put(realKey, v); - }); + tableSpec.getConfig().entrySet().stream() + .filter(e -> !e.getKey().startsWith("internal.")) + .forEach(e -> { + String k = e.getKey(); + String v = e.getValue(); + String realKey = k.startsWith("inmemory.") + ? String.format("stores.%s", tableSpec.getId()) + "." + k.substring("inmemory.".length()) + : String.format(JavaTableConfig.TABLE_ID_PREFIX, tableSpec.getId()) + "." + k; + tableConfig.put(realKey, v); + }); logger.info("Generated configuration for table " + tableSpec.getId()); http://git-wip-us.apache.org/repos/asf/samza/blob/493053df/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableProvider.java b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableProvider.java index 76b7a66..2145b68 100644 --- a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableProvider.java +++ b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableProvider.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.samza.config.JavaTableConfig; +import org.apache.samza.config.MapConfig; import org.apache.samza.config.StorageConfig; import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.KVSerde; @@ -45,12 +46,12 @@ public class TestInMemoryTableProvider { TableSpec tableSpec = new TableSpec("t1", KVSerde.of(new IntegerSerde(), new IntegerSerde()), "my-table-provider-factory", tableSpecConfig); - Map<String, String> config = new HashMap<>(); - config.put(String.format(JavaTableConfig.TABLE_KEY_SERDE, "t1"), "ks1"); - config.put(String.format(JavaTableConfig.TABLE_VALUE_SERDE, "t1"), "vs1"); + Map<String, String> generatedConfig = new HashMap<>(); + generatedConfig.put(String.format(JavaTableConfig.TABLE_KEY_SERDE, "t1"), "ks1"); + generatedConfig.put(String.format(JavaTableConfig.TABLE_VALUE_SERDE, "t1"), "vs1"); TableProvider tableProvider = new InMemoryTableProvider(tableSpec); - Map<String, String> tableConfig = tableProvider.generateConfig(config); + Map<String, String> tableConfig = tableProvider.generateConfig(new MapConfig(), generatedConfig); Assert.assertEquals("ks1", tableConfig.get(String.format(StorageConfig.KEY_SERDE(), "t1"))); Assert.assertEquals("vs1", tableConfig.get(String.format(StorageConfig.MSG_SERDE(), "t1"))); http://git-wip-us.apache.org/repos/asf/samza/blob/493053df/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java index 33bfc84..9b81605 100644 --- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java +++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java @@ -44,8 +44,8 @@ public class RocksDbTableDescriptor<K, V> extends BaseLocalStoreBackedTableDescr static final public String ROCKSDB_MAX_LOG_FILE_SIZE_BYTES = "rocksdb.max.log.file.size.bytes"; static final public String ROCKSDB_KEEP_LOG_FILE_NUM = "rocksdb.keep.log.file.num"; - protected Integer writeBatchSize; - protected Integer objectCacheSize; + private Integer writeBatchSize; + private Integer objectCacheSize; private Integer cacheSize; private Integer writeBufferSize; private Integer blockSize; @@ -56,6 +56,9 @@ public class RocksDbTableDescriptor<K, V> extends BaseLocalStoreBackedTableDescr private String compressionType; private String compactionStyle; + /** + * {@inheritDoc} + */ public RocksDbTableDescriptor(String tableId) { super(tableId); } http://git-wip-us.apache.org/repos/asf/samza/blob/493053df/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProvider.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProvider.java index dce7cc0..df60a5a 100644 --- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProvider.java +++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProvider.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.samza.config.ClusterManagerConfig; +import org.apache.samza.config.Config; import org.apache.samza.config.JavaTableConfig; import org.apache.samza.config.StorageConfig; import org.apache.samza.table.TableSpec; @@ -37,7 +38,7 @@ public class RocksDbTableProvider extends BaseLocalStoreBackedTableProvider { } @Override - public Map<String, String> generateConfig(Map<String, String> config) { + public Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig) { Map<String, String> tableConfig = new HashMap<>(); @@ -47,15 +48,19 @@ public class RocksDbTableProvider extends BaseLocalStoreBackedTableProvider { RocksDbKeyValueStorageEngineFactory.class.getName()); // Common store configuration - tableConfig.putAll(generateCommonStoreConfig(config)); + tableConfig.putAll(generateCommonStoreConfig(jobConfig, generatedConfig)); // Rest of the configuration - tableSpec.getConfig().forEach((k, v) -> { - String realKey = k.startsWith("rocksdb.") ? - String.format("stores.%s", tableSpec.getId()) + "." + k.substring("rocksdb.".length()) - : String.format(JavaTableConfig.TABLE_ID_PREFIX, tableSpec.getId()) + "." + k; - tableConfig.put(realKey, v); - }); + tableSpec.getConfig().entrySet().stream() + .filter(e -> !e.getKey().startsWith("internal.")) + .forEach(e -> { + String k = e.getKey(); + String v = e.getValue(); + String realKey = k.startsWith("rocksdb.") + ? String.format("stores.%s", tableSpec.getId()) + "." + k.substring("rocksdb.".length()) + : String.format(JavaTableConfig.TABLE_ID_PREFIX, tableSpec.getId()) + "." + k; + tableConfig.put(realKey, v); + }); // Enable host affinity tableConfig.put(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, "true"); http://git-wip-us.apache.org/repos/asf/samza/blob/493053df/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java index 49fe6eb..50f0920 100644 --- a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java +++ b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java @@ -79,6 +79,21 @@ public class TestRocksDbTableDescriptor { Assert.assertEquals("snappy", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_COMPRESSION)); Assert.assertEquals("fifo", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_COMPACTION_STYLE)); Assert.assertEquals("xyz", getConfig(tableSpec, "abc")); + Assert.assertEquals("false", tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_ENABLE_CHANGELOG)); + } + + @Test + public void testTableSpecWithChangelogEnabled() { + + TableSpec tableSpec = new RocksDbTableDescriptor<Integer, String>("1") + .withSerde(KVSerde.of(new IntegerSerde(), new StringSerde())) + .withChangelogStream("changelog-$tream") + .withChangelogReplicationFactor(10) + .getTableSpec(); + + Assert.assertEquals("10", tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_CHANGELOG_REPLICATION_FACTOR)); + Assert.assertEquals("changelog-$tream", tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_CHANGELOG_STREAM)); + Assert.assertEquals("true", tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_ENABLE_CHANGELOG)); } private String getConfig(TableSpec tableSpec, String key) { http://git-wip-us.apache.org/repos/asf/samza/blob/493053df/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableProvider.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableProvider.java index beda5da..8ce061c 100644 --- a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableProvider.java +++ b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableProvider.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.samza.config.JavaTableConfig; +import org.apache.samza.config.MapConfig; import org.apache.samza.config.StorageConfig; import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.KVSerde; @@ -46,12 +47,12 @@ public class TestRocksDbTableProvider { TableSpec tableSpec = new TableSpec("t1", KVSerde.of(new IntegerSerde(), new IntegerSerde()), "my-table-provider-factory", tableSpecConfig); - Map<String, String> config = new HashMap<>(); - config.put(String.format(JavaTableConfig.TABLE_KEY_SERDE, "t1"), "ks1"); - config.put(String.format(JavaTableConfig.TABLE_VALUE_SERDE, "t1"), "vs1"); + Map<String, String> generatedConfig = new HashMap<>(); + generatedConfig.put(String.format(JavaTableConfig.TABLE_KEY_SERDE, "t1"), "ks1"); + generatedConfig.put(String.format(JavaTableConfig.TABLE_VALUE_SERDE, "t1"), "vs1"); TableProvider tableProvider = new RocksDbTableProvider(tableSpec); - Map<String, String> tableConfig = tableProvider.generateConfig(config); + Map<String, String> tableConfig = tableProvider.generateConfig(new MapConfig(), generatedConfig); Assert.assertEquals("ks1", tableConfig.get(String.format(StorageConfig.KEY_SERDE(), "t1"))); Assert.assertEquals("vs1", tableConfig.get(String.format(StorageConfig.MSG_SERDE(), "t1"))); http://git-wip-us.apache.org/repos/asf/samza/blob/493053df/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java index 2d05f95..c46f9e1 100644 --- a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java +++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java @@ -36,11 +36,20 @@ import org.apache.samza.storage.SideInputsProcessor; */ abstract public class BaseLocalStoreBackedTableDescriptor<K, V, D extends BaseLocalStoreBackedTableDescriptor<K, V, D>> extends BaseTableDescriptor<K, V, D> { + + static final public String INTERNAL_ENABLE_CHANGELOG = "internal.enable.changelog"; + static final public String INTERNAL_CHANGELOG_STREAM = "internal.changelog.stream"; + static final public String INTERNAL_CHANGELOG_REPLICATION_FACTOR = "internal.changelog.replication.factor"; + protected List<String> sideInputs; protected SideInputsProcessor sideInputsProcessor; + protected boolean enableChangelog; + protected String changelogStream; + protected Integer changelogReplicationFactor; /** * Constructs a table descriptor instance + * * @param tableId Id of the table */ public BaseLocalStoreBackedTableDescriptor(String tableId) { @@ -49,6 +58,10 @@ abstract public class BaseLocalStoreBackedTableDescriptor<K, V, D extends BaseLo public D withSideInputs(List<String> sideInputs) { this.sideInputs = sideInputs; + // Disable changelog + this.enableChangelog = false; + this.changelogStream = null; + this.changelogReplicationFactor = null; return (D) this; } @@ -57,9 +70,56 @@ abstract public class BaseLocalStoreBackedTableDescriptor<K, V, D extends BaseLo return (D) this; } + /** + * Enable changelog for this table, by default changelog is disabled. When the + * changelog stream name is not specified, it is automatically generated in + * the format { @literal [job-name]-[job-id]-table-[table-id] }. + * Refer to <code>stores.store-name.changelog</code> in Samza configuration guide + * + * @return this table descriptor instance + */ + public D withChangelogEnabled() { + this.enableChangelog = true; + return (D) this; + } + + /** + * Refer to <code>stores.store-name.changelog</code> in Samza configuration guide + * + * @param changelogStream changelog stream name + * @return this table descriptor instance + */ + public D withChangelogStream(String changelogStream) { + this.enableChangelog = true; + this.changelogStream = changelogStream; + return (D) this; + } + + /** + * Refer to <code>stores.store-name.changelog.replication.factor</code> in Samza configuration guide + * + * @param replicationFactor replication factor + * @return this table descriptor instance + */ + public D withChangelogReplicationFactor(int replicationFactor) { + this.enableChangelog = true; + this.changelogReplicationFactor = replicationFactor; + return (D) this; + } + @Override protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) { super.generateTableSpecConfig(tableSpecConfig); + + tableSpecConfig.put(INTERNAL_ENABLE_CHANGELOG, String.valueOf(enableChangelog)); + if (enableChangelog) { + if (changelogStream != null) { + tableSpecConfig.put(INTERNAL_CHANGELOG_STREAM, changelogStream); + } + if (changelogReplicationFactor != null) { + tableSpecConfig.put(INTERNAL_CHANGELOG_REPLICATION_FACTOR, String.valueOf(changelogReplicationFactor)); + } + } } /** @@ -72,6 +132,14 @@ abstract public class BaseLocalStoreBackedTableDescriptor<K, V, D extends BaseLo String.format("Invalid side input configuration for table: %s. " + "Both side inputs and the processor must be provided", tableId)); } + if (!enableChangelog) { + Preconditions.checkState(changelogStream == null, + String.format("Invalid changelog configuration for table: %s. Changelog " + + "must be enabled, when changelog stream name is provided", tableId)); + Preconditions.checkState(changelogReplicationFactor == null, + String.format("Invalid changelog configuration for table: %s. Changelog " + + "must be enabled, when changelog replication factor is provided", tableId)); + } } } http://git-wip-us.apache.org/repos/asf/samza/blob/493053df/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java index cacfe95..16da035 100644 --- a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java +++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java @@ -22,20 +22,22 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.commons.lang3.StringUtils; import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; import org.apache.samza.config.JavaStorageConfig; import org.apache.samza.config.JavaTableConfig; +import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.StorageConfig; import org.apache.samza.container.SamzaContainerContext; +import org.apache.samza.operators.StreamGraphSpec; import org.apache.samza.table.ReadableTable; import org.apache.samza.table.Table; -import org.apache.samza.table.TableProvider; import org.apache.samza.table.TableSpec; +import org.apache.samza.table.utils.BaseTableProvider; import org.apache.samza.table.utils.SerdeUtils; import org.apache.samza.task.TaskContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; @@ -47,26 +49,18 @@ import com.google.common.base.Preconditions; * the table provider will not manage the lifecycle of the backing * stores. */ -abstract public class BaseLocalStoreBackedTableProvider implements TableProvider { - - protected final Logger logger = LoggerFactory.getLogger(getClass()); - - protected final TableSpec tableSpec; +abstract public class BaseLocalStoreBackedTableProvider extends BaseTableProvider { protected KeyValueStore kvStore; - protected SamzaContainerContext containerContext; - - protected TaskContext taskContext; - public BaseLocalStoreBackedTableProvider(TableSpec tableSpec) { - this.tableSpec = tableSpec; + super(tableSpec); } @Override public void init(SamzaContainerContext containerContext, TaskContext taskContext) { - this.containerContext = containerContext; - this.taskContext = taskContext; + + super.init(containerContext, taskContext); Preconditions.checkNotNull(this.taskContext, "Must specify task context for local tables."); @@ -90,14 +84,14 @@ abstract public class BaseLocalStoreBackedTableProvider implements TableProvider return table; } - protected Map<String, String> generateCommonStoreConfig(Map<String, String> config) { + protected Map<String, String> generateCommonStoreConfig(Config jobConfig, Map<String, String> generatedConfig) { Map<String, String> storeConfig = new HashMap<>(); // We assume the configuration for serde are already generated for this table, // so we simply carry them over to store configuration. // - JavaTableConfig tableConfig = new JavaTableConfig(new MapConfig(config)); + JavaTableConfig tableConfig = new JavaTableConfig(new MapConfig(generatedConfig)); String keySerde = tableConfig.getKeySerde(tableSpec.getId()); storeConfig.put(String.format(StorageConfig.KEY_SERDE(), tableSpec.getId()), keySerde); @@ -107,13 +101,38 @@ abstract public class BaseLocalStoreBackedTableProvider implements TableProvider List<String> sideInputs = tableSpec.getSideInputs(); if (sideInputs != null && !sideInputs.isEmpty()) { + sideInputs.forEach(si -> Preconditions.checkState(StreamGraphSpec.isValidStreamId(si), String.format( + "Side input stream %s doesn't confirm to pattern %s", si, StreamGraphSpec.STREAM_ID_PATTERN))); String formattedSideInputs = String.join(",", sideInputs); - storeConfig.put(String.format(JavaStorageConfig.SIDE_INPUTS, tableSpec.getId()), formattedSideInputs); storeConfig.put(String.format(JavaStorageConfig.SIDE_INPUTS_PROCESSOR_SERIALIZED_INSTANCE, tableSpec.getId()), SerdeUtils.serialize("Side Inputs Processor", tableSpec.getSideInputsProcessor())); } + // Changelog configuration + Boolean enableChangelog = Boolean.valueOf( + tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_ENABLE_CHANGELOG)); + if (enableChangelog) { + String changelogStream = tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_CHANGELOG_STREAM); + if (StringUtils.isEmpty(changelogStream)) { + changelogStream = String.format("%s-%s-table-%s", + jobConfig.get(JobConfig.JOB_NAME()), + jobConfig.get(JobConfig.JOB_ID(), "1"), + tableSpec.getId()); + } + + Preconditions.checkState(StreamGraphSpec.isValidStreamId(changelogStream), String.format( + "Changelog stream %s doesn't confirm to pattern %s", changelogStream, StreamGraphSpec.STREAM_ID_PATTERN)); + storeConfig.put(String.format(StorageConfig.CHANGELOG_STREAM(), tableSpec.getId()), changelogStream); + + String changelogReplicationFactor = tableSpec.getConfig().get( + BaseLocalStoreBackedTableDescriptor.INTERNAL_CHANGELOG_REPLICATION_FACTOR); + if (changelogReplicationFactor != null) { + storeConfig.put(String.format(StorageConfig.CHANGELOG_REPLICATION_FACTOR(), tableSpec.getId()), + changelogReplicationFactor); + } + } + return storeConfig; } http://git-wip-us.apache.org/repos/asf/samza/blob/493053df/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java index 1c59eb6..d0629c4 100644 --- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java +++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java @@ -44,6 +44,7 @@ public class LocalStoreBackedReadableTable<K, V> implements ReadableTable<K, V> /** * Constructs an instance of {@link LocalStoreBackedReadableTable} + * @param tableId the table Id * @param kvStore the backing store */ public LocalStoreBackedReadableTable(String tableId, KeyValueStore<K, V> kvStore) { http://git-wip-us.apache.org/repos/asf/samza/blob/493053df/samza-kv/src/test/java/org/apache/samza/storage/kv/TestBaseLocalStoreBackedTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestBaseLocalStoreBackedTableProvider.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestBaseLocalStoreBackedTableProvider.java new file mode 100644 index 0000000..2b0166c --- /dev/null +++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestBaseLocalStoreBackedTableProvider.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.kv; + +import java.util.HashMap; +import java.util.Map; +import junit.framework.Assert; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.config.JavaTableConfig; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.StorageConfig; +import org.apache.samza.container.SamzaContainerContext; +import org.apache.samza.storage.StorageEngine; +import org.apache.samza.table.TableProvider; +import org.apache.samza.table.TableSpec; +import org.apache.samza.task.TaskContext; +import org.apache.samza.util.NoOpMetricsRegistry; +import org.junit.Test; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; + + +public class TestBaseLocalStoreBackedTableProvider { + + @Test + public void testInit() { + StorageEngine store = mock(KeyValueStorageEngine.class); + SamzaContainerContext containerContext = mock(SamzaContainerContext.class); + TaskContext taskContext = mock(TaskContext.class); + when(taskContext.getStore(any())).thenReturn(store); + when(taskContext.getMetricsRegistry()).thenReturn(new NoOpMetricsRegistry()); + + TableSpec tableSpec = mock(TableSpec.class); + when(tableSpec.getId()).thenReturn("t1"); + + TableProvider tableProvider = createTableProvider(tableSpec); + tableProvider.init(containerContext, taskContext); + Assert.assertNotNull(tableProvider.getTable()); + } + + @Test(expected = SamzaException.class) + public void testInitFail() { + TableSpec tableSpec = mock(TableSpec.class); + when(tableSpec.getId()).thenReturn("t1"); + TableProvider tableProvider = createTableProvider(tableSpec); + Assert.assertNotNull(tableProvider.getTable()); + } + + @Test + public void testGenerateCommonStoreConfig() { + Map<String, String> generatedConfig = new HashMap<>(); + generatedConfig.put(String.format(JavaTableConfig.TABLE_KEY_SERDE, "t1"), "ks1"); + generatedConfig.put(String.format(JavaTableConfig.TABLE_VALUE_SERDE, "t1"), "vs1"); + + TableSpec tableSpec = mock(TableSpec.class); + when(tableSpec.getId()).thenReturn("t1"); + + TableProvider tableProvider = createTableProvider(tableSpec); + Map<String, String> tableConfig = tableProvider.generateConfig(new MapConfig(), generatedConfig); + Assert.assertEquals("ks1", tableConfig.get(String.format(StorageConfig.KEY_SERDE(), "t1"))); + Assert.assertEquals("vs1", tableConfig.get(String.format(StorageConfig.MSG_SERDE(), "t1"))); + } + + @Test + public void testChangelogDisabled() { + TableSpec tableSpec = createTableDescriptor("t1") + .getTableSpec(); + + TableProvider tableProvider = createTableProvider(tableSpec); + Map<String, String> tableConfig = tableProvider.generateConfig(new MapConfig(), new MapConfig()); + Assert.assertEquals(2, tableConfig.size()); + Assert.assertFalse(tableConfig.containsKey(String.format(StorageConfig.CHANGELOG_STREAM(), "t1"))); + } + + @Test + public void testChangelogEnabled() { + TableSpec tableSpec = createTableDescriptor("t1") + .withChangelogEnabled() + .getTableSpec(); + + Map<String, String> jobConfig = new HashMap<>(); + jobConfig.put(JobConfig.JOB_NAME(), "test-job"); + jobConfig.put(JobConfig.JOB_ID(), "10"); + + TableProvider tableProvider = createTableProvider(tableSpec); + Map<String, String> tableConfig = tableProvider.generateConfig(new MapConfig(jobConfig), new MapConfig()); + Assert.assertEquals(3, tableConfig.size()); + Assert.assertEquals("test-job-10-table-t1", String.format( + tableConfig.get(String.format(StorageConfig.CHANGELOG_STREAM(), "t1")))); + } + + @Test + public void testChangelogEnabledWithCustomParameters() { + TableSpec tableSpec = createTableDescriptor("t1") + .withChangelogStream("my-stream") + .withChangelogReplicationFactor(100) + .getTableSpec(); + + TableProvider tableProvider = createTableProvider(tableSpec); + Map<String, String> tableConfig = tableProvider.generateConfig(new MapConfig(), new MapConfig()); + Assert.assertEquals(4, tableConfig.size()); + Assert.assertEquals("my-stream", String.format( + tableConfig.get(String.format(StorageConfig.CHANGELOG_STREAM(), "t1")))); + Assert.assertEquals("100", String.format( + tableConfig.get(String.format(StorageConfig.CHANGELOG_REPLICATION_FACTOR(), "t1")))); + } + + private TableProvider createTableProvider(TableSpec tableSpec) { + return new BaseLocalStoreBackedTableProvider(tableSpec) { + @Override + public Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig) { + return generateCommonStoreConfig(jobConfig, generatedConfig); + } + }; + } + + private BaseLocalStoreBackedTableDescriptor createTableDescriptor(String tableId) { + return new BaseLocalStoreBackedTableDescriptor(tableId) { + @Override + public TableSpec getTableSpec() { + validate(); + Map<String, String> tableSpecConfig = new HashMap<>(); + generateTableSpecConfig(tableSpecConfig); + return new TableSpec(tableId, serde, null, tableSpecConfig, + sideInputs, sideInputsProcessor); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/493053df/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalBaseStoreBackedTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalBaseStoreBackedTableProvider.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalBaseStoreBackedTableProvider.java deleted file mode 100644 index 56818b5..0000000 --- a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalBaseStoreBackedTableProvider.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.storage.kv; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.samza.SamzaException; -import org.apache.samza.config.JavaTableConfig; -import org.apache.samza.config.StorageConfig; -import org.apache.samza.container.SamzaContainerContext; -import org.apache.samza.storage.StorageEngine; -import org.apache.samza.table.TableSpec; -import org.apache.samza.task.TaskContext; -import org.apache.samza.util.NoOpMetricsRegistry; -import org.junit.Before; -import org.junit.Test; - -import junit.framework.Assert; - -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - - -public class TestLocalBaseStoreBackedTableProvider { - - private BaseLocalStoreBackedTableProvider tableProvider; - - @Before - public void prepare() { - TableSpec tableSpec = mock(TableSpec.class); - when(tableSpec.getId()).thenReturn("t1"); - tableProvider = new BaseLocalStoreBackedTableProvider(tableSpec) { - @Override - public Map<String, String> generateConfig(Map<String, String> config) { - return generateCommonStoreConfig(config); - } - }; - } - - @Test - public void testInit() { - StorageEngine store = mock(KeyValueStorageEngine.class); - SamzaContainerContext containerContext = mock(SamzaContainerContext.class); - TaskContext taskContext = mock(TaskContext.class); - when(taskContext.getStore(any())).thenReturn(store); - when(taskContext.getMetricsRegistry()).thenReturn(new NoOpMetricsRegistry()); - tableProvider.init(containerContext, taskContext); - Assert.assertNotNull(tableProvider.getTable()); - } - - @Test(expected = SamzaException.class) - public void testInitFail() { - Assert.assertNotNull(tableProvider.getTable()); - } - - @Test - public void testGenerateCommonStoreConfig() { - Map<String, String> config = new HashMap<>(); - config.put(String.format(JavaTableConfig.TABLE_KEY_SERDE, "t1"), "ks1"); - config.put(String.format(JavaTableConfig.TABLE_VALUE_SERDE, "t1"), "vs1"); - - Map<String, String> tableConfig = tableProvider.generateConfig(config); - Assert.assertEquals("ks1", tableConfig.get(String.format(StorageConfig.KEY_SERDE(), "t1"))); - Assert.assertEquals("vs1", tableConfig.get(String.format(StorageConfig.MSG_SERDE(), "t1"))); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/493053df/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java index 574076e..bd61afd 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java @@ -27,7 +27,6 @@ import java.util.concurrent.CompletableFuture; import org.apache.commons.lang.NotImplementedException; import org.apache.samza.config.Config; -import org.apache.samza.container.SamzaContainerContext; import org.apache.samza.operators.BaseTableDescriptor; import org.apache.samza.operators.TableDescriptor; import org.apache.samza.serializers.JsonSerdeV2; @@ -44,7 +43,7 @@ import org.apache.samza.table.Table; import org.apache.samza.table.TableProvider; import org.apache.samza.table.TableProviderFactory; import org.apache.samza.table.TableSpec; -import org.apache.samza.task.TaskContext; +import org.apache.samza.table.utils.BaseTableProvider; public class TestIOResolverFactory implements SqlIOResolverFactory { @@ -156,9 +155,10 @@ public class TestIOResolverFactory implements SqlIOResolverFactory { } } - static class TestTableProvider implements TableProvider { - @Override - public void init(SamzaContainerContext containerContext, TaskContext taskContext) { + static class TestTableProvider extends BaseTableProvider { + + public TestTableProvider() { + super(null); } @Override @@ -167,7 +167,7 @@ public class TestIOResolverFactory implements SqlIOResolverFactory { } @Override - public Map<String, String> generateConfig(Map<String, String> config) { + public Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig) { return new HashMap<>(); } @@ -201,7 +201,8 @@ public class TestIOResolverFactory implements SqlIOResolverFactory { if (isSink) { tableDescriptor = new TestTableDescriptor(TEST_TABLE_ID + tableDescMap.size()); } else { - tableDescriptor = new RocksDbTableDescriptor("InputTable-" + ioName) + String tableId = "InputTable-" + ioName.replace(".", "-").replace("$", "-"); + tableDescriptor = new RocksDbTableDescriptor(tableId) .withSerde(KVSerde.of( new JsonSerdeV2<>(SamzaSqlCompositeKey.class), new JsonSerdeV2<>(SamzaSqlRelMessage.class))); http://git-wip-us.apache.org/repos/asf/samza/blob/493053df/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java index 38cc47c..41b6509 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java @@ -29,6 +29,7 @@ import org.apache.samza.config.ConfigException; import org.apache.samza.config.ConfigRewriter; import org.apache.samza.config.JavaStorageConfig; import org.apache.samza.config.JavaTableConfig; +import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.operators.TableDescriptor; import org.apache.samza.serializers.KVSerde; @@ -42,7 +43,6 @@ import org.apache.samza.table.TableDescriptorsProvider; import org.apache.samza.table.remote.RemoteTableDescriptor; import org.apache.samza.table.remote.RemoteTableProviderFactory; import org.apache.samza.table.remote.TableReadFunction; - import org.apache.samza.util.RateLimiter; import org.apache.samza.util.Util; import org.junit.Assert; @@ -79,6 +79,8 @@ public class TestTableDescriptorsProvider { public void testWithTableDescriptorsProviderClass() throws Exception { Map<String, String> configs = new HashMap<>(); String tableRewriterName = "tableRewriter"; + String jobName = "test-job"; + configs.put(JobConfig.JOB_NAME(), jobName); configs.put("tables.descriptors.provider.class", MySampleTableDescriptorsProvider.class.getName()); Config resultConfig = new MySampleTableConfigRewriter().rewrite(tableRewriterName, new MapConfig(configs)); Assert.assertNotNull(resultConfig); @@ -95,8 +97,8 @@ public class TestTableDescriptorsProvider { Assert.assertTrue(storageConfig.getStorageKeySerde(localTableId).startsWith("StringSerde")); Assert.assertTrue(storageConfig.getStorageMsgSerde(localTableId).startsWith("StringSerde")); Config storeConfig = resultConfig.subset("stores." + localTableId + ".", true); - Assert.assertTrue(storeConfig.size() == 4); - Assert.assertEquals(storeConfig.getInt("rocksdb.block.size.bytes"), 4096); + Assert.assertEquals(4, storeConfig.size()); + Assert.assertEquals(4096, storeConfig.getInt("rocksdb.block.size.bytes")); JavaTableConfig tableConfig = new JavaTableConfig(resultConfig); Assert.assertEquals(tableConfig.getTableProviderFactory(localTableId), @@ -155,7 +157,7 @@ public class TestTableDescriptorsProvider { TableDescriptorsProvider tableDescriptorsProvider = Util.getObj(tableDescriptorsProviderClassName, TableDescriptorsProvider.class); List<TableDescriptor> tableDescs = tableDescriptorsProvider.getTableDescriptors(config); - return new MapConfig(Arrays.asList(config, TableConfigGenerator.generateConfigsForTableDescs(tableDescs))); + return new MapConfig(Arrays.asList(config, TableConfigGenerator.generateConfigsForTableDescs(config, tableDescs))); } catch (Exception e) { throw new ConfigException(String.format("Invalid configuration for TableDescriptorsProvider class: %s", tableDescriptorsProviderClassName), e);
