Repository: samza
Updated Branches:
  refs/heads/master a08040dcb -> 7bb206151


SAMZA-1788: Add LocationIdProvider abstraction.

Currently in standalone, by default hostName of the standalone processor is 
used as LocationId. However, for containerized environments like azure cloud, 
kubernetes this defaulting does not work. Standalone processors can be launched 
from different kubernetes container on a physical machine(where each kubernetes 
container has different locatliyID than other kubernetes container within same 
machine).

To solve this problem, we introduce locationID abstraction to allow users to 
plugin a uniqueId identifying the execution environment of the processor.

In containerized environments, LocationId is a composite key of multiple 
fields: (sliceId, containerId, hostname) By default hostname will be used as 
LocationId(if not configured by the user).

All the processors of an application registered from an locationID should be 
able to share(read/write) their local state stores. Any custom 
LocationIdProvider is expected to honor this contract when generating the 
locationID.

This patch is part of SEP-11. Please refer to it for more details.

Author: Shanthoosh Venkataraman <spven...@usc.edu>

Reviewers: Jagadish<jagad...@apache.org>

Closes #585 from shanthoosh/add_location_id_interface


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7bb20615
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7bb20615
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7bb20615

Branch: refs/heads/master
Commit: 7bb206151e250321eea9a49abd0f2b2bddb1c943
Parents: a08040d
Author: Shanthoosh Venkataraman <spven...@usc.edu>
Authored: Fri Aug 10 14:26:42 2018 -0700
Committer: Jagadish <jvenkatra...@linkedin.com>
Committed: Fri Aug 10 14:26:42 2018 -0700

----------------------------------------------------------------------
 .../org/apache/samza/runtime/LocationId.java    | 60 ++++++++++++++++++++
 .../samza/runtime/LocationIdProvider.java       | 28 +++++++++
 .../runtime/LocationIdProviderFactory.java      | 28 +++++++++
 .../DefaultLocationIdProviderFactory.java       | 32 +++++++++++
 .../org/apache/samza/config/JobConfig.scala     |  5 ++
 5 files changed, 153 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/7bb20615/samza-api/src/main/java/org/apache/samza/runtime/LocationId.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/runtime/LocationId.java 
b/samza-api/src/main/java/org/apache/samza/runtime/LocationId.java
new file mode 100644
index 0000000..48ea7ff
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/runtime/LocationId.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.runtime;
+
+import java.util.Objects;
+
+/**
+ * Represents the physical execution environment of the StreamProcessor.
+ * All the stream processors which run from a LocationId should be able to 
share (read/write)
+ * their local state stores.
+ */
+public class LocationId {
+  private final String locationId;
+
+  public LocationId(String locationId) {
+    if (locationId == null) {
+      throw new IllegalArgumentException("LocationId cannot be null");
+    }
+    this.locationId = locationId;
+  }
+
+  public String getId() {
+    return this.locationId;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    LocationId that = (LocationId) o;
+
+    return Objects.equals(locationId, that.locationId);
+  }
+
+  @Override
+  public int hashCode() {
+    return locationId.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return locationId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/7bb20615/samza-api/src/main/java/org/apache/samza/runtime/LocationIdProvider.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/runtime/LocationIdProvider.java 
b/samza-api/src/main/java/org/apache/samza/runtime/LocationIdProvider.java
new file mode 100644
index 0000000..d0e0af0
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/runtime/LocationIdProvider.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.runtime;
+
+/**
+ * Generates {@link LocationId} that uniquely identifies the
+ * execution environment of a stream processor.
+ */
+public interface LocationIdProvider {
+
+  LocationId getLocationId();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/7bb20615/samza-api/src/main/java/org/apache/samza/runtime/LocationIdProviderFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/runtime/LocationIdProviderFactory.java
 
b/samza-api/src/main/java/org/apache/samza/runtime/LocationIdProviderFactory.java
new file mode 100644
index 0000000..b8017d8
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/runtime/LocationIdProviderFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.runtime;
+
+import org.apache.samza.config.Config;
+
+/**
+ * Builds the {@link LocationIdProvider}.
+ */
+public interface LocationIdProviderFactory {
+  LocationIdProvider getLocationIdProvider(Config config);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/7bb20615/samza-core/src/main/java/org/apache/samza/runtime/DefaultLocationIdProviderFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/DefaultLocationIdProviderFactory.java
 
b/samza-core/src/main/java/org/apache/samza/runtime/DefaultLocationIdProviderFactory.java
new file mode 100644
index 0000000..47c5330
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/DefaultLocationIdProviderFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.runtime;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.util.Util;
+
+/**
+ * Uses the address of the local host for generating {@link LocationId}. 
+ */ 
+public class DefaultLocationIdProviderFactory implements 
LocationIdProviderFactory {
+  @Override
+  public LocationIdProvider getLocationIdProvider(Config config) {
+    return ()  -> new LocationId(Util.getLocalHost().getHostName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/7bb20615/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 7cebcc6..15d9d20 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -23,6 +23,7 @@ package org.apache.samza.config
 import java.io.File
 
 import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
+import org.apache.samza.runtime.DefaultLocationIdProviderFactory
 import org.apache.samza.util.Logging
 
 object JobConfig {
@@ -77,6 +78,8 @@ object JobConfig {
   val DEFAULT_MONITOR_PARTITION_CHANGE_FREQUENCY_MS = 300000
   val JOB_SECURITY_MANAGER_FACTORY = "job.security.manager.factory"
 
+  val LOCATION_ID_PROVIDER_FACTORY = "locationid.provider.factory"
+
   // Processor Config Constants
   val PROCESSOR_ID = "processor.id"
   val PROCESSOR_LIST = "processor.list"
@@ -168,6 +171,8 @@ class JobConfig(config: Config) extends 
ScalaMapConfig(config) with Logging {
 
   def getSystemStreamPartitionGrouperFactory = 
getOption(JobConfig.SSP_GROUPER_FACTORY).getOrElse(classOf[GroupByPartitionFactory].getCanonicalName)
 
+  def getLocationIdProviderFactory = 
getOption(JobConfig.LOCATION_ID_PROVIDER_FACTORY).getOrElse(classOf[DefaultLocationIdProviderFactory].getCanonicalName)
+
   def getSecurityManagerFactory = 
getOption(JobConfig.JOB_SECURITY_MANAGER_FACTORY)
 
   def getSSPMatcherClass = getOption(JobConfig.SSP_MATCHER_CLASS)

Reply via email to