Repository: samza
Updated Branches:
  refs/heads/master 7a8927de5 -> de5d92e51


Side inputs: Bug fixes

 - Use the correct regex in Util#getSystemStreamFromNameOrId
 - Handle end of stream during dispatch of message to side input
processor
 - Fix validation to check for presence of side input processor for a
given store instead of looking up the side input processor factory.

Author: Bharath Kumarasubramanian <[email protected]>

Reviewers: Prateek Maheshwari <[email protected]>

Closes #580 from bharathkk/side-input-bugs


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/de5d92e5
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/de5d92e5
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/de5d92e5

Branch: refs/heads/master
Commit: de5d92e51994d77cba6590b6f360f1ddda2fdc4f
Parents: 7a8927d
Author: Bharath Kumarasubramanian <[email protected]>
Authored: Tue Jul 24 22:17:25 2018 -0700
Committer: Prateek Maheshwari <[email protected]>
Committed: Tue Jul 24 22:17:25 2018 -0700

----------------------------------------------------------------------
 .../apache/samza/storage/TaskSideInputStorageManager.java    | 8 ++------
 .../main/scala/org/apache/samza/container/TaskInstance.scala | 2 +-
 samza-core/src/main/scala/org/apache/samza/util/Util.scala   | 2 +-
 3 files changed, 4 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/de5d92e5/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
 
b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
index 7a0a822..66c9106 100644
--- 
a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
@@ -34,11 +34,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.JavaStorageConfig;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueStore;
@@ -76,7 +74,6 @@ public class TaskSideInputStorageManager {
   private final StreamMetadataCache streamMetadataCache;
   private final SystemAdmins systemAdmins;
   private final TaskName taskName;
-  private final JavaStorageConfig storageConfig;
   private final Map<SystemStreamPartition, String> lastProcessedOffsets = new 
ConcurrentHashMap<>();
 
   private Map<SystemStreamPartition, String> startingOffsets;
@@ -92,7 +89,6 @@ public class TaskSideInputStorageManager {
       Config config,
       Clock clock) {
     this.clock = clock;
-    this.storageConfig = new JavaStorageConfig(config);
     this.stores = sideInputStores;
     this.storeBaseDir = storeBaseDir;
     this.storeToSSps = storesToSSPs;
@@ -365,9 +361,9 @@ public class TaskSideInputStorageManager {
 
   private void validateStoreConfiguration() {
     stores.forEach((storeName, storageEngine) -> {
-        if 
(StringUtils.isBlank(storageConfig.getSideInputsProcessorFactory(storeName))) {
+        if (!storeToProcessor.containsKey(storeName)) {
           throw new SamzaException(
-              String.format("Side inputs processor factory configuration 
missing for store: %s.", storeName));
+              String.format("Side inputs processor missing for store: %s.", 
storeName));
         }
 
         if (storageEngine.getStoreProperties().isLoggedStore()) {

http://git-wip-us.apache.org/repos/asf/samza/blob/de5d92e5/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 0caca4f..d1e9b3c 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -179,7 +179,7 @@ class TaskInstance(
       trace("Processing incoming message envelope for taskName and SSP: %s, %s"
         format (taskName, incomingMessageSsp))
 
-      if (sideInputSSPs.contains(incomingMessageSsp)) {
+      if (sideInputSSPs.contains(incomingMessageSsp) && 
!envelope.isEndOfStream) {
         sideInputStorageManager.process(envelope)
       } else {
         if (isAsyncTask) {

http://git-wip-us.apache.org/repos/asf/samza/blob/de5d92e5/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala 
b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index c9534bc..fd06c20 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -94,7 +94,7 @@ object Util extends Logging {
     * @return the [[SystemStream]] for the stream
     */
   def getSystemStreamFromNameOrId(config: Config, stream: String): 
SystemStream = {
-    val parts = stream.split(".")
+    val parts = stream.split("\\.")
     if (parts.length == 0 || parts.length > 2) {
       throw new SamzaException(
         String.format("Invalid stream %s. Expected to be of the format 
streamId or systemName.streamName", stream))

Reply via email to