sborya commented on a change in pull request #1171: SAMZA-2328: StreamConfig 
from scala to Java
URL: https://github.com/apache/samza/pull/1171#discussion_r329283792
 
 

 ##########
 File path: samza-core/src/main/scala/org/apache/samza/config/StreamConfig.java
 ##########
 @@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+import com.google.common.collect.Sets;
+import org.apache.samza.system.SystemStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class StreamConfig extends MapConfig {
+  public static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+
+  // Samza configs for streams
+  public static final String SAMZA_PROPERTY = "samza.";
+  public static final String SYSTEM = SAMZA_PROPERTY + "system";
+  public static final String PHYSICAL_NAME = SAMZA_PROPERTY + "physical.name";
+  public static final String MSG_SERDE = SAMZA_PROPERTY + "msg.serde";
+  public static final String KEY_SERDE = SAMZA_PROPERTY + "key.serde";
+  public static final String CONSUMER_RESET_OFFSET = SAMZA_PROPERTY + 
"reset.offset";
+  public static final String CONSUMER_OFFSET_DEFAULT = SAMZA_PROPERTY + 
"offset.default";
+  public static final String BOOTSTRAP = SAMZA_PROPERTY + "bootstrap";
+  public static final String PRIORITY = SAMZA_PROPERTY + "priority";
+  public static final String IS_INTERMEDIATE = SAMZA_PROPERTY + "intermediate";
+  public static final String DELETE_COMMITTED_MESSAGES = SAMZA_PROPERTY + 
"delete.committed.messages";
+  public static final String IS_BOUNDED = SAMZA_PROPERTY + "bounded";
+  public static final String BROADCAST = SAMZA_PROPERTY + "broadcast";
+
+  // We don't want any external dependencies on these patterns while both 
exist. Use getProperty to ensure proper values.
+  private static final String STREAMS_PREFIX = "streams.";
+
+  public static final String STREAM_PREFIX = "systems.%s.streams.%s.";
+  public static final String STREAM_ID_PREFIX = STREAMS_PREFIX + "%s.";
+  public static final String SYSTEM_FOR_STREAM_ID = STREAM_ID_PREFIX + SYSTEM;
+  public static final String PHYSICAL_NAME_FOR_STREAM_ID = STREAM_ID_PREFIX + 
PHYSICAL_NAME;
+  public static final String IS_INTERMEDIATE_FOR_STREAM_ID = STREAM_ID_PREFIX 
+ IS_INTERMEDIATE;
+  public static final String DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID = 
STREAM_ID_PREFIX + DELETE_COMMITTED_MESSAGES;
+  public static final String IS_BOUNDED_FOR_STREAM_ID = STREAM_ID_PREFIX + 
IS_BOUNDED;
+  public static final String PRIORITY_FOR_STREAM_ID = STREAM_ID_PREFIX + 
PRIORITY;
+  public static final String CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID = 
STREAM_ID_PREFIX + CONSUMER_OFFSET_DEFAULT;
+  public static final String BOOTSTRAP_FOR_STREAM_ID = STREAM_ID_PREFIX + 
BOOTSTRAP;
+  public static final String BROADCAST_FOR_STREAM_ID = STREAM_ID_PREFIX + 
BROADCAST;
+
+  /**
+   * Helper for accessing configs related to stream properties.
+   *
+   * For most configs, this currently supports two different formats for 
specifying stream properties:
+   * 1) "streams.{streamId}.{property}" (recommended to use this format)
+   * 2) "systems.{systemName}.streams.{streamName}.{property}" (legacy)
+   * Note that some config lookups are only supported through the 
"streams.{streamId}.{property}". See the specific
+   * accessor method to determine which formats are supported.
+   *
+   * Summary of terms:
+   * - streamId: logical identifier used for a stream; configs are specified 
using this streamId
+   * - physical stream: concrete name for a stream (if the physical stream is 
not explicitly configured, then the streamId
+   *   is used as the physical stream
+   * - streamName: within the javadoc for this class, streamName is the same 
as physical stream
+   * - samza property: property which is Samza-specific, which will have 
"samza." as a prefix (e.g. "samza.key.serde");
+   *   this is in contrast to stream-specific properties which are related to 
specific stream technologies
+   */
+
+  private Optional<String> nonEmptyOption(String value) {
+    if (value == null || value.isEmpty()) {
+      return Optional.empty();
+    } else {
+      return Optional.of(value);
+    }
+  }
+
+  public StreamConfig(Config config) {
+    super(config);
+  }
+
+
+  /**
+   * Finds the properties from the legacy config style (config key includes 
system).
+   * This will return a Config with the properties that match the following 
formats (if a property is specified through
+   * multiple formats, priority is top to bottom):
+   * 1) "systems.{systemName}.streams.{streamName}.{property}"
+   * 2) "systems.{systemName}.default.stream.{property}"
+   *
+   * @param systemName the system name under which the properties are 
configured
+   * @param streamName the stream name
+   * @return           the map of properties for the stream
+   */
+  private Map<String, String> getSystemStreamProperties(String systemName, 
String streamName) {
+    if (systemName == null) {
+      Collections.emptyMap();
+    }
+    SystemConfig systemConfig = new SystemConfig(this);
+    Config defaults = systemConfig.getDefaultStreamProperties(systemName);
+    Config explicitConfigs = subset(String.format(STREAM_PREFIX, systemName, 
streamName), true);
+    return new MapConfig(defaults, explicitConfigs);
+  }
+
+
+  /**
+   * Gets all of the properties for the specified streamId (includes current 
and legacy config styles).
+   * This will return a Config with the properties that match the following 
formats (if a property is specified through
+   * multiple formats, priority is top to bottom):
+   * 1) "streams.{streamId}.{property}"
+   * 2) "systems.{systemName}.streams.{streamName}.{property}" where 
systemName is the system mapped to the streamId in
+   * the config and streamName is the physical stream name mapped to the 
stream id
+   * 3) "systems.{systemName}.default.stream.{property}" where systemName is 
the system mapped to the streamId in the
+   * config
+   *
+   * @param streamId the identifier for the stream in the config.
+   * @return         the merged map of config properties from both the legacy 
and new config styles
+   */
+  private MapConfig getAllStreamProperties(String streamId) {
+    Config allProperties = subset(String.format(STREAM_ID_PREFIX, streamId));
+    Map<String, String> inheritedLegacyProperties = 
getSystemStreamProperties(getSystem(streamId), getPhysicalName(streamId));
+    return new MapConfig(Arrays.asList(inheritedLegacyProperties, 
allProperties));
+  }
+
+  /**
+   * Gets the distinct stream IDs of all the streams defined in the config
+   *
+   * @return collection of stream IDs
+   */
+  public Set<String> getStreamIds() {
+    // StreamIds are not allowed to have '.' so the first index of '.' marks 
the end of the streamId.
+    return subset(STREAMS_PREFIX).keySet().stream().map(key -> 
key.substring(0, key.indexOf(".")))
+      .distinct().collect(Collectors.toSet());
+  }
+
+  private List<String> getStreamIdsForSystem(String system) {
+    return getStreamIds().stream().filter(streamId -> 
system.equals(getSystem(streamId))).collect(Collectors.toList());
+  }
+
+  /**
+   * Finds the stream id which corresponds to the systemStream.
+   * This finds the stream id that is mapped to the system in systemStream 
through the config and that has a physical
+   * name (the physical name might be the stream id itself if there is no 
explicit mapping) that matches the stream in
+   * systemStream.
+   * Note: If the stream in the systemStream is a stream id which is mapped to 
a physical stream, then that stream won't
+   * be returned as a stream id here, since the stream in systemStream doesn't 
match the physical stream name.
+   *
+   * @param systemStream system stream to map to stream id
+   * @return             stream id corresponding to the system stream
+   */
+  public String systemStreamToStreamId(SystemStream systemStream) {
+    List<String> streamIds = 
getStreamIdsForSystem(systemStream.getSystem()).stream()
+      .filter(streamId -> 
systemStream.getStream().equals(getPhysicalName(streamId))).collect(Collectors.toList());
+    if (streamIds.size() > 1) {
+      throw new IllegalStateException(String.format("There was more than one 
stream found for system stream %s", systemStream));
+    }
+
+    return streamIds.isEmpty() ? null : streamIds.get(0);
+  }
+
+  /**
+   * Gets the specified Samza property for a SystemStream. A Samza property is 
a property that controls how Samza
+   * interacts with the stream, as opposed to a property of the stream itself.
+   *
+   * First, tries to map the systemStream to a streamId. This will only find a 
streamId if the stream is a physical name
+   * (explicitly mapped physical name or a stream id without a physical name 
mapping). That means this will not map a
+   * stream id to itself if there is a mapping from the stream id to a 
physical stream name. This also requires that the
+   * stream id is mapped to a system in the config.
+   * If a stream id is found:
+   * 1) Look for "streams.{streamId}.{property}" for the stream id.
+   * 2) Otherwise, look for 
"systems.{systemName}.streams.{streamName}.{property}" in which the systemName 
is the system
+   * mapped to the stream id and the streamName is the physical stream name 
for the stream id.
+   * 3) Otherwise, look for "systems.{systemName}.default.stream.{property}" 
in which the systemName is the system
+   * mapped to the stream id.
+   * If a stream id was not found or no property could be found using the 
above keys:
+   * 1) Look for "systems.{systemName}.streams.{streamName}.{property}" in 
which the systemName is the system in the
+   * input systemStream and the streamName is the stream from the input 
systemStream.
+   * 2) Otherwise, look for "systems.{systemName}.default.stream.{property}" 
in which the systemName is the system
+   * in the input systemStream.
+   *
 
 Review comment:
   you mean in the comment? As 3) return null.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to