Repository: samza
Updated Branches:
  refs/heads/master b842626fd -> cfbb9c6eb


http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
index 8d4098f..2999800 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -32,7 +32,7 @@ class KafkaCheckpointManagerFactory extends 
CheckpointManagerFactory with Loggin
 
   def getCheckpointManager(config: Config, registry: MetricsRegistry): 
CheckpointManager = {
     val jobName = config.getName.getOrElse(throw new SamzaException("Missing 
job name in configs"))
-    val jobId = config.getJobId.getOrElse("1")
+    val jobId = config.getJobId
 
     val kafkaConfig = new KafkaConfig(config)
     val checkpointSystemName = kafkaConfig.getCheckpointSystem.getOrElse(

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java 
b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
index 3fa66e5..6cebc28 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
@@ -31,7 +31,6 @@ import org.apache.samza.SamzaException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
-import scala.runtime.AbstractFunction0;
 
 
 /**
@@ -126,11 +125,7 @@ public class KafkaConsumerConfig extends HashMap<String, 
Object> {
     }
     String jobName = (String) jobNameOption.get();
 
-    Option jobIdOption = jobConfig.getJobId();
-    String jobId = "1";
-    if (! jobIdOption.isEmpty()) {
-      jobId = (String) jobIdOption.get();
-    }
+    String jobId = jobConfig.getJobId();
 
     return String.format("%s-%s", jobName, jobId);
   }
@@ -156,11 +151,7 @@ public class KafkaConsumerConfig extends HashMap<String, 
Object> {
     }
     String jobName = (String) jobNameOption.get();
 
-    Option jobIdOption = jobConfig.getJobId();
-    String jobId = "1";
-    if (! jobIdOption.isEmpty()) {
-      jobId = (String) jobIdOption.get();
-    }
+    String jobId = jobConfig.getJobId();
 
     return String.format("%s-%s-%s", id.replaceAll("\\W", "_"), 
jobName.replaceAll("\\W", "_"),
         jobId.replaceAll("\\W", "_"));

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala 
b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
index 601ffa2..2d09301 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
@@ -40,7 +40,7 @@ object KafkaUtil extends Logging {
   def getClientId(id: String, config: Config): String = getClientId(
     id,
     config.getName.getOrElse(throw new ConfigException("Missing job name.")),
-    config.getJobId.getOrElse("1"))
+    config.getJobId)
 
   def getClientId(id: String, jobName: String, jobId: String): String =
     "%s-%s-%s" format

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
 
b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
index 07f4f55..8231905 100644
--- 
a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
+++ 
b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
@@ -89,10 +89,15 @@ abstract public class BaseLocalStoreBackedTableProvider 
extends BaseTableProvide
 
     Map<String, String> storeConfig = new HashMap<>();
 
-    // We assume the configuration for serde are already generated for this 
table,
-    // so we simply carry them over to store configuration.
-    //
-    JavaTableConfig tableConfig = new JavaTableConfig(new 
MapConfig(generatedConfig));
+    // serde configurations for tables are generated at top level by 
JobNodeConfigurationGenerator and are included
+    // in the global jobConfig. generatedConfig has all table specific 
configuration generated from TableSpec, such
+    // as TableProviderFactory, sideInputs, etc.
+    // Merge the global jobConfig and generatedConfig to get full access to 
configuration needed to create local
+    // store configuration
+    Map<String, String> mergedConfigMap = new HashMap<>(jobConfig);
+    mergedConfigMap.putAll(generatedConfig);
+    JobConfig mergedJobConfig = new JobConfig(new MapConfig(mergedConfigMap));
+    JavaTableConfig tableConfig = new JavaTableConfig(mergedJobConfig);
 
     String keySerde = tableConfig.getKeySerde(tableSpec.getId());
     storeConfig.put(String.format(StorageConfig.KEY_SERDE(), 
tableSpec.getId()), keySerde);
@@ -116,9 +121,7 @@ abstract public class BaseLocalStoreBackedTableProvider 
extends BaseTableProvide
     if (enableChangelog) {
       String changelogStream = 
tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_CHANGELOG_STREAM);
       if (StringUtils.isEmpty(changelogStream)) {
-        changelogStream = String.format("%s-%s-table-%s",
-            jobConfig.get(JobConfig.JOB_NAME()),
-            jobConfig.get(JobConfig.JOB_ID(), "1"),
+        changelogStream = String.format("%s-%s-table-%s", 
mergedJobConfig.getName().get(), mergedJobConfig.getJobId(),
             tableSpec.getId());
       }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java 
b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index 5c4ba3b..6d6613c 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -31,9 +31,9 @@ import java.util.stream.Collectors;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.samza.SamzaException;
+import org.apache.samza.application.LegacyTaskApplication;
 import org.apache.samza.application.SamzaApplication;
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.TaskApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
@@ -57,10 +57,7 @@ import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.system.inmemory.InMemorySystemFactory;
 import org.apache.samza.task.AsyncStreamTask;
-import org.apache.samza.task.AsyncStreamTaskFactory;
 import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.StreamTaskFactory;
-import org.apache.samza.task.TaskFactory;
 import org.apache.samza.test.framework.system.InMemoryInputDescriptor;
 import org.apache.samza.test.framework.system.InMemoryOutputDescriptor;
 import org.apache.samza.test.framework.system.InMemorySystemDescriptor;
@@ -86,8 +83,7 @@ public class TestRunner {
   public static final String JOB_NAME = "samza-test";
 
   private Map<String, String> configs;
-  private Class taskClass;
-  private StreamApplication app;
+  private SamzaApplication app;
   /*
    * inMemoryScope is a unique global key per TestRunner, this key when 
configured with {@link InMemorySystemDescriptor}
    * provides an isolated state to run with in memory system
@@ -112,7 +108,7 @@ public class TestRunner {
     this();
     Preconditions.checkNotNull(taskClass);
     configs.put(TaskConfig.TASK_CLASS(), taskClass.getName());
-    this.taskClass = taskClass;
+    this.app = new LegacyTaskApplication(taskClass.getName());
   }
 
   /**
@@ -159,6 +155,17 @@ public class TestRunner {
   }
 
   /**
+   * Only adds a config from {@code config} to samza job {@code configs} if 
they dont exist in it.
+   * @param config configs for the application
+   * @return this {@link TestRunner}
+   */
+  public TestRunner addConfigs(Map<String, String> config, String 
configPrefix) {
+    Preconditions.checkNotNull(config);
+    config.forEach((key, value) -> 
this.configs.putIfAbsent(String.format("%s%s", configPrefix, key), value));
+    return this;
+  }
+
+  /**
    * Adds a config to {@code configs} if its not already present. Overrides a 
config value for which key is already
    * exisiting in {@code configs}
    * @param key key of the config
@@ -168,7 +175,7 @@ public class TestRunner {
   public TestRunner addOverrideConfig(String key, String value) {
     Preconditions.checkNotNull(key);
     Preconditions.checkNotNull(value);
-    String configKeyPrefix = String.format(JobConfig.CONFIG_JOB_PREFIX(), 
JOB_NAME);
+    String configKeyPrefix = 
String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId());
     configs.put(String.format("%s%s", configKeyPrefix, key), value);
     return this;
   }
@@ -192,6 +199,10 @@ public class TestRunner {
     return this;
   }
 
+  private String getJobNameAndId() {
+    return String.format("%s-%s", JOB_NAME, 
configs.getOrDefault(JobConfig.JOB_ID(), "1"));
+  }
+
   /**
    * Adds the provided input stream with mock data to the test application.
    * @param descriptor describes the stream that is supposed to be input to 
Samza application
@@ -243,11 +254,10 @@ public class TestRunner {
    * @throws SamzaException if Samza job fails with exception and returns 
UnsuccessfulFinish as the statuscode
    */
   public void run(Duration timeout) {
-    Preconditions.checkState((app == null && taskClass != null) || (app != 
null && taskClass == null),
+    Preconditions.checkState(app != null,
         "TestRunner should run for Low Level Task api or High Level 
Application Api");
     Preconditions.checkState(!timeout.isZero() || !timeout.isNegative(), 
"Timeouts should be positive");
-    SamzaApplication testApp = app == null ? (TaskApplication) appDesc -> 
appDesc.setTaskFactory(createTaskFactory()) : app;
-    final LocalApplicationRunner runner = new LocalApplicationRunner(testApp, 
new MapConfig(configs));
+    final LocalApplicationRunner runner = new LocalApplicationRunner(app, new 
MapConfig(configs));
     runner.run();
     boolean timedOut = !runner.waitForFinish(timeout);
     Assert.assertFalse("Timed out waiting for application to finish", 
timedOut);
@@ -326,28 +336,6 @@ public class TestRunner {
             entry -> entry.getValue().stream().map(e -> (StreamMessageType) 
e.getMessage()).collect(Collectors.toList())));
   }
 
-  private TaskFactory createTaskFactory() {
-    if (StreamTask.class.isAssignableFrom(taskClass)) {
-      return (StreamTaskFactory) () -> {
-        try {
-          return (StreamTask) taskClass.newInstance();
-        } catch (InstantiationException | IllegalAccessException e) {
-          throw new SamzaException(String.format("Failed to instantiate 
StreamTask class %s", taskClass.getName()), e);
-        }
-      };
-    } else if (AsyncStreamTask.class.isAssignableFrom(taskClass)) {
-      return (AsyncStreamTaskFactory) () -> {
-        try {
-          return (AsyncStreamTask) taskClass.newInstance();
-        } catch (InstantiationException | IllegalAccessException e) {
-          throw new SamzaException(String.format("Failed to instantiate 
AsyncStreamTask class %s", taskClass.getName()), e);
-        }
-      };
-    }
-    throw new SamzaException(String.format("Not supported task.class %s. 
task.class has to implement either StreamTask "
-        + "or AsyncStreamTask", taskClass.getName()));
-  }
-
   /**
    * Creates an in memory stream with {@link InMemorySystemFactory} and feeds 
its partition with stream of messages
    * @param partitonData key of the map represents partitionId and value 
represents
@@ -367,7 +355,7 @@ public class TestRunner {
     InMemorySystemDescriptor imsd = (InMemorySystemDescriptor) 
descriptor.getSystemDescriptor();
     imsd.withInMemoryScope(this.inMemoryScope);
     addConfigs(descriptor.toConfig());
-    addConfigs(descriptor.getSystemDescriptor().toConfig());
+    addConfigs(descriptor.getSystemDescriptor().toConfig(), 
String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId()));
     StreamSpec spec = new StreamSpec(descriptor.getStreamId(), streamName, 
systemName, partitonData.size());
     SystemFactory factory = new InMemorySystemFactory();
     Config config = new MapConfig(descriptor.toConfig(), 
descriptor.getSystemDescriptor().toConfig());
@@ -381,7 +369,7 @@ public class TestRunner {
             producer.send(systemName, new OutgoingMessageEnvelope(sysStream, 
Integer.valueOf(partitionId), key, value));
           });
         producer.send(systemName, new OutgoingMessageEnvelope(sysStream, 
Integer.valueOf(partitionId), null,
-          new EndOfStreamMessage(null)));
+            new EndOfStreamMessage(null)));
       });
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java
 
b/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java
index 92b23ef..e6e423f 100644
--- 
a/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java
+++ 
b/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java
@@ -29,7 +29,6 @@ import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.inmemory.InMemorySystemFactory;
 import org.apache.samza.config.JavaSystemConfig;
-import org.apache.samza.test.framework.TestRunner;
 
 
 /**
@@ -60,9 +59,6 @@ public class InMemorySystemDescriptor extends 
SystemDescriptor<InMemorySystemDes
    * </ol>
    *
    **/
-  private static final String CONFIG_OVERRIDE_PREFIX = "jobs.%s.";
-  private static final String DEFAULT_STREAM_OFFSET_DEFAULT_CONFIG_KEY = 
"systems.%s.default.stream.samza.offset.default";
-
   private String inMemoryScope;
 
   /**
@@ -106,11 +102,7 @@ public class InMemorySystemDescriptor extends 
SystemDescriptor<InMemorySystemDes
   public Map<String, String> toConfig() {
     HashMap<String, String> configs = new HashMap<>(super.toConfig());
     configs.put(InMemorySystemConfig.INMEMORY_SCOPE, this.inMemoryScope);
-    configs.put(String.format(CONFIG_OVERRIDE_PREFIX + 
JavaSystemConfig.SYSTEM_FACTORY_FORMAT, TestRunner.JOB_NAME, getSystemName()),
-        FACTORY_CLASS_NAME);
-    configs.put(
-        String.format(CONFIG_OVERRIDE_PREFIX + 
DEFAULT_STREAM_OFFSET_DEFAULT_CONFIG_KEY, TestRunner.JOB_NAME,
-            getSystemName()), 
SystemStreamMetadata.OffsetType.OLDEST.toString());
+    configs.put(String.format(JavaSystemConfig.SYSTEM_FACTORY_FORMAT, 
getSystemName()), FACTORY_CLASS_NAME);
     return configs;
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
 
b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
index d123cee..6186ca7 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
@@ -96,8 +96,6 @@ public class TestTableDescriptorsProvider {
     Assert.assertEquals(storageConfig.getStoreNames().get(0), localTableId);
     Assert.assertEquals(storageConfig.getStorageFactoryClassName(localTableId),
         RocksDbKeyValueStorageEngineFactory.class.getName());
-    
Assert.assertTrue(storageConfig.getStorageKeySerde(localTableId).startsWith("StringSerde"));
-    
Assert.assertTrue(storageConfig.getStorageMsgSerde(localTableId).startsWith("StringSerde"));
     Config storeConfig = resultConfig.subset("stores." + localTableId + ".", 
true);
     Assert.assertEquals(4, storeConfig.size());
     Assert.assertEquals(4096, storeConfig.getInt("rocksdb.block.size.bytes"));
@@ -107,10 +105,6 @@ public class TestTableDescriptorsProvider {
         RocksDbTableProviderFactory.class.getName());
     Assert.assertEquals(tableConfig.getTableProviderFactory(remoteTableId),
         RemoteTableProviderFactory.class.getName());
-    
Assert.assertTrue(tableConfig.getKeySerde(localTableId).startsWith("StringSerde"));
-    
Assert.assertTrue(tableConfig.getValueSerde(localTableId).startsWith("StringSerde"));
-    
Assert.assertTrue(tableConfig.getKeySerde(remoteTableId).startsWith("StringSerde"));
-    
Assert.assertTrue(tableConfig.getValueSerde(remoteTableId).startsWith("LongSerde"));
     Assert.assertEquals(tableConfig.getTableProviderFactory(localTableId), 
RocksDbTableProviderFactory.class.getName());
     Assert.assertEquals(tableConfig.getTableProviderFactory(remoteTableId), 
RemoteTableProviderFactory.class.getName());
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
 
b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
index b30b896..4adb93a 100644
--- 
a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
+++ 
b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
@@ -76,7 +76,7 @@ public class YarnJobValidationTool {
     this.config = config;
     this.client = client;
     String name = this.config.getName().get();
-    String jobId = this.config.getJobId().nonEmpty()? 
this.config.getJobId().get() : "1";
+    String jobId = this.config.getJobId();
     this.jobName =  name + "_" + jobId;
     this.validator = validator;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
index d335448..1d72a88 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
@@ -67,7 +67,7 @@ class YarnJob(config: Config, hadoopConfig: Configuration) 
extends StreamJob {
           }
           envMapWithJavaHome
         }),
-        Some("%s_%s" format(config.getName.get, config.getJobId.getOrElse(1)))
+        Some("%s_%s" format(config.getName.get, config.getJobId))
       )
     } catch {
       case e: Throwable =>
@@ -169,7 +169,7 @@ class YarnJob(config: Config, hadoopConfig: Configuration) 
extends StreamJob {
         // Get by name
         config.getName match {
           case Some(jobName) =>
-            val applicationName = "%s_%s" format(jobName, 
config.getJobId.getOrElse(1))
+            val applicationName = "%s_%s" format(jobName, config.getJobId)
             logger.info("Fetching status from YARN for application name %s" 
format applicationName)
             val applicationIds = 
client.getActiveApplicationIds(applicationName)
 

Reply via email to