http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/execution/JobNode.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java index 47705ee..af556f5 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java @@ -19,45 +19,26 @@ package org.apache.samza.execution; -import java.util.ArrayList; -import java.util.Base64; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; -import java.util.UUID; +import java.util.Objects; +import java.util.Set; import java.util.stream.Collectors; - -import org.apache.commons.lang3.StringUtils; +import org.apache.samza.application.ApplicationDescriptor; +import org.apache.samza.application.ApplicationDescriptorImpl; +import org.apache.samza.application.LegacyTaskApplication; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; -import org.apache.samza.config.MapConfig; -import org.apache.samza.config.SerializerConfig; -import org.apache.samza.config.StorageConfig; -import org.apache.samza.config.StreamConfig; -import org.apache.samza.config.TaskConfig; -import org.apache.samza.config.TaskConfigJava; -import org.apache.samza.operators.OperatorSpecGraph; +import org.apache.samza.operators.KV; import org.apache.samza.operators.spec.InputOperatorSpec; -import org.apache.samza.operators.spec.JoinOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; -import org.apache.samza.operators.spec.OutputStreamImpl; -import org.apache.samza.operators.spec.StatefulOperatorSpec; -import org.apache.samza.operators.spec.WindowOperatorSpec; -import org.apache.samza.table.TableConfigGenerator; -import org.apache.samza.util.MathUtil; import org.apache.samza.serializers.Serde; -import org.apache.samza.serializers.SerializableSerde; import org.apache.samza.table.TableSpec; -import org.apache.samza.util.StreamUtil; -import org.apache.samza.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Joiner; - - /** * A JobNode is a physical execution unit. In RemoteExecutionEnvironment, it's a job that will be submitted * to remote cluster. In LocalExecutionEnvironment, it's a set of StreamProcessors for local execution. @@ -65,64 +46,71 @@ import com.google.common.base.Joiner; */ public class JobNode { private static final Logger log = LoggerFactory.getLogger(JobNode.class); - private static final String CONFIG_INTERNAL_EXECUTION_PLAN = "samza.internal.execution.plan"; private final String jobName; private final String jobId; - private final String id; - private final OperatorSpecGraph specGraph; - private final List<StreamEdge> inEdges = new ArrayList<>(); - private final List<StreamEdge> outEdges = new ArrayList<>(); - private final List<TableSpec> tables = new ArrayList<>(); + private final String jobNameAndId; private final Config config; - - JobNode(String jobName, String jobId, OperatorSpecGraph specGraph, Config config) { + private final JobNodeConfigurationGenerator configGenerator; + // The following maps (i.e. inEdges and outEdges) uses the streamId as the key + private final Map<String, StreamEdge> inEdges = new HashMap<>(); + private final Map<String, StreamEdge> outEdges = new HashMap<>(); + // Similarly, tables uses tableId as the key + private final Map<String, TableSpec> tables = new HashMap<>(); + private final ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc; + + JobNode(String jobName, String jobId, Config config, ApplicationDescriptorImpl appDesc, + JobNodeConfigurationGenerator configureGenerator) { this.jobName = jobName; this.jobId = jobId; - this.id = createId(jobName, jobId); - this.specGraph = specGraph; + this.jobNameAndId = createJobNameAndId(jobName, jobId); this.config = config; + this.appDesc = appDesc; + this.configGenerator = configureGenerator; } - public static Config mergeJobConfig(Config fullConfig, Config generatedConfig) { - return new JobConfig(Util.rewriteConfig(extractScopedConfig( - fullConfig, generatedConfig, String.format(JobConfig.CONFIG_JOB_PREFIX(), new JobConfig(fullConfig).getName().get())))); - } - - public OperatorSpecGraph getSpecGraph() { - return this.specGraph; + static String createJobNameAndId(String jobName, String jobId) { + return String.format("%s-%s", jobName, jobId); } - public String getId() { - return id; + String getJobNameAndId() { + return jobNameAndId; } - public String getJobName() { + String getJobName() { return jobName; } - public String getJobId() { + String getJobId() { return jobId; } + Config getConfig() { + return config; + } + void addInEdge(StreamEdge in) { - inEdges.add(in); + inEdges.put(in.getStreamSpec().getId(), in); } void addOutEdge(StreamEdge out) { - outEdges.add(out); + outEdges.put(out.getStreamSpec().getId(), out); } - List<StreamEdge> getInEdges() { + void addTable(TableSpec tableSpec) { + tables.put(tableSpec.getId(), tableSpec); + } + + Map<String, StreamEdge> getInEdges() { return inEdges; } - List<StreamEdge> getOutEdges() { + Map<String, StreamEdge> getOutEdges() { return outEdges; } - void addTable(TableSpec tableSpec) { - tables.add(tableSpec); + Map<String, TableSpec> getTables() { + return tables; } /** @@ -130,250 +118,65 @@ public class JobNode { * @param executionPlanJson JSON representation of the execution plan * @return config of the job */ - public JobConfig generateConfig(String executionPlanJson) { - Map<String, String> configs = new HashMap<>(); - configs.put(JobConfig.JOB_NAME(), jobName); - configs.put(JobConfig.JOB_ID(), jobId); + JobConfig generateConfig(String executionPlanJson) { + return configGenerator.generateJobConfig(this, executionPlanJson); + } - final List<String> inputs = new ArrayList<>(); - final List<String> broadcasts = new ArrayList<>(); - for (StreamEdge inEdge : inEdges) { - String formattedSystemStream = inEdge.getName(); - if (inEdge.isBroadcast()) { - broadcasts.add(formattedSystemStream + "#0"); - } else { - inputs.add(formattedSystemStream); - } + KV<Serde, Serde> getInputSerdes(String streamId) { + if (!inEdges.containsKey(streamId)) { + return null; } + return appDesc.getStreamSerdes(streamId); + } - if (!broadcasts.isEmpty()) { - // TODO: remove this once we support defining broadcast input stream in high-level - // task.broadcast.input should be generated by the planner in the future. - final String taskBroadcasts = config.get(TaskConfigJava.BROADCAST_INPUT_STREAMS); - if (StringUtils.isNoneEmpty(taskBroadcasts)) { - broadcasts.add(taskBroadcasts); - } - configs.put(TaskConfigJava.BROADCAST_INPUT_STREAMS, Joiner.on(',').join(broadcasts)); + KV<Serde, Serde> getOutputSerde(String streamId) { + if (!outEdges.containsKey(streamId)) { + return null; } + return appDesc.getStreamSerdes(streamId); + } - // set triggering interval if a window or join is defined - if (specGraph.hasWindowOrJoins()) { - if ("-1".equals(config.get(TaskConfig.WINDOW_MS(), "-1"))) { - long triggerInterval = computeTriggerInterval(); - log.info("Using triggering interval: {} for jobName: {}", triggerInterval, jobName); + Collection<OperatorSpec> getReachableOperators() { + Set<OperatorSpec> inputOperatorsInJobNode = inEdges.values().stream().map(inEdge -> + appDesc.getInputOperators().get(inEdge.getStreamSpec().getId())).filter(Objects::nonNull).collect(Collectors.toSet()); + Set<OperatorSpec> reachableOperators = new HashSet<>(); + findReachableOperators(inputOperatorsInJobNode, reachableOperators); + return reachableOperators; + } - configs.put(TaskConfig.WINDOW_MS(), String.valueOf(triggerInterval)); - } + // get all next operators consuming from the input {@code streamId} + Set<String> getNextOperatorIds(String streamId) { + if (!appDesc.getInputOperators().containsKey(streamId) || !inEdges.containsKey(streamId)) { + return new HashSet<>(); } + return appDesc.getInputOperators().get(streamId).getRegisteredOperatorSpecs().stream() + .map(op -> op.getOpId()).collect(Collectors.toSet()); + } - specGraph.getAllOperatorSpecs().forEach(opSpec -> { - if (opSpec instanceof StatefulOperatorSpec) { - ((StatefulOperatorSpec) opSpec).getStoreDescriptors() - .forEach(sd -> configs.putAll(sd.getStorageConfigs())); - // store key and message serdes are configured separately in #addSerdeConfigs - } - }); - - configs.put(CONFIG_INTERNAL_EXECUTION_PLAN, executionPlanJson); - - // write input/output streams to configs - inEdges.stream().filter(StreamEdge::isIntermediate).forEach(edge -> configs.putAll(edge.generateConfig())); - - // write serialized serde instances and stream serde configs to configs - addSerdeConfigs(configs); - - configs.putAll(TableConfigGenerator.generateConfigsForTableSpecs(new MapConfig(configs), tables)); - - // Add side inputs to the inputs and mark the stream as bootstrap - tables.forEach(tableSpec -> { - List<String> sideInputs = tableSpec.getSideInputs(); - if (sideInputs != null && !sideInputs.isEmpty()) { - sideInputs.stream() - .map(sideInput -> StreamUtil.getSystemStreamFromNameOrId(config, sideInput)) - .forEach(systemStream -> { - inputs.add(StreamUtil.getNameFromSystemStream(systemStream)); - configs.put(String.format(StreamConfig.STREAM_PREFIX() + StreamConfig.BOOTSTRAP(), - systemStream.getSystem(), systemStream.getStream()), "true"); - }); - } - }); - - configs.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputs)); - - log.info("Job {} has generated configs {}", jobName, configs); - - String configPrefix = String.format(JobConfig.CONFIG_JOB_PREFIX(), jobName); - - // Disallow user specified job inputs/outputs. This info comes strictly from the user application. - Map<String, String> allowedConfigs = new HashMap<>(config); - if (allowedConfigs.containsKey(TaskConfig.INPUT_STREAMS())) { - log.warn("Specifying task inputs in configuration is not allowed with Fluent API. " - + "Ignoring configured value for " + TaskConfig.INPUT_STREAMS()); - allowedConfigs.remove(TaskConfig.INPUT_STREAMS()); + InputOperatorSpec getInputOperator(String inputStreamId) { + if (!inEdges.containsKey(inputStreamId)) { + return null; } - - log.debug("Job {} has allowed configs {}", jobName, allowedConfigs); - return new JobConfig( - Util.rewriteConfig( - extractScopedConfig(new MapConfig(allowedConfigs), new MapConfig(configs), configPrefix))); + return appDesc.getInputOperators().get(inputStreamId); } - /** - * Serializes the {@link Serde} instances for operators, adds them to the provided config, and - * sets the serde configuration for the input/output/intermediate streams appropriately. - * - * We try to preserve the number of Serde instances before and after serialization. However we don't - * guarantee that references shared between these serdes instances (e.g. an Jackson ObjectMapper shared - * between two json serdes) are shared after deserialization too. - * - * Ideally all the user defined objects in the application should be serialized and de-serialized in one pass - * from the same output/input stream so that we can maintain reference sharing relationships. - * - * @param configs the configs to add serialized serde instances and stream serde configs to - */ - void addSerdeConfigs(Map<String, String> configs) { - // collect all key and msg serde instances for streams - Map<String, Serde> streamKeySerdes = new HashMap<>(); - Map<String, Serde> streamMsgSerdes = new HashMap<>(); - Map<String, InputOperatorSpec> inputOperators = specGraph.getInputOperators(); - inEdges.forEach(edge -> { - String streamId = edge.getStreamSpec().getId(); - InputOperatorSpec inputOperatorSpec = inputOperators.get(streamId); - Serde keySerde = inputOperatorSpec.getKeySerde(); - if (keySerde != null) { - streamKeySerdes.put(streamId, keySerde); - } - Serde valueSerde = inputOperatorSpec.getValueSerde(); - if (valueSerde != null) { - streamMsgSerdes.put(streamId, valueSerde); - } - }); - Map<String, OutputStreamImpl> outputStreams = specGraph.getOutputStreams(); - outEdges.forEach(edge -> { - String streamId = edge.getStreamSpec().getId(); - OutputStreamImpl outputStream = outputStreams.get(streamId); - Serde keySerde = outputStream.getKeySerde(); - if (keySerde != null) { - streamKeySerdes.put(streamId, keySerde); - } - Serde valueSerde = outputStream.getValueSerde(); - if (valueSerde != null) { - streamMsgSerdes.put(streamId, valueSerde); - } - }); - - // collect all key and msg serde instances for stores - Map<String, Serde> storeKeySerdes = new HashMap<>(); - Map<String, Serde> storeMsgSerdes = new HashMap<>(); - specGraph.getAllOperatorSpecs().forEach(opSpec -> { - if (opSpec instanceof StatefulOperatorSpec) { - ((StatefulOperatorSpec) opSpec).getStoreDescriptors().forEach(storeDescriptor -> { - storeKeySerdes.put(storeDescriptor.getStoreName(), storeDescriptor.getKeySerde()); - storeMsgSerdes.put(storeDescriptor.getStoreName(), storeDescriptor.getMsgSerde()); - }); - } - }); - - // for each unique stream or store serde instance, generate a unique name and serialize to config - HashSet<Serde> serdes = new HashSet<>(streamKeySerdes.values()); - serdes.addAll(streamMsgSerdes.values()); - serdes.addAll(storeKeySerdes.values()); - serdes.addAll(storeMsgSerdes.values()); - SerializableSerde<Serde> serializableSerde = new SerializableSerde<>(); - Base64.Encoder base64Encoder = Base64.getEncoder(); - Map<Serde, String> serdeUUIDs = new HashMap<>(); - serdes.forEach(serde -> { - String serdeName = serdeUUIDs.computeIfAbsent(serde, - s -> serde.getClass().getSimpleName() + "-" + UUID.randomUUID().toString()); - configs.putIfAbsent(String.format(SerializerConfig.SERDE_SERIALIZED_INSTANCE(), serdeName), - base64Encoder.encodeToString(serializableSerde.toBytes(serde))); - }); - - // set key and msg serdes for streams to the serde names generated above - streamKeySerdes.forEach((streamId, serde) -> { - String streamIdPrefix = String.format(StreamConfig.STREAM_ID_PREFIX(), streamId); - String keySerdeConfigKey = streamIdPrefix + StreamConfig.KEY_SERDE(); - configs.put(keySerdeConfigKey, serdeUUIDs.get(serde)); - }); - - streamMsgSerdes.forEach((streamId, serde) -> { - String streamIdPrefix = String.format(StreamConfig.STREAM_ID_PREFIX(), streamId); - String valueSerdeConfigKey = streamIdPrefix + StreamConfig.MSG_SERDE(); - configs.put(valueSerdeConfigKey, serdeUUIDs.get(serde)); - }); - - // set key and msg serdes for stores to the serde names generated above - storeKeySerdes.forEach((storeName, serde) -> { - String keySerdeConfigKey = String.format(StorageConfig.KEY_SERDE(), storeName); - configs.put(keySerdeConfigKey, serdeUUIDs.get(serde)); - }); - - storeMsgSerdes.forEach((storeName, serde) -> { - String msgSerdeConfigKey = String.format(StorageConfig.MSG_SERDE(), storeName); - configs.put(msgSerdeConfigKey, serdeUUIDs.get(serde)); - }); + boolean isLegacyTaskApplication() { + return LegacyTaskApplication.class.isAssignableFrom(appDesc.getAppClass()); } - /** - * Computes the triggering interval to use during the execution of this {@link JobNode} - */ - private long computeTriggerInterval() { - // Obtain the operator specs from the specGraph - Collection<OperatorSpec> operatorSpecs = specGraph.getAllOperatorSpecs(); - - // Filter out window operators, and obtain a list of their triggering interval values - List<Long> windowTimerIntervals = operatorSpecs.stream() - .filter(spec -> spec.getOpCode() == OperatorSpec.OpCode.WINDOW) - .map(spec -> ((WindowOperatorSpec) spec).getDefaultTriggerMs()) - .collect(Collectors.toList()); - - // Filter out the join operators, and obtain a list of their ttl values - List<Long> joinTtlIntervals = operatorSpecs.stream() - .filter(spec -> spec instanceof JoinOperatorSpec) - .map(spec -> ((JoinOperatorSpec) spec).getTtlMs()) - .collect(Collectors.toList()); - - // Combine both the above lists - List<Long> candidateTimerIntervals = new ArrayList<>(joinTtlIntervals); - candidateTimerIntervals.addAll(windowTimerIntervals); - - if (candidateTimerIntervals.isEmpty()) { - return -1; - } - - // Compute the gcd of the resultant list - return MathUtil.gcd(candidateTimerIntervals); + KV<Serde, Serde> getTableSerdes(String tableId) { + //TODO: SAMZA-1893: should test whether the table is used in the current JobNode + return appDesc.getTableSerdes(tableId); } - /** - * This function extract the subset of configs from the full config, and use it to override the generated configs - * from the job. - * @param fullConfig full config - * @param generatedConfig config generated for the job - * @param configPrefix prefix to extract the subset of the config overrides - * @return config that merges the generated configs and overrides - */ - private static Config extractScopedConfig(Config fullConfig, Config generatedConfig, String configPrefix) { - Config scopedConfig = fullConfig.subset(configPrefix); - - Config[] configPrecedence = new Config[] {fullConfig, generatedConfig, scopedConfig}; - // Strip empty configs so they don't override the configs before them. - Map<String, String> mergedConfig = new HashMap<>(); - for (Map<String, String> config : configPrecedence) { - for (Map.Entry<String, String> property : config.entrySet()) { - String value = property.getValue(); - if (!(value == null || value.isEmpty())) { - mergedConfig.put(property.getKey(), property.getValue()); + private void findReachableOperators(Collection<OperatorSpec> inputOperatorsInJobNode, + Set<OperatorSpec> reachableOperators) { + inputOperatorsInJobNode.forEach(op -> { + if (reachableOperators.contains(op)) { + return; } - } - } - scopedConfig = new MapConfig(mergedConfig); - log.debug("Prefix '{}' has merged config {}", configPrefix, scopedConfig); - - return scopedConfig; - } - - static String createId(String jobName, String jobId) { - return String.format("%s-%s", jobName, jobId); + reachableOperators.add(op); + findReachableOperators(op.getRegisteredOperatorSpecs(), reachableOperators); + }); } }
http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java new file mode 100644 index 0000000..676d28e --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.execution; + +import com.google.common.base.Joiner; +import java.util.ArrayList; +import java.util.Base64; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; +import org.apache.samza.config.Config; +import org.apache.samza.config.JavaTableConfig; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.SerializerConfig; +import org.apache.samza.config.StorageConfig; +import org.apache.samza.config.StreamConfig; +import org.apache.samza.config.TaskConfig; +import org.apache.samza.config.TaskConfigJava; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.spec.JoinOperatorSpec; +import org.apache.samza.operators.spec.OperatorSpec; +import org.apache.samza.operators.spec.StatefulOperatorSpec; +import org.apache.samza.operators.spec.StoreDescriptor; +import org.apache.samza.operators.spec.WindowOperatorSpec; +import org.apache.samza.serializers.NoOpSerde; +import org.apache.samza.serializers.Serde; +import org.apache.samza.serializers.SerializableSerde; +import org.apache.samza.table.TableConfigGenerator; +import org.apache.samza.table.TableSpec; +import org.apache.samza.util.MathUtil; +import org.apache.samza.util.StreamUtil; +import org.apache.samza.util.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * This class provides methods to generate configuration for a {@link JobNode} + */ +/* package private */ class JobNodeConfigurationGenerator { + + private static final Logger LOG = LoggerFactory.getLogger(JobNodeConfigurationGenerator.class); + + static final String CONFIG_INTERNAL_EXECUTION_PLAN = "samza.internal.execution.plan"; + + static JobConfig mergeJobConfig(Config originalConfig, Config generatedConfig) { + JobConfig jobConfig = new JobConfig(originalConfig); + String jobNameAndId = JobNode.createJobNameAndId(jobConfig.getName().get(), jobConfig.getJobId()); + return new JobConfig(Util.rewriteConfig(extractScopedConfig(originalConfig, generatedConfig, + String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), jobNameAndId)))); + } + + JobConfig generateJobConfig(JobNode jobNode, String executionPlanJson) { + Map<String, String> configs = new HashMap<>(); + // set up job name and job ID + configs.put(JobConfig.JOB_NAME(), jobNode.getJobName()); + configs.put(JobConfig.JOB_ID(), jobNode.getJobId()); + + Map<String, StreamEdge> inEdges = jobNode.getInEdges(); + Map<String, StreamEdge> outEdges = jobNode.getOutEdges(); + Collection<OperatorSpec> reachableOperators = jobNode.getReachableOperators(); + List<StoreDescriptor> stores = getStoreDescriptors(reachableOperators); + Map<String, TableSpec> reachableTables = getReachableTables(reachableOperators, jobNode); + Config config = jobNode.getConfig(); + + // check all inputs to the node for broadcast and input streams + final Set<String> inputs = new HashSet<>(); + final Set<String> broadcasts = new HashSet<>(); + for (StreamEdge inEdge : inEdges.values()) { + String formattedSystemStream = inEdge.getName(); + if (inEdge.isBroadcast()) { + broadcasts.add(formattedSystemStream + "#0"); + } else { + inputs.add(formattedSystemStream); + } + } + + configureBroadcastInputs(configs, config, broadcasts); + + // compute window and join operator intervals in this node + configureWindowInterval(configs, config, reachableOperators); + + // set store configuration for stateful operators. + stores.forEach(sd -> configs.putAll(sd.getStorageConfigs())); + + // set the execution plan in json + configs.put(CONFIG_INTERNAL_EXECUTION_PLAN, executionPlanJson); + + // write intermediate input/output streams to configs + inEdges.values().stream().filter(StreamEdge::isIntermediate).forEach(edge -> configs.putAll(edge.generateConfig())); + + // write serialized serde instances and stream, store, and table serdes to configs + // serde configuration generation has to happen before table configuration, since the serde configuration + // is required when generating configurations for some TableProvider (i.e. local store backed tables) + configureSerdes(configs, inEdges, outEdges, stores, reachableTables.keySet(), jobNode); + + // generate table configuration and potential side input configuration + configureTables(configs, config, reachableTables, inputs); + + // finalize the task.inputs configuration + configs.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputs)); + + LOG.info("Job {} has generated configs {}", jobNode.getJobNameAndId(), configs); + + // apply configure rewriters and user configure overrides + return applyConfigureRewritersAndOverrides(configs, config, jobNode); + } + + private Map<String, TableSpec> getReachableTables(Collection<OperatorSpec> reachableOperators, JobNode jobNode) { + // TODO: Fix this in SAMZA-1893. For now, returning all tables for single-job execution plan + return jobNode.getTables(); + } + + private void configureBroadcastInputs(Map<String, String> configs, Config config, Set<String> broadcastStreams) { + // TODO: SAMZA-1841: remove this once we support defining broadcast input stream in high-level + // task.broadcast.input should be generated by the planner in the future. + if (broadcastStreams.isEmpty()) { + return; + } + final String taskBroadcasts = config.get(TaskConfigJava.BROADCAST_INPUT_STREAMS); + if (StringUtils.isNoneEmpty(taskBroadcasts)) { + broadcastStreams.add(taskBroadcasts); + } + configs.put(TaskConfigJava.BROADCAST_INPUT_STREAMS, Joiner.on(',').join(broadcastStreams)); + } + + private void configureWindowInterval(Map<String, String> configs, Config config, + Collection<OperatorSpec> reachableOperators) { + if (!reachableOperators.stream().anyMatch(op -> op.getOpCode() == OperatorSpec.OpCode.WINDOW + || op.getOpCode() == OperatorSpec.OpCode.JOIN)) { + return; + } + + // set triggering interval if a window or join is defined. Only applies to high-level applications + if ("-1".equals(config.get(TaskConfig.WINDOW_MS(), "-1"))) { + long triggerInterval = computeTriggerInterval(reachableOperators); + LOG.info("Using triggering interval: {}", triggerInterval); + + configs.put(TaskConfig.WINDOW_MS(), String.valueOf(triggerInterval)); + } + } + + /** + * Computes the triggering interval to use during the execution of this {@link JobNode} + */ + private long computeTriggerInterval(Collection<OperatorSpec> reachableOperators) { + List<Long> windowTimerIntervals = reachableOperators.stream() + .filter(spec -> spec.getOpCode() == OperatorSpec.OpCode.WINDOW) + .map(spec -> ((WindowOperatorSpec) spec).getDefaultTriggerMs()) + .collect(Collectors.toList()); + + // Filter out the join operators, and obtain a list of their ttl values + List<Long> joinTtlIntervals = reachableOperators.stream() + .filter(spec -> spec instanceof JoinOperatorSpec) + .map(spec -> ((JoinOperatorSpec) spec).getTtlMs()) + .collect(Collectors.toList()); + + // Combine both the above lists + List<Long> candidateTimerIntervals = new ArrayList<>(joinTtlIntervals); + candidateTimerIntervals.addAll(windowTimerIntervals); + + if (candidateTimerIntervals.isEmpty()) { + return -1; + } + + // Compute the gcd of the resultant list + return MathUtil.gcd(candidateTimerIntervals); + } + + private JobConfig applyConfigureRewritersAndOverrides(Map<String, String> configs, Config config, JobNode jobNode) { + // Disallow user specified job inputs/outputs. This info comes strictly from the user application. + Map<String, String> allowedConfigs = new HashMap<>(config); + if (!jobNode.isLegacyTaskApplication()) { + if (allowedConfigs.containsKey(TaskConfig.INPUT_STREAMS())) { + LOG.warn("Specifying task inputs in configuration is not allowed for SamzaApplication. " + + "Ignoring configured value for " + TaskConfig.INPUT_STREAMS()); + allowedConfigs.remove(TaskConfig.INPUT_STREAMS()); + } + } + + LOG.debug("Job {} has allowed configs {}", jobNode.getJobNameAndId(), allowedConfigs); + return mergeJobConfig(new MapConfig(allowedConfigs), new MapConfig(configs)); + } + + /** + * This function extract the subset of configs from the full config, and use it to override the generated configs + * from the job. + * @param fullConfig full config + * @param generatedConfig config generated for the job + * @param configPrefix prefix to extract the subset of the config overrides + * @return config that merges the generated configs and overrides + */ + private static Config extractScopedConfig(Config fullConfig, Config generatedConfig, String configPrefix) { + Config scopedConfig = fullConfig.subset(configPrefix); + + Config[] configPrecedence = new Config[] {fullConfig, generatedConfig, scopedConfig}; + // Strip empty configs so they don't override the configs before them. + Map<String, String> mergedConfig = new HashMap<>(); + for (Map<String, String> config : configPrecedence) { + for (Map.Entry<String, String> property : config.entrySet()) { + String value = property.getValue(); + if (!(value == null || value.isEmpty())) { + mergedConfig.put(property.getKey(), property.getValue()); + } + } + } + scopedConfig = new MapConfig(mergedConfig); + LOG.debug("Prefix '{}' has merged config {}", configPrefix, scopedConfig); + + return scopedConfig; + } + + private List<StoreDescriptor> getStoreDescriptors(Collection<OperatorSpec> reachableOperators) { + return reachableOperators.stream().filter(operatorSpec -> operatorSpec instanceof StatefulOperatorSpec) + .map(operatorSpec -> ((StatefulOperatorSpec) operatorSpec).getStoreDescriptors()).flatMap(Collection::stream) + .collect(Collectors.toList()); + } + + private void configureTables(Map<String, String> configs, Config config, Map<String, TableSpec> tables, Set<String> inputs) { + configs.putAll(TableConfigGenerator.generateConfigsForTableSpecs(new MapConfig(configs), + tables.values().stream().collect(Collectors.toList()))); + + // Add side inputs to the inputs and mark the stream as bootstrap + tables.values().forEach(tableSpec -> { + List<String> sideInputs = tableSpec.getSideInputs(); + if (sideInputs != null && !sideInputs.isEmpty()) { + sideInputs.stream() + .map(sideInput -> StreamUtil.getSystemStreamFromNameOrId(config, sideInput)) + .forEach(systemStream -> { + inputs.add(StreamUtil.getNameFromSystemStream(systemStream)); + configs.put(String.format(StreamConfig.STREAM_PREFIX() + StreamConfig.BOOTSTRAP(), + systemStream.getSystem(), systemStream.getStream()), "true"); + }); + } + }); + } + + /** + * Serializes the {@link Serde} instances for operators, adds them to the provided config, and + * sets the serde configuration for the input/output/intermediate streams appropriately. + * + * We try to preserve the number of Serde instances before and after serialization. However we don't + * guarantee that references shared between these serdes instances (e.g. an Jackson ObjectMapper shared + * between two json serdes) are shared after deserialization too. + * + * Ideally all the user defined objects in the application should be serialized and de-serialized in one pass + * from the same output/input stream so that we can maintain reference sharing relationships. + * + * @param configs the configs to add serialized serde instances and stream serde configs to + */ + private void configureSerdes(Map<String, String> configs, Map<String, StreamEdge> inEdges, Map<String, StreamEdge> outEdges, + List<StoreDescriptor> stores, Collection<String> tables, JobNode jobNode) { + // collect all key and msg serde instances for streams + Map<String, Serde> streamKeySerdes = new HashMap<>(); + Map<String, Serde> streamMsgSerdes = new HashMap<>(); + inEdges.keySet().forEach(streamId -> + addSerdes(jobNode.getInputSerdes(streamId), streamId, streamKeySerdes, streamMsgSerdes)); + outEdges.keySet().forEach(streamId -> + addSerdes(jobNode.getOutputSerde(streamId), streamId, streamKeySerdes, streamMsgSerdes)); + + Map<String, Serde> storeKeySerdes = new HashMap<>(); + Map<String, Serde> storeMsgSerdes = new HashMap<>(); + stores.forEach(storeDescriptor -> { + storeKeySerdes.put(storeDescriptor.getStoreName(), storeDescriptor.getKeySerde()); + storeMsgSerdes.put(storeDescriptor.getStoreName(), storeDescriptor.getMsgSerde()); + }); + + Map<String, Serde> tableKeySerdes = new HashMap<>(); + Map<String, Serde> tableMsgSerdes = new HashMap<>(); + tables.forEach(tableId -> { + addSerdes(jobNode.getTableSerdes(tableId), tableId, tableKeySerdes, tableMsgSerdes); + }); + + // for each unique stream or store serde instance, generate a unique name and serialize to config + HashSet<Serde> serdes = new HashSet<>(streamKeySerdes.values()); + serdes.addAll(streamMsgSerdes.values()); + serdes.addAll(storeKeySerdes.values()); + serdes.addAll(storeMsgSerdes.values()); + serdes.addAll(tableKeySerdes.values()); + serdes.addAll(tableMsgSerdes.values()); + SerializableSerde<Serde> serializableSerde = new SerializableSerde<>(); + Base64.Encoder base64Encoder = Base64.getEncoder(); + Map<Serde, String> serdeUUIDs = new HashMap<>(); + serdes.forEach(serde -> { + String serdeName = serdeUUIDs.computeIfAbsent(serde, + s -> serde.getClass().getSimpleName() + "-" + UUID.randomUUID().toString()); + configs.putIfAbsent(String.format(SerializerConfig.SERDE_SERIALIZED_INSTANCE(), serdeName), + base64Encoder.encodeToString(serializableSerde.toBytes(serde))); + }); + + // set key and msg serdes for streams to the serde names generated above + streamKeySerdes.forEach((streamId, serde) -> { + String streamIdPrefix = String.format(StreamConfig.STREAM_ID_PREFIX(), streamId); + String keySerdeConfigKey = streamIdPrefix + StreamConfig.KEY_SERDE(); + configs.put(keySerdeConfigKey, serdeUUIDs.get(serde)); + }); + + streamMsgSerdes.forEach((streamId, serde) -> { + String streamIdPrefix = String.format(StreamConfig.STREAM_ID_PREFIX(), streamId); + String valueSerdeConfigKey = streamIdPrefix + StreamConfig.MSG_SERDE(); + configs.put(valueSerdeConfigKey, serdeUUIDs.get(serde)); + }); + + // set key and msg serdes for stores to the serde names generated above + storeKeySerdes.forEach((storeName, serde) -> { + String keySerdeConfigKey = String.format(StorageConfig.KEY_SERDE(), storeName); + configs.put(keySerdeConfigKey, serdeUUIDs.get(serde)); + }); + + storeMsgSerdes.forEach((storeName, serde) -> { + String msgSerdeConfigKey = String.format(StorageConfig.MSG_SERDE(), storeName); + configs.put(msgSerdeConfigKey, serdeUUIDs.get(serde)); + }); + + // set key and msg serdes for stores to the serde names generated above + tableKeySerdes.forEach((tableId, serde) -> { + String keySerdeConfigKey = String.format(JavaTableConfig.TABLE_KEY_SERDE, tableId); + configs.put(keySerdeConfigKey, serdeUUIDs.get(serde)); + }); + + tableMsgSerdes.forEach((tableId, serde) -> { + String valueSerdeConfigKey = String.format(JavaTableConfig.TABLE_VALUE_SERDE, tableId); + configs.put(valueSerdeConfigKey, serdeUUIDs.get(serde)); + }); + } + + private void addSerdes(KV<Serde, Serde> serdes, String streamId, Map<String, Serde> keySerdeMap, + Map<String, Serde> msgSerdeMap) { + if (serdes != null) { + if (serdes.getKey() != null && !(serdes.getKey() instanceof NoOpSerde)) { + keySerdeMap.put(streamId, serdes.getKey()); + } + if (serdes.getValue() != null && !(serdes.getValue() instanceof NoOpSerde)) { + msgSerdeMap.put(streamId, serdes.getValue()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java index a2050e5..abbec18 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java @@ -20,29 +20,20 @@ package org.apache.samza.execution; import java.io.File; import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import org.apache.commons.lang3.StringUtils; -import org.apache.samza.SamzaException; import org.apache.samza.application.ApplicationDescriptor; import org.apache.samza.application.ApplicationDescriptorImpl; -import org.apache.samza.application.StreamApplicationDescriptorImpl; -import org.apache.samza.application.TaskApplicationDescriptorImpl; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.ShellCommandConfig; import org.apache.samza.config.StreamConfig; -import org.apache.samza.operators.BaseTableDescriptor; -import org.apache.samza.operators.OperatorSpecGraph; -import org.apache.samza.table.TableConfigGenerator; -import org.apache.samza.table.TableSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,22 +55,7 @@ public abstract class JobPlanner { this.config = descriptor.getConfig(); } - public List<JobConfig> prepareJobs() { - String appId = new ApplicationConfig(appDesc.getConfig()).getGlobalAppId(); - if (appDesc instanceof TaskApplicationDescriptorImpl) { - return Collections.singletonList(prepareTaskJob((TaskApplicationDescriptorImpl) appDesc)); - } else if (appDesc instanceof StreamApplicationDescriptorImpl) { - try { - return prepareStreamJobs((StreamApplicationDescriptorImpl) appDesc); - } catch (Exception e) { - throw new SamzaException("Failed to generate JobConfig for StreamApplication " + appId, e); - } - } - throw new IllegalArgumentException(String.format("ApplicationDescriptorImpl has to be either TaskApplicationDescriptorImpl or " - + "StreamApplicationDescriptorImpl. class %s is not supported", appDesc.getClass().getName())); - } - - abstract List<JobConfig> prepareStreamJobs(StreamApplicationDescriptorImpl streamAppDesc) throws Exception; + public abstract List<JobConfig> prepareJobs(); StreamManager buildAndStartStreamManager(Config config) { StreamManager streamManager = new StreamManager(config); @@ -87,12 +63,12 @@ public abstract class JobPlanner { return streamManager; } - ExecutionPlan getExecutionPlan(OperatorSpecGraph specGraph) { - return getExecutionPlan(specGraph, null); + ExecutionPlan getExecutionPlan() { + return getExecutionPlan(null); } /* package private */ - ExecutionPlan getExecutionPlan(OperatorSpecGraph specGraph, String runId) { + ExecutionPlan getExecutionPlan(String runId) { // update application configs Map<String, String> cfg = new HashMap<>(); @@ -101,8 +77,8 @@ public abstract class JobPlanner { } StreamConfig streamConfig = new StreamConfig(config); - Set<String> inputStreams = new HashSet<>(specGraph.getInputOperators().keySet()); - inputStreams.removeAll(specGraph.getOutputStreams().keySet()); + Set<String> inputStreams = new HashSet<>(appDesc.getInputStreamIds()); + inputStreams.removeAll(appDesc.getOutputStreamIds()); ApplicationConfig.ApplicationMode mode = inputStreams.stream().allMatch(streamConfig::getIsBounded) ? ApplicationConfig.ApplicationMode.BATCH : ApplicationConfig.ApplicationMode.STREAM; cfg.put(ApplicationConfig.APP_MODE, mode.name()); @@ -117,12 +93,12 @@ public abstract class JobPlanner { // create the physical execution plan and merge with overrides. This works for a single-stage job now // TODO: This should all be consolidated with ExecutionPlanner after fixing SAMZA-1811 - Config mergedConfig = JobNode.mergeJobConfig(config, new MapConfig(cfg)); + Config mergedConfig = JobNodeConfigurationGenerator.mergeJobConfig(config, new MapConfig(cfg)); // creating the StreamManager to get all input/output streams' metadata for planning StreamManager streamManager = buildAndStartStreamManager(mergedConfig); try { ExecutionPlanner planner = new ExecutionPlanner(mergedConfig, streamManager); - return planner.plan(specGraph); + return planner.plan(appDesc); } finally { streamManager.stop(); } @@ -149,25 +125,6 @@ public abstract class JobPlanner { } } - // TODO: SAMZA-1814: the following configuration generation still misses serde configuration generation, - // side input configuration, broadcast input and task inputs configuration generation for low-level task - // applications - // helper method to generate a single node job configuration for low level task applications - private JobConfig prepareTaskJob(TaskApplicationDescriptorImpl taskAppDesc) { - // copy original configure - Map<String, String> cfg = new HashMap<>(); - // expand system and streams configure - Map<String, String> systemStreamConfigs = expandSystemStreamConfigs(taskAppDesc); - cfg.putAll(systemStreamConfigs); - // expand table configure - cfg.putAll(expandTableConfigs(cfg, taskAppDesc)); - // adding app.class in the configuration - cfg.put(ApplicationConfig.APP_CLASS, appDesc.getAppClass().getName()); - // create the physical execution plan and merge with overrides. This works for a single-stage job now - // TODO: This should all be consolidated with ExecutionPlanner after fixing SAMZA-1811 - return new JobConfig(JobNode.mergeJobConfig(config, new MapConfig(cfg))); - } - private Map<String, String> expandSystemStreamConfigs(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) { Map<String, String> systemStreamConfigs = new HashMap<>(); appDesc.getInputDescriptors().forEach((key, value) -> systemStreamConfigs.putAll(value.toConfig())); @@ -177,12 +134,4 @@ public abstract class JobPlanner { systemStreamConfigs.put(JobConfig.JOB_DEFAULT_SYSTEM(), dsd.getSystemName())); return systemStreamConfigs; } - - private Map<String, String> expandTableConfigs(Map<String, String> originConfig, - ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) { - List<TableSpec> tableSpecs = new ArrayList<>(); - appDesc.getTableDescriptors().stream().map(td -> ((BaseTableDescriptor) td).getTableSpec()) - .forEach(spec -> tableSpecs.add(spec)); - return TableConfigGenerator.generateConfigsForTableSpecs(new MapConfig(originConfig), tableSpecs); - } } http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java index 7996d6b..86aca0f 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java +++ b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java @@ -25,7 +25,6 @@ import java.util.concurrent.TimeoutException; import org.apache.samza.SamzaException; import org.apache.samza.application.ApplicationDescriptor; import org.apache.samza.application.ApplicationDescriptorImpl; -import org.apache.samza.application.StreamApplicationDescriptorImpl; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.JobConfig; import org.apache.samza.config.JobCoordinatorConfig; @@ -37,7 +36,7 @@ import org.slf4j.LoggerFactory; /** - * Temporarily helper class with specific implementation of {@link JobPlanner#prepareStreamJobs(StreamApplicationDescriptorImpl)} + * Temporarily helper class with specific implementation of {@link JobPlanner#prepareJobs()} * for standalone Samza processors. * * TODO: we need to consolidate this with {@link ExecutionPlanner} after SAMZA-1811. @@ -53,17 +52,23 @@ public class LocalJobPlanner extends JobPlanner { } @Override - List<JobConfig> prepareStreamJobs(StreamApplicationDescriptorImpl streamAppDesc) throws Exception { + public List<JobConfig> prepareJobs() { // for high-level DAG, generating the plan and job configs // 1. initialize and plan - ExecutionPlan plan = getExecutionPlan(streamAppDesc.getOperatorSpecGraph()); + ExecutionPlan plan = getExecutionPlan(); - String executionPlanJson = plan.getPlanAsJson(); + String executionPlanJson = ""; + try { + executionPlanJson = plan.getPlanAsJson(); + } catch (Exception e) { + throw new SamzaException("Failed to create plan JSON.", e); + } writePlanJsonFile(executionPlanJson); LOG.info("Execution Plan: \n" + executionPlanJson); String planId = String.valueOf(executionPlanJson.hashCode()); - if (plan.getJobConfigs().isEmpty()) { + List<JobConfig> jobConfigs = plan.getJobConfigs(); + if (jobConfigs.isEmpty()) { throw new SamzaException("No jobs in the plan."); } @@ -71,7 +76,7 @@ public class LocalJobPlanner extends JobPlanner { // TODO: System generated intermediate streams should have robust naming scheme. See SAMZA-1391 // TODO: this works for single-job applications. For multi-job applications, ExecutionPlan should return an AppConfig // to be used for the whole application - JobConfig jobConfig = plan.getJobConfigs().get(0); + JobConfig jobConfig = jobConfigs.get(0); StreamManager streamManager = null; try { // create the StreamManager to create intermediate streams in the plan @@ -82,7 +87,7 @@ public class LocalJobPlanner extends JobPlanner { streamManager.stop(); } } - return plan.getJobConfigs(); + return jobConfigs; } /** http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java b/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java index aa1dff9..ca91214 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java +++ b/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java @@ -27,15 +27,14 @@ import java.util.HashSet; import java.util.Set; import java.util.function.Consumer; import java.util.function.Function; -import org.apache.samza.operators.OperatorSpecGraph; import org.apache.samza.operators.spec.InputOperatorSpec; import org.apache.samza.operators.spec.JoinOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; /** - * A utility class that encapsulates the logic for traversing an {@link OperatorSpecGraph} and building - * associations between related {@link OperatorSpec}s. + * A utility class that encapsulates the logic for traversing operators in the graph from the set of {@link InputOperatorSpec} + * and building associations between related {@link OperatorSpec}s. */ /* package private */ class OperatorSpecGraphAnalyzer { @@ -43,14 +42,13 @@ import org.apache.samza.operators.spec.OperatorSpec; * Returns a grouping of {@link InputOperatorSpec}s by the joins, i.e. {@link JoinOperatorSpec}s, they participate in. */ public static Multimap<JoinOperatorSpec, InputOperatorSpec> getJoinToInputOperatorSpecs( - OperatorSpecGraph operatorSpecGraph) { + Collection<InputOperatorSpec> inputOperatorSpecs) { Multimap<JoinOperatorSpec, InputOperatorSpec> joinOpSpecToInputOpSpecs = HashMultimap.create(); // Traverse graph starting from every input operator spec, observing connectivity between input operator specs // and Join operator specs. - Iterable<InputOperatorSpec> inputOpSpecs = operatorSpecGraph.getInputOperators().values(); - for (InputOperatorSpec inputOpSpec : inputOpSpecs) { + for (InputOperatorSpec inputOpSpec : inputOperatorSpecs) { // Observe all join operator specs reachable from this input operator spec. JoinOperatorSpecVisitor joinOperatorSpecVisitor = new JoinOperatorSpecVisitor(); traverse(inputOpSpec, joinOperatorSpecVisitor, opSpec -> opSpec.getRegisteredOperatorSpecs()); @@ -77,7 +75,7 @@ import org.apache.samza.operators.spec.OperatorSpec; } /** - * An {@link OperatorSpecGraph} visitor that records all {@link JoinOperatorSpec}s encountered in the graph. + * An visitor that records all {@link JoinOperatorSpec}s encountered in the graph of {@link OperatorSpec}s */ private static class JoinOperatorSpecVisitor implements Consumer<OperatorSpec> { private Set<JoinOperatorSpec> joinOpSpecs = new HashSet<>(); http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java index 254ff97..54f86d5 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java +++ b/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java @@ -23,7 +23,6 @@ import java.util.UUID; import org.apache.samza.SamzaException; import org.apache.samza.application.ApplicationDescriptor; import org.apache.samza.application.ApplicationDescriptorImpl; -import org.apache.samza.application.StreamApplicationDescriptorImpl; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; @@ -34,7 +33,7 @@ import org.slf4j.LoggerFactory; /** - * Temporary helper class with specific implementation of {@link JobPlanner#prepareStreamJobs(StreamApplicationDescriptorImpl)} + * Temporary helper class with specific implementation of {@link JobPlanner#prepareJobs()} * for remote-launched Samza processors (e.g. in YARN). * * TODO: we need to consolidate this class with {@link ExecutionPlanner} after SAMZA-1811. @@ -47,7 +46,7 @@ public class RemoteJobPlanner extends JobPlanner { } @Override - List<JobConfig> prepareStreamJobs(StreamApplicationDescriptorImpl streamAppDesc) throws Exception { + public List<JobConfig> prepareJobs() { // for high-level DAG, generate the plan and job configs // TODO: run.id needs to be set for standalone: SAMZA-1531 // run.id is based on current system time with the most significant bits in UUID (8 digits) to avoid collision @@ -55,17 +54,22 @@ public class RemoteJobPlanner extends JobPlanner { LOG.info("The run id for this run is {}", runId); // 1. initialize and plan - ExecutionPlan plan = getExecutionPlan(streamAppDesc.getOperatorSpecGraph(), runId); - writePlanJsonFile(plan.getPlanAsJson()); + ExecutionPlan plan = getExecutionPlan(runId); + try { + writePlanJsonFile(plan.getPlanAsJson()); + } catch (Exception e) { + throw new SamzaException("Failed to create plan JSON.", e); + } - if (plan.getJobConfigs().isEmpty()) { + List<JobConfig> jobConfigs = plan.getJobConfigs(); + if (jobConfigs.isEmpty()) { throw new SamzaException("No jobs in the plan."); } // 2. create the necessary streams // TODO: this works for single-job applications. For multi-job applications, ExecutionPlan should return an AppConfig // to be used for the whole application - JobConfig jobConfig = plan.getJobConfigs().get(0); + JobConfig jobConfig = jobConfigs.get(0); StreamManager streamManager = null; try { // create the StreamManager to create intermediate streams in the plan @@ -79,7 +83,7 @@ public class RemoteJobPlanner extends JobPlanner { streamManager.stop(); } } - return plan.getJobConfigs(); + return jobConfigs; } private Config getConfigFromPrevRun() { http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java index 1e4194a..1830d1c 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java +++ b/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java @@ -73,6 +73,15 @@ abstract public class BaseTableDescriptor<K, V, D extends BaseTableDescriptor<K, } /** + * Get the serde assigned to this {@link TableDescriptor} + * + * @return {@link KVSerde} used by this table + */ + public KVSerde<K, V> getSerde() { + return serde; + } + + /** * Generate config for {@link TableSpec}; this method is used internally. * @param tableSpecConfig configuration for the {@link TableSpec} */ http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java b/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java index b75b1e8..5329fd7 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java +++ b/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java @@ -30,7 +30,6 @@ import org.apache.samza.operators.spec.InputOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.OutputStreamImpl; import org.apache.samza.serializers.SerializableSerde; -import org.apache.samza.table.TableSpec; /** @@ -45,7 +44,6 @@ public class OperatorSpecGraph implements Serializable { private final Map<String, InputOperatorSpec> inputOperators; private final Map<String, OutputStreamImpl> outputStreams; private final Set<String> broadcastStreams; - private final Map<TableSpec, TableImpl> tables; private final Set<OperatorSpec> allOpSpecs; private final boolean hasWindowOrJoins; @@ -57,7 +55,6 @@ public class OperatorSpecGraph implements Serializable { this.inputOperators = streamAppDesc.getInputOperators(); this.outputStreams = streamAppDesc.getOutputStreams(); this.broadcastStreams = streamAppDesc.getBroadcastStreams(); - this.tables = streamAppDesc.getTables(); this.allOpSpecs = Collections.unmodifiableSet(this.findAllOperatorSpecs()); this.hasWindowOrJoins = checkWindowOrJoins(); this.serializedOpSpecGraph = opSpecGraphSerde.toBytes(this); @@ -75,10 +72,6 @@ public class OperatorSpecGraph implements Serializable { return broadcastStreams; } - public Map<TableSpec, TableImpl> getTables() { - return tables; - } - /** * Get all {@link OperatorSpec}s available in this {@link StreamApplicationDescriptorImpl} * http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java index 98864d2..b9bb1f6 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java @@ -75,7 +75,7 @@ public class LocalContainerRunner { throw new SamzaException("can not find the job name"); } String jobName = jobConfig.getName().get(); - String jobId = jobConfig.getJobId().getOrElse(ScalaJavaUtil.defaultValue("1")); + String jobId = jobConfig.getJobId(); MDC.put("containerName", "samza-container-" + containerId); MDC.put("jobName", jobName); MDC.put("jobId", jobId); http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java b/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java index 085131c..03be758 100644 --- a/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java +++ b/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java @@ -20,22 +20,16 @@ package org.apache.samza.table; import java.util.ArrayList; -import java.util.Base64; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.UUID; import org.apache.samza.config.Config; import org.apache.samza.config.JavaTableConfig; -import org.apache.samza.config.SerializerConfig; import org.apache.samza.operators.BaseTableDescriptor; import org.apache.samza.operators.TableDescriptor; import org.apache.samza.operators.TableImpl; -import org.apache.samza.serializers.Serde; -import org.apache.samza.serializers.SerializableSerde; import org.apache.samza.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,8 +60,6 @@ public class TableConfigGenerator { static public Map<String, String> generateConfigsForTableSpecs(Config config, List<TableSpec> tableSpecs) { Map<String, String> tableConfigs = new HashMap<>(); - tableConfigs.putAll(generateTableKVSerdeConfigs(tableSpecs)); - tableSpecs.forEach(tableSpec -> { // Add table provider factory config tableConfigs.put(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, tableSpec.getId()), @@ -103,44 +95,4 @@ public class TableConfigGenerator { }); return new ArrayList<>(tableSpecs.keySet()); } - - static private Map<String, String> generateTableKVSerdeConfigs(List<TableSpec> tableSpecs) { - Map<String, String> serdeConfigs = new HashMap<>(); - - // Collect key and msg serde instances for all the tables - Map<String, Serde> tableKeySerdes = new HashMap<>(); - Map<String, Serde> tableValueSerdes = new HashMap<>(); - HashSet<Serde> serdes = new HashSet<>(); - - tableSpecs.forEach(tableSpec -> { - tableKeySerdes.put(tableSpec.getId(), tableSpec.getSerde().getKeySerde()); - tableValueSerdes.put(tableSpec.getId(), tableSpec.getSerde().getValueSerde()); - }); - serdes.addAll(tableKeySerdes.values()); - serdes.addAll(tableValueSerdes.values()); - - // Generate serde names - SerializableSerde<Serde> serializableSerde = new SerializableSerde<>(); - Base64.Encoder base64Encoder = Base64.getEncoder(); - Map<Serde, String> serdeUUIDs = new HashMap<>(); - serdes.forEach(serde -> { - String serdeName = serdeUUIDs.computeIfAbsent(serde, - s -> serde.getClass().getSimpleName() + "-" + UUID.randomUUID().toString()); - serdeConfigs.putIfAbsent(String.format(SerializerConfig.SERDE_SERIALIZED_INSTANCE(), serdeName), - base64Encoder.encodeToString(serializableSerde.toBytes(serde))); - }); - - // Set key and msg serdes for tables to the serde names generated above - tableKeySerdes.forEach((tableId, serde) -> { - String keySerdeConfigKey = String.format(JavaTableConfig.TABLE_KEY_SERDE, tableId); - serdeConfigs.put(keySerdeConfigKey, serdeUUIDs.get(serde)); - }); - - tableValueSerdes.forEach((tableId, serde) -> { - String valueSerdeConfigKey = String.format(JavaTableConfig.TABLE_VALUE_SERDE, tableId); - serdeConfigs.put(valueSerdeConfigKey, serdeUUIDs.get(serde)); - }); - - return serdeConfigs; - } } http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java index 3dad6c1..41294a3 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java @@ -35,7 +35,6 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory { private static final Logger LOG = LoggerFactory.getLogger(ZkJobCoordinatorFactory.class); private static final String JOB_COORDINATOR_ZK_PATH_FORMAT = "%s/%s-%s-coordinationData"; - private static final String DEFAULT_JOB_ID = "1"; private static final String DEFAULT_JOB_NAME = "defaultJob"; /** @@ -68,9 +67,7 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory { String jobName = jobConfig.getName().isDefined() ? jobConfig.getName().get() : DEFAULT_JOB_NAME; - String jobId = jobConfig.getJobId().isDefined() - ? jobConfig.getJobId().get() - : DEFAULT_JOB_ID; + String jobId = jobConfig.getJobId(); return String.format(JOB_COORDINATOR_ZK_PATH_FORMAT, appId, jobName, jobId); } http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala index fc8780f..d7b71b5 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala @@ -39,7 +39,7 @@ object JobConfig { */ val CONFIG_REWRITERS = "job.config.rewriters" // streaming.job_config_rewriters val CONFIG_REWRITER_CLASS = "job.config.rewriter.%s.class" // streaming.job_config_rewriter_class - regex, system, config - val CONFIG_JOB_PREFIX = "jobs.%s." + val CONFIG_OVERRIDE_JOBS_PREFIX = "jobs.%s." val JOB_NAME = "job.name" // streaming.job_name val JOB_ID = "job.id" // streaming.job_id val SAMZA_FWK_PATH = "samza.fwk.path" @@ -164,7 +164,7 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging { def getStreamJobFactoryClass = getOption(JobConfig.STREAM_JOB_FACTORY_CLASS) - def getJobId = getOption(JobConfig.JOB_ID) + def getJobId = getOption(JobConfig.JOB_ID).getOrElse("1") def failOnCheckpointValidation = { getBoolean(JobConfig.JOB_FAIL_CHECKPOINT_VALIDATION, true) } http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index fba7329..417fc18 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -101,7 +101,7 @@ object SamzaContainer extends Logging { if(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) != null) { val jobNameAndId = ( config.getName.getOrElse(throw new ConfigException("Missing required config: job.name")), - config.getJobId.getOrElse("1") + config.getJobId ) loggedStorageBaseDir = new File(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala index d1e6554..8a9c021 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala @@ -44,7 +44,6 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging val jobId = config .getJobId - .getOrElse(1.toString) val taskClass = config .getTaskClass http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala index cd74716..bfb2271 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala @@ -89,6 +89,6 @@ object CoordinatorStreamUtil { */ private def getJobNameAndId(config: Config) = { (config.getName.getOrElse(throw new ConfigException("Missing required config: job.name")), - config.getJobId.getOrElse("1")) + config.getJobId) } } http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java b/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java index db85e33..1fe6023 100644 --- a/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java +++ b/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java @@ -522,10 +522,11 @@ public class TestStreamApplicationDescriptorImpl { TableSpec testTableSpec = new TableSpec("t1", KVSerde.of(new NoOpSerde(), new NoOpSerde()), "", new HashMap<>()); when(mockTableDescriptor.getTableSpec()).thenReturn(testTableSpec); when(mockTableDescriptor.getTableId()).thenReturn(testTableSpec.getId()); + when(mockTableDescriptor.getSerde()).thenReturn(testTableSpec.getSerde()); StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { appDesc.getTable(mockTableDescriptor); }, mockConfig); - assertNotNull(streamAppDesc.getTables().get(testTableSpec)); + assertNotNull(streamAppDesc.getTables().get(testTableSpec.getId())); } @Test http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java b/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java index 9418c1f..abe5ce1 100644 --- a/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java +++ b/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java @@ -23,12 +23,14 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import org.apache.samza.config.Config; +import org.apache.samza.operators.BaseTableDescriptor; import org.apache.samza.operators.ContextManager; import org.apache.samza.operators.TableDescriptor; import org.apache.samza.operators.descriptors.base.stream.InputDescriptor; import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor; import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; import org.apache.samza.runtime.ProcessorLifecycleListenerFactory; +import org.apache.samza.serializers.KVSerde; import org.apache.samza.task.TaskFactory; import org.junit.Before; import org.junit.Test; @@ -64,10 +66,12 @@ public class TestTaskApplicationDescriptorImpl { this.add(mock2); } }; private Set<TableDescriptor> mockTables = new HashSet<TableDescriptor>() { { - TableDescriptor mock1 = mock(TableDescriptor.class); - TableDescriptor mock2 = mock(TableDescriptor.class); + BaseTableDescriptor mock1 = mock(BaseTableDescriptor.class); + BaseTableDescriptor mock2 = mock(BaseTableDescriptor.class); when(mock1.getTableId()).thenReturn("test-table1"); when(mock2.getTableId()).thenReturn("test-table2"); + when(mock1.getSerde()).thenReturn(mock(KVSerde.class)); + when(mock2.getSerde()).thenReturn(mock(KVSerde.class)); this.add(mock1); this.add(mock2); } }; http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/test/java/org/apache/samza/execution/ExecutionPlannerTestBase.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/ExecutionPlannerTestBase.java b/samza-core/src/test/java/org/apache/samza/execution/ExecutionPlannerTestBase.java new file mode 100644 index 0000000..f507c70 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/execution/ExecutionPlannerTestBase.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.execution; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import org.apache.samza.application.ApplicationDescriptorImpl; +import org.apache.samza.application.LegacyTaskApplication; +import org.apache.samza.application.StreamApplication; +import org.apache.samza.application.StreamApplicationDescriptorImpl; +import org.apache.samza.application.TaskApplication; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.descriptors.GenericInputDescriptor; +import org.apache.samza.operators.descriptors.GenericOutputDescriptor; +import org.apache.samza.operators.descriptors.GenericSystemDescriptor; +import org.apache.samza.operators.functions.JoinFunction; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.Serde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.task.IdentityStreamTask; +import org.junit.Before; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; + + +/** + * Unit test base class to set up commonly used test application and configuration. + */ +class ExecutionPlannerTestBase { + protected StreamApplicationDescriptorImpl mockStreamAppDesc; + protected Config mockConfig; + protected JobNode mockJobNode; + protected KVSerde<String, Object> defaultSerde; + protected GenericSystemDescriptor inputSystemDescriptor; + protected GenericSystemDescriptor outputSystemDescriptor; + protected GenericSystemDescriptor intermediateSystemDescriptor; + protected GenericInputDescriptor<KV<String, Object>> input1Descriptor; + protected GenericInputDescriptor<KV<String, Object>> input2Descriptor; + protected GenericInputDescriptor<KV<String, Object>> intermediateInputDescriptor; + protected GenericInputDescriptor<KV<String, Object>> broadcastInputDesriptor; + protected GenericOutputDescriptor<KV<String, Object>> outputDescriptor; + protected GenericOutputDescriptor<KV<String, Object>> intermediateOutputDescriptor; + + @Before + public void setUp() { + defaultSerde = KVSerde.of(new StringSerde(), new JsonSerdeV2<>()); + inputSystemDescriptor = new GenericSystemDescriptor("input-system", "mockSystemFactoryClassName"); + outputSystemDescriptor = new GenericSystemDescriptor("output-system", "mockSystemFactoryClassName"); + intermediateSystemDescriptor = new GenericSystemDescriptor("intermediate-system", "mockSystemFactoryClassName"); + input1Descriptor = inputSystemDescriptor.getInputDescriptor("input1", defaultSerde); + input2Descriptor = inputSystemDescriptor.getInputDescriptor("input2", defaultSerde); + outputDescriptor = outputSystemDescriptor.getOutputDescriptor("output", defaultSerde); + intermediateInputDescriptor = intermediateSystemDescriptor.getInputDescriptor("jobName-jobId-partition_by-p1", defaultSerde) + .withPhysicalName("jobName-jobId-partition_by-p1"); + intermediateOutputDescriptor = intermediateSystemDescriptor.getOutputDescriptor("jobName-jobId-partition_by-p1", defaultSerde) + .withPhysicalName("jobName-jobId-partition_by-p1"); + broadcastInputDesriptor = intermediateSystemDescriptor.getInputDescriptor("jobName-jobId-broadcast-b1", defaultSerde) + .withPhysicalName("jobName-jobId-broadcast-b1"); + + Map<String, String> configs = new HashMap<>(); + configs.put(JobConfig.JOB_NAME(), "jobName"); + configs.put(JobConfig.JOB_ID(), "jobId"); + configs.putAll(input1Descriptor.toConfig()); + configs.putAll(input2Descriptor.toConfig()); + configs.putAll(outputDescriptor.toConfig()); + configs.putAll(inputSystemDescriptor.toConfig()); + configs.putAll(outputSystemDescriptor.toConfig()); + configs.putAll(intermediateSystemDescriptor.toConfig()); + configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), intermediateSystemDescriptor.getSystemName()); + mockConfig = spy(new MapConfig(configs)); + + mockStreamAppDesc = new StreamApplicationDescriptorImpl(getRepartitionJoinStreamApplication(), mockConfig); + } + + String getJobNameAndId() { + return "jobName-jobId"; + } + + void configureJobNode(ApplicationDescriptorImpl mockStreamAppDesc) { + JobGraph jobGraph = new ExecutionPlanner(mockConfig, mock(StreamManager.class)) + .createJobGraph(mockConfig, mockStreamAppDesc); + mockJobNode = spy(jobGraph.getJobNodes().get(0)); + } + + StreamApplication getRepartitionOnlyStreamApplication() { + return appDesc -> { + MessageStream<KV<String, Object>> input1 = appDesc.getInputStream(input1Descriptor); + input1.partitionBy(KV::getKey, KV::getValue, "p1"); + }; + } + + StreamApplication getRepartitionJoinStreamApplication() { + return appDesc -> { + MessageStream<KV<String, Object>> input1 = appDesc.getInputStream(input1Descriptor); + MessageStream<KV<String, Object>> input2 = appDesc.getInputStream(input2Descriptor); + OutputStream<KV<String, Object>> output = appDesc.getOutputStream(outputDescriptor); + JoinFunction<String, Object, Object, KV<String, Object>> mockJoinFn = mock(JoinFunction.class); + input1 + .partitionBy(KV::getKey, KV::getValue, defaultSerde, "p1") + .map(kv -> kv.value) + .join(input2.map(kv -> kv.value), mockJoinFn, + new StringSerde(), new JsonSerdeV2<>(Object.class), new JsonSerdeV2<>(Object.class), + Duration.ofHours(1), "j1") + .sendTo(output); + }; + } + + TaskApplication getTaskApplication() { + return appDesc -> { + appDesc.addInputStream(input1Descriptor); + appDesc.addInputStream(input2Descriptor); + appDesc.addInputStream(intermediateInputDescriptor); + appDesc.addOutputStream(intermediateOutputDescriptor); + appDesc.addOutputStream(outputDescriptor); + appDesc.setTaskFactory(() -> new IdentityStreamTask()); + }; + } + + TaskApplication getLegacyTaskApplication() { + return new LegacyTaskApplication(IdentityStreamTask.class.getName()); + } + + StreamApplication getBroadcastOnlyStreamApplication(Serde serde) { + return appDesc -> { + MessageStream<KV<String, Object>> input = appDesc.getInputStream(input1Descriptor); + if (serde != null) { + input.broadcast(serde, "b1"); + } else { + input.broadcast("b1"); + } + }; + } +}