Repository: samza Updated Branches: refs/heads/master a08040dcb -> 7bb206151
SAMZA-1788: Add LocationIdProvider abstraction. Currently in standalone, by default hostName of the standalone processor is used as LocationId. However, for containerized environments like azure cloud, kubernetes this defaulting does not work. Standalone processors can be launched from different kubernetes container on a physical machine(where each kubernetes container has different locatliyID than other kubernetes container within same machine). To solve this problem, we introduce locationID abstraction to allow users to plugin a uniqueId identifying the execution environment of the processor. In containerized environments, LocationId is a composite key of multiple fields: (sliceId, containerId, hostname) By default hostname will be used as LocationId(if not configured by the user). All the processors of an application registered from an locationID should be able to share(read/write) their local state stores. Any custom LocationIdProvider is expected to honor this contract when generating the locationID. This patch is part of SEP-11. Please refer to it for more details. Author: Shanthoosh Venkataraman <spven...@usc.edu> Reviewers: Jagadish<jagad...@apache.org> Closes #585 from shanthoosh/add_location_id_interface Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7bb20615 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7bb20615 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7bb20615 Branch: refs/heads/master Commit: 7bb206151e250321eea9a49abd0f2b2bddb1c943 Parents: a08040d Author: Shanthoosh Venkataraman <spven...@usc.edu> Authored: Fri Aug 10 14:26:42 2018 -0700 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Fri Aug 10 14:26:42 2018 -0700 ---------------------------------------------------------------------- .../org/apache/samza/runtime/LocationId.java | 60 ++++++++++++++++++++ .../samza/runtime/LocationIdProvider.java | 28 +++++++++ .../runtime/LocationIdProviderFactory.java | 28 +++++++++ .../DefaultLocationIdProviderFactory.java | 32 +++++++++++ .../org/apache/samza/config/JobConfig.scala | 5 ++ 5 files changed, 153 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/7bb20615/samza-api/src/main/java/org/apache/samza/runtime/LocationId.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/runtime/LocationId.java b/samza-api/src/main/java/org/apache/samza/runtime/LocationId.java new file mode 100644 index 0000000..48ea7ff --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/runtime/LocationId.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.runtime; + +import java.util.Objects; + +/** + * Represents the physical execution environment of the StreamProcessor. + * All the stream processors which run from a LocationId should be able to share (read/write) + * their local state stores. + */ +public class LocationId { + private final String locationId; + + public LocationId(String locationId) { + if (locationId == null) { + throw new IllegalArgumentException("LocationId cannot be null"); + } + this.locationId = locationId; + } + + public String getId() { + return this.locationId; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + LocationId that = (LocationId) o; + + return Objects.equals(locationId, that.locationId); + } + + @Override + public int hashCode() { + return locationId.hashCode(); + } + + @Override + public String toString() { + return locationId; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/7bb20615/samza-api/src/main/java/org/apache/samza/runtime/LocationIdProvider.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/runtime/LocationIdProvider.java b/samza-api/src/main/java/org/apache/samza/runtime/LocationIdProvider.java new file mode 100644 index 0000000..d0e0af0 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/runtime/LocationIdProvider.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.runtime; + +/** + * Generates {@link LocationId} that uniquely identifies the + * execution environment of a stream processor. + */ +public interface LocationIdProvider { + + LocationId getLocationId(); +} http://git-wip-us.apache.org/repos/asf/samza/blob/7bb20615/samza-api/src/main/java/org/apache/samza/runtime/LocationIdProviderFactory.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/runtime/LocationIdProviderFactory.java b/samza-api/src/main/java/org/apache/samza/runtime/LocationIdProviderFactory.java new file mode 100644 index 0000000..b8017d8 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/runtime/LocationIdProviderFactory.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.runtime; + +import org.apache.samza.config.Config; + +/** + * Builds the {@link LocationIdProvider}. + */ +public interface LocationIdProviderFactory { + LocationIdProvider getLocationIdProvider(Config config); +} http://git-wip-us.apache.org/repos/asf/samza/blob/7bb20615/samza-core/src/main/java/org/apache/samza/runtime/DefaultLocationIdProviderFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/DefaultLocationIdProviderFactory.java b/samza-core/src/main/java/org/apache/samza/runtime/DefaultLocationIdProviderFactory.java new file mode 100644 index 0000000..47c5330 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/runtime/DefaultLocationIdProviderFactory.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.runtime; + +import org.apache.samza.config.Config; +import org.apache.samza.util.Util; + +/** + * Uses the address of the local host for generating {@link LocationId}. + */ +public class DefaultLocationIdProviderFactory implements LocationIdProviderFactory { + @Override + public LocationIdProvider getLocationIdProvider(Config config) { + return () -> new LocationId(Util.getLocalHost().getHostName()); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/7bb20615/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala index 7cebcc6..15d9d20 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala @@ -23,6 +23,7 @@ package org.apache.samza.config import java.io.File import org.apache.samza.container.grouper.stream.GroupByPartitionFactory +import org.apache.samza.runtime.DefaultLocationIdProviderFactory import org.apache.samza.util.Logging object JobConfig { @@ -77,6 +78,8 @@ object JobConfig { val DEFAULT_MONITOR_PARTITION_CHANGE_FREQUENCY_MS = 300000 val JOB_SECURITY_MANAGER_FACTORY = "job.security.manager.factory" + val LOCATION_ID_PROVIDER_FACTORY = "locationid.provider.factory" + // Processor Config Constants val PROCESSOR_ID = "processor.id" val PROCESSOR_LIST = "processor.list" @@ -168,6 +171,8 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging { def getSystemStreamPartitionGrouperFactory = getOption(JobConfig.SSP_GROUPER_FACTORY).getOrElse(classOf[GroupByPartitionFactory].getCanonicalName) + def getLocationIdProviderFactory = getOption(JobConfig.LOCATION_ID_PROVIDER_FACTORY).getOrElse(classOf[DefaultLocationIdProviderFactory].getCanonicalName) + def getSecurityManagerFactory = getOption(JobConfig.JOB_SECURITY_MANAGER_FACTORY) def getSSPMatcherClass = getOption(JobConfig.SSP_MATCHER_CLASS)