http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorUtil.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorUtil.java b/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorUtil.java new file mode 100644 index 0000000..acf4bf7 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorUtil.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.application; + +import org.apache.samza.config.Config; + + +/** + * Util class to help creating {@link ApplicationDescriptorImpl} instance from {@link SamzaApplication} and {@link Config} + */ +public class ApplicationDescriptorUtil { + + private ApplicationDescriptorUtil() { + + } + + /** + * Create a new instance of {@link ApplicationDescriptorImpl} based on {@link SamzaApplication} and {@link Config} + * + * @param app an implementation of {@link SamzaApplication}. The {@code app} has to have a proper fully-qualified class name. + * @param config the {@link Config} for the application + * @return the {@link ApplicationDescriptorImpl} instance containing the processing logic and the config + */ + public static ApplicationDescriptorImpl<? extends ApplicationDescriptor> getAppDescriptor(SamzaApplication app, Config config) { + if (app instanceof StreamApplication) { + return new StreamApplicationDescriptorImpl((StreamApplication) app, config); + } + if (app instanceof TaskApplication) { + return new TaskApplicationDescriptorImpl((TaskApplication) app, config); + } + throw new IllegalArgumentException(String.format("User application class %s is not supported. Only StreamApplication " + + "and TaskApplication are supported.", app.getClass().getName())); + } + +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java b/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java new file mode 100644 index 0000000..b39ad3c --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.application; + +import org.apache.commons.lang3.StringUtils; +import org.apache.samza.config.ApplicationConfig; +import org.apache.samza.config.Config; +import org.apache.samza.config.ConfigException; +import org.apache.samza.config.TaskConfig; +import scala.Option; + + +/** + * Util class to create {@link SamzaApplication} from the configuration. + */ +public class ApplicationUtil { + + /** + * Creates the {@link SamzaApplication} object from the task or application class name specified in {@code config} + * + * @param config the configuration of the application + * @return the {@link SamzaApplication} object + */ + public static SamzaApplication fromConfig(Config config) { + String appClassName = new ApplicationConfig(config).getAppClass(); + if (StringUtils.isNotBlank(appClassName)) { + // app.class is configured + try { + Class<SamzaApplication> appClass = (Class<SamzaApplication>) Class.forName(appClassName); + if (StreamApplication.class.isAssignableFrom(appClass) || TaskApplication.class.isAssignableFrom(appClass)) { + return appClass.newInstance(); + } + } catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) { + throw new ConfigException(String.format("Loading app.class %s failed. The user application has to implement " + + "StreamApplication or TaskApplication.", appClassName), e); + } + } + // no app.class defined. It has to be a legacy application with task.class configuration + Option<String> taskClassOption = new TaskConfig(config).getTaskClass(); + if (!taskClassOption.isDefined() || !StringUtils.isNotBlank(taskClassOption.getOrElse(null))) { + // no task.class defined either. This is wrong. + throw new ConfigException("Legacy task applications must set a non-empty task.class in configuration."); + } + return new LegacyTaskApplication(taskClassOption.get()); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/application/LegacyTaskApplication.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/application/LegacyTaskApplication.java b/samza-core/src/main/java/org/apache/samza/application/LegacyTaskApplication.java new file mode 100644 index 0000000..e9e2635 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/application/LegacyTaskApplication.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.application; + +import org.apache.samza.task.TaskFactoryUtil; + +/** + * Default {@link TaskApplication} for legacy applications w/ only task.class implemented + */ +public final class LegacyTaskApplication implements TaskApplication { + private final String taskClassName; + + public LegacyTaskApplication(String taskClassName) { + this.taskClassName = taskClassName; + } + + @Override + public void describe(TaskApplicationDescriptor appDesc) { + appDesc.setTaskFactory(TaskFactoryUtil.getTaskFactory(taskClassName)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java new file mode 100644 index 0000000..ae7a45d --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.application; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.regex.Pattern; +import org.apache.commons.lang3.StringUtils; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.operators.BaseTableDescriptor; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.OperatorSpecGraph; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.TableDescriptor; +import org.apache.samza.operators.TableImpl; +import org.apache.samza.operators.descriptors.base.stream.InputDescriptor; +import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor; +import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; +import org.apache.samza.operators.functions.InputTransformer; +import org.apache.samza.operators.functions.StreamExpander; +import org.apache.samza.operators.spec.InputOperatorSpec; +import org.apache.samza.operators.spec.OperatorSpec.OpCode; +import org.apache.samza.operators.spec.OperatorSpecs; +import org.apache.samza.operators.spec.OutputStreamImpl; +import org.apache.samza.operators.stream.IntermediateMessageStreamImpl; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.NoOpSerde; +import org.apache.samza.serializers.Serde; +import org.apache.samza.table.Table; +import org.apache.samza.table.TableSpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class defines: + * 1) an implementation of {@link StreamApplicationDescriptor} that provides APIs to access {@link MessageStream}, {@link OutputStream}, + * and {@link Table} to create the DAG of transforms. + * 2) a builder that creates a serializable {@link OperatorSpecGraph} from user-defined DAG + */ +public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<StreamApplicationDescriptor> + implements StreamApplicationDescriptor { + private static final Logger LOGGER = LoggerFactory.getLogger(StreamApplicationDescriptorImpl.class); + private static final Pattern ID_PATTERN = Pattern.compile("[\\d\\w-_]+"); + + private final Map<String, InputDescriptor> inputDescriptors = new LinkedHashMap<>(); + private final Map<String, OutputDescriptor> outputDescriptors = new LinkedHashMap<>(); + private final Set<String> broadcastStreams = new HashSet<>(); + private final Map<String, TableDescriptor> tableDescriptors = new LinkedHashMap<>(); + private final Map<String, SystemDescriptor> systemDescriptors = new LinkedHashMap<>(); + // We use a LHM for deterministic order in initializing and closing operators. + private final Map<String, InputOperatorSpec> inputOperators = new LinkedHashMap<>(); + private final Map<String, OutputStreamImpl> outputStreams = new LinkedHashMap<>(); + private final Map<TableSpec, TableImpl> tables = new LinkedHashMap<>(); + private final Set<String> operatorIds = new HashSet<>(); + + private Optional<SystemDescriptor> defaultSystemDescriptorOptional = Optional.empty(); + + /** + * The 0-based position of the next operator in the graph. + * Part of the unique ID for each OperatorSpec in the graph. + * Should only accessed and incremented via {@link #getNextOpId(OpCode, String)}. + */ + private int nextOpNum = 0; + + public StreamApplicationDescriptorImpl(StreamApplication userApp, Config config) { + super(userApp, config); + userApp.describe(this); + } + + @Override + public StreamApplicationDescriptor withDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor) { + Preconditions.checkNotNull(defaultSystemDescriptor, "Provided defaultSystemDescriptor must not be null."); + Preconditions.checkState(inputOperators.isEmpty() && outputStreams.isEmpty(), + "Default system must be set before creating any input or output streams."); + addSystemDescriptor(defaultSystemDescriptor); + + defaultSystemDescriptorOptional = Optional.of(defaultSystemDescriptor); + return this; + } + + @Override + public <M> MessageStream<M> getInputStream(InputDescriptor<M, ?> inputDescriptor) { + SystemDescriptor systemDescriptor = inputDescriptor.getSystemDescriptor(); + Optional<StreamExpander> expander = systemDescriptor.getExpander(); + if (expander.isPresent()) { + return expander.get().apply(this, inputDescriptor); + } + + // TODO: SAMZA-1841: need to add to the broadcast streams if inputDescriptor is for a broadcast stream + Preconditions.checkState(!inputDescriptors.containsKey(inputDescriptor.getStreamId()), + String.format("add input descriptors multiple times with the same streamId: %s", inputDescriptor.getStreamId())); + inputDescriptors.put(inputDescriptor.getStreamId(), inputDescriptor); + addSystemDescriptor(inputDescriptor.getSystemDescriptor()); + + String streamId = inputDescriptor.getStreamId(); + Preconditions.checkState(!inputOperators.containsKey(streamId), + "getInputStream must not be called multiple times with the same streamId: " + streamId); + + Serde serde = inputDescriptor.getSerde(); + KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde); + if (outputStreams.containsKey(streamId)) { + OutputStreamImpl outputStream = outputStreams.get(streamId); + Serde keySerde = outputStream.getKeySerde(); + Serde valueSerde = outputStream.getValueSerde(); + Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde), + String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at " + + "stream level, so the same key and message Serde must be used for both.", streamId)); + } + + boolean isKeyed = serde instanceof KVSerde; + InputTransformer transformer = inputDescriptor.getTransformer().orElse(null); + InputOperatorSpec inputOperatorSpec = + OperatorSpecs.createInputOperatorSpec(streamId, kvSerdes.getKey(), kvSerdes.getValue(), + transformer, isKeyed, this.getNextOpId(OpCode.INPUT, null)); + inputOperators.put(streamId, inputOperatorSpec); + return new MessageStreamImpl(this, inputOperators.get(streamId)); + } + + @Override + public <M> OutputStream<M> getOutputStream(OutputDescriptor<M, ?> outputDescriptor) { + Preconditions.checkState(!outputDescriptors.containsKey(outputDescriptor.getStreamId()), + String.format("add output descriptors multiple times with the same streamId: %s", outputDescriptor.getStreamId())); + outputDescriptors.put(outputDescriptor.getStreamId(), outputDescriptor); + addSystemDescriptor(outputDescriptor.getSystemDescriptor()); + + String streamId = outputDescriptor.getStreamId(); + Preconditions.checkState(!outputStreams.containsKey(streamId), + "getOutputStream must not be called multiple times with the same streamId: " + streamId); + + Serde serde = outputDescriptor.getSerde(); + KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde); + if (inputOperators.containsKey(streamId)) { + InputOperatorSpec inputOperatorSpec = inputOperators.get(streamId); + Serde keySerde = inputOperatorSpec.getKeySerde(); + Serde valueSerde = inputOperatorSpec.getValueSerde(); + Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde), + String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at " + + "stream level, so the same key and message Serde must be used for both.", streamId)); + } + + boolean isKeyed = serde instanceof KVSerde; + outputStreams.put(streamId, new OutputStreamImpl<>(streamId, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed)); + return outputStreams.get(streamId); + } + + @Override + public <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor) { + String tableId = tableDescriptor.getTableId(); + Preconditions.checkState(StringUtils.isNotBlank(tableId) && ID_PATTERN.matcher(tableId).matches(), + String.format("tableId: %s must confirm to pattern: %s", tableId, ID_PATTERN.toString())); + Preconditions.checkState(!tableDescriptors.containsKey(tableDescriptor.getTableId()), + String.format("add table descriptors multiple times with the same tableId: %s", tableDescriptor.getTableId())); + tableDescriptors.put(tableDescriptor.getTableId(), tableDescriptor); + + TableSpec tableSpec = ((BaseTableDescriptor) tableDescriptor).getTableSpec(); + if (tables.containsKey(tableSpec)) { + throw new IllegalStateException( + String.format("getTable() invoked multiple times with the same tableId: %s", tableId)); + } + tables.put(tableSpec, new TableImpl(tableSpec)); + return tables.get(tableSpec); + } + + /** + * Get all the {@link InputDescriptor}s to this application + * + * @return an immutable map of streamId to {@link InputDescriptor} + */ + @Override + public Map<String, InputDescriptor> getInputDescriptors() { + return Collections.unmodifiableMap(inputDescriptors); + } + + /** + * Get all the {@link OutputDescriptor}s from this application + * + * @return an immutable map of streamId to {@link OutputDescriptor} + */ + @Override + public Map<String, OutputDescriptor> getOutputDescriptors() { + return Collections.unmodifiableMap(outputDescriptors); + } + + /** + * Get all the broadcast streamIds from this application + * + * @return an immutable set of streamIds + */ + @Override + public Set<String> getBroadcastStreams() { + return Collections.unmodifiableSet(broadcastStreams); + } + + /** + * Get all the {@link TableDescriptor}s in this application + * + * @return an immutable set of {@link TableDescriptor}s + */ + @Override + public Set<TableDescriptor> getTableDescriptors() { + return Collections.unmodifiableSet(new HashSet<>(tableDescriptors.values())); + } + + /** + * Get all the unique {@link SystemDescriptor}s in this application + * + * @return an immutable set of {@link SystemDescriptor}s + */ + @Override + public Set<SystemDescriptor> getSystemDescriptors() { + // We enforce that users must not use different system descriptor instances for the same system name + // when getting an input/output stream or setting the default system descriptor + return Collections.unmodifiableSet(new HashSet<>(systemDescriptors.values())); + } + + /** + * Get the default {@link SystemDescriptor} in this application + * + * @return the default {@link SystemDescriptor} + */ + @Override + public Optional<SystemDescriptor> getDefaultSystemDescriptor() { + return defaultSystemDescriptorOptional; + } + + public OperatorSpecGraph getOperatorSpecGraph() { + return new OperatorSpecGraph(this); + } + + /** + * Gets the unique ID for the next operator in the graph. The ID is of the following format: + * jobName-jobId-opCode-(userDefinedId|nextOpNum); + * + * @param opCode the {@link OpCode} of the next operator + * @param userDefinedId the optional user-provided name of the next operator or null + * @return the unique ID for the next operator in the graph + */ + public String getNextOpId(OpCode opCode, String userDefinedId) { + if (StringUtils.isNotBlank(userDefinedId) && !ID_PATTERN.matcher(userDefinedId).matches()) { + throw new SamzaException("Operator ID must not contain spaces or special characters: " + userDefinedId); + } + + String nextOpId = String.format("%s-%s-%s-%s", + config.get(JobConfig.JOB_NAME()), + config.get(JobConfig.JOB_ID(), "1"), + opCode.name().toLowerCase(), + StringUtils.isNotBlank(userDefinedId) ? userDefinedId.trim() : String.valueOf(nextOpNum)); + if (!operatorIds.add(nextOpId)) { + throw new SamzaException( + String.format("Found duplicate operator ID %s in the graph. Operator IDs must be unique.", nextOpId)); + } + nextOpNum++; + return nextOpId; + } + + /** + * Gets the unique ID for the next operator in the graph. The ID is of the following format: + * jobName-jobId-opCode-nextOpNum; + * + * @param opCode the {@link OpCode} of the next operator + * @return the unique ID for the next operator in the graph + */ + public String getNextOpId(OpCode opCode) { + return getNextOpId(opCode, null); + } + + public Map<String, InputOperatorSpec> getInputOperators() { + return Collections.unmodifiableMap(inputOperators); + } + + public Map<String, OutputStreamImpl> getOutputStreams() { + return Collections.unmodifiableMap(outputStreams); + } + + public Map<TableSpec, TableImpl> getTables() { + return Collections.unmodifiableMap(tables); + } + + /** + * Internal helper for {@link MessageStreamImpl} to add an intermediate {@link MessageStream} to the graph. + * An intermediate {@link MessageStream} is both an output and an input stream. + * + * @param streamId the id of the stream to be created. + * @param serde the {@link Serde} to use for the message in the intermediate stream. If null, the default serde + * is used. + * @param isBroadcast whether the stream is a broadcast stream. + * @param <M> the type of messages in the intermediate {@link MessageStream} + * @return the intermediate {@link MessageStreamImpl} + */ + @VisibleForTesting + public <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamId, Serde<M> serde, boolean isBroadcast) { + Preconditions.checkState(!inputOperators.containsKey(streamId) && !outputStreams.containsKey(streamId), + "getIntermediateStream must not be called multiple times with the same streamId: " + streamId); + + if (serde == null) { + LOGGER.info("No serde provided for intermediate stream: " + streamId + + ". Key and message serdes configured for the job.default.system will be used."); + } + + if (isBroadcast) { + broadcastStreams.add(streamId); + } + + boolean isKeyed; + KV<Serde, Serde> kvSerdes; + if (serde == null) { // if no explicit serde available + isKeyed = true; // assume keyed stream + kvSerdes = new KV<>(null, null); // and that key and msg serdes are provided for job.default.system in configs + } else { + isKeyed = serde instanceof KVSerde; + kvSerdes = getKVSerdes(streamId, serde); + } + + InputTransformer transformer = (InputTransformer) getDefaultSystemDescriptor() + .flatMap(SystemDescriptor::getTransformer).orElse(null); + + InputOperatorSpec inputOperatorSpec = + OperatorSpecs.createInputOperatorSpec(streamId, kvSerdes.getKey(), kvSerdes.getValue(), + transformer, isKeyed, this.getNextOpId(OpCode.INPUT, null)); + inputOperators.put(streamId, inputOperatorSpec); + outputStreams.put(streamId, new OutputStreamImpl(streamId, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed)); + return new IntermediateMessageStreamImpl<>(this, inputOperators.get(streamId), outputStreams.get(streamId)); + } + + private KV<Serde, Serde> getKVSerdes(String streamId, Serde serde) { + Serde keySerde, valueSerde; + + if (serde instanceof KVSerde) { + keySerde = ((KVSerde) serde).getKeySerde(); + valueSerde = ((KVSerde) serde).getValueSerde(); + } else { + keySerde = new NoOpSerde(); + valueSerde = serde; + } + + if (keySerde instanceof NoOpSerde) { + LOGGER.info("Using NoOpSerde as the key serde for stream " + streamId + + ". Keys will not be (de)serialized"); + } + if (valueSerde instanceof NoOpSerde) { + LOGGER.info("Using NoOpSerde as the value serde for stream " + streamId + + ". Values will not be (de)serialized"); + } + + return KV.of(keySerde, valueSerde); + } + + // check uniqueness of the {@code systemDescriptor} and add if it is unique + private void addSystemDescriptor(SystemDescriptor systemDescriptor) { + Preconditions.checkState(!systemDescriptors.containsKey(systemDescriptor.getSystemName()) + || systemDescriptors.get(systemDescriptor.getSystemName()) == systemDescriptor, + "Must not use different system descriptor instances for the same system name: " + systemDescriptor.getSystemName()); + systemDescriptors.put(systemDescriptor.getSystemName(), systemDescriptor); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java new file mode 100644 index 0000000..3597d7c --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.application; + +import com.google.common.base.Preconditions; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; +import org.apache.samza.config.Config; +import org.apache.samza.operators.TableDescriptor; +import org.apache.samza.operators.descriptors.base.stream.InputDescriptor; +import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor; +import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; +import org.apache.samza.task.TaskFactory; + + +/** + * This class implements interface {@link TaskApplicationDescriptor}. + * <p> + * In addition to the common objects for an application defined in {@link ApplicationDescriptorImpl}, this class also includes + * the low-level {@link TaskFactory} that creates user-defined task instances, the lists of input/broadcast/output streams, + * and the list of {@link TableDescriptor}s used in the application. + */ +public class TaskApplicationDescriptorImpl extends ApplicationDescriptorImpl<TaskApplicationDescriptor> + implements TaskApplicationDescriptor { + + private final Map<String, InputDescriptor> inputDescriptors = new LinkedHashMap<>(); + private final Map<String, OutputDescriptor> outputDescriptors = new LinkedHashMap<>(); + private final Set<String> broadcastStreams = new HashSet<>(); + private final Map<String, TableDescriptor> tableDescriptors = new LinkedHashMap<>(); + private final Map<String, SystemDescriptor> systemDescriptors = new LinkedHashMap<>(); + + private TaskFactory taskFactory = null; + + public TaskApplicationDescriptorImpl(TaskApplication userApp, Config config) { + super(userApp, config); + userApp.describe(this); + } + + @Override + public void setTaskFactory(TaskFactory factory) { + this.taskFactory = factory; + } + + @Override + public void addInputStream(InputDescriptor inputDescriptor) { + // TODO: SAMZA-1841: need to add to the broadcast streams if inputDescriptor is for a broadcast stream + Preconditions.checkState(!inputDescriptors.containsKey(inputDescriptor.getStreamId()), + String.format("add input descriptors multiple times with the same streamId: %s", inputDescriptor.getStreamId())); + inputDescriptors.put(inputDescriptor.getStreamId(), inputDescriptor); + addSystemDescriptor(inputDescriptor.getSystemDescriptor()); + } + + @Override + public void addOutputStream(OutputDescriptor outputDescriptor) { + Preconditions.checkState(!outputDescriptors.containsKey(outputDescriptor.getStreamId()), + String.format("add output descriptors multiple times with the same streamId: %s", outputDescriptor.getStreamId())); + outputDescriptors.put(outputDescriptor.getStreamId(), outputDescriptor); + addSystemDescriptor(outputDescriptor.getSystemDescriptor()); + } + + @Override + public void addTable(TableDescriptor tableDescriptor) { + Preconditions.checkState(!tableDescriptors.containsKey(tableDescriptor.getTableId()), + String.format("add table descriptors multiple times with the same tableId: %s", tableDescriptor.getTableId())); + tableDescriptors.put(tableDescriptor.getTableId(), tableDescriptor); + } + + @Override + public Map<String, InputDescriptor> getInputDescriptors() { + return Collections.unmodifiableMap(inputDescriptors); + } + + @Override + public Map<String, OutputDescriptor> getOutputDescriptors() { + return Collections.unmodifiableMap(outputDescriptors); + } + + @Override + public Set<String> getBroadcastStreams() { + return Collections.unmodifiableSet(broadcastStreams); + } + + @Override + public Set<TableDescriptor> getTableDescriptors() { + return Collections.unmodifiableSet(new HashSet<>(tableDescriptors.values())); + } + + @Override + public Set<SystemDescriptor> getSystemDescriptors() { + // We enforce that users must not use different system descriptor instances for the same system name + // when getting an input/output stream or setting the default system descriptor + return Collections.unmodifiableSet(new HashSet<>(systemDescriptors.values())); + } + + /** + * Get the user-defined {@link TaskFactory} + * @return the {@link TaskFactory} object + */ + public TaskFactory getTaskFactory() { + return taskFactory; + } + + private void addSystemDescriptor(SystemDescriptor systemDescriptor) { + Preconditions.checkState(!systemDescriptors.containsKey(systemDescriptor.getSystemName()) + || systemDescriptors.get(systemDescriptor.getSystemName()) == systemDescriptor, + "Must not use different system descriptor instances for the same system name: " + systemDescriptor.getSystemName()); + systemDescriptors.put(systemDescriptor.getSystemName(), systemDescriptor); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java index fe8bc66..5b3636b 100644 --- a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java +++ b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java @@ -19,34 +19,40 @@ package org.apache.samza.container; /** - * A Listener for {@link org.apache.samza.container.SamzaContainer} lifecycle events. + * A Listener for {@link SamzaContainer} lifecycle events. */ public interface SamzaContainerListener { /** - * Method invoked when the {@link org.apache.samza.container.SamzaContainer} has successfully transitioned to + * Method invoked when the {@link SamzaContainer} state is {@link org.apache.samza.SamzaContainerStatus#NOT_STARTED} + * and is about to transition to {@link org.apache.samza.SamzaContainerStatus#STARTING} to start the initialization sequence. + */ + void beforeStart(); + + /** + * Method invoked after the {@link SamzaContainer} has successfully transitioned to * the {@link org.apache.samza.SamzaContainerStatus#STARTED} state and is about to start the * {@link org.apache.samza.container.RunLoop} */ - void onContainerStart(); + void afterStart(); /** - * Method invoked when the {@link org.apache.samza.container.SamzaContainer} has successfully transitioned to + * Method invoked after the {@link SamzaContainer} has successfully transitioned to * {@link org.apache.samza.SamzaContainerStatus#STOPPED} state. Details on state transitions can be found in * {@link org.apache.samza.SamzaContainerStatus} * <br> * <b>Note</b>: This will be the last call after completely shutting down the SamzaContainer without any * exceptions/errors. */ - void onContainerStop(); + void afterStop(); /** - * Method invoked when the {@link org.apache.samza.container.SamzaContainer} has transitioned to + * Method invoked after the {@link SamzaContainer} has transitioned to * {@link org.apache.samza.SamzaContainerStatus#FAILED} state. Details on state transitions can be found in * {@link org.apache.samza.SamzaContainerStatus} * <br> - * <b>Note</b>: {@link #onContainerFailed(Throwable)} is mutually exclusive to {@link #onContainerStop()}. + * <b>Note</b>: {@link #afterFailure(Throwable)} is mutually exclusive to {@link #afterStop()}. * @param t Throwable that caused the container failure. */ - void onContainerFailed(Throwable t); + void afterFailure(Throwable t); } http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java index e95ed26..ea892fe 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java +++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java @@ -47,9 +47,10 @@ import static org.apache.samza.util.StreamUtil.*; /** - * The ExecutionPlanner creates the physical execution graph for the StreamGraph, and + * The ExecutionPlanner creates the physical execution graph for the {@link OperatorSpecGraph}, and * the intermediate topics needed for the execution. */ +// TODO: ExecutionPlanner needs to be able to generate single node JobGraph for low-level TaskApplication as well (SAMZA-1811) public class ExecutionPlanner { private static final Logger log = LoggerFactory.getLogger(ExecutionPlanner.class); @@ -63,7 +64,7 @@ public class ExecutionPlanner { this.streamManager = streamManager; } - public ExecutionPlan plan(OperatorSpecGraph specGraph) throws Exception { + public ExecutionPlan plan(OperatorSpecGraph specGraph) { validateConfig(); // create physical job graph based on stream graph @@ -91,7 +92,7 @@ public class ExecutionPlanner { } /** - * Create the physical graph from StreamGraph + * Create the physical graph from {@link OperatorSpecGraph} */ /* package private */ JobGraph createJobGraph(OperatorSpecGraph specGraph) { JobGraph jobGraph = new JobGraph(config, specGraph); http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java index 2f210f2..f49e6db 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java @@ -195,12 +195,6 @@ import org.slf4j.LoggerFactory; } /** - * Get the {@link StreamEdge} for a {@link StreamSpec}. Create one if it does not exist. - * @param streamSpec spec of the StreamEdge - * @return stream edge - */ - - /** * Returns the job nodes to be executed in the topological order * @return unmodifiable list of {@link JobNode} */ http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java new file mode 100644 index 0000000..a2050e5 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.execution; + +import java.io.File; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.commons.lang3.StringUtils; +import org.apache.samza.SamzaException; +import org.apache.samza.application.ApplicationDescriptor; +import org.apache.samza.application.ApplicationDescriptorImpl; +import org.apache.samza.application.StreamApplicationDescriptorImpl; +import org.apache.samza.application.TaskApplicationDescriptorImpl; +import org.apache.samza.config.ApplicationConfig; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.ShellCommandConfig; +import org.apache.samza.config.StreamConfig; +import org.apache.samza.operators.BaseTableDescriptor; +import org.apache.samza.operators.OperatorSpecGraph; +import org.apache.samza.table.TableConfigGenerator; +import org.apache.samza.table.TableSpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * This is a temporary helper class to include all common logic to generate {@link JobConfig}s for high- and low-level + * applications in {@link org.apache.samza.runtime.LocalApplicationRunner} and {@link org.apache.samza.runtime.RemoteApplicationRunner}. + * + * TODO: Fix SAMZA-1811 to consolidate this class with {@link ExecutionPlanner} + */ +public abstract class JobPlanner { + private static final Logger LOG = LoggerFactory.getLogger(JobPlanner.class); + + protected final ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc; + protected final Config config; + + JobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> descriptor) { + this.appDesc = descriptor; + this.config = descriptor.getConfig(); + } + + public List<JobConfig> prepareJobs() { + String appId = new ApplicationConfig(appDesc.getConfig()).getGlobalAppId(); + if (appDesc instanceof TaskApplicationDescriptorImpl) { + return Collections.singletonList(prepareTaskJob((TaskApplicationDescriptorImpl) appDesc)); + } else if (appDesc instanceof StreamApplicationDescriptorImpl) { + try { + return prepareStreamJobs((StreamApplicationDescriptorImpl) appDesc); + } catch (Exception e) { + throw new SamzaException("Failed to generate JobConfig for StreamApplication " + appId, e); + } + } + throw new IllegalArgumentException(String.format("ApplicationDescriptorImpl has to be either TaskApplicationDescriptorImpl or " + + "StreamApplicationDescriptorImpl. class %s is not supported", appDesc.getClass().getName())); + } + + abstract List<JobConfig> prepareStreamJobs(StreamApplicationDescriptorImpl streamAppDesc) throws Exception; + + StreamManager buildAndStartStreamManager(Config config) { + StreamManager streamManager = new StreamManager(config); + streamManager.start(); + return streamManager; + } + + ExecutionPlan getExecutionPlan(OperatorSpecGraph specGraph) { + return getExecutionPlan(specGraph, null); + } + + /* package private */ + ExecutionPlan getExecutionPlan(OperatorSpecGraph specGraph, String runId) { + + // update application configs + Map<String, String> cfg = new HashMap<>(); + if (StringUtils.isNoneEmpty(runId)) { + cfg.put(ApplicationConfig.APP_RUN_ID, runId); + } + + StreamConfig streamConfig = new StreamConfig(config); + Set<String> inputStreams = new HashSet<>(specGraph.getInputOperators().keySet()); + inputStreams.removeAll(specGraph.getOutputStreams().keySet()); + ApplicationConfig.ApplicationMode mode = inputStreams.stream().allMatch(streamConfig::getIsBounded) + ? ApplicationConfig.ApplicationMode.BATCH : ApplicationConfig.ApplicationMode.STREAM; + cfg.put(ApplicationConfig.APP_MODE, mode.name()); + + // merge user-provided configuration with input/output descriptor generated configuration + // descriptor generated configuration has higher priority + Map<String, String> systemStreamConfigs = expandSystemStreamConfigs(appDesc); + cfg.putAll(systemStreamConfigs); + + // adding app.class in the configuration + cfg.put(ApplicationConfig.APP_CLASS, appDesc.getAppClass().getName()); + + // create the physical execution plan and merge with overrides. This works for a single-stage job now + // TODO: This should all be consolidated with ExecutionPlanner after fixing SAMZA-1811 + Config mergedConfig = JobNode.mergeJobConfig(config, new MapConfig(cfg)); + // creating the StreamManager to get all input/output streams' metadata for planning + StreamManager streamManager = buildAndStartStreamManager(mergedConfig); + try { + ExecutionPlanner planner = new ExecutionPlanner(mergedConfig, streamManager); + return planner.plan(specGraph); + } finally { + streamManager.stop(); + } + } + + /** + * Write the execution plan JSON to a file + * @param planJson JSON representation of the plan + */ + final void writePlanJsonFile(String planJson) { + try { + String content = "plan='" + planJson + "'"; + String planPath = System.getenv(ShellCommandConfig.EXECUTION_PLAN_DIR()); + if (planPath != null && !planPath.isEmpty()) { + // Write the plan json to plan path + File file = new File(planPath + "/plan.json"); + file.setReadable(true, false); + PrintWriter writer = new PrintWriter(file, "UTF-8"); + writer.println(content); + writer.close(); + } + } catch (Exception e) { + LOG.warn("Failed to write execution plan json to file", e); + } + } + + // TODO: SAMZA-1814: the following configuration generation still misses serde configuration generation, + // side input configuration, broadcast input and task inputs configuration generation for low-level task + // applications + // helper method to generate a single node job configuration for low level task applications + private JobConfig prepareTaskJob(TaskApplicationDescriptorImpl taskAppDesc) { + // copy original configure + Map<String, String> cfg = new HashMap<>(); + // expand system and streams configure + Map<String, String> systemStreamConfigs = expandSystemStreamConfigs(taskAppDesc); + cfg.putAll(systemStreamConfigs); + // expand table configure + cfg.putAll(expandTableConfigs(cfg, taskAppDesc)); + // adding app.class in the configuration + cfg.put(ApplicationConfig.APP_CLASS, appDesc.getAppClass().getName()); + // create the physical execution plan and merge with overrides. This works for a single-stage job now + // TODO: This should all be consolidated with ExecutionPlanner after fixing SAMZA-1811 + return new JobConfig(JobNode.mergeJobConfig(config, new MapConfig(cfg))); + } + + private Map<String, String> expandSystemStreamConfigs(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) { + Map<String, String> systemStreamConfigs = new HashMap<>(); + appDesc.getInputDescriptors().forEach((key, value) -> systemStreamConfigs.putAll(value.toConfig())); + appDesc.getOutputDescriptors().forEach((key, value) -> systemStreamConfigs.putAll(value.toConfig())); + appDesc.getSystemDescriptors().forEach(sd -> systemStreamConfigs.putAll(sd.toConfig())); + appDesc.getDefaultSystemDescriptor().ifPresent(dsd -> + systemStreamConfigs.put(JobConfig.JOB_DEFAULT_SYSTEM(), dsd.getSystemName())); + return systemStreamConfigs; + } + + private Map<String, String> expandTableConfigs(Map<String, String> originConfig, + ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) { + List<TableSpec> tableSpecs = new ArrayList<>(); + appDesc.getTableDescriptors().stream().map(td -> ((BaseTableDescriptor) td).getTableSpec()) + .forEach(spec -> tableSpecs.add(spec)); + return TableConfigGenerator.generateConfigsForTableSpecs(new MapConfig(originConfig), tableSpecs); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java new file mode 100644 index 0000000..7996d6b --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.execution; + +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.samza.SamzaException; +import org.apache.samza.application.ApplicationDescriptor; +import org.apache.samza.application.ApplicationDescriptorImpl; +import org.apache.samza.application.StreamApplicationDescriptorImpl; +import org.apache.samza.config.ApplicationConfig; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.JobCoordinatorConfig; +import org.apache.samza.coordinator.CoordinationUtils; +import org.apache.samza.coordinator.DistributedLockWithState; +import org.apache.samza.system.StreamSpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Temporarily helper class with specific implementation of {@link JobPlanner#prepareStreamJobs(StreamApplicationDescriptorImpl)} + * for standalone Samza processors. + * + * TODO: we need to consolidate this with {@link ExecutionPlanner} after SAMZA-1811. + */ +public class LocalJobPlanner extends JobPlanner { + private static final Logger LOG = LoggerFactory.getLogger(LocalJobPlanner.class); + private static final String APPLICATION_RUNNER_PATH_SUFFIX = "/ApplicationRunnerData"; + + private final String uid = UUID.randomUUID().toString();; + + public LocalJobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> descriptor) { + super(descriptor); + } + + @Override + List<JobConfig> prepareStreamJobs(StreamApplicationDescriptorImpl streamAppDesc) throws Exception { + // for high-level DAG, generating the plan and job configs + // 1. initialize and plan + ExecutionPlan plan = getExecutionPlan(streamAppDesc.getOperatorSpecGraph()); + + String executionPlanJson = plan.getPlanAsJson(); + writePlanJsonFile(executionPlanJson); + LOG.info("Execution Plan: \n" + executionPlanJson); + String planId = String.valueOf(executionPlanJson.hashCode()); + + if (plan.getJobConfigs().isEmpty()) { + throw new SamzaException("No jobs in the plan."); + } + + // 2. create the necessary streams + // TODO: System generated intermediate streams should have robust naming scheme. See SAMZA-1391 + // TODO: this works for single-job applications. For multi-job applications, ExecutionPlan should return an AppConfig + // to be used for the whole application + JobConfig jobConfig = plan.getJobConfigs().get(0); + StreamManager streamManager = null; + try { + // create the StreamManager to create intermediate streams in the plan + streamManager = buildAndStartStreamManager(jobConfig); + createStreams(planId, plan.getIntermediateStreams(), streamManager); + } finally { + if (streamManager != null) { + streamManager.stop(); + } + } + return plan.getJobConfigs(); + } + + /** + * Create intermediate streams using {@link StreamManager}. + * If {@link CoordinationUtils} is provided, this function will first invoke leader election, and the leader + * will create the streams. All the runner processes will wait on the latch that is released after the leader finishes + * stream creation. + * @param planId a unique identifier representing the plan used for coordination purpose + * @param intStreams list of intermediate {@link StreamSpec}s + * @param streamManager the {@link StreamManager} used to create streams + */ + private void createStreams(String planId, List<StreamSpec> intStreams, StreamManager streamManager) { + if (intStreams.isEmpty()) { + LOG.info("Set of intermediate streams is empty. Nothing to create."); + return; + } + LOG.info("A single processor must create the intermediate streams. Processor {} will attempt to acquire the lock.", uid); + // Move the scope of coordination utils within stream creation to address long idle connection problem. + // Refer SAMZA-1385 for more details + JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(config); + String coordinationId = new ApplicationConfig(config).getGlobalAppId() + APPLICATION_RUNNER_PATH_SUFFIX; + CoordinationUtils coordinationUtils = + jcConfig.getCoordinationUtilsFactory().getCoordinationUtils(coordinationId, uid, config); + if (coordinationUtils == null) { + LOG.warn("Processor {} failed to create utils. Each processor will attempt to create streams.", uid); + // each application process will try creating the streams, which + // requires stream creation to be idempotent + streamManager.createStreams(intStreams); + return; + } + + DistributedLockWithState lockWithState = coordinationUtils.getLockWithState(planId); + try { + // check if the processor needs to go through leader election and stream creation + if (lockWithState.lockIfNotSet(1000, TimeUnit.MILLISECONDS)) { + LOG.info("lock acquired for streams creation by " + uid); + streamManager.createStreams(intStreams); + lockWithState.unlockAndSet(); + } else { + LOG.info("Processor {} did not obtain the lock for streams creation. They must've been created by another processor.", uid); + } + } catch (TimeoutException e) { + String msg = String.format("Processor {} failed to get the lock for stream initialization", uid); + throw new SamzaException(msg, e); + } finally { + coordinationUtils.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java new file mode 100644 index 0000000..254ff97 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.execution; + +import java.util.List; +import java.util.UUID; +import org.apache.samza.SamzaException; +import org.apache.samza.application.ApplicationDescriptor; +import org.apache.samza.application.ApplicationDescriptorImpl; +import org.apache.samza.application.StreamApplicationDescriptorImpl; +import org.apache.samza.config.ApplicationConfig; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer; +import org.apache.samza.metrics.MetricsRegistryMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Temporary helper class with specific implementation of {@link JobPlanner#prepareStreamJobs(StreamApplicationDescriptorImpl)} + * for remote-launched Samza processors (e.g. in YARN). + * + * TODO: we need to consolidate this class with {@link ExecutionPlanner} after SAMZA-1811. + */ +public class RemoteJobPlanner extends JobPlanner { + private static final Logger LOG = LoggerFactory.getLogger(RemoteJobPlanner.class); + + public RemoteJobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> descriptor) { + super(descriptor); + } + + @Override + List<JobConfig> prepareStreamJobs(StreamApplicationDescriptorImpl streamAppDesc) throws Exception { + // for high-level DAG, generate the plan and job configs + // TODO: run.id needs to be set for standalone: SAMZA-1531 + // run.id is based on current system time with the most significant bits in UUID (8 digits) to avoid collision + String runId = String.valueOf(System.currentTimeMillis()) + "-" + UUID.randomUUID().toString().substring(0, 8); + LOG.info("The run id for this run is {}", runId); + + // 1. initialize and plan + ExecutionPlan plan = getExecutionPlan(streamAppDesc.getOperatorSpecGraph(), runId); + writePlanJsonFile(plan.getPlanAsJson()); + + if (plan.getJobConfigs().isEmpty()) { + throw new SamzaException("No jobs in the plan."); + } + + // 2. create the necessary streams + // TODO: this works for single-job applications. For multi-job applications, ExecutionPlan should return an AppConfig + // to be used for the whole application + JobConfig jobConfig = plan.getJobConfigs().get(0); + StreamManager streamManager = null; + try { + // create the StreamManager to create intermediate streams in the plan + streamManager = buildAndStartStreamManager(jobConfig); + if (plan.getApplicationConfig().getAppMode() == ApplicationConfig.ApplicationMode.BATCH) { + streamManager.clearStreamsFromPreviousRun(getConfigFromPrevRun()); + } + streamManager.createStreams(plan.getIntermediateStreams()); + } finally { + if (streamManager != null) { + streamManager.stop(); + } + } + return plan.getJobConfigs(); + } + + private Config getConfigFromPrevRun() { + CoordinatorStreamSystemConsumer consumer = new CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap()); + consumer.register(); + consumer.start(); + consumer.bootstrap(); + consumer.stop(); + + Config cfg = consumer.getConfig(); + LOG.info("Previous config is: " + cfg.toString()); + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java index 3c1a1dc..5411af3 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java @@ -19,10 +19,12 @@ package org.apache.samza.operators; +import com.google.common.annotations.VisibleForTesting; import java.time.Duration; import java.util.Collection; import org.apache.samza.SamzaException; +import org.apache.samza.application.StreamApplicationDescriptorImpl; import org.apache.samza.operators.functions.FilterFunction; import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.operators.functions.JoinFunction; @@ -53,7 +55,7 @@ import org.apache.samza.table.TableSpec; /** * The {@link MessageStream} implementation that lets users describe their logical DAG. - * Users can obtain an instance by calling {@link StreamGraph#getInputStream}. + * Users can obtain an instance by calling {@link StreamApplicationDescriptorImpl#getInputStream}. * <p> * Each {@link MessageStreamImpl} is associated with a single {@link OperatorSpec} in the DAG and allows * users to chain further operators on its {@link OperatorSpec}. In other words, a {@link MessageStreamImpl} @@ -63,54 +65,54 @@ import org.apache.samza.table.TableSpec; */ public class MessageStreamImpl<M> implements MessageStream<M> { /** - * The {@link StreamGraphSpec} that contains this {@link MessageStreamImpl} + * The {@link StreamApplicationDescriptorImpl} that contains this {@link MessageStreamImpl} */ - private final StreamGraphSpec graph; + private final StreamApplicationDescriptorImpl streamAppDesc; /** * The {@link OperatorSpec} associated with this {@link MessageStreamImpl} */ private final OperatorSpec operatorSpec; - public MessageStreamImpl(StreamGraphSpec graph, OperatorSpec<?, M> operatorSpec) { - this.graph = graph; + public MessageStreamImpl(StreamApplicationDescriptorImpl streamAppDesc, OperatorSpec<?, M> operatorSpec) { + this.streamAppDesc = streamAppDesc; this.operatorSpec = operatorSpec; } @Override public <TM> MessageStream<TM> map(MapFunction<? super M, ? extends TM> mapFn) { - String opId = this.graph.getNextOpId(OpCode.MAP); + String opId = this.streamAppDesc.getNextOpId(OpCode.MAP); StreamOperatorSpec<M, TM> op = OperatorSpecs.createMapOperatorSpec(mapFn, opId); this.operatorSpec.registerNextOperatorSpec(op); - return new MessageStreamImpl<>(this.graph, op); + return new MessageStreamImpl<>(this.streamAppDesc, op); } @Override public MessageStream<M> filter(FilterFunction<? super M> filterFn) { - String opId = this.graph.getNextOpId(OpCode.FILTER); + String opId = this.streamAppDesc.getNextOpId(OpCode.FILTER); StreamOperatorSpec<M, M> op = OperatorSpecs.createFilterOperatorSpec(filterFn, opId); this.operatorSpec.registerNextOperatorSpec(op); - return new MessageStreamImpl<>(this.graph, op); + return new MessageStreamImpl<>(this.streamAppDesc, op); } @Override public <TM> MessageStream<TM> flatMap(FlatMapFunction<? super M, ? extends TM> flatMapFn) { - String opId = this.graph.getNextOpId(OpCode.FLAT_MAP); + String opId = this.streamAppDesc.getNextOpId(OpCode.FLAT_MAP); StreamOperatorSpec<M, TM> op = OperatorSpecs.createFlatMapOperatorSpec(flatMapFn, opId); this.operatorSpec.registerNextOperatorSpec(op); - return new MessageStreamImpl<>(this.graph, op); + return new MessageStreamImpl<>(this.streamAppDesc, op); } @Override public void sink(SinkFunction<? super M> sinkFn) { - String opId = this.graph.getNextOpId(OpCode.SINK); + String opId = this.streamAppDesc.getNextOpId(OpCode.SINK); SinkOperatorSpec<M> op = OperatorSpecs.createSinkOperatorSpec(sinkFn, opId); this.operatorSpec.registerNextOperatorSpec(op); } @Override public void sendTo(OutputStream<M> outputStream) { - String opId = this.graph.getNextOpId(OpCode.SEND_TO); + String opId = this.streamAppDesc.getNextOpId(OpCode.SEND_TO); OutputOperatorSpec<M> op = OperatorSpecs.createSendToOperatorSpec( (OutputStreamImpl<M>) outputStream, opId); this.operatorSpec.registerNextOperatorSpec(op); @@ -118,10 +120,10 @@ public class MessageStreamImpl<M> implements MessageStream<M> { @Override public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window, String userDefinedId) { - String opId = this.graph.getNextOpId(OpCode.WINDOW, userDefinedId); + String opId = this.streamAppDesc.getNextOpId(OpCode.WINDOW, userDefinedId); OperatorSpec<M, WindowPane<K, WV>> op = OperatorSpecs.createWindowOperatorSpec((WindowInternal<M, K, WV>) window, opId); this.operatorSpec.registerNextOperatorSpec(op); - return new MessageStreamImpl<>(this.graph, op); + return new MessageStreamImpl<>(this.streamAppDesc, op); } @Override @@ -130,7 +132,7 @@ public class MessageStreamImpl<M> implements MessageStream<M> { Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde, Duration ttl, String userDefinedId) { if (otherStream.equals(this)) throw new SamzaException("Cannot join a MessageStream with itself."); - String opId = this.graph.getNextOpId(OpCode.JOIN, userDefinedId); + String opId = this.streamAppDesc.getNextOpId(OpCode.JOIN, userDefinedId); OperatorSpec<?, OM> otherOpSpec = ((MessageStreamImpl<OM>) otherStream).getOperatorSpec(); JoinOperatorSpec<K, M, OM, JM> op = OperatorSpecs.createJoinOperatorSpec(this.operatorSpec, otherOpSpec, (JoinFunction<K, M, OM, JM>) joinFn, keySerde, @@ -138,35 +140,35 @@ public class MessageStreamImpl<M> implements MessageStream<M> { this.operatorSpec.registerNextOperatorSpec(op); otherOpSpec.registerNextOperatorSpec((OperatorSpec<OM, ?>) op); - return new MessageStreamImpl<>(this.graph, op); + return new MessageStreamImpl<>(this.streamAppDesc, op); } @Override public <K, R extends KV, JM> MessageStream<JM> join(Table<R> table, StreamTableJoinFunction<? extends K, ? super M, ? super R, ? extends JM> joinFn) { - String opId = this.graph.getNextOpId(OpCode.JOIN); + String opId = this.streamAppDesc.getNextOpId(OpCode.JOIN); TableSpec tableSpec = ((TableImpl) table).getTableSpec(); StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec = OperatorSpecs.createStreamTableJoinOperatorSpec( tableSpec, (StreamTableJoinFunction<K, M, R, JM>) joinFn, opId); this.operatorSpec.registerNextOperatorSpec(joinOpSpec); - return new MessageStreamImpl<>(this.graph, joinOpSpec); + return new MessageStreamImpl<>(this.streamAppDesc, joinOpSpec); } @Override public MessageStream<M> merge(Collection<? extends MessageStream<? extends M>> otherStreams) { if (otherStreams.isEmpty()) return this; - String opId = this.graph.getNextOpId(OpCode.MERGE); + String opId = this.streamAppDesc.getNextOpId(OpCode.MERGE); StreamOperatorSpec<M, M> op = OperatorSpecs.createMergeOperatorSpec(opId); this.operatorSpec.registerNextOperatorSpec(op); otherStreams.forEach(other -> ((MessageStreamImpl<M>) other).getOperatorSpec().registerNextOperatorSpec(op)); - return new MessageStreamImpl<>(this.graph, op); + return new MessageStreamImpl<>(this.streamAppDesc, op); } @Override public <K, V> MessageStream<KV<K, V>> partitionBy(MapFunction<? super M, ? extends K> keyExtractor, MapFunction<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde, String userDefinedId) { - String opId = this.graph.getNextOpId(OpCode.PARTITION_BY, userDefinedId); - IntermediateMessageStreamImpl<KV<K, V>> intermediateStream = this.graph.getIntermediateStream(opId, serde, false); + String opId = this.streamAppDesc.getNextOpId(OpCode.PARTITION_BY, userDefinedId); + IntermediateMessageStreamImpl<KV<K, V>> intermediateStream = this.streamAppDesc.getIntermediateStream(opId, serde, false); if (!intermediateStream.isKeyed()) { // this can only happen when the default serde partitionBy variant is being used throw new SamzaException("partitionBy can not be used with a default serde that is not a KVSerde."); @@ -185,7 +187,7 @@ public class MessageStreamImpl<M> implements MessageStream<M> { @Override public <K, V> void sendTo(Table<KV<K, V>> table) { - String opId = this.graph.getNextOpId(OpCode.SEND_TO); + String opId = this.streamAppDesc.getNextOpId(OpCode.SEND_TO); SendToTableOperatorSpec<K, V> op = OperatorSpecs.createSendToTableOperatorSpec(((TableImpl) table).getTableSpec(), opId); this.operatorSpec.registerNextOperatorSpec(op); @@ -193,8 +195,8 @@ public class MessageStreamImpl<M> implements MessageStream<M> { @Override public MessageStream<M> broadcast(Serde<M> serde, String userDefinedId) { - String opId = this.graph.getNextOpId(OpCode.BROADCAST, userDefinedId); - IntermediateMessageStreamImpl<M> intermediateStream = this.graph.getIntermediateStream(opId, serde, true); + String opId = this.streamAppDesc.getNextOpId(OpCode.BROADCAST, userDefinedId); + IntermediateMessageStreamImpl<M> intermediateStream = this.streamAppDesc.getIntermediateStream(opId, serde, true); BroadcastOperatorSpec<M> broadcastOperatorSpec = OperatorSpecs.createBroadCastOperatorSpec(intermediateStream.getOutputStream(), opId); this.operatorSpec.registerNextOperatorSpec(broadcastOperatorSpec); @@ -210,7 +212,8 @@ public class MessageStreamImpl<M> implements MessageStream<M> { * Get the {@link OperatorSpec} associated with this {@link MessageStreamImpl}. * @return the {@link OperatorSpec} associated with this {@link MessageStreamImpl}. */ - protected OperatorSpec<?, M> getOperatorSpec() { + @VisibleForTesting + public OperatorSpec<?, M> getOperatorSpec() { return this.operatorSpec; } http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java b/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java index b6c3dae..b75b1e8 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java +++ b/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java @@ -25,6 +25,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import org.apache.samza.application.StreamApplicationDescriptorImpl; import org.apache.samza.operators.spec.InputOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.OutputStreamImpl; @@ -33,10 +34,11 @@ import org.apache.samza.table.TableSpec; /** - * Defines the serialized format of {@link StreamGraphSpec}. This class encapsulates all getter methods to get the {@link OperatorSpec} - * initialized in the {@link StreamGraphSpec} and constructsthe corresponding serialized instances of {@link OperatorSpec}. - * The {@link StreamGraphSpec} and {@link OperatorSpec} instances included in this class are considered as immutable and read-only. - * The instance of {@link OperatorSpecGraph} should only be used in runtime to construct {@link org.apache.samza.task.StreamOperatorTask}. + * Defines the serialized format of the operator graph in {@link StreamApplicationDescriptorImpl}. This class encapsulates all + * getter methods to get the {@link OperatorSpec} initialized in the {@link StreamApplicationDescriptorImpl} and constructs the + * corresponding serialized instances of {@link OperatorSpec}. The {@link StreamApplicationDescriptorImpl} and {@link OperatorSpec} + * instances included in this class are considered as immutable and read-only. The instance of {@link OperatorSpecGraph} + * should only be used in runtime to construct {@link org.apache.samza.task.StreamOperatorTask}. */ public class OperatorSpecGraph implements Serializable { // We use a LHM for deterministic order in initializing and closing operators. @@ -51,11 +53,11 @@ public class OperatorSpecGraph implements Serializable { private transient final SerializableSerde<OperatorSpecGraph> opSpecGraphSerde = new SerializableSerde<>(); private transient final byte[] serializedOpSpecGraph; - OperatorSpecGraph(StreamGraphSpec graphSpec) { - this.inputOperators = graphSpec.getInputOperators(); - this.outputStreams = graphSpec.getOutputStreams(); - this.broadcastStreams = graphSpec.getBroadcastStreams(); - this.tables = graphSpec.getTables(); + public OperatorSpecGraph(StreamApplicationDescriptorImpl streamAppDesc) { + this.inputOperators = streamAppDesc.getInputOperators(); + this.outputStreams = streamAppDesc.getOutputStreams(); + this.broadcastStreams = streamAppDesc.getBroadcastStreams(); + this.tables = streamAppDesc.getTables(); this.allOpSpecs = Collections.unmodifiableSet(this.findAllOperatorSpecs()); this.hasWindowOrJoins = checkWindowOrJoins(); this.serializedOpSpecGraph = opSpecGraphSerde.toBytes(this); @@ -78,7 +80,7 @@ public class OperatorSpecGraph implements Serializable { } /** - * Get all {@link OperatorSpec}s available in this {@link StreamGraphSpec} + * Get all {@link OperatorSpec}s available in this {@link StreamApplicationDescriptorImpl} * * @return all available {@link OperatorSpec}s */ @@ -87,9 +89,9 @@ public class OperatorSpecGraph implements Serializable { } /** - * Returns <tt>true</tt> iff this {@link StreamGraphSpec} contains a join or a window operator + * Returns <tt>true</tt> iff this {@link StreamApplicationDescriptorImpl} contains a join or a window operator * - * @return <tt>true</tt> iff this {@link StreamGraphSpec} contains a join or a window operator + * @return <tt>true</tt> iff this {@link StreamApplicationDescriptorImpl} contains a join or a window operator */ public boolean hasWindowOrJoins() { return hasWindowOrJoins; http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java deleted file mode 100644 index 8eb2425..0000000 --- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java +++ /dev/null @@ -1,336 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - -import java.util.Collections; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.regex.Pattern; -import org.apache.commons.lang3.StringUtils; -import org.apache.samza.SamzaException; -import org.apache.samza.config.Config; -import org.apache.samza.config.JobConfig; -import org.apache.samza.operators.descriptors.base.stream.InputDescriptor; -import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor; -import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; -import org.apache.samza.operators.functions.InputTransformer; -import org.apache.samza.operators.functions.StreamExpander; -import org.apache.samza.operators.spec.InputOperatorSpec; -import org.apache.samza.operators.spec.OperatorSpec.OpCode; -import org.apache.samza.operators.spec.OperatorSpecs; -import org.apache.samza.operators.spec.OutputStreamImpl; -import org.apache.samza.operators.stream.IntermediateMessageStreamImpl; -import org.apache.samza.serializers.KVSerde; -import org.apache.samza.serializers.NoOpSerde; -import org.apache.samza.serializers.Serde; -import org.apache.samza.table.Table; -import org.apache.samza.table.TableSpec; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class defines: - * 1) an implementation of {@link StreamGraph} that provides APIs for accessing {@link MessageStream}s to be used to - * create the DAG of transforms. - * 2) a builder that creates a serializable {@link OperatorSpecGraph} from user-defined DAG - */ -public class StreamGraphSpec implements StreamGraph { - private static final Logger LOGGER = LoggerFactory.getLogger(StreamGraphSpec.class); - private static final Pattern ID_PATTERN = Pattern.compile("[\\d\\w-_]+"); - - // We use a LHM for deterministic order in initializing and closing operators. - private final Map<String, InputOperatorSpec> inputOperators = new LinkedHashMap<>(); - private final Map<String, OutputStreamImpl> outputStreams = new LinkedHashMap<>(); - private final Map<String, InputDescriptor> inputDescriptors = new LinkedHashMap<>(); - private final Map<String, OutputDescriptor> outputDescriptors = new LinkedHashMap<>(); - private final Map<String, SystemDescriptor> systemDescriptors = new LinkedHashMap<>(); - private final Set<String> broadcastStreams = new HashSet<>(); - private final Map<TableSpec, TableImpl> tables = new LinkedHashMap<>(); - private final Config config; - - /** - * The 0-based position of the next operator in the graph. - * Part of the unique ID for each OperatorSpec in the graph. - * Should only accessed and incremented via {@link #getNextOpId(OpCode, String)}. - */ - private int nextOpNum = 0; - private final Set<String> operatorIds = new HashSet<>(); - private ContextManager contextManager = null; - private Optional<SystemDescriptor> defaultSystemDescriptorOptional = Optional.empty(); - - public StreamGraphSpec(Config config) { - this.config = config; - } - - @Override - public void setDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor) { - Preconditions.checkNotNull(defaultSystemDescriptor, "Provided defaultSystemDescriptor must not be null."); - String defaultSystemName = defaultSystemDescriptor.getSystemName(); - Preconditions.checkState(inputOperators.isEmpty() && outputStreams.isEmpty(), - "Default system must be set before creating any input or output streams."); - checkSystemDescriptorUniqueness(defaultSystemDescriptor, defaultSystemName); - systemDescriptors.put(defaultSystemName, defaultSystemDescriptor); - this.defaultSystemDescriptorOptional = Optional.of(defaultSystemDescriptor); - } - - @Override - public <M> MessageStream<M> getInputStream(InputDescriptor<M, ?> inputDescriptor) { - SystemDescriptor systemDescriptor = inputDescriptor.getSystemDescriptor(); - Optional<StreamExpander> expander = systemDescriptor.getExpander(); - if (expander.isPresent()) { - return expander.get().apply(this, inputDescriptor); - } - - String streamId = inputDescriptor.getStreamId(); - Preconditions.checkState(!inputOperators.containsKey(streamId), - "getInputStream must not be called multiple times with the same streamId: " + streamId); - Preconditions.checkState(!inputDescriptors.containsKey(streamId), - "getInputStream must not be called multiple times with the same input descriptor: " + streamId); - String systemName = systemDescriptor.getSystemName(); - checkSystemDescriptorUniqueness(systemDescriptor, systemName); - - Serde serde = inputDescriptor.getSerde(); - KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde); - if (outputStreams.containsKey(streamId)) { - OutputStreamImpl outputStream = outputStreams.get(streamId); - Serde keySerde = outputStream.getKeySerde(); - Serde valueSerde = outputStream.getValueSerde(); - Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde), - String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at " - + "stream level, so the same key and message Serde must be used for both.", streamId)); - } - - boolean isKeyed = serde instanceof KVSerde; - InputTransformer transformer = inputDescriptor.getTransformer().orElse(null); - InputOperatorSpec inputOperatorSpec = - OperatorSpecs.createInputOperatorSpec(streamId, kvSerdes.getKey(), kvSerdes.getValue(), - transformer, isKeyed, this.getNextOpId(OpCode.INPUT, null)); - inputOperators.put(streamId, inputOperatorSpec); - inputDescriptors.put(streamId, inputDescriptor); - systemDescriptors.put(systemDescriptor.getSystemName(), systemDescriptor); - return new MessageStreamImpl(this, inputOperators.get(streamId)); - } - - @Override - public <M> OutputStream<M> getOutputStream(OutputDescriptor<M, ?> outputDescriptor) { - String streamId = outputDescriptor.getStreamId(); - Preconditions.checkState(!outputStreams.containsKey(streamId), - "getOutputStream must not be called multiple times with the same streamId: " + streamId); - Preconditions.checkState(!outputDescriptors.containsKey(streamId), - "getOutputStream must not be called multiple times with the same output descriptor: " + streamId); - SystemDescriptor systemDescriptor = outputDescriptor.getSystemDescriptor(); - String systemName = systemDescriptor.getSystemName(); - checkSystemDescriptorUniqueness(systemDescriptor, systemName); - - Serde serde = outputDescriptor.getSerde(); - KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde); - if (inputOperators.containsKey(streamId)) { - InputOperatorSpec inputOperatorSpec = inputOperators.get(streamId); - Serde keySerde = inputOperatorSpec.getKeySerde(); - Serde valueSerde = inputOperatorSpec.getValueSerde(); - Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde), - String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at " - + "stream level, so the same key and message Serde must be used for both.", streamId)); - } - - boolean isKeyed = serde instanceof KVSerde; - outputStreams.put(streamId, new OutputStreamImpl<>(streamId, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed)); - outputDescriptors.put(streamId, outputDescriptor); - systemDescriptors.put(systemDescriptor.getSystemName(), systemDescriptor); - return outputStreams.get(streamId); - } - - @Override - public <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor) { - String tableId = tableDescriptor.getTableId(); - Preconditions.checkState(StringUtils.isNotBlank(tableId) && ID_PATTERN.matcher(tableId).matches(), - String.format("tableId: %s must confirm to pattern: %s", tableId, ID_PATTERN.toString())); - TableSpec tableSpec = ((BaseTableDescriptor) tableDescriptor).getTableSpec(); - if (tables.containsKey(tableSpec)) { - throw new IllegalStateException( - String.format("getTable() invoked multiple times with the same tableId: %s", tableId)); - } - tables.put(tableSpec, new TableImpl(tableSpec)); - return tables.get(tableSpec); - } - - @Override - public StreamGraph withContextManager(ContextManager contextManager) { - this.contextManager = contextManager; - return this; - } - - public ContextManager getContextManager() { - return this.contextManager; - } - - public OperatorSpecGraph getOperatorSpecGraph() { - return new OperatorSpecGraph(this); - } - - /** - * Gets the unique ID for the next operator in the graph. The ID is of the following format: - * jobName-jobId-opCode-(userDefinedId|nextOpNum); - * - * @param opCode the {@link OpCode} of the next operator - * @param userDefinedId the optional user-provided name of the next operator or null - * @return the unique ID for the next operator in the graph - */ - public String getNextOpId(OpCode opCode, String userDefinedId) { - if (StringUtils.isNotBlank(userDefinedId) && !ID_PATTERN.matcher(userDefinedId).matches()) { - throw new SamzaException("Operator ID must not contain spaces or special characters: " + userDefinedId); - } - - String nextOpId = String.format("%s-%s-%s-%s", - config.get(JobConfig.JOB_NAME()), - config.get(JobConfig.JOB_ID(), "1"), - opCode.name().toLowerCase(), - StringUtils.isNotBlank(userDefinedId) ? userDefinedId.trim() : String.valueOf(nextOpNum)); - if (!operatorIds.add(nextOpId)) { - throw new SamzaException( - String.format("Found duplicate operator ID %s in the graph. Operator IDs must be unique.", nextOpId)); - } - nextOpNum++; - return nextOpId; - } - - /** - * Gets the unique ID for the next operator in the graph. The ID is of the following format: - * jobName-jobId-opCode-nextOpNum; - * - * @param opCode the {@link OpCode} of the next operator - * @return the unique ID for the next operator in the graph - */ - public String getNextOpId(OpCode opCode) { - return getNextOpId(opCode, null); - } - - /** - * Internal helper for {@link MessageStreamImpl} to add an intermediate {@link MessageStream} to the graph. - * An intermediate {@link MessageStream} is both an output and an input stream. - * - * @param streamId the id of the stream to be created. - * @param serde the {@link Serde} to use for the message in the intermediate stream. If null, the default serde - * is used. - * @param isBroadcast whether the stream is a broadcast stream. - * @param <M> the type of messages in the intermediate {@link MessageStream} - * @return the intermediate {@link MessageStreamImpl} - */ - @VisibleForTesting - public <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamId, Serde<M> serde, boolean isBroadcast) { - Preconditions.checkState(!inputOperators.containsKey(streamId) && !outputStreams.containsKey(streamId), - "getIntermediateStream must not be called multiple times with the same streamId: " + streamId); - - if (serde == null) { - LOGGER.info("No serde provided for intermediate stream: " + streamId + - ". Key and message serdes configured for the job.default.system will be used."); - } - - if (isBroadcast) broadcastStreams.add(streamId); - - boolean isKeyed; - KV<Serde, Serde> kvSerdes; - if (serde == null) { // if no explicit serde available - isKeyed = true; // assume keyed stream - kvSerdes = new KV<>(null, null); // and that key and msg serdes are provided for job.default.system in configs - } else { - isKeyed = serde instanceof KVSerde; - kvSerdes = getKVSerdes(streamId, serde); - } - - InputTransformer transformer = (InputTransformer) defaultSystemDescriptorOptional - .flatMap(SystemDescriptor::getTransformer).orElse(null); - - InputOperatorSpec inputOperatorSpec = - OperatorSpecs.createInputOperatorSpec(streamId, kvSerdes.getKey(), kvSerdes.getValue(), - transformer, isKeyed, this.getNextOpId(OpCode.INPUT, null)); - inputOperators.put(streamId, inputOperatorSpec); - outputStreams.put(streamId, new OutputStreamImpl(streamId, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed)); - return new IntermediateMessageStreamImpl<>(this, inputOperators.get(streamId), outputStreams.get(streamId)); - } - - Map<String, InputOperatorSpec> getInputOperators() { - return Collections.unmodifiableMap(inputOperators); - } - - Map<String, OutputStreamImpl> getOutputStreams() { - return Collections.unmodifiableMap(outputStreams); - } - - Set<String> getBroadcastStreams() { - return Collections.unmodifiableSet(broadcastStreams); - } - - Map<TableSpec, TableImpl> getTables() { - return Collections.unmodifiableMap(tables); - } - - public Map<String, InputDescriptor> getInputDescriptors() { - return Collections.unmodifiableMap(inputDescriptors); - } - - public Map<String, OutputDescriptor> getOutputDescriptors() { - return Collections.unmodifiableMap(outputDescriptors); - } - - public Set<SystemDescriptor> getSystemDescriptors() { - // We enforce that users must not use different system descriptor instances for the same system name - // when getting an input/output stream or setting the default system descriptor - return Collections.unmodifiableSet(new HashSet<>(systemDescriptors.values())); - } - - public Optional<SystemDescriptor> getDefaultSystemDescriptor() { - return this.defaultSystemDescriptorOptional; - } - - private void checkSystemDescriptorUniqueness(SystemDescriptor systemDescriptor, String systemName) { - Preconditions.checkState(!systemDescriptors.containsKey(systemName) - || systemDescriptors.get(systemName) == systemDescriptor, - "Must not use different system descriptor instances for the same system name: " + systemName); - } - - private KV<Serde, Serde> getKVSerdes(String streamId, Serde serde) { - Serde keySerde, valueSerde; - - if (serde instanceof KVSerde) { - keySerde = ((KVSerde) serde).getKeySerde(); - valueSerde = ((KVSerde) serde).getValueSerde(); - } else { - keySerde = new NoOpSerde(); - valueSerde = serde; - } - - if (keySerde instanceof NoOpSerde) { - LOGGER.info("Using NoOpSerde as the key serde for stream " + streamId + - ". Keys will not be (de)serialized"); - } - if (valueSerde instanceof NoOpSerde) { - LOGGER.info("Using NoOpSerde as the value serde for stream " + streamId + - ". Values will not be (de)serialized"); - } - - return KV.of(keySerde, valueSerde); - } -}