http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java 
b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
index 47705ee..af556f5 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@ -19,45 +19,26 @@
 
 package org.apache.samza.execution;
 
-import java.util.ArrayList;
-import java.util.Base64;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
-import java.util.UUID;
+import java.util.Objects;
+import java.util.Set;
 import java.util.stream.Collectors;
-
-import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.application.ApplicationDescriptor;
+import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.LegacyTaskApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.SerializerConfig;
-import org.apache.samza.config.StorageConfig;
-import org.apache.samza.config.StreamConfig;
-import org.apache.samza.config.TaskConfig;
-import org.apache.samza.config.TaskConfigJava;
-import org.apache.samza.operators.OperatorSpecGraph;
+import org.apache.samza.operators.KV;
 import org.apache.samza.operators.spec.InputOperatorSpec;
-import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.OutputStreamImpl;
-import org.apache.samza.operators.spec.StatefulOperatorSpec;
-import org.apache.samza.operators.spec.WindowOperatorSpec;
-import org.apache.samza.table.TableConfigGenerator;
-import org.apache.samza.util.MathUtil;
 import org.apache.samza.serializers.Serde;
-import org.apache.samza.serializers.SerializableSerde;
 import org.apache.samza.table.TableSpec;
-import org.apache.samza.util.StreamUtil;
-import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Joiner;
-
-
 /**
  * A JobNode is a physical execution unit. In RemoteExecutionEnvironment, it's 
a job that will be submitted
  * to remote cluster. In LocalExecutionEnvironment, it's a set of 
StreamProcessors for local execution.
@@ -65,64 +46,71 @@ import com.google.common.base.Joiner;
  */
 public class JobNode {
   private static final Logger log = LoggerFactory.getLogger(JobNode.class);
-  private static final String CONFIG_INTERNAL_EXECUTION_PLAN = 
"samza.internal.execution.plan";
 
   private final String jobName;
   private final String jobId;
-  private final String id;
-  private final OperatorSpecGraph specGraph;
-  private final List<StreamEdge> inEdges = new ArrayList<>();
-  private final List<StreamEdge> outEdges = new ArrayList<>();
-  private final List<TableSpec> tables = new ArrayList<>();
+  private final String jobNameAndId;
   private final Config config;
-
-  JobNode(String jobName, String jobId, OperatorSpecGraph specGraph, Config 
config) {
+  private final JobNodeConfigurationGenerator configGenerator;
+  // The following maps (i.e. inEdges and outEdges) uses the streamId as the 
key
+  private final Map<String, StreamEdge> inEdges = new HashMap<>();
+  private final Map<String, StreamEdge> outEdges = new HashMap<>();
+  // Similarly, tables uses tableId as the key
+  private final Map<String, TableSpec> tables = new HashMap<>();
+  private final ApplicationDescriptorImpl<? extends ApplicationDescriptor> 
appDesc;
+
+  JobNode(String jobName, String jobId, Config config, 
ApplicationDescriptorImpl appDesc,
+      JobNodeConfigurationGenerator configureGenerator) {
     this.jobName = jobName;
     this.jobId = jobId;
-    this.id = createId(jobName, jobId);
-    this.specGraph = specGraph;
+    this.jobNameAndId = createJobNameAndId(jobName, jobId);
     this.config = config;
+    this.appDesc = appDesc;
+    this.configGenerator = configureGenerator;
   }
 
-  public static Config mergeJobConfig(Config fullConfig, Config 
generatedConfig) {
-    return new JobConfig(Util.rewriteConfig(extractScopedConfig(
-        fullConfig, generatedConfig, 
String.format(JobConfig.CONFIG_JOB_PREFIX(), new 
JobConfig(fullConfig).getName().get()))));
-  }
-
-  public OperatorSpecGraph getSpecGraph() {
-    return this.specGraph;
+  static String createJobNameAndId(String jobName, String jobId) {
+    return String.format("%s-%s", jobName, jobId);
   }
 
-  public  String getId() {
-    return id;
+  String getJobNameAndId() {
+    return jobNameAndId;
   }
 
-  public String getJobName() {
+  String getJobName() {
     return jobName;
   }
 
-  public String getJobId() {
+  String getJobId() {
     return jobId;
   }
 
+  Config getConfig() {
+    return config;
+  }
+
   void addInEdge(StreamEdge in) {
-    inEdges.add(in);
+    inEdges.put(in.getStreamSpec().getId(), in);
   }
 
   void addOutEdge(StreamEdge out) {
-    outEdges.add(out);
+    outEdges.put(out.getStreamSpec().getId(), out);
   }
 
-  List<StreamEdge> getInEdges() {
+  void addTable(TableSpec tableSpec) {
+    tables.put(tableSpec.getId(), tableSpec);
+  }
+
+  Map<String, StreamEdge> getInEdges() {
     return inEdges;
   }
 
-  List<StreamEdge> getOutEdges() {
+  Map<String, StreamEdge> getOutEdges() {
     return outEdges;
   }
 
-  void addTable(TableSpec tableSpec) {
-    tables.add(tableSpec);
+  Map<String, TableSpec> getTables() {
+    return tables;
   }
 
   /**
@@ -130,250 +118,65 @@ public class JobNode {
    * @param executionPlanJson JSON representation of the execution plan
    * @return config of the job
    */
-  public JobConfig generateConfig(String executionPlanJson) {
-    Map<String, String> configs = new HashMap<>();
-    configs.put(JobConfig.JOB_NAME(), jobName);
-    configs.put(JobConfig.JOB_ID(), jobId);
+  JobConfig generateConfig(String executionPlanJson) {
+    return configGenerator.generateJobConfig(this, executionPlanJson);
+  }
 
-    final List<String> inputs = new ArrayList<>();
-    final List<String> broadcasts = new ArrayList<>();
-    for (StreamEdge inEdge : inEdges) {
-      String formattedSystemStream = inEdge.getName();
-      if (inEdge.isBroadcast()) {
-        broadcasts.add(formattedSystemStream + "#0");
-      } else {
-        inputs.add(formattedSystemStream);
-      }
+  KV<Serde, Serde> getInputSerdes(String streamId) {
+    if (!inEdges.containsKey(streamId)) {
+      return null;
     }
+    return appDesc.getStreamSerdes(streamId);
+  }
 
-    if (!broadcasts.isEmpty()) {
-      // TODO: remove this once we support defining broadcast input stream in 
high-level
-      // task.broadcast.input should be generated by the planner in the future.
-      final String taskBroadcasts = 
config.get(TaskConfigJava.BROADCAST_INPUT_STREAMS);
-      if (StringUtils.isNoneEmpty(taskBroadcasts)) {
-        broadcasts.add(taskBroadcasts);
-      }
-      configs.put(TaskConfigJava.BROADCAST_INPUT_STREAMS, 
Joiner.on(',').join(broadcasts));
+  KV<Serde, Serde> getOutputSerde(String streamId) {
+    if (!outEdges.containsKey(streamId)) {
+      return null;
     }
+    return appDesc.getStreamSerdes(streamId);
+  }
 
-    // set triggering interval if a window or join is defined
-    if (specGraph.hasWindowOrJoins()) {
-      if ("-1".equals(config.get(TaskConfig.WINDOW_MS(), "-1"))) {
-        long triggerInterval = computeTriggerInterval();
-        log.info("Using triggering interval: {} for jobName: {}", 
triggerInterval, jobName);
+  Collection<OperatorSpec> getReachableOperators() {
+    Set<OperatorSpec> inputOperatorsInJobNode = 
inEdges.values().stream().map(inEdge ->
+        
appDesc.getInputOperators().get(inEdge.getStreamSpec().getId())).filter(Objects::nonNull).collect(Collectors.toSet());
+    Set<OperatorSpec> reachableOperators = new HashSet<>();
+    findReachableOperators(inputOperatorsInJobNode, reachableOperators);
+    return reachableOperators;
+  }
 
-        configs.put(TaskConfig.WINDOW_MS(), String.valueOf(triggerInterval));
-      }
+  // get all next operators consuming from the input {@code streamId}
+  Set<String> getNextOperatorIds(String streamId) {
+    if (!appDesc.getInputOperators().containsKey(streamId) || 
!inEdges.containsKey(streamId)) {
+      return new HashSet<>();
     }
+    return 
appDesc.getInputOperators().get(streamId).getRegisteredOperatorSpecs().stream()
+        .map(op -> op.getOpId()).collect(Collectors.toSet());
+  }
 
-    specGraph.getAllOperatorSpecs().forEach(opSpec -> {
-        if (opSpec instanceof StatefulOperatorSpec) {
-          ((StatefulOperatorSpec) opSpec).getStoreDescriptors()
-              .forEach(sd -> configs.putAll(sd.getStorageConfigs()));
-          // store key and message serdes are configured separately in 
#addSerdeConfigs
-        }
-      });
-
-    configs.put(CONFIG_INTERNAL_EXECUTION_PLAN, executionPlanJson);
-
-    // write input/output streams to configs
-    inEdges.stream().filter(StreamEdge::isIntermediate).forEach(edge -> 
configs.putAll(edge.generateConfig()));
-
-    // write serialized serde instances and stream serde configs to configs
-    addSerdeConfigs(configs);
-
-    configs.putAll(TableConfigGenerator.generateConfigsForTableSpecs(new 
MapConfig(configs), tables));
-
-    // Add side inputs to the inputs and mark the stream as bootstrap
-    tables.forEach(tableSpec -> {
-        List<String> sideInputs = tableSpec.getSideInputs();
-        if (sideInputs != null && !sideInputs.isEmpty()) {
-          sideInputs.stream()
-              .map(sideInput -> StreamUtil.getSystemStreamFromNameOrId(config, 
sideInput))
-              .forEach(systemStream -> {
-                  inputs.add(StreamUtil.getNameFromSystemStream(systemStream));
-                  configs.put(String.format(StreamConfig.STREAM_PREFIX() + 
StreamConfig.BOOTSTRAP(),
-                      systemStream.getSystem(), systemStream.getStream()), 
"true");
-                });
-        }
-      });
-
-    configs.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputs));
-
-    log.info("Job {} has generated configs {}", jobName, configs);
-
-    String configPrefix = String.format(JobConfig.CONFIG_JOB_PREFIX(), 
jobName);
-
-    // Disallow user specified job inputs/outputs. This info comes strictly 
from the user application.
-    Map<String, String> allowedConfigs = new HashMap<>(config);
-    if (allowedConfigs.containsKey(TaskConfig.INPUT_STREAMS())) {
-      log.warn("Specifying task inputs in configuration is not allowed with 
Fluent API. "
-          + "Ignoring configured value for " + TaskConfig.INPUT_STREAMS());
-      allowedConfigs.remove(TaskConfig.INPUT_STREAMS());
+  InputOperatorSpec getInputOperator(String inputStreamId) {
+    if (!inEdges.containsKey(inputStreamId)) {
+      return null;
     }
-
-    log.debug("Job {} has allowed configs {}", jobName, allowedConfigs);
-    return new JobConfig(
-        Util.rewriteConfig(
-            extractScopedConfig(new MapConfig(allowedConfigs), new 
MapConfig(configs), configPrefix)));
+    return appDesc.getInputOperators().get(inputStreamId);
   }
 
-  /**
-   * Serializes the {@link Serde} instances for operators, adds them to the 
provided config, and
-   * sets the serde configuration for the input/output/intermediate streams 
appropriately.
-   *
-   * We try to preserve the number of Serde instances before and after 
serialization. However we don't
-   * guarantee that references shared between these serdes instances (e.g. an 
Jackson ObjectMapper shared
-   * between two json serdes) are shared after deserialization too.
-   *
-   * Ideally all the user defined objects in the application should be 
serialized and de-serialized in one pass
-   * from the same output/input stream so that we can maintain reference 
sharing relationships.
-   *
-   * @param configs the configs to add serialized serde instances and stream 
serde configs to
-   */
-  void addSerdeConfigs(Map<String, String> configs) {
-    // collect all key and msg serde instances for streams
-    Map<String, Serde> streamKeySerdes = new HashMap<>();
-    Map<String, Serde> streamMsgSerdes = new HashMap<>();
-    Map<String, InputOperatorSpec> inputOperators = 
specGraph.getInputOperators();
-    inEdges.forEach(edge -> {
-        String streamId = edge.getStreamSpec().getId();
-        InputOperatorSpec inputOperatorSpec = inputOperators.get(streamId);
-        Serde keySerde = inputOperatorSpec.getKeySerde();
-        if (keySerde != null) {
-          streamKeySerdes.put(streamId, keySerde);
-        }
-        Serde valueSerde = inputOperatorSpec.getValueSerde();
-        if (valueSerde != null) {
-          streamMsgSerdes.put(streamId, valueSerde);
-        }
-      });
-    Map<String, OutputStreamImpl> outputStreams = specGraph.getOutputStreams();
-    outEdges.forEach(edge -> {
-        String streamId = edge.getStreamSpec().getId();
-        OutputStreamImpl outputStream = outputStreams.get(streamId);
-        Serde keySerde = outputStream.getKeySerde();
-        if (keySerde != null) {
-          streamKeySerdes.put(streamId, keySerde);
-        }
-        Serde valueSerde = outputStream.getValueSerde();
-        if (valueSerde != null) {
-          streamMsgSerdes.put(streamId, valueSerde);
-        }
-      });
-
-    // collect all key and msg serde instances for stores
-    Map<String, Serde> storeKeySerdes = new HashMap<>();
-    Map<String, Serde> storeMsgSerdes = new HashMap<>();
-    specGraph.getAllOperatorSpecs().forEach(opSpec -> {
-        if (opSpec instanceof StatefulOperatorSpec) {
-          ((StatefulOperatorSpec) 
opSpec).getStoreDescriptors().forEach(storeDescriptor -> {
-              storeKeySerdes.put(storeDescriptor.getStoreName(), 
storeDescriptor.getKeySerde());
-              storeMsgSerdes.put(storeDescriptor.getStoreName(), 
storeDescriptor.getMsgSerde());
-            });
-        }
-      });
-
-    // for each unique stream or store serde instance, generate a unique name 
and serialize to config
-    HashSet<Serde> serdes = new HashSet<>(streamKeySerdes.values());
-    serdes.addAll(streamMsgSerdes.values());
-    serdes.addAll(storeKeySerdes.values());
-    serdes.addAll(storeMsgSerdes.values());
-    SerializableSerde<Serde> serializableSerde = new SerializableSerde<>();
-    Base64.Encoder base64Encoder = Base64.getEncoder();
-    Map<Serde, String> serdeUUIDs = new HashMap<>();
-    serdes.forEach(serde -> {
-        String serdeName = serdeUUIDs.computeIfAbsent(serde,
-            s -> serde.getClass().getSimpleName() + "-" + 
UUID.randomUUID().toString());
-        
configs.putIfAbsent(String.format(SerializerConfig.SERDE_SERIALIZED_INSTANCE(), 
serdeName),
-            base64Encoder.encodeToString(serializableSerde.toBytes(serde)));
-      });
-
-    // set key and msg serdes for streams to the serde names generated above
-    streamKeySerdes.forEach((streamId, serde) -> {
-        String streamIdPrefix = String.format(StreamConfig.STREAM_ID_PREFIX(), 
streamId);
-        String keySerdeConfigKey = streamIdPrefix + StreamConfig.KEY_SERDE();
-        configs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
-      });
-
-    streamMsgSerdes.forEach((streamId, serde) -> {
-        String streamIdPrefix = String.format(StreamConfig.STREAM_ID_PREFIX(), 
streamId);
-        String valueSerdeConfigKey = streamIdPrefix + StreamConfig.MSG_SERDE();
-        configs.put(valueSerdeConfigKey, serdeUUIDs.get(serde));
-      });
-
-    // set key and msg serdes for stores to the serde names generated above
-    storeKeySerdes.forEach((storeName, serde) -> {
-        String keySerdeConfigKey = String.format(StorageConfig.KEY_SERDE(), 
storeName);
-        configs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
-      });
-
-    storeMsgSerdes.forEach((storeName, serde) -> {
-        String msgSerdeConfigKey = String.format(StorageConfig.MSG_SERDE(), 
storeName);
-        configs.put(msgSerdeConfigKey, serdeUUIDs.get(serde));
-      });
+  boolean isLegacyTaskApplication() {
+    return LegacyTaskApplication.class.isAssignableFrom(appDesc.getAppClass());
   }
 
-  /**
-   * Computes the triggering interval to use during the execution of this 
{@link JobNode}
-   */
-  private long computeTriggerInterval() {
-    // Obtain the operator specs from the specGraph
-    Collection<OperatorSpec> operatorSpecs = specGraph.getAllOperatorSpecs();
-
-    // Filter out window operators, and obtain a list of their triggering 
interval values
-    List<Long> windowTimerIntervals = operatorSpecs.stream()
-        .filter(spec -> spec.getOpCode() == OperatorSpec.OpCode.WINDOW)
-        .map(spec -> ((WindowOperatorSpec) spec).getDefaultTriggerMs())
-        .collect(Collectors.toList());
-
-    // Filter out the join operators, and obtain a list of their ttl values
-    List<Long> joinTtlIntervals = operatorSpecs.stream()
-        .filter(spec -> spec instanceof JoinOperatorSpec)
-        .map(spec -> ((JoinOperatorSpec) spec).getTtlMs())
-        .collect(Collectors.toList());
-
-    // Combine both the above lists
-    List<Long> candidateTimerIntervals = new ArrayList<>(joinTtlIntervals);
-    candidateTimerIntervals.addAll(windowTimerIntervals);
-
-    if (candidateTimerIntervals.isEmpty()) {
-      return -1;
-    }
-
-    // Compute the gcd of the resultant list
-    return MathUtil.gcd(candidateTimerIntervals);
+  KV<Serde, Serde> getTableSerdes(String tableId) {
+    //TODO: SAMZA-1893: should test whether the table is used in the current 
JobNode
+    return appDesc.getTableSerdes(tableId);
   }
 
-  /**
-   * This function extract the subset of configs from the full config, and use 
it to override the generated configs
-   * from the job.
-   * @param fullConfig full config
-   * @param generatedConfig config generated for the job
-   * @param configPrefix prefix to extract the subset of the config overrides
-   * @return config that merges the generated configs and overrides
-   */
-  private static Config extractScopedConfig(Config fullConfig, Config 
generatedConfig, String configPrefix) {
-    Config scopedConfig = fullConfig.subset(configPrefix);
-
-    Config[] configPrecedence = new Config[] {fullConfig, generatedConfig, 
scopedConfig};
-    // Strip empty configs so they don't override the configs before them.
-    Map<String, String> mergedConfig = new HashMap<>();
-    for (Map<String, String> config : configPrecedence) {
-      for (Map.Entry<String, String> property : config.entrySet()) {
-        String value = property.getValue();
-        if (!(value == null || value.isEmpty())) {
-          mergedConfig.put(property.getKey(), property.getValue());
+  private void findReachableOperators(Collection<OperatorSpec> 
inputOperatorsInJobNode,
+      Set<OperatorSpec> reachableOperators) {
+    inputOperatorsInJobNode.forEach(op -> {
+        if (reachableOperators.contains(op)) {
+          return;
         }
-      }
-    }
-    scopedConfig = new MapConfig(mergedConfig);
-    log.debug("Prefix '{}' has merged config {}", configPrefix, scopedConfig);
-
-    return scopedConfig;
-  }
-
-  static String createId(String jobName, String jobId) {
-    return String.format("%s-%s", jobName, jobId);
+        reachableOperators.add(op);
+        findReachableOperators(op.getRegisteredOperatorSpecs(), 
reachableOperators);
+      });
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
 
b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
new file mode 100644
index 0000000..676d28e
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.execution;
+
+import com.google.common.base.Joiner;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.SerializerConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.StreamConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.spec.JoinOperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.StatefulOperatorSpec;
+import org.apache.samza.operators.spec.StoreDescriptor;
+import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerializableSerde;
+import org.apache.samza.table.TableConfigGenerator;
+import org.apache.samza.table.TableSpec;
+import org.apache.samza.util.MathUtil;
+import org.apache.samza.util.StreamUtil;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class provides methods to generate configuration for a {@link JobNode}
+ */
+/* package private */ class JobNodeConfigurationGenerator {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(JobNodeConfigurationGenerator.class);
+
+  static final String CONFIG_INTERNAL_EXECUTION_PLAN = 
"samza.internal.execution.plan";
+
+  static JobConfig mergeJobConfig(Config originalConfig, Config 
generatedConfig) {
+    JobConfig jobConfig = new JobConfig(originalConfig);
+    String jobNameAndId = 
JobNode.createJobNameAndId(jobConfig.getName().get(), jobConfig.getJobId());
+    return new 
JobConfig(Util.rewriteConfig(extractScopedConfig(originalConfig, 
generatedConfig,
+        String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), 
jobNameAndId))));
+  }
+
+  JobConfig generateJobConfig(JobNode jobNode, String executionPlanJson) {
+    Map<String, String> configs = new HashMap<>();
+    // set up job name and job ID
+    configs.put(JobConfig.JOB_NAME(), jobNode.getJobName());
+    configs.put(JobConfig.JOB_ID(), jobNode.getJobId());
+
+    Map<String, StreamEdge> inEdges = jobNode.getInEdges();
+    Map<String, StreamEdge> outEdges = jobNode.getOutEdges();
+    Collection<OperatorSpec> reachableOperators = 
jobNode.getReachableOperators();
+    List<StoreDescriptor> stores = getStoreDescriptors(reachableOperators);
+    Map<String, TableSpec> reachableTables = 
getReachableTables(reachableOperators, jobNode);
+    Config config = jobNode.getConfig();
+
+    // check all inputs to the node for broadcast and input streams
+    final Set<String> inputs = new HashSet<>();
+    final Set<String> broadcasts = new HashSet<>();
+    for (StreamEdge inEdge : inEdges.values()) {
+      String formattedSystemStream = inEdge.getName();
+      if (inEdge.isBroadcast()) {
+        broadcasts.add(formattedSystemStream + "#0");
+      } else {
+        inputs.add(formattedSystemStream);
+      }
+    }
+
+    configureBroadcastInputs(configs, config, broadcasts);
+
+    // compute window and join operator intervals in this node
+    configureWindowInterval(configs, config, reachableOperators);
+
+    // set store configuration for stateful operators.
+    stores.forEach(sd -> configs.putAll(sd.getStorageConfigs()));
+
+    // set the execution plan in json
+    configs.put(CONFIG_INTERNAL_EXECUTION_PLAN, executionPlanJson);
+
+    // write intermediate input/output streams to configs
+    inEdges.values().stream().filter(StreamEdge::isIntermediate).forEach(edge 
-> configs.putAll(edge.generateConfig()));
+
+    // write serialized serde instances and stream, store, and table serdes to 
configs
+    // serde configuration generation has to happen before table 
configuration, since the serde configuration
+    // is required when generating configurations for some TableProvider (i.e. 
local store backed tables)
+    configureSerdes(configs, inEdges, outEdges, stores, 
reachableTables.keySet(), jobNode);
+
+    // generate table configuration and potential side input configuration
+    configureTables(configs, config, reachableTables, inputs);
+
+    // finalize the task.inputs configuration
+    configs.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputs));
+
+    LOG.info("Job {} has generated configs {}", jobNode.getJobNameAndId(), 
configs);
+
+    // apply configure rewriters and user configure overrides
+    return applyConfigureRewritersAndOverrides(configs, config, jobNode);
+  }
+
+  private Map<String, TableSpec> getReachableTables(Collection<OperatorSpec> 
reachableOperators, JobNode jobNode) {
+    // TODO: Fix this in SAMZA-1893. For now, returning all tables for 
single-job execution plan
+    return jobNode.getTables();
+  }
+
+  private void configureBroadcastInputs(Map<String, String> configs, Config 
config, Set<String> broadcastStreams) {
+    // TODO: SAMZA-1841: remove this once we support defining broadcast input 
stream in high-level
+    // task.broadcast.input should be generated by the planner in the future.
+    if (broadcastStreams.isEmpty()) {
+      return;
+    }
+    final String taskBroadcasts = 
config.get(TaskConfigJava.BROADCAST_INPUT_STREAMS);
+    if (StringUtils.isNoneEmpty(taskBroadcasts)) {
+      broadcastStreams.add(taskBroadcasts);
+    }
+    configs.put(TaskConfigJava.BROADCAST_INPUT_STREAMS, 
Joiner.on(',').join(broadcastStreams));
+  }
+
+  private void configureWindowInterval(Map<String, String> configs, Config 
config,
+      Collection<OperatorSpec> reachableOperators) {
+    if (!reachableOperators.stream().anyMatch(op -> op.getOpCode() == 
OperatorSpec.OpCode.WINDOW
+        || op.getOpCode() == OperatorSpec.OpCode.JOIN)) {
+      return;
+    }
+
+    // set triggering interval if a window or join is defined. Only applies to 
high-level applications
+    if ("-1".equals(config.get(TaskConfig.WINDOW_MS(), "-1"))) {
+      long triggerInterval = computeTriggerInterval(reachableOperators);
+      LOG.info("Using triggering interval: {}", triggerInterval);
+
+      configs.put(TaskConfig.WINDOW_MS(), String.valueOf(triggerInterval));
+    }
+  }
+
+  /**
+   * Computes the triggering interval to use during the execution of this 
{@link JobNode}
+   */
+  private long computeTriggerInterval(Collection<OperatorSpec> 
reachableOperators) {
+    List<Long> windowTimerIntervals =  reachableOperators.stream()
+        .filter(spec -> spec.getOpCode() == OperatorSpec.OpCode.WINDOW)
+        .map(spec -> ((WindowOperatorSpec) spec).getDefaultTriggerMs())
+        .collect(Collectors.toList());
+
+    // Filter out the join operators, and obtain a list of their ttl values
+    List<Long> joinTtlIntervals = reachableOperators.stream()
+        .filter(spec -> spec instanceof JoinOperatorSpec)
+        .map(spec -> ((JoinOperatorSpec) spec).getTtlMs())
+        .collect(Collectors.toList());
+
+    // Combine both the above lists
+    List<Long> candidateTimerIntervals = new ArrayList<>(joinTtlIntervals);
+    candidateTimerIntervals.addAll(windowTimerIntervals);
+
+    if (candidateTimerIntervals.isEmpty()) {
+      return -1;
+    }
+
+    // Compute the gcd of the resultant list
+    return MathUtil.gcd(candidateTimerIntervals);
+  }
+
+  private JobConfig applyConfigureRewritersAndOverrides(Map<String, String> 
configs, Config config, JobNode jobNode) {
+    // Disallow user specified job inputs/outputs. This info comes strictly 
from the user application.
+    Map<String, String> allowedConfigs = new HashMap<>(config);
+    if (!jobNode.isLegacyTaskApplication()) {
+      if (allowedConfigs.containsKey(TaskConfig.INPUT_STREAMS())) {
+        LOG.warn("Specifying task inputs in configuration is not allowed for 
SamzaApplication. "
+            + "Ignoring configured value for " + TaskConfig.INPUT_STREAMS());
+        allowedConfigs.remove(TaskConfig.INPUT_STREAMS());
+      }
+    }
+
+    LOG.debug("Job {} has allowed configs {}", jobNode.getJobNameAndId(), 
allowedConfigs);
+    return mergeJobConfig(new MapConfig(allowedConfigs), new 
MapConfig(configs));
+  }
+
+  /**
+   * This function extract the subset of configs from the full config, and use 
it to override the generated configs
+   * from the job.
+   * @param fullConfig full config
+   * @param generatedConfig config generated for the job
+   * @param configPrefix prefix to extract the subset of the config overrides
+   * @return config that merges the generated configs and overrides
+   */
+  private static Config extractScopedConfig(Config fullConfig, Config 
generatedConfig, String configPrefix) {
+    Config scopedConfig = fullConfig.subset(configPrefix);
+
+    Config[] configPrecedence = new Config[] {fullConfig, generatedConfig, 
scopedConfig};
+    // Strip empty configs so they don't override the configs before them.
+    Map<String, String> mergedConfig = new HashMap<>();
+    for (Map<String, String> config : configPrecedence) {
+      for (Map.Entry<String, String> property : config.entrySet()) {
+        String value = property.getValue();
+        if (!(value == null || value.isEmpty())) {
+          mergedConfig.put(property.getKey(), property.getValue());
+        }
+      }
+    }
+    scopedConfig = new MapConfig(mergedConfig);
+    LOG.debug("Prefix '{}' has merged config {}", configPrefix, scopedConfig);
+
+    return scopedConfig;
+  }
+
+  private List<StoreDescriptor> getStoreDescriptors(Collection<OperatorSpec> 
reachableOperators) {
+    return reachableOperators.stream().filter(operatorSpec -> operatorSpec 
instanceof StatefulOperatorSpec)
+        .map(operatorSpec -> ((StatefulOperatorSpec) 
operatorSpec).getStoreDescriptors()).flatMap(Collection::stream)
+        .collect(Collectors.toList());
+  }
+
+  private void configureTables(Map<String, String> configs, Config config, 
Map<String, TableSpec> tables, Set<String> inputs) {
+    configs.putAll(TableConfigGenerator.generateConfigsForTableSpecs(new 
MapConfig(configs),
+        tables.values().stream().collect(Collectors.toList())));
+
+    // Add side inputs to the inputs and mark the stream as bootstrap
+    tables.values().forEach(tableSpec -> {
+        List<String> sideInputs = tableSpec.getSideInputs();
+        if (sideInputs != null && !sideInputs.isEmpty()) {
+          sideInputs.stream()
+              .map(sideInput -> StreamUtil.getSystemStreamFromNameOrId(config, 
sideInput))
+              .forEach(systemStream -> {
+                  inputs.add(StreamUtil.getNameFromSystemStream(systemStream));
+                  configs.put(String.format(StreamConfig.STREAM_PREFIX() + 
StreamConfig.BOOTSTRAP(),
+                      systemStream.getSystem(), systemStream.getStream()), 
"true");
+                });
+        }
+      });
+  }
+
+  /**
+   * Serializes the {@link Serde} instances for operators, adds them to the 
provided config, and
+   * sets the serde configuration for the input/output/intermediate streams 
appropriately.
+   *
+   * We try to preserve the number of Serde instances before and after 
serialization. However we don't
+   * guarantee that references shared between these serdes instances (e.g. an 
Jackson ObjectMapper shared
+   * between two json serdes) are shared after deserialization too.
+   *
+   * Ideally all the user defined objects in the application should be 
serialized and de-serialized in one pass
+   * from the same output/input stream so that we can maintain reference 
sharing relationships.
+   *
+   * @param configs the configs to add serialized serde instances and stream 
serde configs to
+   */
+  private void configureSerdes(Map<String, String> configs, Map<String, 
StreamEdge> inEdges, Map<String, StreamEdge> outEdges,
+      List<StoreDescriptor> stores, Collection<String> tables, JobNode 
jobNode) {
+    // collect all key and msg serde instances for streams
+    Map<String, Serde> streamKeySerdes = new HashMap<>();
+    Map<String, Serde> streamMsgSerdes = new HashMap<>();
+    inEdges.keySet().forEach(streamId ->
+        addSerdes(jobNode.getInputSerdes(streamId), streamId, streamKeySerdes, 
streamMsgSerdes));
+    outEdges.keySet().forEach(streamId ->
+        addSerdes(jobNode.getOutputSerde(streamId), streamId, streamKeySerdes, 
streamMsgSerdes));
+
+    Map<String, Serde> storeKeySerdes = new HashMap<>();
+    Map<String, Serde> storeMsgSerdes = new HashMap<>();
+    stores.forEach(storeDescriptor -> {
+        storeKeySerdes.put(storeDescriptor.getStoreName(), 
storeDescriptor.getKeySerde());
+        storeMsgSerdes.put(storeDescriptor.getStoreName(), 
storeDescriptor.getMsgSerde());
+      });
+
+    Map<String, Serde> tableKeySerdes = new HashMap<>();
+    Map<String, Serde> tableMsgSerdes = new HashMap<>();
+    tables.forEach(tableId -> {
+        addSerdes(jobNode.getTableSerdes(tableId), tableId, tableKeySerdes, 
tableMsgSerdes);
+      });
+
+    // for each unique stream or store serde instance, generate a unique name 
and serialize to config
+    HashSet<Serde> serdes = new HashSet<>(streamKeySerdes.values());
+    serdes.addAll(streamMsgSerdes.values());
+    serdes.addAll(storeKeySerdes.values());
+    serdes.addAll(storeMsgSerdes.values());
+    serdes.addAll(tableKeySerdes.values());
+    serdes.addAll(tableMsgSerdes.values());
+    SerializableSerde<Serde> serializableSerde = new SerializableSerde<>();
+    Base64.Encoder base64Encoder = Base64.getEncoder();
+    Map<Serde, String> serdeUUIDs = new HashMap<>();
+    serdes.forEach(serde -> {
+        String serdeName = serdeUUIDs.computeIfAbsent(serde,
+            s -> serde.getClass().getSimpleName() + "-" + 
UUID.randomUUID().toString());
+        
configs.putIfAbsent(String.format(SerializerConfig.SERDE_SERIALIZED_INSTANCE(), 
serdeName),
+            base64Encoder.encodeToString(serializableSerde.toBytes(serde)));
+      });
+
+    // set key and msg serdes for streams to the serde names generated above
+    streamKeySerdes.forEach((streamId, serde) -> {
+        String streamIdPrefix = String.format(StreamConfig.STREAM_ID_PREFIX(), 
streamId);
+        String keySerdeConfigKey = streamIdPrefix + StreamConfig.KEY_SERDE();
+        configs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
+      });
+
+    streamMsgSerdes.forEach((streamId, serde) -> {
+        String streamIdPrefix = String.format(StreamConfig.STREAM_ID_PREFIX(), 
streamId);
+        String valueSerdeConfigKey = streamIdPrefix + StreamConfig.MSG_SERDE();
+        configs.put(valueSerdeConfigKey, serdeUUIDs.get(serde));
+      });
+
+    // set key and msg serdes for stores to the serde names generated above
+    storeKeySerdes.forEach((storeName, serde) -> {
+        String keySerdeConfigKey = String.format(StorageConfig.KEY_SERDE(), 
storeName);
+        configs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
+      });
+
+    storeMsgSerdes.forEach((storeName, serde) -> {
+        String msgSerdeConfigKey = String.format(StorageConfig.MSG_SERDE(), 
storeName);
+        configs.put(msgSerdeConfigKey, serdeUUIDs.get(serde));
+      });
+
+    // set key and msg serdes for stores to the serde names generated above
+    tableKeySerdes.forEach((tableId, serde) -> {
+        String keySerdeConfigKey = 
String.format(JavaTableConfig.TABLE_KEY_SERDE, tableId);
+        configs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
+      });
+
+    tableMsgSerdes.forEach((tableId, serde) -> {
+        String valueSerdeConfigKey = 
String.format(JavaTableConfig.TABLE_VALUE_SERDE, tableId);
+        configs.put(valueSerdeConfigKey, serdeUUIDs.get(serde));
+      });
+  }
+
+  private void addSerdes(KV<Serde, Serde> serdes, String streamId, Map<String, 
Serde> keySerdeMap,
+      Map<String, Serde> msgSerdeMap) {
+    if (serdes != null) {
+      if (serdes.getKey() != null && !(serdes.getKey() instanceof NoOpSerde)) {
+        keySerdeMap.put(streamId, serdes.getKey());
+      }
+      if (serdes.getValue() != null && !(serdes.getValue() instanceof 
NoOpSerde)) {
+        msgSerdeMap.put(streamId, serdes.getValue());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java 
b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
index a2050e5..abbec18 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
@@ -20,29 +20,20 @@ package org.apache.samza.execution;
 
 import java.io.File;
 import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.samza.SamzaException;
 import org.apache.samza.application.ApplicationDescriptor;
 import org.apache.samza.application.ApplicationDescriptorImpl;
-import org.apache.samza.application.StreamApplicationDescriptorImpl;
-import org.apache.samza.application.TaskApplicationDescriptorImpl;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.config.StreamConfig;
-import org.apache.samza.operators.BaseTableDescriptor;
-import org.apache.samza.operators.OperatorSpecGraph;
-import org.apache.samza.table.TableConfigGenerator;
-import org.apache.samza.table.TableSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,22 +55,7 @@ public abstract class JobPlanner {
     this.config = descriptor.getConfig();
   }
 
-  public List<JobConfig> prepareJobs() {
-    String appId = new ApplicationConfig(appDesc.getConfig()).getGlobalAppId();
-    if (appDesc instanceof TaskApplicationDescriptorImpl) {
-      return 
Collections.singletonList(prepareTaskJob((TaskApplicationDescriptorImpl) 
appDesc));
-    } else if (appDesc instanceof StreamApplicationDescriptorImpl) {
-      try {
-        return prepareStreamJobs((StreamApplicationDescriptorImpl) appDesc);
-      } catch (Exception e) {
-        throw new SamzaException("Failed to generate JobConfig for 
StreamApplication " + appId, e);
-      }
-    }
-    throw new 
IllegalArgumentException(String.format("ApplicationDescriptorImpl has to be 
either TaskApplicationDescriptorImpl or "
-        + "StreamApplicationDescriptorImpl. class %s is not supported", 
appDesc.getClass().getName()));
-  }
-
-  abstract List<JobConfig> prepareStreamJobs(StreamApplicationDescriptorImpl 
streamAppDesc) throws Exception;
+  public abstract List<JobConfig> prepareJobs();
 
   StreamManager buildAndStartStreamManager(Config config) {
     StreamManager streamManager = new StreamManager(config);
@@ -87,12 +63,12 @@ public abstract class JobPlanner {
     return streamManager;
   }
 
-  ExecutionPlan getExecutionPlan(OperatorSpecGraph specGraph) {
-    return getExecutionPlan(specGraph, null);
+  ExecutionPlan getExecutionPlan() {
+    return getExecutionPlan(null);
   }
 
   /* package private */
-  ExecutionPlan getExecutionPlan(OperatorSpecGraph specGraph, String runId) {
+  ExecutionPlan getExecutionPlan(String runId) {
 
     // update application configs
     Map<String, String> cfg = new HashMap<>();
@@ -101,8 +77,8 @@ public abstract class JobPlanner {
     }
 
     StreamConfig streamConfig = new StreamConfig(config);
-    Set<String> inputStreams = new 
HashSet<>(specGraph.getInputOperators().keySet());
-    inputStreams.removeAll(specGraph.getOutputStreams().keySet());
+    Set<String> inputStreams = new HashSet<>(appDesc.getInputStreamIds());
+    inputStreams.removeAll(appDesc.getOutputStreamIds());
     ApplicationConfig.ApplicationMode mode = 
inputStreams.stream().allMatch(streamConfig::getIsBounded)
         ? ApplicationConfig.ApplicationMode.BATCH : 
ApplicationConfig.ApplicationMode.STREAM;
     cfg.put(ApplicationConfig.APP_MODE, mode.name());
@@ -117,12 +93,12 @@ public abstract class JobPlanner {
 
     // create the physical execution plan and merge with overrides. This works 
for a single-stage job now
     // TODO: This should all be consolidated with ExecutionPlanner after 
fixing SAMZA-1811
-    Config mergedConfig = JobNode.mergeJobConfig(config, new MapConfig(cfg));
+    Config mergedConfig = JobNodeConfigurationGenerator.mergeJobConfig(config, 
new MapConfig(cfg));
     // creating the StreamManager to get all input/output streams' metadata 
for planning
     StreamManager streamManager = buildAndStartStreamManager(mergedConfig);
     try {
       ExecutionPlanner planner = new ExecutionPlanner(mergedConfig, 
streamManager);
-      return planner.plan(specGraph);
+      return planner.plan(appDesc);
     } finally {
       streamManager.stop();
     }
@@ -149,25 +125,6 @@ public abstract class JobPlanner {
     }
   }
 
-  // TODO: SAMZA-1814: the following configuration generation still misses 
serde configuration generation,
-  // side input configuration, broadcast input and task inputs configuration 
generation for low-level task
-  // applications
-  // helper method to generate a single node job configuration for low level 
task applications
-  private JobConfig prepareTaskJob(TaskApplicationDescriptorImpl taskAppDesc) {
-    // copy original configure
-    Map<String, String> cfg = new HashMap<>();
-    // expand system and streams configure
-    Map<String, String> systemStreamConfigs = 
expandSystemStreamConfigs(taskAppDesc);
-    cfg.putAll(systemStreamConfigs);
-    // expand table configure
-    cfg.putAll(expandTableConfigs(cfg, taskAppDesc));
-    // adding app.class in the configuration
-    cfg.put(ApplicationConfig.APP_CLASS, appDesc.getAppClass().getName());
-    // create the physical execution plan and merge with overrides. This works 
for a single-stage job now
-    // TODO: This should all be consolidated with ExecutionPlanner after 
fixing SAMZA-1811
-    return new JobConfig(JobNode.mergeJobConfig(config, new MapConfig(cfg)));
-  }
-
   private Map<String, String> 
expandSystemStreamConfigs(ApplicationDescriptorImpl<? extends 
ApplicationDescriptor> appDesc) {
     Map<String, String> systemStreamConfigs = new HashMap<>();
     appDesc.getInputDescriptors().forEach((key, value) -> 
systemStreamConfigs.putAll(value.toConfig()));
@@ -177,12 +134,4 @@ public abstract class JobPlanner {
         systemStreamConfigs.put(JobConfig.JOB_DEFAULT_SYSTEM(), 
dsd.getSystemName()));
     return systemStreamConfigs;
   }
-
-  private Map<String, String> expandTableConfigs(Map<String, String> 
originConfig,
-      ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) {
-    List<TableSpec> tableSpecs = new ArrayList<>();
-    appDesc.getTableDescriptors().stream().map(td -> ((BaseTableDescriptor) 
td).getTableSpec())
-        .forEach(spec -> tableSpecs.add(spec));
-    return TableConfigGenerator.generateConfigsForTableSpecs(new 
MapConfig(originConfig), tableSpecs);
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java 
b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
index 7996d6b..86aca0f 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeoutException;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.ApplicationDescriptor;
 import org.apache.samza.application.ApplicationDescriptorImpl;
-import org.apache.samza.application.StreamApplicationDescriptorImpl;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
@@ -37,7 +36,7 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * Temporarily helper class with specific implementation of {@link 
JobPlanner#prepareStreamJobs(StreamApplicationDescriptorImpl)}
+ * Temporarily helper class with specific implementation of {@link 
JobPlanner#prepareJobs()}
  * for standalone Samza processors.
  *
  * TODO: we need to consolidate this with {@link ExecutionPlanner} after 
SAMZA-1811.
@@ -53,17 +52,23 @@ public class LocalJobPlanner extends JobPlanner {
   }
 
   @Override
-  List<JobConfig> prepareStreamJobs(StreamApplicationDescriptorImpl 
streamAppDesc) throws Exception {
+  public List<JobConfig> prepareJobs() {
     // for high-level DAG, generating the plan and job configs
     // 1. initialize and plan
-    ExecutionPlan plan = 
getExecutionPlan(streamAppDesc.getOperatorSpecGraph());
+    ExecutionPlan plan = getExecutionPlan();
 
-    String executionPlanJson = plan.getPlanAsJson();
+    String executionPlanJson = "";
+    try {
+      executionPlanJson = plan.getPlanAsJson();
+    } catch (Exception e) {
+      throw new SamzaException("Failed to create plan JSON.", e);
+    }
     writePlanJsonFile(executionPlanJson);
     LOG.info("Execution Plan: \n" + executionPlanJson);
     String planId = String.valueOf(executionPlanJson.hashCode());
 
-    if (plan.getJobConfigs().isEmpty()) {
+    List<JobConfig> jobConfigs = plan.getJobConfigs();
+    if (jobConfigs.isEmpty()) {
       throw new SamzaException("No jobs in the plan.");
     }
 
@@ -71,7 +76,7 @@ public class LocalJobPlanner extends JobPlanner {
     // TODO: System generated intermediate streams should have robust naming 
scheme. See SAMZA-1391
     // TODO: this works for single-job applications. For multi-job 
applications, ExecutionPlan should return an AppConfig
     // to be used for the whole application
-    JobConfig jobConfig = plan.getJobConfigs().get(0);
+    JobConfig jobConfig = jobConfigs.get(0);
     StreamManager streamManager = null;
     try {
       // create the StreamManager to create intermediate streams in the plan
@@ -82,7 +87,7 @@ public class LocalJobPlanner extends JobPlanner {
         streamManager.stop();
       }
     }
-    return plan.getJobConfigs();
+    return jobConfigs;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java
 
b/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java
index aa1dff9..ca91214 100644
--- 
a/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java
+++ 
b/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java
@@ -27,15 +27,14 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.function.Consumer;
 import java.util.function.Function;
-import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 
 
 /**
- * A utility class that encapsulates the logic for traversing an {@link 
OperatorSpecGraph} and building
- * associations between related {@link OperatorSpec}s.
+ * A utility class that encapsulates the logic for traversing operators in the 
graph from the set of {@link InputOperatorSpec}
+ * and building associations between related {@link OperatorSpec}s.
  */
 /* package private */ class OperatorSpecGraphAnalyzer {
 
@@ -43,14 +42,13 @@ import org.apache.samza.operators.spec.OperatorSpec;
    * Returns a grouping of {@link InputOperatorSpec}s by the joins, i.e. 
{@link JoinOperatorSpec}s, they participate in.
    */
   public static Multimap<JoinOperatorSpec, InputOperatorSpec> 
getJoinToInputOperatorSpecs(
-      OperatorSpecGraph operatorSpecGraph) {
+      Collection<InputOperatorSpec> inputOperatorSpecs) {
 
     Multimap<JoinOperatorSpec, InputOperatorSpec> joinOpSpecToInputOpSpecs = 
HashMultimap.create();
 
     // Traverse graph starting from every input operator spec, observing 
connectivity between input operator specs
     // and Join operator specs.
-    Iterable<InputOperatorSpec> inputOpSpecs = 
operatorSpecGraph.getInputOperators().values();
-    for (InputOperatorSpec inputOpSpec : inputOpSpecs) {
+    for (InputOperatorSpec inputOpSpec : inputOperatorSpecs) {
       // Observe all join operator specs reachable from this input operator 
spec.
       JoinOperatorSpecVisitor joinOperatorSpecVisitor = new 
JoinOperatorSpecVisitor();
       traverse(inputOpSpec, joinOperatorSpecVisitor, opSpec -> 
opSpec.getRegisteredOperatorSpecs());
@@ -77,7 +75,7 @@ import org.apache.samza.operators.spec.OperatorSpec;
   }
 
   /**
-   * An {@link OperatorSpecGraph} visitor that records all {@link 
JoinOperatorSpec}s encountered in the graph.
+   * An visitor that records all {@link JoinOperatorSpec}s encountered in the 
graph of {@link OperatorSpec}s
    */
   private static class JoinOperatorSpecVisitor implements 
Consumer<OperatorSpec> {
     private Set<JoinOperatorSpec> joinOpSpecs = new HashSet<>();

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java 
b/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java
index 254ff97..54f86d5 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java
@@ -23,7 +23,6 @@ import java.util.UUID;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.ApplicationDescriptor;
 import org.apache.samza.application.ApplicationDescriptorImpl;
-import org.apache.samza.application.StreamApplicationDescriptorImpl;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
@@ -34,7 +33,7 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * Temporary helper class with specific implementation of {@link 
JobPlanner#prepareStreamJobs(StreamApplicationDescriptorImpl)}
+ * Temporary helper class with specific implementation of {@link 
JobPlanner#prepareJobs()}
  * for remote-launched Samza processors (e.g. in YARN).
  *
  * TODO: we need to consolidate this class with {@link ExecutionPlanner} after 
SAMZA-1811.
@@ -47,7 +46,7 @@ public class RemoteJobPlanner extends JobPlanner {
   }
 
   @Override
-  List<JobConfig> prepareStreamJobs(StreamApplicationDescriptorImpl 
streamAppDesc) throws Exception {
+  public List<JobConfig> prepareJobs() {
     // for high-level DAG, generate the plan and job configs
     // TODO: run.id needs to be set for standalone: SAMZA-1531
     // run.id is based on current system time with the most significant bits 
in UUID (8 digits) to avoid collision
@@ -55,17 +54,22 @@ public class RemoteJobPlanner extends JobPlanner {
     LOG.info("The run id for this run is {}", runId);
 
     // 1. initialize and plan
-    ExecutionPlan plan = 
getExecutionPlan(streamAppDesc.getOperatorSpecGraph(), runId);
-    writePlanJsonFile(plan.getPlanAsJson());
+    ExecutionPlan plan = getExecutionPlan(runId);
+    try {
+      writePlanJsonFile(plan.getPlanAsJson());
+    } catch (Exception e) {
+      throw new SamzaException("Failed to create plan JSON.", e);
+    }
 
-    if (plan.getJobConfigs().isEmpty()) {
+    List<JobConfig> jobConfigs = plan.getJobConfigs();
+    if (jobConfigs.isEmpty()) {
       throw new SamzaException("No jobs in the plan.");
     }
 
     // 2. create the necessary streams
     // TODO: this works for single-job applications. For multi-job 
applications, ExecutionPlan should return an AppConfig
     // to be used for the whole application
-    JobConfig jobConfig = plan.getJobConfigs().get(0);
+    JobConfig jobConfig = jobConfigs.get(0);
     StreamManager streamManager = null;
     try {
       // create the StreamManager to create intermediate streams in the plan
@@ -79,7 +83,7 @@ public class RemoteJobPlanner extends JobPlanner {
         streamManager.stop();
       }
     }
-    return plan.getJobConfigs();
+    return jobConfigs;
   }
 
   private Config getConfigFromPrevRun() {

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java 
b/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java
index 1e4194a..1830d1c 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java
@@ -73,6 +73,15 @@ abstract public class BaseTableDescriptor<K, V, D extends 
BaseTableDescriptor<K,
   }
 
   /**
+   * Get the serde assigned to this {@link TableDescriptor}
+   *
+   * @return {@link KVSerde} used by this table
+   */
+  public KVSerde<K, V> getSerde() {
+    return serde;
+  }
+
+  /**
    * Generate config for {@link TableSpec}; this method is used internally.
    * @param tableSpecConfig configuration for the {@link TableSpec}
    */

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java 
b/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
index b75b1e8..5329fd7 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
@@ -30,7 +30,6 @@ import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.serializers.SerializableSerde;
-import org.apache.samza.table.TableSpec;
 
 
 /**
@@ -45,7 +44,6 @@ public class OperatorSpecGraph implements Serializable {
   private final Map<String, InputOperatorSpec> inputOperators;
   private final Map<String, OutputStreamImpl> outputStreams;
   private final Set<String> broadcastStreams;
-  private final Map<TableSpec, TableImpl> tables;
   private final Set<OperatorSpec> allOpSpecs;
   private final boolean hasWindowOrJoins;
 
@@ -57,7 +55,6 @@ public class OperatorSpecGraph implements Serializable {
     this.inputOperators = streamAppDesc.getInputOperators();
     this.outputStreams = streamAppDesc.getOutputStreams();
     this.broadcastStreams = streamAppDesc.getBroadcastStreams();
-    this.tables = streamAppDesc.getTables();
     this.allOpSpecs = Collections.unmodifiableSet(this.findAllOperatorSpecs());
     this.hasWindowOrJoins = checkWindowOrJoins();
     this.serializedOpSpecGraph = opSpecGraphSerde.toBytes(this);
@@ -75,10 +72,6 @@ public class OperatorSpecGraph implements Serializable {
     return broadcastStreams;
   }
 
-  public Map<TableSpec, TableImpl> getTables() {
-    return tables;
-  }
-
   /**
    * Get all {@link OperatorSpec}s available in this {@link 
StreamApplicationDescriptorImpl}
    *

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index 98864d2..b9bb1f6 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -75,7 +75,7 @@ public class LocalContainerRunner {
       throw new SamzaException("can not find the job name");
     }
     String jobName = jobConfig.getName().get();
-    String jobId = 
jobConfig.getJobId().getOrElse(ScalaJavaUtil.defaultValue("1"));
+    String jobId = jobConfig.getJobId();
     MDC.put("containerName", "samza-container-" + containerId);
     MDC.put("jobName", jobName);
     MDC.put("jobId", jobId);

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java 
b/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java
index 085131c..03be758 100644
--- a/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java
+++ b/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java
@@ -20,22 +20,16 @@
 package org.apache.samza.table;
 
 import java.util.ArrayList;
-import java.util.Base64;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaTableConfig;
-import org.apache.samza.config.SerializerConfig;
 import org.apache.samza.operators.BaseTableDescriptor;
 import org.apache.samza.operators.TableDescriptor;
 import org.apache.samza.operators.TableImpl;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.serializers.SerializableSerde;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,8 +60,6 @@ public class TableConfigGenerator {
   static public Map<String, String> generateConfigsForTableSpecs(Config 
config, List<TableSpec> tableSpecs) {
     Map<String, String> tableConfigs = new HashMap<>();
 
-    tableConfigs.putAll(generateTableKVSerdeConfigs(tableSpecs));
-
     tableSpecs.forEach(tableSpec -> {
         // Add table provider factory config
         tableConfigs.put(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, 
tableSpec.getId()),
@@ -103,44 +95,4 @@ public class TableConfigGenerator {
       });
     return new ArrayList<>(tableSpecs.keySet());
   }
-
-  static private Map<String, String> 
generateTableKVSerdeConfigs(List<TableSpec> tableSpecs) {
-    Map<String, String> serdeConfigs = new HashMap<>();
-
-    // Collect key and msg serde instances for all the tables
-    Map<String, Serde> tableKeySerdes = new HashMap<>();
-    Map<String, Serde> tableValueSerdes = new HashMap<>();
-    HashSet<Serde> serdes = new HashSet<>();
-
-    tableSpecs.forEach(tableSpec -> {
-        tableKeySerdes.put(tableSpec.getId(), 
tableSpec.getSerde().getKeySerde());
-        tableValueSerdes.put(tableSpec.getId(), 
tableSpec.getSerde().getValueSerde());
-      });
-    serdes.addAll(tableKeySerdes.values());
-    serdes.addAll(tableValueSerdes.values());
-
-    // Generate serde names
-    SerializableSerde<Serde> serializableSerde = new SerializableSerde<>();
-    Base64.Encoder base64Encoder = Base64.getEncoder();
-    Map<Serde, String> serdeUUIDs = new HashMap<>();
-    serdes.forEach(serde -> {
-        String serdeName = serdeUUIDs.computeIfAbsent(serde,
-            s -> serde.getClass().getSimpleName() + "-" + 
UUID.randomUUID().toString());
-        
serdeConfigs.putIfAbsent(String.format(SerializerConfig.SERDE_SERIALIZED_INSTANCE(),
 serdeName),
-            base64Encoder.encodeToString(serializableSerde.toBytes(serde)));
-      });
-
-    // Set key and msg serdes for tables to the serde names generated above
-    tableKeySerdes.forEach((tableId, serde) -> {
-        String keySerdeConfigKey = 
String.format(JavaTableConfig.TABLE_KEY_SERDE, tableId);
-        serdeConfigs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
-      });
-
-    tableValueSerdes.forEach((tableId, serde) -> {
-        String valueSerdeConfigKey = 
String.format(JavaTableConfig.TABLE_VALUE_SERDE, tableId);
-        serdeConfigs.put(valueSerdeConfigKey, serdeUUIDs.get(serde));
-      });
-
-    return serdeConfigs;
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
index 3dad6c1..41294a3 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
@@ -35,7 +35,6 @@ public class ZkJobCoordinatorFactory implements 
JobCoordinatorFactory {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ZkJobCoordinatorFactory.class);
   private static final String JOB_COORDINATOR_ZK_PATH_FORMAT = 
"%s/%s-%s-coordinationData";
-  private static final String DEFAULT_JOB_ID = "1";
   private static final String DEFAULT_JOB_NAME = "defaultJob";
 
   /**
@@ -68,9 +67,7 @@ public class ZkJobCoordinatorFactory implements 
JobCoordinatorFactory {
     String jobName = jobConfig.getName().isDefined()
         ? jobConfig.getName().get()
         : DEFAULT_JOB_NAME;
-    String jobId = jobConfig.getJobId().isDefined()
-        ? jobConfig.getJobId().get()
-        : DEFAULT_JOB_ID;
+    String jobId = jobConfig.getJobId();
 
     return String.format(JOB_COORDINATOR_ZK_PATH_FORMAT, appId, jobName, 
jobId);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index fc8780f..d7b71b5 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -39,7 +39,7 @@ object JobConfig {
    */
   val CONFIG_REWRITERS = "job.config.rewriters" // 
streaming.job_config_rewriters
   val CONFIG_REWRITER_CLASS = "job.config.rewriter.%s.class" // 
streaming.job_config_rewriter_class - regex, system, config
-  val CONFIG_JOB_PREFIX = "jobs.%s."
+  val CONFIG_OVERRIDE_JOBS_PREFIX = "jobs.%s."
   val JOB_NAME = "job.name" // streaming.job_name
   val JOB_ID = "job.id" // streaming.job_id
   val SAMZA_FWK_PATH = "samza.fwk.path"
@@ -164,7 +164,7 @@ class JobConfig(config: Config) extends 
ScalaMapConfig(config) with Logging {
 
   def getStreamJobFactoryClass = getOption(JobConfig.STREAM_JOB_FACTORY_CLASS)
 
-  def getJobId = getOption(JobConfig.JOB_ID)
+  def getJobId = getOption(JobConfig.JOB_ID).getOrElse("1")
 
   def failOnCheckpointValidation = { 
getBoolean(JobConfig.JOB_FAIL_CHECKPOINT_VALIDATION, true) }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index fba7329..417fc18 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -101,7 +101,7 @@ object SamzaContainer extends Logging {
     if(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) != null) {
       val jobNameAndId = (
         config.getName.getOrElse(throw new ConfigException("Missing required 
config: job.name")),
-        config.getJobId.getOrElse("1")
+        config.getJobId
       )
 
       loggedStorageBaseDir = new 
File(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR)

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
 
b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
index d1e6554..8a9c021 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
@@ -44,7 +44,6 @@ class MetricsSnapshotReporterFactory extends 
MetricsReporterFactory with Logging
 
     val jobId = config
       .getJobId
-      .getOrElse(1.toString)
 
     val taskClass = config
       .getTaskClass

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala 
b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
index cd74716..bfb2271 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
@@ -89,6 +89,6 @@ object CoordinatorStreamUtil {
     */
   private def getJobNameAndId(config: Config) = {
     (config.getName.getOrElse(throw new ConfigException("Missing required 
config: job.name")),
-      config.getJobId.getOrElse("1"))
+      config.getJobId)
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java
 
b/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java
index db85e33..1fe6023 100644
--- 
a/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java
+++ 
b/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java
@@ -522,10 +522,11 @@ public class TestStreamApplicationDescriptorImpl {
     TableSpec testTableSpec = new TableSpec("t1", KVSerde.of(new NoOpSerde(), 
new NoOpSerde()), "", new HashMap<>());
     when(mockTableDescriptor.getTableSpec()).thenReturn(testTableSpec);
     when(mockTableDescriptor.getTableId()).thenReturn(testTableSpec.getId());
+    when(mockTableDescriptor.getSerde()).thenReturn(testTableSpec.getSerde());
     StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(appDesc -> {
         appDesc.getTable(mockTableDescriptor);
       }, mockConfig);
-    assertNotNull(streamAppDesc.getTables().get(testTableSpec));
+    assertNotNull(streamAppDesc.getTables().get(testTableSpec.getId()));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java
 
b/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java
index 9418c1f..abe5ce1 100644
--- 
a/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java
+++ 
b/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java
@@ -23,12 +23,14 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import org.apache.samza.config.Config;
+import org.apache.samza.operators.BaseTableDescriptor;
 import org.apache.samza.operators.ContextManager;
 import org.apache.samza.operators.TableDescriptor;
 import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
 import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
 import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
 import org.apache.samza.runtime.ProcessorLifecycleListenerFactory;
+import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.task.TaskFactory;
 import org.junit.Before;
 import org.junit.Test;
@@ -64,10 +66,12 @@ public class TestTaskApplicationDescriptorImpl {
       this.add(mock2);
     } };
   private Set<TableDescriptor> mockTables = new HashSet<TableDescriptor>() { {
-      TableDescriptor mock1 = mock(TableDescriptor.class);
-      TableDescriptor mock2 = mock(TableDescriptor.class);
+      BaseTableDescriptor mock1 = mock(BaseTableDescriptor.class);
+      BaseTableDescriptor mock2 = mock(BaseTableDescriptor.class);
       when(mock1.getTableId()).thenReturn("test-table1");
       when(mock2.getTableId()).thenReturn("test-table2");
+      when(mock1.getSerde()).thenReturn(mock(KVSerde.class));
+      when(mock2.getSerde()).thenReturn(mock(KVSerde.class));
       this.add(mock1);
       this.add(mock2);
     } };

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/test/java/org/apache/samza/execution/ExecutionPlannerTestBase.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/ExecutionPlannerTestBase.java
 
b/samza-core/src/test/java/org/apache/samza/execution/ExecutionPlannerTestBase.java
new file mode 100644
index 0000000..f507c70
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/execution/ExecutionPlannerTestBase.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.execution;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.LegacyTaskApplication;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
+import org.apache.samza.application.TaskApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.descriptors.GenericInputDescriptor;
+import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
+import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.task.IdentityStreamTask;
+import org.junit.Before;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+
+/**
+ * Unit test base class to set up commonly used test application and 
configuration.
+ */
+class ExecutionPlannerTestBase {
+  protected StreamApplicationDescriptorImpl mockStreamAppDesc;
+  protected Config mockConfig;
+  protected JobNode mockJobNode;
+  protected KVSerde<String, Object> defaultSerde;
+  protected GenericSystemDescriptor inputSystemDescriptor;
+  protected GenericSystemDescriptor outputSystemDescriptor;
+  protected GenericSystemDescriptor intermediateSystemDescriptor;
+  protected GenericInputDescriptor<KV<String, Object>> input1Descriptor;
+  protected GenericInputDescriptor<KV<String, Object>> input2Descriptor;
+  protected GenericInputDescriptor<KV<String, Object>> 
intermediateInputDescriptor;
+  protected GenericInputDescriptor<KV<String, Object>> broadcastInputDesriptor;
+  protected GenericOutputDescriptor<KV<String, Object>> outputDescriptor;
+  protected GenericOutputDescriptor<KV<String, Object>> 
intermediateOutputDescriptor;
+
+  @Before
+  public void setUp() {
+    defaultSerde = KVSerde.of(new StringSerde(), new JsonSerdeV2<>());
+    inputSystemDescriptor = new GenericSystemDescriptor("input-system", 
"mockSystemFactoryClassName");
+    outputSystemDescriptor = new GenericSystemDescriptor("output-system", 
"mockSystemFactoryClassName");
+    intermediateSystemDescriptor = new 
GenericSystemDescriptor("intermediate-system", "mockSystemFactoryClassName");
+    input1Descriptor = inputSystemDescriptor.getInputDescriptor("input1", 
defaultSerde);
+    input2Descriptor = inputSystemDescriptor.getInputDescriptor("input2", 
defaultSerde);
+    outputDescriptor = outputSystemDescriptor.getOutputDescriptor("output", 
defaultSerde);
+    intermediateInputDescriptor = 
intermediateSystemDescriptor.getInputDescriptor("jobName-jobId-partition_by-p1",
 defaultSerde)
+        .withPhysicalName("jobName-jobId-partition_by-p1");
+    intermediateOutputDescriptor = 
intermediateSystemDescriptor.getOutputDescriptor("jobName-jobId-partition_by-p1",
 defaultSerde)
+        .withPhysicalName("jobName-jobId-partition_by-p1");
+    broadcastInputDesriptor = 
intermediateSystemDescriptor.getInputDescriptor("jobName-jobId-broadcast-b1", 
defaultSerde)
+        .withPhysicalName("jobName-jobId-broadcast-b1");
+
+    Map<String, String> configs = new HashMap<>();
+    configs.put(JobConfig.JOB_NAME(), "jobName");
+    configs.put(JobConfig.JOB_ID(), "jobId");
+    configs.putAll(input1Descriptor.toConfig());
+    configs.putAll(input2Descriptor.toConfig());
+    configs.putAll(outputDescriptor.toConfig());
+    configs.putAll(inputSystemDescriptor.toConfig());
+    configs.putAll(outputSystemDescriptor.toConfig());
+    configs.putAll(intermediateSystemDescriptor.toConfig());
+    configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), 
intermediateSystemDescriptor.getSystemName());
+    mockConfig = spy(new MapConfig(configs));
+
+    mockStreamAppDesc = new 
StreamApplicationDescriptorImpl(getRepartitionJoinStreamApplication(), 
mockConfig);
+  }
+
+  String getJobNameAndId() {
+    return "jobName-jobId";
+  }
+
+  void configureJobNode(ApplicationDescriptorImpl mockStreamAppDesc) {
+    JobGraph jobGraph = new ExecutionPlanner(mockConfig, 
mock(StreamManager.class))
+        .createJobGraph(mockConfig, mockStreamAppDesc);
+    mockJobNode = spy(jobGraph.getJobNodes().get(0));
+  }
+
+  StreamApplication getRepartitionOnlyStreamApplication() {
+    return appDesc -> {
+      MessageStream<KV<String, Object>> input1 = 
appDesc.getInputStream(input1Descriptor);
+      input1.partitionBy(KV::getKey, KV::getValue, "p1");
+    };
+  }
+
+  StreamApplication getRepartitionJoinStreamApplication() {
+    return appDesc -> {
+      MessageStream<KV<String, Object>> input1 = 
appDesc.getInputStream(input1Descriptor);
+      MessageStream<KV<String, Object>> input2 = 
appDesc.getInputStream(input2Descriptor);
+      OutputStream<KV<String, Object>> output = 
appDesc.getOutputStream(outputDescriptor);
+      JoinFunction<String, Object, Object, KV<String, Object>> mockJoinFn = 
mock(JoinFunction.class);
+      input1
+          .partitionBy(KV::getKey, KV::getValue, defaultSerde, "p1")
+          .map(kv -> kv.value)
+          .join(input2.map(kv -> kv.value), mockJoinFn,
+              new StringSerde(), new JsonSerdeV2<>(Object.class), new 
JsonSerdeV2<>(Object.class),
+              Duration.ofHours(1), "j1")
+          .sendTo(output);
+    };
+  }
+
+  TaskApplication getTaskApplication() {
+    return appDesc -> {
+      appDesc.addInputStream(input1Descriptor);
+      appDesc.addInputStream(input2Descriptor);
+      appDesc.addInputStream(intermediateInputDescriptor);
+      appDesc.addOutputStream(intermediateOutputDescriptor);
+      appDesc.addOutputStream(outputDescriptor);
+      appDesc.setTaskFactory(() -> new IdentityStreamTask());
+    };
+  }
+
+  TaskApplication getLegacyTaskApplication() {
+    return new LegacyTaskApplication(IdentityStreamTask.class.getName());
+  }
+
+  StreamApplication getBroadcastOnlyStreamApplication(Serde serde) {
+    return appDesc -> {
+      MessageStream<KV<String, Object>> input = 
appDesc.getInputStream(input1Descriptor);
+      if (serde != null) {
+        input.broadcast(serde, "b1");
+      } else {
+        input.broadcast("b1");
+      }
+    };
+  }
+}

Reply via email to