Repository: samza Updated Branches: refs/heads/master 7a8927de5 -> de5d92e51
Side inputs: Bug fixes - Use the correct regex in Util#getSystemStreamFromNameOrId - Handle end of stream during dispatch of message to side input processor - Fix validation to check for presence of side input processor for a given store instead of looking up the side input processor factory. Author: Bharath Kumarasubramanian <[email protected]> Reviewers: Prateek Maheshwari <[email protected]> Closes #580 from bharathkk/side-input-bugs Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/de5d92e5 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/de5d92e5 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/de5d92e5 Branch: refs/heads/master Commit: de5d92e51994d77cba6590b6f360f1ddda2fdc4f Parents: 7a8927d Author: Bharath Kumarasubramanian <[email protected]> Authored: Tue Jul 24 22:17:25 2018 -0700 Committer: Prateek Maheshwari <[email protected]> Committed: Tue Jul 24 22:17:25 2018 -0700 ---------------------------------------------------------------------- .../apache/samza/storage/TaskSideInputStorageManager.java | 8 ++------ .../main/scala/org/apache/samza/container/TaskInstance.scala | 2 +- samza-core/src/main/scala/org/apache/samza/util/Util.scala | 2 +- 3 files changed, 4 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/de5d92e5/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java index 7a0a822..66c9106 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java +++ b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java @@ -34,11 +34,9 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; -import org.apache.commons.lang3.StringUtils; import org.apache.samza.Partition; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; -import org.apache.samza.config.JavaStorageConfig; import org.apache.samza.container.TaskName; import org.apache.samza.storage.kv.Entry; import org.apache.samza.storage.kv.KeyValueStore; @@ -76,7 +74,6 @@ public class TaskSideInputStorageManager { private final StreamMetadataCache streamMetadataCache; private final SystemAdmins systemAdmins; private final TaskName taskName; - private final JavaStorageConfig storageConfig; private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>(); private Map<SystemStreamPartition, String> startingOffsets; @@ -92,7 +89,6 @@ public class TaskSideInputStorageManager { Config config, Clock clock) { this.clock = clock; - this.storageConfig = new JavaStorageConfig(config); this.stores = sideInputStores; this.storeBaseDir = storeBaseDir; this.storeToSSps = storesToSSPs; @@ -365,9 +361,9 @@ public class TaskSideInputStorageManager { private void validateStoreConfiguration() { stores.forEach((storeName, storageEngine) -> { - if (StringUtils.isBlank(storageConfig.getSideInputsProcessorFactory(storeName))) { + if (!storeToProcessor.containsKey(storeName)) { throw new SamzaException( - String.format("Side inputs processor factory configuration missing for store: %s.", storeName)); + String.format("Side inputs processor missing for store: %s.", storeName)); } if (storageEngine.getStoreProperties().isLoggedStore()) { http://git-wip-us.apache.org/repos/asf/samza/blob/de5d92e5/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index 0caca4f..d1e9b3c 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -179,7 +179,7 @@ class TaskInstance( trace("Processing incoming message envelope for taskName and SSP: %s, %s" format (taskName, incomingMessageSsp)) - if (sideInputSSPs.contains(incomingMessageSsp)) { + if (sideInputSSPs.contains(incomingMessageSsp) && !envelope.isEndOfStream) { sideInputStorageManager.process(envelope) } else { if (isAsyncTask) { http://git-wip-us.apache.org/repos/asf/samza/blob/de5d92e5/samza-core/src/main/scala/org/apache/samza/util/Util.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala index c9534bc..fd06c20 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala @@ -94,7 +94,7 @@ object Util extends Logging { * @return the [[SystemStream]] for the stream */ def getSystemStreamFromNameOrId(config: Config, stream: String): SystemStream = { - val parts = stream.split(".") + val parts = stream.split("\\.") if (parts.length == 0 || parts.length > 2) { throw new SamzaException( String.format("Invalid stream %s. Expected to be of the format streamId or systemName.streamName", stream))
