Repository: samza Updated Branches: refs/heads/master b842626fd -> cfbb9c6eb
http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala index 8d4098f..2999800 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala @@ -32,7 +32,7 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin def getCheckpointManager(config: Config, registry: MetricsRegistry): CheckpointManager = { val jobName = config.getName.getOrElse(throw new SamzaException("Missing job name in configs")) - val jobId = config.getJobId.getOrElse("1") + val jobId = config.getJobId val kafkaConfig = new KafkaConfig(config) val checkpointSystemName = kafkaConfig.getCheckpointSystem.getOrElse( http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java index 3fa66e5..6cebc28 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java @@ -31,7 +31,6 @@ import org.apache.samza.SamzaException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Option; -import scala.runtime.AbstractFunction0; /** @@ -126,11 +125,7 @@ public class KafkaConsumerConfig extends HashMap<String, Object> { } String jobName = (String) jobNameOption.get(); - Option jobIdOption = jobConfig.getJobId(); - String jobId = "1"; - if (! jobIdOption.isEmpty()) { - jobId = (String) jobIdOption.get(); - } + String jobId = jobConfig.getJobId(); return String.format("%s-%s", jobName, jobId); } @@ -156,11 +151,7 @@ public class KafkaConsumerConfig extends HashMap<String, Object> { } String jobName = (String) jobNameOption.get(); - Option jobIdOption = jobConfig.getJobId(); - String jobId = "1"; - if (! jobIdOption.isEmpty()) { - jobId = (String) jobIdOption.get(); - } + String jobId = jobConfig.getJobId(); return String.format("%s-%s-%s", id.replaceAll("\\W", "_"), jobName.replaceAll("\\W", "_"), jobId.replaceAll("\\W", "_")); http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala index 601ffa2..2d09301 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala @@ -40,7 +40,7 @@ object KafkaUtil extends Logging { def getClientId(id: String, config: Config): String = getClientId( id, config.getName.getOrElse(throw new ConfigException("Missing job name.")), - config.getJobId.getOrElse("1")) + config.getJobId) def getClientId(id: String, jobName: String, jobId: String): String = "%s-%s-%s" format http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java index 07f4f55..8231905 100644 --- a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java +++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java @@ -89,10 +89,15 @@ abstract public class BaseLocalStoreBackedTableProvider extends BaseTableProvide Map<String, String> storeConfig = new HashMap<>(); - // We assume the configuration for serde are already generated for this table, - // so we simply carry them over to store configuration. - // - JavaTableConfig tableConfig = new JavaTableConfig(new MapConfig(generatedConfig)); + // serde configurations for tables are generated at top level by JobNodeConfigurationGenerator and are included + // in the global jobConfig. generatedConfig has all table specific configuration generated from TableSpec, such + // as TableProviderFactory, sideInputs, etc. + // Merge the global jobConfig and generatedConfig to get full access to configuration needed to create local + // store configuration + Map<String, String> mergedConfigMap = new HashMap<>(jobConfig); + mergedConfigMap.putAll(generatedConfig); + JobConfig mergedJobConfig = new JobConfig(new MapConfig(mergedConfigMap)); + JavaTableConfig tableConfig = new JavaTableConfig(mergedJobConfig); String keySerde = tableConfig.getKeySerde(tableSpec.getId()); storeConfig.put(String.format(StorageConfig.KEY_SERDE(), tableSpec.getId()), keySerde); @@ -116,9 +121,7 @@ abstract public class BaseLocalStoreBackedTableProvider extends BaseTableProvide if (enableChangelog) { String changelogStream = tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_CHANGELOG_STREAM); if (StringUtils.isEmpty(changelogStream)) { - changelogStream = String.format("%s-%s-table-%s", - jobConfig.get(JobConfig.JOB_NAME()), - jobConfig.get(JobConfig.JOB_ID(), "1"), + changelogStream = String.format("%s-%s-table-%s", mergedJobConfig.getName().get(), mergedJobConfig.getJobId(), tableSpec.getId()); } http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java index 5c4ba3b..6d6613c 100644 --- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java +++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java @@ -31,9 +31,9 @@ import java.util.stream.Collectors; import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.samza.SamzaException; +import org.apache.samza.application.LegacyTaskApplication; import org.apache.samza.application.SamzaApplication; import org.apache.samza.application.StreamApplication; -import org.apache.samza.application.TaskApplication; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.JobCoordinatorConfig; @@ -57,10 +57,7 @@ import org.apache.samza.system.SystemStreamMetadata; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.system.inmemory.InMemorySystemFactory; import org.apache.samza.task.AsyncStreamTask; -import org.apache.samza.task.AsyncStreamTaskFactory; import org.apache.samza.task.StreamTask; -import org.apache.samza.task.StreamTaskFactory; -import org.apache.samza.task.TaskFactory; import org.apache.samza.test.framework.system.InMemoryInputDescriptor; import org.apache.samza.test.framework.system.InMemoryOutputDescriptor; import org.apache.samza.test.framework.system.InMemorySystemDescriptor; @@ -86,8 +83,7 @@ public class TestRunner { public static final String JOB_NAME = "samza-test"; private Map<String, String> configs; - private Class taskClass; - private StreamApplication app; + private SamzaApplication app; /* * inMemoryScope is a unique global key per TestRunner, this key when configured with {@link InMemorySystemDescriptor} * provides an isolated state to run with in memory system @@ -112,7 +108,7 @@ public class TestRunner { this(); Preconditions.checkNotNull(taskClass); configs.put(TaskConfig.TASK_CLASS(), taskClass.getName()); - this.taskClass = taskClass; + this.app = new LegacyTaskApplication(taskClass.getName()); } /** @@ -159,6 +155,17 @@ public class TestRunner { } /** + * Only adds a config from {@code config} to samza job {@code configs} if they dont exist in it. + * @param config configs for the application + * @return this {@link TestRunner} + */ + public TestRunner addConfigs(Map<String, String> config, String configPrefix) { + Preconditions.checkNotNull(config); + config.forEach((key, value) -> this.configs.putIfAbsent(String.format("%s%s", configPrefix, key), value)); + return this; + } + + /** * Adds a config to {@code configs} if its not already present. Overrides a config value for which key is already * exisiting in {@code configs} * @param key key of the config @@ -168,7 +175,7 @@ public class TestRunner { public TestRunner addOverrideConfig(String key, String value) { Preconditions.checkNotNull(key); Preconditions.checkNotNull(value); - String configKeyPrefix = String.format(JobConfig.CONFIG_JOB_PREFIX(), JOB_NAME); + String configKeyPrefix = String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId()); configs.put(String.format("%s%s", configKeyPrefix, key), value); return this; } @@ -192,6 +199,10 @@ public class TestRunner { return this; } + private String getJobNameAndId() { + return String.format("%s-%s", JOB_NAME, configs.getOrDefault(JobConfig.JOB_ID(), "1")); + } + /** * Adds the provided input stream with mock data to the test application. * @param descriptor describes the stream that is supposed to be input to Samza application @@ -243,11 +254,10 @@ public class TestRunner { * @throws SamzaException if Samza job fails with exception and returns UnsuccessfulFinish as the statuscode */ public void run(Duration timeout) { - Preconditions.checkState((app == null && taskClass != null) || (app != null && taskClass == null), + Preconditions.checkState(app != null, "TestRunner should run for Low Level Task api or High Level Application Api"); Preconditions.checkState(!timeout.isZero() || !timeout.isNegative(), "Timeouts should be positive"); - SamzaApplication testApp = app == null ? (TaskApplication) appDesc -> appDesc.setTaskFactory(createTaskFactory()) : app; - final LocalApplicationRunner runner = new LocalApplicationRunner(testApp, new MapConfig(configs)); + final LocalApplicationRunner runner = new LocalApplicationRunner(app, new MapConfig(configs)); runner.run(); boolean timedOut = !runner.waitForFinish(timeout); Assert.assertFalse("Timed out waiting for application to finish", timedOut); @@ -326,28 +336,6 @@ public class TestRunner { entry -> entry.getValue().stream().map(e -> (StreamMessageType) e.getMessage()).collect(Collectors.toList()))); } - private TaskFactory createTaskFactory() { - if (StreamTask.class.isAssignableFrom(taskClass)) { - return (StreamTaskFactory) () -> { - try { - return (StreamTask) taskClass.newInstance(); - } catch (InstantiationException | IllegalAccessException e) { - throw new SamzaException(String.format("Failed to instantiate StreamTask class %s", taskClass.getName()), e); - } - }; - } else if (AsyncStreamTask.class.isAssignableFrom(taskClass)) { - return (AsyncStreamTaskFactory) () -> { - try { - return (AsyncStreamTask) taskClass.newInstance(); - } catch (InstantiationException | IllegalAccessException e) { - throw new SamzaException(String.format("Failed to instantiate AsyncStreamTask class %s", taskClass.getName()), e); - } - }; - } - throw new SamzaException(String.format("Not supported task.class %s. task.class has to implement either StreamTask " - + "or AsyncStreamTask", taskClass.getName())); - } - /** * Creates an in memory stream with {@link InMemorySystemFactory} and feeds its partition with stream of messages * @param partitonData key of the map represents partitionId and value represents @@ -367,7 +355,7 @@ public class TestRunner { InMemorySystemDescriptor imsd = (InMemorySystemDescriptor) descriptor.getSystemDescriptor(); imsd.withInMemoryScope(this.inMemoryScope); addConfigs(descriptor.toConfig()); - addConfigs(descriptor.getSystemDescriptor().toConfig()); + addConfigs(descriptor.getSystemDescriptor().toConfig(), String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId())); StreamSpec spec = new StreamSpec(descriptor.getStreamId(), streamName, systemName, partitonData.size()); SystemFactory factory = new InMemorySystemFactory(); Config config = new MapConfig(descriptor.toConfig(), descriptor.getSystemDescriptor().toConfig()); @@ -381,7 +369,7 @@ public class TestRunner { producer.send(systemName, new OutgoingMessageEnvelope(sysStream, Integer.valueOf(partitionId), key, value)); }); producer.send(systemName, new OutgoingMessageEnvelope(sysStream, Integer.valueOf(partitionId), null, - new EndOfStreamMessage(null))); + new EndOfStreamMessage(null))); }); } } http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java index 92b23ef..e6e423f 100644 --- a/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java +++ b/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java @@ -29,7 +29,6 @@ import org.apache.samza.serializers.Serde; import org.apache.samza.system.SystemStreamMetadata; import org.apache.samza.system.inmemory.InMemorySystemFactory; import org.apache.samza.config.JavaSystemConfig; -import org.apache.samza.test.framework.TestRunner; /** @@ -60,9 +59,6 @@ public class InMemorySystemDescriptor extends SystemDescriptor<InMemorySystemDes * </ol> * **/ - private static final String CONFIG_OVERRIDE_PREFIX = "jobs.%s."; - private static final String DEFAULT_STREAM_OFFSET_DEFAULT_CONFIG_KEY = "systems.%s.default.stream.samza.offset.default"; - private String inMemoryScope; /** @@ -106,11 +102,7 @@ public class InMemorySystemDescriptor extends SystemDescriptor<InMemorySystemDes public Map<String, String> toConfig() { HashMap<String, String> configs = new HashMap<>(super.toConfig()); configs.put(InMemorySystemConfig.INMEMORY_SCOPE, this.inMemoryScope); - configs.put(String.format(CONFIG_OVERRIDE_PREFIX + JavaSystemConfig.SYSTEM_FACTORY_FORMAT, TestRunner.JOB_NAME, getSystemName()), - FACTORY_CLASS_NAME); - configs.put( - String.format(CONFIG_OVERRIDE_PREFIX + DEFAULT_STREAM_OFFSET_DEFAULT_CONFIG_KEY, TestRunner.JOB_NAME, - getSystemName()), SystemStreamMetadata.OffsetType.OLDEST.toString()); + configs.put(String.format(JavaSystemConfig.SYSTEM_FACTORY_FORMAT, getSystemName()), FACTORY_CLASS_NAME); return configs; } http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java index d123cee..6186ca7 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java @@ -96,8 +96,6 @@ public class TestTableDescriptorsProvider { Assert.assertEquals(storageConfig.getStoreNames().get(0), localTableId); Assert.assertEquals(storageConfig.getStorageFactoryClassName(localTableId), RocksDbKeyValueStorageEngineFactory.class.getName()); - Assert.assertTrue(storageConfig.getStorageKeySerde(localTableId).startsWith("StringSerde")); - Assert.assertTrue(storageConfig.getStorageMsgSerde(localTableId).startsWith("StringSerde")); Config storeConfig = resultConfig.subset("stores." + localTableId + ".", true); Assert.assertEquals(4, storeConfig.size()); Assert.assertEquals(4096, storeConfig.getInt("rocksdb.block.size.bytes")); @@ -107,10 +105,6 @@ public class TestTableDescriptorsProvider { RocksDbTableProviderFactory.class.getName()); Assert.assertEquals(tableConfig.getTableProviderFactory(remoteTableId), RemoteTableProviderFactory.class.getName()); - Assert.assertTrue(tableConfig.getKeySerde(localTableId).startsWith("StringSerde")); - Assert.assertTrue(tableConfig.getValueSerde(localTableId).startsWith("StringSerde")); - Assert.assertTrue(tableConfig.getKeySerde(remoteTableId).startsWith("StringSerde")); - Assert.assertTrue(tableConfig.getValueSerde(remoteTableId).startsWith("LongSerde")); Assert.assertEquals(tableConfig.getTableProviderFactory(localTableId), RocksDbTableProviderFactory.class.getName()); Assert.assertEquals(tableConfig.getTableProviderFactory(remoteTableId), RemoteTableProviderFactory.class.getName()); } http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java index b30b896..4adb93a 100644 --- a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java +++ b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java @@ -76,7 +76,7 @@ public class YarnJobValidationTool { this.config = config; this.client = client; String name = this.config.getName().get(); - String jobId = this.config.getJobId().nonEmpty()? this.config.getJobId().get() : "1"; + String jobId = this.config.getJobId(); this.jobName = name + "_" + jobId; this.validator = validator; } http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala index d335448..1d72a88 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala @@ -67,7 +67,7 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob { } envMapWithJavaHome }), - Some("%s_%s" format(config.getName.get, config.getJobId.getOrElse(1))) + Some("%s_%s" format(config.getName.get, config.getJobId)) ) } catch { case e: Throwable => @@ -169,7 +169,7 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob { // Get by name config.getName match { case Some(jobName) => - val applicationName = "%s_%s" format(jobName, config.getJobId.getOrElse(1)) + val applicationName = "%s_%s" format(jobName, config.getJobId) logger.info("Fetching status from YARN for application name %s" format applicationName) val applicationIds = client.getActiveApplicationIds(applicationName)