mynameborat commented on a change in pull request #1449:
URL: https://github.com/apache/samza/pull/1449#discussion_r538641971



##########
File path: 
samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.hash.Funnel;
+import com.google.common.hash.Hashing;
+import com.google.common.hash.PrimitiveSink;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
+import 
org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A class to manage read and writes of {@link JobCoordinatorMetadata} to 
{@link MetadataStore}. It also provides
+ * additional helper functionalities to generate {@link 
JobCoordinatorMetadata} and check for changes across runs.
+ */
+public class JobCoordinatorMetadataManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(JobCoordinatorMetadataManager.class);
+  private static final String APPLICATION_ATTEMPT_COUNT = 
"applicationAttemptCount";
+  private static final String JOB_COORDINATOR_MANAGER_METRICS = 
"job-coordinator-manager";
+  private static final String JOB_MODEL_CHANGED = "jobModelChanged";
+  private static final String CONFIG_CHANGED = "configChanged";
+  private static final String NEW_DEPLOYMENT = "newDeployment";
+
+  static final String CONTAINER_ID_PROPERTY = "CONTAINER_ID";
+  static final String CONTAINER_ID_DELIMITER = "_";
+
+  private final Counter applicationAttemptCount;
+  private final Gauge<Integer> jobModelChangedAcrossApplicationAttempt;
+  private final Gauge<Integer> configChangedAcrossApplicationAttempt;
+  private final Gauge<Integer> newDeployment;
+  private final MetadataStore metadataStore;
+  private final ObjectMapper metadataMapper = 
SamzaObjectMapper.getObjectMapper();
+  private final Serde<String> valueSerde;
+  private final ClusterType clusterType;
+
+  public JobCoordinatorMetadataManager(MetadataStore metadataStore, 
ClusterType clusterType, MetricsRegistry metricsRegistry) {
+    Preconditions.checkNotNull(clusterType, "Cluster type cannot be null");
+    this.clusterType = clusterType;
+    this.metadataStore = metadataStore;
+    this.valueSerde = new 
CoordinatorStreamValueSerde(SetJobCoordinatorMetadataMessage.TYPE);
+
+    applicationAttemptCount = 
metricsRegistry.newCounter(JOB_COORDINATOR_MANAGER_METRICS, 
APPLICATION_ATTEMPT_COUNT);
+    configChangedAcrossApplicationAttempt =
+        metricsRegistry.newGauge(JOB_COORDINATOR_MANAGER_METRICS, 
CONFIG_CHANGED, 0);
+    jobModelChangedAcrossApplicationAttempt =
+        metricsRegistry.newGauge(JOB_COORDINATOR_MANAGER_METRICS, 
JOB_MODEL_CHANGED, 0);
+    newDeployment = metricsRegistry.newGauge(JOB_COORDINATOR_MANAGER_METRICS, 
NEW_DEPLOYMENT, 0);
+  }
+
+  /**
+   * Generates {@link JobCoordinatorMetadata} for the {@link JobCoordinator}.
+   *
+   * Epoch ID - It is generated by {@link #fetchEpochIdForJobCoordinator()}. 
Refer to the javadocs for more
+   * details on how it is generated and the properties of the identifier.
+   *
+   * Config ID - A unique and reproducible identifier that is generated based 
on the input {@link Config}. It uses
+   * a {@link Funnel} to use a subset of the input configuration to generate 
the identifier and as long as the subset
+   * of the configuration remains same, the identifier is guaranteed to be 
same. For the list of config prefixes used
+   * by the funnel refer to {@link ConfigHashFunnel}
+   *
+   * JobModel ID - A unique and reproducible identifier that is generated 
based on the input {@link JobModel}. It only
+   * uses the {@link org.apache.samza.job.model.ContainerModel} within the 
{@linkplain JobModel} for generation. We
+   * serialize the data into bytes and use those bytes to compute the 
identifier.
+   *
+   * In case of YARN, the epoch identifier is extracted from the application 
attempt and translates to applicationId
+   * e.g. 1606797336059_0010
+   * Both config and job model identifiers should a 32 bit integer.
+   *
+   * @param jobModel job model used for generating the metadata
+   * @param config config used for generating the metadata
+   *
+   * @return the metadata for the job coordinator
+   */
+  public JobCoordinatorMetadata generateJobCoordinatorMetadata(JobModel 
jobModel, Config config) {
+    try {
+      int jobModelId = Hashing
+          .crc32c()
+          
.hashBytes(SamzaObjectMapper.getObjectMapper().writeValueAsBytes(jobModel.getContainers()))
+          .asInt();
+      int configId = Hashing
+          .crc32()
+          .hashObject(config, new ConfigHashFunnel())
+          .asInt();
+
+      LOG.info("Generated job model id {} and config id {}", jobModelId, 
configId);
+      return new JobCoordinatorMetadata(fetchEpochIdForJobCoordinator(), 
String.valueOf(configId),
+          String.valueOf(jobModelId));
+    } catch (Exception e) {
+      LOG.error("Failed to generate metadata for the current attempt due to ", 
e);
+      throw new RuntimeException("Failed to generate the metadata for the 
current attempt due to ", e);
+    }
+  }
+
+  /**
+   * Check for changes between the metadata passed as inputs. Metadata is 
considered changed if any of the attributes within
+   * {@linkplain JobCoordinatorMetadata} changes.
+   *
+   * We intentionally check for each changes to help us track at this 
granularity. We want to use this information
+   * to determine if complex handling is required to cater these changes 
instead of blindly restarting all the
+   * containers upstream.
+   *
+   * @param newMetadata new metadata to be compared
+   * @param previousMetadata previous metadata to be compared against
+   *
+   * @return true if metadata changed, false otherwise
+   */
+  public boolean checkForMetadataChanges(JobCoordinatorMetadata newMetadata, 
JobCoordinatorMetadata previousMetadata) {
+    boolean changed = true;
+
+    if (previousMetadata == null) {
+      newDeployment.set(1);
+    } else if 
(!previousMetadata.getEpochId().equals(newMetadata.getEpochId())) {
+      newDeployment.set(1);
+    } else if 
(!previousMetadata.getJobModelId().equals(newMetadata.getJobModelId())) {
+      jobModelChangedAcrossApplicationAttempt.set(1);
+    } else if 
(!previousMetadata.getConfigId().equals(newMetadata.getConfigId())) {
+      configChangedAcrossApplicationAttempt.set(1);
+    } else {
+      changed = false;
+      applicationAttemptCount.inc();
+    }
+
+    if (changed) {
+      LOG.info("Job coordinator metadata changed from: {} to: {}", 
previousMetadata, newMetadata);
+    } else {
+      LOG.info("Job coordinator metadata {} unchanged.", newMetadata);
+    }
+
+    return changed;
+  }
+
+  /**
+   * Reads the {@link JobCoordinatorMetadata} from the metadata store. It 
fetches the metadata
+   * associated with cluster type specified at the creation of the manager.
+   *
+   * @return job coordinator metadata
+   */
+  public JobCoordinatorMetadata readJobCoordinatorMetadata() {
+    JobCoordinatorMetadata metadata = null;
+    for (Map.Entry<String, byte[]> entry : metadataStore.all().entrySet()) {
+      if (clusterType.name().equals(entry.getKey())) {
+        try {
+          String metadataString = valueSerde.fromBytes(entry.getValue());
+          metadata = metadataMapper.readValue(metadataString, 
JobCoordinatorMetadata.class);
+          break;
+        } catch (Exception e) {
+          LOG.error("Failed to read job coordinator metadata due to ", e);

Review comment:
       unable to read the metadata can still enable AM to proceed and restart 
all the containers instead of killing the AM. The reason this is better is the 
scenario where the data is corrupt, you are better of proceeding and rewriting 
the metadata with new one instead of hoping the errors are transient and 
retrying (by killing the AM and starting it again).
   
   Updating the metrics happens in the check flow. Refer to the orchestration 
PR on how this read is used subsequently to determine change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to