Repository: samza Updated Branches: refs/heads/master bee30f5df -> 4dbc2c6de
SAMZA-570: Enabling auto-discovery of regex input topics This PR makes the following changes * Enriches StreamPartitionCountMonitor to periodically monitor input-regexes to match to actual inputs and stop the job when a new input stream is discovered. * Add a new API to SysAdmin to allow listing of all streams, e.g., Kafka-topics. KafkaSysAdmin implementation of this uses KafkaConsumer's listTopics API. (Even if listTopics had 1 million topics with 100 bytes per topic total, temporary memory overhead will be 100 MB). * Added config job.coordinator.monitor-input-regex.frequency.ms for the monitoring frequency, and job.coordinator.monitor-input-regex.%s for each input system. Users can then choose desired regex for each input system, e.g., job.coordinator.monitor-input-regex.kafka=test-.*. * We can later enrich RegexTopicGen rewriter to add a monitor-input-regex config to allow periodic jonitoring * Tested: Unit test for SPCM and tested with test jobs on local grid. Author: Ray Matharu <rmath...@linkedin.com> Reviewers: Jagadish<jagad...@apache.org> Closes #796 from rmatharu/newtopic-test Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4dbc2c6d Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4dbc2c6d Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4dbc2c6d Branch: refs/heads/master Commit: 4dbc2c6de5e9048654eed0195364fdc09cefa3bf Parents: bee30f5 Author: Ray Matharu <rmath...@linkedin.com> Authored: Fri Nov 30 17:06:30 2018 -0800 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Fri Nov 30 17:06:30 2018 -0800 ---------------------------------------------------------------------- .../org/apache/samza/system/SystemAdmin.java | 8 + .../apache/samza/PartitionChangeException.java | 31 --- .../ClusterBasedJobCoordinator.java | 93 ++++++-- .../InputStreamsDiscoveredException.java | 32 +++ .../coordinator/PartitionChangeException.java | 33 +++ .../samza/coordinator/StreamRegexMonitor.java | 223 +++++++++++++++++++ .../org/apache/samza/config/JobConfig.scala | 49 +++- .../samza/system/StreamMetadataCache.scala | 14 +- .../coordinator/TestInputRegexMonitor.java | 136 +++++++++++ .../TestStreamPartitionCountMonitor.scala | 6 +- .../samza/system/kafka/KafkaSystemAdmin.java | 8 + .../org/apache/samza/config/KafkaConfig.scala | 9 +- .../samza/config/RegExTopicGenerator.scala | 3 +- .../samza/config/TestRegExTopicGenerator.scala | 2 +- 14 files changed, 585 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java index 6ee7df2..8201b3d 100644 --- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java +++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java @@ -156,4 +156,12 @@ public interface SystemAdmin { return getSystemStreamMetadata(streamNames); } + /** + * Fetch the set of all available streams + * @return The set of all available SystemStreams. + */ + default Set<SystemStream> getAllSystemStreams() { + throw new UnsupportedOperationException(); + } + } http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-core/src/main/java/org/apache/samza/PartitionChangeException.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/PartitionChangeException.java b/samza-core/src/main/java/org/apache/samza/PartitionChangeException.java deleted file mode 100644 index 4619dfa..0000000 --- a/samza-core/src/main/java/org/apache/samza/PartitionChangeException.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza; - - -/** - * Exception to indicate that the input {@link org.apache.samza.system.SystemStreamPartition} changed - */ -public class PartitionChangeException extends SamzaException { - - public PartitionChangeException(String s) { - super(s); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java index ff70df0..4c5a34b 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java @@ -19,11 +19,14 @@ package org.apache.samza.clustermanager; import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Pattern; import org.apache.samza.SamzaException; -import org.apache.samza.PartitionChangeException; import org.apache.samza.checkpoint.CheckpointManager; import org.apache.samza.config.ClusterManagerConfig; import org.apache.samza.config.Config; @@ -33,8 +36,11 @@ import org.apache.samza.config.ShellCommandConfig; import org.apache.samza.config.StorageConfig; import org.apache.samza.config.TaskConfigJava; import org.apache.samza.container.TaskName; +import org.apache.samza.coordinator.InputStreamsDiscoveredException; import org.apache.samza.coordinator.JobModelManager; +import org.apache.samza.coordinator.PartitionChangeException; import org.apache.samza.coordinator.StreamPartitionCountMonitor; +import org.apache.samza.coordinator.StreamRegexMonitor; import org.apache.samza.coordinator.stream.CoordinatorStreamManager; import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.JobModel; @@ -49,9 +55,9 @@ import org.apache.samza.system.SystemStream; import org.apache.samza.util.SystemClock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.Option; +import scala.collection.JavaConverters; -import java.io.IOException; -import java.util.concurrent.atomic.AtomicBoolean; /** * Implements a JobCoordinator that is completely independent of the underlying cluster @@ -134,7 +140,12 @@ public class ClusterBasedJobCoordinator { /** * The input topic partition count monitor */ - private final StreamPartitionCountMonitor partitionMonitor; + private final Optional<StreamPartitionCountMonitor> partitionMonitor; + + /** + * The input stream regex monitor + */ + private final Optional<StreamRegexMonitor> inputStreamRegexMonitor; /** * Metrics to track stats around container failures, needed containers etc. @@ -174,7 +185,8 @@ public class ClusterBasedJobCoordinator { // build a JobModelManager and ChangelogStreamManager and perform partition assignments. changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager); - jobModelManager = JobModelManager.apply(coordinatorStreamManager.getConfig(), changelogStreamManager.readPartitionMapping()); + jobModelManager = + JobModelManager.apply(coordinatorStreamManager.getConfig(), changelogStreamManager.readPartitionMapping()); config = jobModelManager.jobModel().getConfig(); hasDurableStores = new StorageConfig(config).hasDurableStores(); @@ -182,6 +194,7 @@ public class ClusterBasedJobCoordinator { // The systemAdmins should be started before partitionMonitor can be used. And it should be stopped when this coordinator is stopped. systemAdmins = new SystemAdmins(config); partitionMonitor = getPartitionCountMonitor(config, systemAdmins); + inputStreamRegexMonitor = getInputRegexMonitor(config, systemAdmins); clusterManagerConfig = new ClusterManagerConfig(config); isJmxEnabled = clusterManagerConfig.getJmxEnabledOnJobCoordinator(); @@ -226,7 +239,7 @@ public class ClusterBasedJobCoordinator { Map<TaskName, Integer> taskPartitionMappings = new HashMap<>(); Map<String, ContainerModel> containers = jobModel.getContainers(); - for (ContainerModel containerModel: containers.values()) { + for (ContainerModel containerModel : containers.values()) { for (TaskModel taskModel : containerModel.getTasks().values()) { taskPartitionMappings.put(taskModel.getTaskName(), taskModel.getChangelogPartition().getPartitionId()); } @@ -236,7 +249,8 @@ public class ClusterBasedJobCoordinator { containerProcessManager.start(); systemAdmins.start(); - partitionMonitor.start(); + partitionMonitor.ifPresent(monitor -> monitor.start()); + inputStreamRegexMonitor.ifPresent(monitor -> monitor.start()); boolean isInterrupted = false; @@ -270,7 +284,8 @@ public class ClusterBasedJobCoordinator { private void onShutDown() { try { - partitionMonitor.stop(); + partitionMonitor.ifPresent(monitor -> monitor.stop()); + inputStreamRegexMonitor.ifPresent(monitor -> monitor.stop()); systemAdmins.stop(); containerProcessManager.stop(); coordinatorStreamManager.stop(); @@ -289,26 +304,63 @@ public class ClusterBasedJobCoordinator { } } - private StreamPartitionCountMonitor getPartitionCountMonitor(Config config, SystemAdmins systemAdmins) { + private Optional<StreamPartitionCountMonitor> getPartitionCountMonitor(Config config, SystemAdmins systemAdmins) { StreamMetadataCache streamMetadata = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance()); Set<SystemStream> inputStreamsToMonitor = new TaskConfigJava(config).getAllInputStreams(); if (inputStreamsToMonitor.isEmpty()) { throw new SamzaException("Input streams to a job can not be empty."); } - return new StreamPartitionCountMonitor( - inputStreamsToMonitor, - streamMetadata, - metrics, - new JobConfig(config).getMonitorPartitionChangeFrequency(), - streamsChanged -> { - // Fail the jobs with durable state store. Otherwise, application state.status remains UNDEFINED s.t. YARN job will be restarted + return Optional.of(new StreamPartitionCountMonitor(inputStreamsToMonitor, streamMetadata, metrics, + new JobConfig(config).getMonitorPartitionChangeFrequency(), streamsChanged -> { + // Fail the jobs with durable state store. Otherwise, application state.status remains UNDEFINED s.t. YARN job will be restarted if (hasDurableStores) { log.error("Input topic partition count changed in a job with durable state. Failing the job."); state.status = SamzaApplicationState.SamzaAppStatus.FAILED; } coordinatorException = new PartitionChangeException("Input topic partition count changes detected."); - }); + })); + } + + private Optional<StreamRegexMonitor> getInputRegexMonitor(Config config, SystemAdmins systemAdmins) { + + // if input regex monitor is not enabled return empty + if (new JobConfig(config).getMonitorRegexEnabled()) { + log.info("StreamRegexMonitor is disabled."); + return Optional.empty(); + } + + StreamMetadataCache streamMetadata = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance()); + Set<SystemStream> inputStreamsToMonitor = new TaskConfigJava(config).getAllInputStreams(); + if (inputStreamsToMonitor.isEmpty()) { + throw new SamzaException("Input streams to a job can not be empty."); + } + + // First list all rewriters + Option<String> rewritersList = new JobConfig(config).getConfigRewriters(); + + // if no rewriter is defined, there is nothing to monitor + if (!rewritersList.isDefined()) { + log.warn("No config rewriters are defined. No StreamRegexMonitor created."); + return Optional.empty(); + } + + // Compile a map of each input-system to its corresponding input-monitor-regex patterns + Map<String, Pattern> inputRegexesToMonitor = + JavaConverters.mapAsJavaMapConverter(new JobConfig(config).getMonitorRegexPatternMap(rewritersList.get())) + .asJava(); + + return Optional.of(new StreamRegexMonitor(inputStreamsToMonitor, inputRegexesToMonitor, streamMetadata, metrics, + new JobConfig(config).getMonitorRegexFrequency(), new StreamRegexMonitor.Callback() { + @Override + public void onInputStreamsChanged(Set<SystemStream> initialInputSet, Set<SystemStream> newInputStreams, + Map<String, Pattern> regexesMonitored) { + log.error("New input system-streams discovered. Failing the job. New input streams: {}", newInputStreams, + " Existing input streams:", inputStreamsToMonitor); + state.status = SamzaApplicationState.SamzaAppStatus.FAILED; + coordinatorException = new InputStreamsDiscoveredException("New input streams added: " + newInputStreams); + } + })); } // The following two methods are package-private and for testing only @@ -321,10 +373,9 @@ public class ClusterBasedJobCoordinator { @VisibleForTesting StreamPartitionCountMonitor getPartitionMonitor() { - return partitionMonitor; + return partitionMonitor.get(); } - /** * The entry point for the {@link ClusterBasedJobCoordinator} * @param args args @@ -335,12 +386,14 @@ public class ClusterBasedJobCoordinator { try { //Read and parse the coordinator system config. log.info("Parsing coordinator system config {}", coordinatorSystemEnv); - coordinatorSystemConfig = new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, Config.class)); + coordinatorSystemConfig = + new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, Config.class)); } catch (IOException e) { log.error("Exception while reading coordinator stream config {}", e); throw new SamzaException(e); } ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(coordinatorSystemConfig); jc.run(); + log.info("Finished ClusterBasedJobCoordinator run"); } } http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-core/src/main/java/org/apache/samza/coordinator/InputStreamsDiscoveredException.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/InputStreamsDiscoveredException.java b/samza-core/src/main/java/org/apache/samza/coordinator/InputStreamsDiscoveredException.java new file mode 100644 index 0000000..6e85f43 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/coordinator/InputStreamsDiscoveredException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.coordinator; + +import org.apache.samza.SamzaException; + + +/** + * Exception to indicate that the new input streams have been added. + */ +public class InputStreamsDiscoveredException extends SamzaException { + + public InputStreamsDiscoveredException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-core/src/main/java/org/apache/samza/coordinator/PartitionChangeException.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/PartitionChangeException.java b/samza-core/src/main/java/org/apache/samza/coordinator/PartitionChangeException.java new file mode 100644 index 0000000..4594307 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/coordinator/PartitionChangeException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.coordinator; + +import org.apache.samza.SamzaException; + + +/** + * Exception to indicate that the input {@link org.apache.samza.system.SystemStreamPartition} changed + */ +public class PartitionChangeException extends SamzaException { + + public PartitionChangeException(String s) { + super(s); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-core/src/main/java/org/apache/samza/coordinator/StreamRegexMonitor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/StreamRegexMonitor.java b/samza-core/src/main/java/org/apache/samza/coordinator/StreamRegexMonitor.java new file mode 100644 index 0000000..3c86bfb --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/coordinator/StreamRegexMonitor.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.coordinator; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.samza.metrics.Gauge; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.system.StreamMetadataCache; +import org.apache.samza.system.SystemStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.JavaConverters; + + +/** + * A single-thread based monitor that periodically monitors the given set of stream regexes, and matches them to + * the given set of streams. If a stream matching a given regex that is not in the corresponding stream set is detected, + * it invokes a {@link StreamRegexMonitor.Callback} with the initial input set, the new input stream set, and the regexes + * being monitored. + */ +public class StreamRegexMonitor { + private static final Logger log = LoggerFactory.getLogger(StreamRegexMonitor.class); + + // Factory of daemon-threads to create the single threaded executor pool + private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Samza-" + StreamRegexMonitor.class.getSimpleName()) + .build(); + + // Enum to describe the state of the regexMonitor + private enum State { + INIT, RUNNING, STOPPED + } + + private final Set<SystemStream> streamsToMonitor; + private final Map<String, Pattern> systemRegexesToMonitor; + private final StreamMetadataCache metadataCache; + private final int inputRegexMonitorPeriodMs; + + // Map of gauges (one per system), emitted when new input stream for that system is detected + private final Map<String, Gauge<Integer>> gauges; + + private final Callback callbackMethod; + + // Used to guard write access to state. + private final Object lock = new Object(); + + private final ScheduledExecutorService schedulerService = Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY); + + private volatile State state = State.INIT; + + /** + * A callback that is invoked when the {@link StreamRegexMonitor} detects a new input stream matching given regex. + */ + public interface Callback { + /** + * Method to be called when new input streams are detected. + * @param initialInputSet The initial set of input streams + * @param newInputStreams The set of new input streams discovered + * @param regexesMonitored The set of regexes being monitored + */ + void onInputStreamsChanged(Set<SystemStream> initialInputSet, Set<SystemStream> newInputStreams, + Map<String, Pattern> regexesMonitored); + } + + /** + * Default constructor. + * + * @param streamsToMonitor a set of SystemStreams to monitor + * @param systemRegexesToMonitor map of regexes for each input system + * @param metadataCache the metadata cache which will be used to fetch metadata for partition counts. + * @param metrics the metrics registry to which the metrics should be added. + * @param inputRegexMonitorPeriodMs the period at which the monitor will check each input-regex + * @param monitorCallback the callback method to be invoked when new input stream matching regex is detected + */ + public StreamRegexMonitor(Set<SystemStream> streamsToMonitor, Map<String, Pattern> systemRegexesToMonitor, + StreamMetadataCache metadataCache, MetricsRegistry metrics, int inputRegexMonitorPeriodMs, + Callback monitorCallback) { + this.streamsToMonitor = streamsToMonitor; + this.systemRegexesToMonitor = systemRegexesToMonitor; + this.metadataCache = metadataCache; + this.callbackMethod = monitorCallback; + this.inputRegexMonitorPeriodMs = inputRegexMonitorPeriodMs; + + // Pre-populate the gauges + Map<String, Gauge<Integer>> mutableGauges = new HashMap<>(); + for (String systemToMonitor : systemRegexesToMonitor.keySet()) { + Gauge gauge = metrics.newGauge("job-coordinator", String.format("%s-new-input-streams", systemToMonitor), 0); + mutableGauges.put(systemToMonitor, gauge); + } + gauges = Collections.unmodifiableMap(mutableGauges); + + log.info("Created {} with inputRegexMonitorPeriodMs: {} and systemRegexesToMonitor: {}", this.getClass().getName(), + this.inputRegexMonitorPeriodMs, this.systemRegexesToMonitor); + } + + /** + * Starts the monitor. + */ + public void start() { + synchronized (lock) { + switch (state) { + case INIT: + if (inputRegexMonitorPeriodMs > 0) { + schedulerService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + monitorInputRegexes(); + } + }, 0, inputRegexMonitorPeriodMs, TimeUnit.MILLISECONDS); + } + state = State.RUNNING; + break; + + case RUNNING: + // start is idempotent + return; + + case STOPPED: + throw new IllegalStateException("StreamRegexMonitor was stopped and cannot be restarted."); + } + } + } + + /** + * Stops the monitor. Once it stops, it cannot be restarted. + */ + public void stop() { + synchronized (lock) { + // We could also wait for full termination of the scheduler service, but it is overkill for + // our use case. + schedulerService.shutdownNow(); + + state = State.STOPPED; + } + } + + private void monitorInputRegexes() { + log.debug("Running monitorInputRegexes"); + + try { + // obtain the list of SysStreams that match given patterns for all systems + Set<SystemStream> inputStreamsMatchingPattern = new HashSet<>(); + + // For each input system, for which we have a regex to monitor + for (String systemName : this.systemRegexesToMonitor.keySet()) { + + try { + // obtain the list of SysStreams that match the regex for this system + // using the systemAdmin in the metadataCache + inputStreamsMatchingPattern.addAll( + JavaConverters.setAsJavaSetConverter(this.metadataCache.getAllSystemStreams(systemName)) + .asJava() + .stream() + .filter(x -> x.getStream().matches(this.systemRegexesToMonitor.get(systemName).pattern())) + .collect(Collectors.toSet())); + } catch (UnsupportedOperationException e) { + log.error("UnsupportedOperationException while monitoring input regexes for system {}", systemName, e); + } + } + + // if there is a stream that is in the input-Set but not in the streamsToMonitor + // since streamsToMonitor = task.inputs + if (!streamsToMonitor.containsAll(inputStreamsMatchingPattern)) { + log.info("New input system-streams discovered. InputStreamsMatchingPattern: {} but streamsToMonitor: {} ", + inputStreamsMatchingPattern, streamsToMonitor); + + // invoke notify callback with new input streams + this.callbackMethod.onInputStreamsChanged(streamsToMonitor, + Sets.difference(inputStreamsMatchingPattern, streamsToMonitor), systemRegexesToMonitor); + } else { + log.info("No new input system-Streams discovered streamsToMonitor {} inputStreamsMatchingPattern {}", + streamsToMonitor, inputStreamsMatchingPattern); + } + } catch (Exception e) { + log.error("Exception while monitoring input regexes.", e); + } + } + + @VisibleForTesting + boolean isRunning() { + return state == State.RUNNING; + } + + /** + * Wait until this service has shutdown. Returns true if shutdown occurred within the timeout + * and false otherwise. + * <p> + * This is currently exposed at the package private level for tests only. + */ + @VisibleForTesting + boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return schedulerService.awaitTermination(timeout, unit); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala index 2bc6420..5363e72 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala @@ -21,12 +21,16 @@ package org.apache.samza.config import java.io.File +import java.util.regex.Pattern import org.apache.samza.container.grouper.stream.GroupByPartitionFactory import org.apache.samza.coordinator.metadatastore.CoordinatorStreamMetadataStoreFactory import org.apache.samza.runtime.DefaultLocationIdProviderFactory import org.apache.samza.util.Logging +import scala.collection.mutable + + object JobConfig { // job config constants val STREAM_JOB_FACTORY_CLASS = "job.factory.class" // streaming.job_factory_class @@ -77,6 +81,15 @@ object JobConfig { val JOB_FAIL_CHECKPOINT_VALIDATION = "job.checkpoint.validation.enabled" val MONITOR_PARTITION_CHANGE = "job.coordinator.monitor-partition-change" val MONITOR_PARTITION_CHANGE_FREQUENCY_MS = "job.coordinator.monitor-partition-change.frequency.ms" + + val MONITOR_INPUT_REGEX_FREQUENCY_MS = "job.coordinator.monitor-input-regex.frequency.ms" + val DEFAULT_MONITOR_INPUT_REGEX_FREQUENCY_MS = 300000 + + val REGEX_RESOLVED_STREAMS = "job.config.rewriter.%s.regex" + val REGEX_RESOLVED_SYSTEM = "job.config.rewriter.%s.system" + val REGEX_INHERITED_CONFIG = "job.config.rewriter.%s.config" + + val DEFAULT_MONITOR_PARTITION_CHANGE_FREQUENCY_MS = 300000 val JOB_SECURITY_MANAGER_FACTORY = "job.security.manager.factory" @@ -127,7 +140,7 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging { def getCoordinatorSystemName = { val system = getCoordinatorSystemNameOrNull if (system == null) { - throw new ConfigException("Missing job.coordinator.system configuration. Cannot proceed with job execution.") + throw new ConfigException("Missing job.coordinator.system configuration. Cannot proceed with job execution." + config) } system } @@ -158,10 +171,44 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging { } } + // StreamRegexMonitor is disabled if the MonitorRegexFRequency is <= 0 + def getMonitorRegexEnabled = (getMonitorRegexFrequency <= 0) + def getMonitorPartitionChangeFrequency = getInt( JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS, JobConfig.DEFAULT_MONITOR_PARTITION_CHANGE_FREQUENCY_MS) + def getMonitorRegexFrequency = getInt( + JobConfig.MONITOR_INPUT_REGEX_FREQUENCY_MS, + JobConfig.DEFAULT_MONITOR_INPUT_REGEX_FREQUENCY_MS) + + def getMonitorRegexPatternMap(rewritersList : String) : mutable.HashMap[String, Pattern] = { + // Compile a map of each input-system to its corresponding input-monitor-regex patterns + val inputRegexesToMonitor: mutable.HashMap[String, Pattern] = mutable.HashMap[String, Pattern]() + val rewriters: Array[String] = rewritersList.split(",") + // iterate over each rewriter and obtain the system and regex for it + for (rewriterName <- rewriters) { + val rewriterSystem: Option[String] = new JobConfig(config).getRegexResolvedSystem(rewriterName) + val rewriterRegex: Option[String] = new JobConfig(config).getRegexResolvedStreams(rewriterName) + if (rewriterSystem.isDefined && rewriterRegex.isDefined) { + var patternForSystem: Option[Pattern] = inputRegexesToMonitor.get(rewriterSystem.get) + patternForSystem = + if (patternForSystem == None) Some(Pattern.compile(rewriterRegex.get)) + else + Some(Pattern.compile(String.join("|", patternForSystem.get.pattern(), rewriterRegex.get))) + inputRegexesToMonitor.put(rewriterSystem.get, patternForSystem.get) + } + } + inputRegexesToMonitor + } + + // regex-related config methods duplicated from KafkaConfig to avoid module dependency + def getRegexResolvedStreams(rewriterName: String) = getOption(JobConfig.REGEX_RESOLVED_STREAMS format rewriterName) + + def getRegexResolvedSystem(rewriterName: String) = getOption(JobConfig.REGEX_RESOLVED_SYSTEM format rewriterName) + + + def getStreamJobFactoryClass = getOption(JobConfig.STREAM_JOB_FACTORY_CLASS) def getJobId = getOption(JobConfig.JOB_ID).getOrElse("1") http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala index edffac7..abd6942 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala @@ -19,9 +19,11 @@ package org.apache.samza.system -import org.apache.samza.util.{Logging, Clock, SystemClock} +import org.apache.samza.util.{Clock, Logging, SystemClock} import org.apache.samza.SamzaException + import scala.collection.JavaConverters._ +import scala.collection.mutable /** * Caches requests to SystemAdmin.getSystemStreamMetadata for a short while (by default @@ -84,6 +86,16 @@ class StreamMetadataCache ( } /** + * Returns the list of System Streams for this system. + * @param systemName + * @param pattern + */ + def getAllSystemStreams(systemName: String): mutable.Set[SystemStream] = { + val systemAdmin = systemAdmins.getSystemAdmin(systemName) + systemAdmin.getAllSystemStreams().asScala + } + + /** * Returns metadata about the given streams. If the metadata isn't in the cache, it is retrieved from the systems * using the given SystemAdmins. * http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-core/src/test/scala/org/apache/samza/coordinator/TestInputRegexMonitor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestInputRegexMonitor.java b/samza-core/src/test/scala/org/apache/samza/coordinator/TestInputRegexMonitor.java new file mode 100644 index 0000000..84b026b --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestInputRegexMonitor.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.coordinator; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.system.StreamMetadataCache; +import org.apache.samza.system.SystemAdmins; +import org.apache.samza.system.SystemStream; +import org.apache.samza.util.Clock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import scala.collection.JavaConversions$; + + +public class TestInputRegexMonitor { + + private StreamRegexMonitor streamRegexMonitor; + private CountDownLatch callbackCount; + + private int inputRegexMs = 10; + private String systemName = "kafka"; + private int expectedNumberOfCallbacks = 1; + private Set<SystemStream> inputStreamsDiscovered; + private final SystemStream sampleStream = new SystemStream(systemName, "test-1"); + + @Before + public void setUp() { + + inputStreamsDiscovered = new HashSet<>(); + Map<String, Pattern> patternMap = new HashMap<>(); + patternMap.put(systemName, Pattern.compile("test-.*")); + + StreamMetadataCache mockStreamMetadataCache = new MockStreamMetadataCache(null, 1, null); + + MetricsRegistry metrics = Mockito.mock(MetricsRegistry.class); + this.callbackCount = new CountDownLatch(expectedNumberOfCallbacks); + + // Creating an streamRegexMonitor with empty-input set and test-.* regex input + this.streamRegexMonitor = + new StreamRegexMonitor(new HashSet<>(), patternMap, mockStreamMetadataCache, metrics, inputRegexMs, + new StreamRegexMonitor.Callback() { + @Override + public void onInputStreamsChanged(Set<SystemStream> initialInputSet, Set<SystemStream> newInputStreams, + Map<String, Pattern> regexesMonitored) { + callbackCount.countDown(); + inputStreamsDiscovered.addAll(newInputStreams); + + // Check that the newInputStream discovered is "kafka" "Test-1" + Assert.assertTrue(inputStreamsDiscovered.size() == 1); + Assert.assertTrue(inputStreamsDiscovered.contains(sampleStream)); + } + }); + } + + @Test + public void testStartStop() throws InterruptedException { + Assert.assertFalse(streamRegexMonitor.isRunning()); + + // Normal start + streamRegexMonitor.start(); + Assert.assertTrue(streamRegexMonitor.isRunning()); + + // Start ought to be idempotent + streamRegexMonitor.start(); + Assert.assertTrue(streamRegexMonitor.isRunning()); + + // Normal stop + streamRegexMonitor.stop(); + Assert.assertTrue(streamRegexMonitor.awaitTermination(1, TimeUnit.SECONDS)); + Assert.assertFalse(streamRegexMonitor.isRunning()); + + try { + streamRegexMonitor.start(); + } catch (Exception e) { + Assert.assertTrue(e.getClass().equals(IllegalStateException.class)); + } + + // Stop ought to be idempotent + Assert.assertFalse(streamRegexMonitor.isRunning()); + streamRegexMonitor.stop(); + Assert.assertFalse(streamRegexMonitor.isRunning()); + } + + @Test + public void testSchedulingAndInputAddition() throws Exception { + this.streamRegexMonitor.start(); + try { + if (!callbackCount.await(1, TimeUnit.SECONDS)) { + throw new Exception( + "Did not see " + expectedNumberOfCallbacks + " callbacks after waiting. " + callbackCount.toString()); + } + } finally { + System.out.println("CallbackCount is " + callbackCount.getCount()); + this.streamRegexMonitor.stop(); + } + } + + private class MockStreamMetadataCache extends StreamMetadataCache { + + public MockStreamMetadataCache(SystemAdmins systemAdmins, int cacheTTLms, Clock clock) { + super(systemAdmins, cacheTTLms, clock); + } + + @Override + public scala.collection.mutable.Set getAllSystemStreams(String systemName) { + Set<SystemStream> s = new HashSet<>(); + return JavaConversions$.MODULE$.asScalaSet(new HashSet<SystemStream>(Arrays.asList(sampleStream))); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala index 2aafab1..0cc7a90 100644 --- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala +++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala @@ -20,7 +20,8 @@ package org.apache.samza.coordinator import java.util.concurrent.{CountDownLatch, TimeUnit} -import java.util.HashMap +import java.util.regex.Pattern + import org.apache.samza.Partition import org.apache.samza.metrics.{Gauge, MetricsRegistryMap} import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata @@ -34,6 +35,7 @@ import org.scalatest.junit.AssertionsForJUnit import org.scalatest.mockito.MockitoSugar import scala.collection.JavaConverters._ +import scala.collection.immutable.HashMap class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSugar { @@ -43,6 +45,7 @@ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSug val mockMetadataCache = mock[StreamMetadataCache] val inputSystemStream = new SystemStream("test-system", "test-stream") val inputSystemStreamSet = Set[SystemStream](inputSystemStream) + val inputRegexMap : java.util.Map[String, Pattern] = HashMap("test-system"-> Pattern.compile(".*")).asJava val initialPartitionMetadata = new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() { { @@ -209,6 +212,7 @@ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSug def testScheduler(): Unit = { val mockMetadataCache = new MockStreamMetadataCache val inputSystemStream = new SystemStream("test-system", "test-stream") + val inputRegexMap : java.util.Map[String, Pattern] = HashMap("test-system"-> Pattern.compile(".*")).asJava val inputSystemStreamSet = Set[SystemStream](inputSystemStream) val sampleCount = new CountDownLatch(2); // Verify 2 invocations http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java index 596b07a..c3c66c7 100644 --- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java +++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java @@ -54,6 +54,7 @@ import org.apache.samza.config.SystemConfig; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.StreamValidationException; import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemStream; import org.apache.samza.system.SystemStreamMetadata; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.util.ExponentialSleepStrategy; @@ -679,6 +680,13 @@ public class KafkaSystemAdmin implements SystemAdmin { return () -> ZkUtils.apply(zkConnect, 6000, 6000, false); } + @Override + public Set<SystemStream> getAllSystemStreams() { + return ((Set<String>) this.metadataConsumer.listTopics().keySet()).stream() + .map(x -> new SystemStream(systemName, x)) + .collect(Collectors.toSet()); + } + /** * Container for metadata about offsets. */ http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index 1954ac7..607feb0 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -40,9 +40,6 @@ object KafkaConfig { val TOPIC_REPLICATION_FACTOR = "replication.factor" val TOPIC_DEFAULT_REPLICATION_FACTOR = "2" - val REGEX_RESOLVED_STREAMS = "job.config.rewriter.%s.regex" - val REGEX_RESOLVED_SYSTEM = "job.config.rewriter.%s.system" - val REGEX_INHERITED_CONFIG = "job.config.rewriter.%s.config" val SEGMENT_BYTES = "segment.bytes" @@ -206,11 +203,11 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { } // regex resolver - def getRegexResolvedStreams(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_STREAMS format rewriterName) + def getRegexResolvedStreams(rewriterName: String) = getOption(JobConfig.REGEX_RESOLVED_STREAMS format rewriterName) - def getRegexResolvedSystem(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_SYSTEM format rewriterName) + def getRegexResolvedSystem(rewriterName: String) = getOption(JobConfig.REGEX_RESOLVED_SYSTEM format rewriterName) - def getRegexResolvedInheritedConfig(rewriterName: String) = config.subset((KafkaConfig.REGEX_INHERITED_CONFIG format rewriterName) + ".", true) + def getRegexResolvedInheritedConfig(rewriterName: String) = config.subset((JobConfig.REGEX_INHERITED_CONFIG format rewriterName) + ".", true) /** * Gets the replication factor for the changelog topics. Uses the following precedence. http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala index e6068b0..654354b 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala @@ -21,7 +21,8 @@ package org.apache.samza.config import org.I0Itec.zkclient.ZkClient import kafka.utils.ZkUtils -import org.apache.samza.config.KafkaConfig.{Config2Kafka, REGEX_RESOLVED_STREAMS} +import org.apache.samza.config.KafkaConfig.{Config2Kafka} +import org.apache.samza.config.JobConfig.{REGEX_RESOLVED_STREAMS} import org.apache.samza.SamzaException import org.apache.samza.util.{Logging, StreamUtil} http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala index 69d7da6..8b19292 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala @@ -24,7 +24,7 @@ import collection.JavaConverters._ import org.junit.Assert._ import org.junit.Test -import KafkaConfig._ +import JobConfig._ class TestRegExTopicGenerator {