Copilot commented on code in PR #6507:
URL: https://github.com/apache/hive/pull/6507#discussion_r3413889774


##########
packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/HiveDependentResource.java:
##########
@@ -422,4 +633,233 @@ protected static Probe buildTcpProbe(int port, ProbeSpec 
spec, int defaultInitia
     return builder.build();
   }
 
+  /**
+   * Applies the autoscaling lifecycle to a workload's pod template: sets a 
preStop
+   * exec lifecycle hook, terminationGracePeriodSeconds, and Prometheus scrape 
annotations.
+   *
+   * @param podSpec           the pod spec of the workload (Deployment or 
StatefulSet)
+   * @param podMetadata       the pod template metadata (for annotations)
+   * @param preStopScript     the shell script to run in the preStop hook
+   * @param gracePeriodSeconds termination grace period
+   */
+  protected static void applyAutoscalingLifecycle(
+      io.fabric8.kubernetes.api.model.PodSpec podSpec,
+      io.fabric8.kubernetes.api.model.ObjectMeta podMetadata,
+      String preStopScript, int gracePeriodSeconds,
+      int metricsScrapeIntervalSeconds) {
+    io.fabric8.kubernetes.api.model.Lifecycle lifecycle =
+        new io.fabric8.kubernetes.api.model.LifecycleBuilder()
+            .withNewPreStop()
+              .withNewExec()
+                .withCommand("/bin/bash", "-c", preStopScript)
+              .endExec()
+            .endPreStop()
+            .build();
+    podSpec.getContainers().get(0).setLifecycle(lifecycle);
+    podSpec.setTerminationGracePeriodSeconds((long) gracePeriodSeconds);
+    applyPrometheusScrapeAnnotations(podMetadata, 
metricsScrapeIntervalSeconds);
+  }
+
+  /**
+   * Adds Prometheus scrape annotations to a pod template so that
+   * the JMX Exporter metrics endpoint is discovered by Prometheus.
+   */
+  private static void applyPrometheusScrapeAnnotations(
+      io.fabric8.kubernetes.api.model.ObjectMeta podMetadata,
+      int scrapeIntervalSeconds) {
+    podMetadata.getAnnotations().put("prometheus.io/scrape", "true");
+    podMetadata.getAnnotations().put("prometheus.io/port",
+        String.valueOf(ConfigUtils.PROMETHEUS_JMX_EXPORTER_PORT));
+    podMetadata.getAnnotations().put("prometheus.io/path", "/metrics");
+    podMetadata.getAnnotations().put("prometheus.io/scrape-interval",
+        scrapeIntervalSeconds + "s");
+  }

Review Comment:
   applyPrometheusScrapeAnnotations() assumes podMetadata.getAnnotations() is 
non-null; if the pod template has no annotations yet, this will throw a 
NullPointerException and break reconciliation when autoscaling lifecycle is 
applied. Initialize the annotations map before writing to it.



##########
packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/HiveDependentResource.java:
##########
@@ -125,6 +134,208 @@ public Matcher.Result<R> match(R actualResource, R 
desired,
     return super.match(actualResource, desired, primary, context);
   }
 
+  @Override
+  protected R handleCreate(R desired, P primary, Context<P> context) {
+    try {
+      return super.handleCreate(desired, primary, context);
+    } catch (KubernetesClientException e) {
+      if (e.getCode() == 409) {
+        LOG.info("Resource {} already exists (informer lag), "
+            + "will reconcile on next event",
+            desired.getMetadata().getName());
+        return desired;
+      }
+      throw e;
+    }
+  }
+
+  /**
+   * Resolves the replica count to set in the desired workload spec.
+   * <p>
+   * Always returns an explicit value — never null. Returning null would cause
+   * JOSDK/SSA to omit spec.replicas, and Kubernetes would default it to 1.
+   * <p>
+   * When autoscaling is enabled:
+   * - On CREATE: returns initialReplicas (minReplicas for the component)
+   * - On UPDATE: returns the autoscaler's managed value, or falls back to
+   *   the current actual replicas from the informer cache.
+   * <p>
+   * When autoscaling is disabled: returns staticReplicas (the spec value).
+   */
+  protected Integer resolveReplicaCount(P primary, Context<P> context,
+      AutoscalingSpec autoscaling, int staticReplicas, int initialReplicas) {
+    // Suspended cluster → 0 replicas (dependent resources natively respect 
suspend).
+    // Exception: HMS stays running if includeMetastore=false in autoSuspend 
config.
+    if (primary instanceof HiveCluster hc && hc.getSpec().suspend()) {
+      boolean isMetastore = 
ConfigUtils.COMPONENT_METASTORE.equals(getComponentName());
+      if (!isMetastore || hc.getSpec().autoSuspend().includeMetastore()) {
+        return 0;
+      }
+    }
+    if (autoscaling == null || !autoscaling.isEnabled()) {
+      return staticReplicas;
+    }
+    Optional<R> existing = getSecondaryResource(primary, context);
+    if (existing.isPresent()) {
+      // Check if the autoscaler has made a decision during this operator's 
lifecycle
+      Integer managed = HiveClusterAutoscaler.getManagedReplicas(
+          primary.getMetadata().getNamespace(),
+          primary.getMetadata().getName(),
+          getComponentName());
+      if (managed != null) {
+        return managed;
+      }
+      // Fallback: operator restarted and MANAGED_REPLICAS is empty — read 
current value
+      R resource = existing.get();
+      if (resource instanceof io.fabric8.kubernetes.api.model.apps.Deployment 
d) {
+        return d.getSpec() != null && d.getSpec().getReplicas() != null
+            ? d.getSpec().getReplicas() : initialReplicas;
+      }
+      if (resource instanceof io.fabric8.kubernetes.api.model.apps.StatefulSet 
s) {
+        return s.getSpec() != null && s.getSpec().getReplicas() != null
+            ? s.getSpec().getReplicas() : initialReplicas;
+      }
+      return initialReplicas;
+    }
+    // First creation: start at minReplicas.
+    return initialReplicas;
+  }
+
+
+  /**
+   * Returns the component name for this dependent (used for autoscaler 
replica lookup).
+   * Subclasses should override if they manage a workload with autoscaling.
+   */
+  protected String getComponentName() {
+    return null;
+  }
+
+  /**
+   * Builds a preStop drain script that polls a single Prometheus metric
+   * (from the JMX Exporter at localhost:9404/metrics) until the value
+   * reaches zero, then exits to allow graceful pod termination.
+   *
+   * @param startupMessage  logged at the start (e.g. "Waiting for open 
connections to drain")
+   * @param metricName      Prometheus metric name (used in grep and log 
messages)
+   * @param varName         shell variable name for the extracted value (e.g. 
"CONNS")
+   * @param idleMessage     logged when idle condition is met (e.g. "All 
connections drained. Shutting down.")
+   * @param sleepSeconds    polling interval in seconds
+   * @param maxRetries      max consecutive curl failures before giving up
+   * @param prefixCommands  optional commands to run before the polling loop 
(may be null)
+   */
+  protected static String buildDrainScript(
+      String startupMessage, String metricName, String varName,
+      String idleMessage, int sleepSeconds, int maxRetries,
+      List<String> prefixCommands) {
+    List<String> lines = new ArrayList<>();
+    lines.add("#!/bin/bash");
+    if (prefixCommands != null) {
+      lines.addAll(prefixCommands);
+    }
+    lines.add("echo '[preStop] " + startupMessage
+        + " (polling localhost:9404/metrics)...'");
+    lines.add("RETRIES=0");
+    lines.add("while true; do");
+    lines.add("  RESPONSE=$(curl -sf http://localhost:9404/metrics)");
+    lines.add("  if [ $? -ne 0 ]; then");
+    lines.add("    RETRIES=$((RETRIES+1))");
+    lines.add("    echo \"[preStop] ERROR: JMX Exporter unreachable on port 
9404 (attempt $RETRIES)\"");
+    lines.add("    if [ $RETRIES -ge " + maxRetries + " ]; then");

Review Comment:
   The preStop drain scripts hardcode the JMX exporter endpoint as 
localhost:9404, but AutoscalingSpec exposes metricsPort and the operator passes 
that port into addJmxExporter(). If a user overrides metricsPort, the drain 
hook will poll the wrong port and may skip draining (or force shutdown after 
retries). Either make the drain script/Prometheus annotations use the 
configured metricsPort, or remove metricsPort configurability and enforce 9404 
everywhere.



##########
packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/HiveServer2ScalingStrategy.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.kubernetes.operator.autoscaling;
+
+import java.util.List;
+
+import org.apache.hive.kubernetes.operator.model.spec.AutoscalingSpec;
+
+/**
+ * Scaling strategy for HiveServer2.
+ * desired = ceil(sum(hs2_open_sessions across all pods) / scaleUpThreshold)
+ * Uses sum() so that each session is counted — prevents premature scale-down
+ * of pods that still have active sessions.
+ */
+public class HiveServer2ScalingStrategy implements ScalingStrategy {
+
+  static final String METRIC_OPEN_SESSIONS = "hs2_open_sessions";
+
+  private int lastMetric;
+
+  @Override
+  public int computeDesiredReplicas(List<PodMetrics> podMetrics,
+      AutoscalingSpec autoscaling, int maxReplicas) {
+    // HS2 is the cluster entry point — scaling to 0 makes the cluster 
unreachable.
+    // Enforce floor of 1 regardless of CRD defaults or user misconfiguration.
+    int safeMinReplicas = Math.max(1, autoscaling.minReplicas());
+
+    double totalSessions = 0;
+    for (PodMetrics pm : podMetrics) {
+      totalSessions += pm.metrics().getOrDefault(METRIC_OPEN_SESSIONS, 0.0);
+    }
+
+    lastMetric = (int) totalSessions;
+
+    if (totalSessions <= 0) {
+      return safeMinReplicas;
+    }
+
+    int desired = (int) Math.ceil(totalSessions / 
autoscaling.scaleUpThreshold());
+    return Math.max(desired, safeMinReplicas);

Review Comment:
   computeDesiredReplicas() divides by autoscaling.scaleUpThreshold() without 
guarding against 0/negative values. A misconfigured CR (or default drift) would 
cause a division-by-zero at runtime and break autoscaling.



##########
packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/LlapScalingStrategy.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.kubernetes.operator.autoscaling;
+
+import java.util.List;
+
+import org.apache.hive.kubernetes.operator.model.HiveCluster;
+import org.apache.hive.kubernetes.operator.model.spec.AutoscalingSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Scaling strategy for LLAP daemons.
+ * Formula: avg(QueuedRequests + Configured - Available) across all pods.
+ * This represents average "busy slots + queued" per daemon.
+ * desired = ceil(avg_busy / scaleUpThreshold)
+ * <p>
+ * Activation gate: only scale if HS2 has open sessions (prevents zombie 
scaling).
+ */
+public class LlapScalingStrategy implements ScalingStrategy {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(LlapScalingStrategy.class);
+
+  static final String METRIC_QUEUED = 
"hadoop_llapdaemon_executornumqueuedrequests";
+  static final String METRIC_CONFIGURED = 
"hadoop_llapdaemon_executornumexecutorsconfigured";
+  static final String METRIC_AVAILABLE = 
"hadoop_llapdaemon_executornumexecutorsavailable";
+
+  private final HiveClusterAutoscaler orchestrator;
+  private final HiveCluster cluster;
+  private int lastMetric;
+
+  public LlapScalingStrategy(HiveClusterAutoscaler orchestrator, HiveCluster 
cluster) {
+    this.orchestrator = orchestrator;
+    this.cluster = cluster;
+  }
+
+  @Override
+  public int computeDesiredReplicas(List<PodMetrics> podMetrics,
+      AutoscalingSpec autoscaling, int maxReplicas) {
+
+    // Activation gate: check if HS2 has any open sessions.
+    // If scrape returns empty but LLAP has running pods, treat as "unknown" 
and preserve.
+    // This prevents spurious scale-to-zero from transient scrape failures 
after operator restart.
+    List<PodMetrics> hs2Metrics = orchestrator.getHs2MetricsFromCache(cluster);
+    Boolean sessionsDetected = detectHs2Sessions(hs2Metrics);
+    if (sessionsDetected == null && !podMetrics.isEmpty()) {
+      // HS2 scrape returned no data but LLAP is running — hold current state
+      LOG.debug("[llap] HS2 scrape returned no pods; preserving LLAP (has {} 
running pods)", podMetrics.size());
+      lastMetric = 0;
+      return Math.max(1, autoscaling.minReplicas());
+    }
+    if (sessionsDetected == null || !sessionsDetected) {
+      LOG.debug("[llap] HS2 has no open sessions, scaling to minReplicas");
+      lastMetric = 0;
+      return autoscaling.minReplicas();
+    }
+
+    // HS2 has sessions but LLAP has no pods yet — scale up to at least 1
+    if (podMetrics.isEmpty()) {
+      int minReplica = Math.max(1, autoscaling.minReplicas());
+      LOG.debug("[llap] HS2 has sessions but LLAP has 0 pods, scaling to {}", 
minReplica);
+      lastMetric = 0;
+      return minReplica;
+    }
+
+    // Compute average busy slots across all LLAP pods
+    double totalBusy = 0;
+    int podCount = 0;
+    for (PodMetrics pm : podMetrics) {
+      double queued = pm.metrics().getOrDefault(METRIC_QUEUED, 0.0);
+      double configured = pm.metrics().getOrDefault(METRIC_CONFIGURED, 0.0);
+      double available = pm.metrics().getOrDefault(METRIC_AVAILABLE, 0.0);
+      double busy = queued + configured - available;
+      totalBusy += busy;
+      podCount++;
+    }
+
+    double avgBusy = totalBusy / podCount;
+    lastMetric = (int) Math.round(avgBusy);
+
+    if (avgBusy <= 0) {
+      // HS2 has sessions (passed activation gate above) but executors are 
idle between queries.
+      // Keep at least 1 daemon to avoid flapping: scaling to 0 here would 
cause immediate
+      // scale-back-up on the next evaluation when the empty-pod path triggers.
+      return Math.max(1, autoscaling.minReplicas());
+    }
+
+    LOG.debug("[llap] avgBusy={}, threshold={}", String.format("%.2f", 
avgBusy),
+        autoscaling.scaleUpThreshold());
+
+    int desired = (int) Math.ceil(avgBusy / autoscaling.scaleUpThreshold());
+    return Math.max(desired, autoscaling.minReplicas());

Review Comment:
   computeDesiredReplicas() divides by autoscaling.scaleUpThreshold() without 
guarding against 0/negative values. A misconfigured threshold would cause a 
division-by-zero and break LLAP autoscaling.



##########
packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/HiveClusterAutoscaler.java:
##########
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.kubernetes.operator.autoscaling;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.hive.kubernetes.operator.model.HiveCluster;
+import org.apache.hive.kubernetes.operator.model.HiveClusterSpec;
+import org.apache.hive.kubernetes.operator.model.spec.AutoscalingSpec;
+import org.apache.hive.kubernetes.operator.model.status.AutoscalingStatus;
+import org.apache.hive.kubernetes.operator.util.ConfigUtils;
+import org.apache.hive.kubernetes.operator.util.Labels;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Main autoscaling orchestrator. Evaluates all enabled components and
+ * returns a map of component → desired replica count for those that need 
changing.
+ * <p>
+ * Maintains per-cluster, per-component state (stabilization windows).
+ */
+public class HiveClusterAutoscaler {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveClusterAutoscaler.class);
+
+  /** Result of evaluating all components. */
+  public record AutoscalingEvaluation(
+      Map<String, Integer> patches,
+      Map<String, AutoscalingStatus> statuses) {}
+
+  // Shared replica store: the autoscaler writes its desired replicas here so 
that
+  // dependent resources can read them (avoids informer cache lag reverting 
patches).
+  // Key: "namespace/clusterName/component" → desired replicas
+  private static final ConcurrentHashMap<String, Integer> MANAGED_REPLICAS =
+      new ConcurrentHashMap<>();
+
+  /**
+   * Returns the autoscaler-managed replica count for a component, or null if 
the
+   * autoscaler hasn't made a decision yet (e.g., first reconcile before 
evaluation runs).
+   */
+  public static Integer getManagedReplicas(String namespace, String 
clusterName, String component) {
+    return MANAGED_REPLICAS.get(namespace + "/" + clusterName + "/" + 
component);
+  }
+
+  /**
+   * Sets the managed replica count for a component. Used by suspend/wake logic
+   * to override what the autoscaler would normally compute.
+   */
+  public static void setManagedReplicas(String namespace, String clusterName,
+      String component, int replicas) {
+    MANAGED_REPLICAS.put(namespace + "/" + clusterName + "/" + component, 
replicas);
+  }
+
+  private record PendingScaleDown(int targetReplicas, Instant annotatedAt) {}
+
+  private final MetricsScraper scraper;
+  private final BackgroundMetricsScraper bgScraper;
+  private final MetricsCache metricsCache;
+  // Key: "namespace/clusterName/component"
+  private final ConcurrentHashMap<String, ComponentAutoscaler> autoscalers =
+      new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, String> lastScaleTimes =
+      new ConcurrentHashMap<>();
+  // Two-phase scale-down: holds deferred scale-down targets while 
pod-deletion-cost
+  // annotations propagate (2s delay before applying the actual scale patch).
+  private final ConcurrentHashMap<String, PendingScaleDown> pendingScaleDowns =
+      new ConcurrentHashMap<>();
+
+  public HiveClusterAutoscaler(MetricsScraper scraper,
+      BackgroundMetricsScraper bgScraper, MetricsCache metricsCache) {
+    this.scraper = scraper;
+    this.bgScraper = bgScraper;
+    this.metricsCache = metricsCache;
+  }
+
+  public BackgroundMetricsScraper getBackgroundScraper() {
+    return bgScraper;
+  }
+
+  /**
+   * Removes all in-memory state for a deleted HiveCluster to prevent memory 
leaks.
+   */
+  public void cleanupCluster(String namespace, String clusterName) {
+    String prefix = namespace + "/" + clusterName + "/";
+    MANAGED_REPLICAS.keySet().removeIf(k -> k.startsWith(prefix));
+    autoscalers.keySet().removeIf(k -> k.startsWith(prefix));
+    lastScaleTimes.keySet().removeIf(k -> k.startsWith(prefix));
+    pendingScaleDowns.keySet().removeIf(k -> k.startsWith(prefix));
+    LOG.info("Cleaned up autoscaler state for {}/{}", namespace, clusterName);
+  }
+
+  /**
+   * Returns true if there are pending scale-down operations waiting for
+   * annotation propagation. The reconciler should reschedule sooner (2s)
+   * when this returns true.
+   */
+  public boolean hasPendingScaleDowns() {
+    return !pendingScaleDowns.isEmpty();
+  }
+
+  /**
+   * Evaluate all autoscaling-enabled components and return patches and status 
info.
+   *
+   * @param cluster the HiveCluster resource
+   * @param client  the Kubernetes client (for reading current replica counts)
+   * @return evaluation result with patches and per-component autoscaling 
statuses
+   */
+  public AutoscalingEvaluation evaluate(HiveCluster cluster, KubernetesClient 
client) {
+    Map<String, Integer> patches = new HashMap<>();
+    Map<String, AutoscalingStatus> statuses = new HashMap<>();
+    HiveClusterSpec spec = cluster.getSpec();
+    String namespace = cluster.getMetadata().getNamespace();
+    String clusterName = cluster.getMetadata().getName();
+
+    // HiveServer2
+    if (spec.hiveServer2().autoscaling().isEnabled()) {
+      AutoscalingSpec hs2Auto = spec.hiveServer2().autoscaling();
+      String hs2Key = namespace + "/" + clusterName + "/" + 
ConfigUtils.COMPONENT_HIVESERVER2;
+      Map<String, String> hs2Selector = Labels.selectorForComponent(cluster, 
ConfigUtils.COMPONENT_HIVESERVER2);
+      bgScraper.registerOrUpdate(namespace, clusterName,
+          ConfigUtils.COMPONENT_HIVESERVER2, hs2Selector,
+          hs2Auto.metricsPort(), hs2Auto.metricsScrapeIntervalSeconds());
+      int maxStale = hs2Auto.metricsScrapeIntervalSeconds() * 3;
+      List<PodMetrics> hs2Metrics = metricsCache.getOrEmpty(hs2Key, maxStale);
+
+      // Two-phase scale-down: check if a pending scale-down from a prior
+      // reconcile is ready to be applied (annotations have propagated).
+      PendingScaleDown pending = pendingScaleDowns.get(hs2Key);
+      if (pending != null) {
+        if (Duration.between(pending.annotatedAt(), Instant.now()).toSeconds() 
>= 2) {
+          patches.put(ConfigUtils.COMPONENT_HIVESERVER2, 
pending.targetReplicas());
+          MANAGED_REPLICAS.put(hs2Key, pending.targetReplicas());
+          lastScaleTimes.put(hs2Key, Instant.now().toString());
+          pendingScaleDowns.remove(hs2Key);
+          LOG.info("[hiveserver2] Applying deferred scale-down to {} 
replicas", pending.targetReplicas());
+        }
+        // Build status even when waiting for pending scale-down
+        evaluateComponent(cluster, client, namespace, clusterName,
+            ConfigUtils.COMPONENT_HIVESERVER2, hs2Auto,
+            spec.hiveServer2().replicas(), new HashMap<>(), statuses, 
hs2Metrics);
+      } else {
+        // Pod deletion cost only applies to Deployments (ReplicaSet 
controller).
+        // StatefulSets always scale down by highest ordinal regardless of this
+        // annotation. LLAP/TezAM graceful drain is handled by preStop hooks.
+        updateDeploymentPodDeletionCost(client, namespace, hs2Metrics, 
"hs2_open_sessions");
+
+        Map<String, Integer> hs2Patches = new HashMap<>();
+        evaluateComponent(cluster, client, namespace, clusterName,
+            ConfigUtils.COMPONENT_HIVESERVER2, hs2Auto,
+            spec.hiveServer2().replicas(), hs2Patches, statuses, hs2Metrics);
+
+        Integer hs2Patch = hs2Patches.get(ConfigUtils.COMPONENT_HIVESERVER2);
+        int currentReplicas = getCurrentReplicas(client, namespace, 
clusterName, ConfigUtils.COMPONENT_HIVESERVER2);
+        if (hs2Patch != null && hs2Patch < currentReplicas) {
+          // Scale-down: defer to allow deletion-cost annotations to propagate
+          pendingScaleDowns.put(hs2Key, new PendingScaleDown(hs2Patch, 
Instant.now()));
+          LOG.info("[hiveserver2] Deferring scale-down to {} (waiting for 
deletion-cost propagation)",
+              hs2Patch);
+        } else if (hs2Patch != null) {
+          // Scale-up: apply immediately
+          patches.put(ConfigUtils.COMPONENT_HIVESERVER2, hs2Patch);
+          MANAGED_REPLICAS.put(hs2Key, hs2Patch);
+        }
+      }
+    }
+
+    // Metastore
+    if (spec.metastore().isEnabled() && 
spec.metastore().autoscaling().isEnabled()) {
+      AutoscalingSpec msAuto = spec.metastore().autoscaling();
+      Map<String, String> msSelector = Labels.selectorForComponent(cluster, 
ConfigUtils.COMPONENT_METASTORE);
+      bgScraper.registerOrUpdate(namespace, clusterName,
+          ConfigUtils.COMPONENT_METASTORE, msSelector,
+          msAuto.metricsPort(), msAuto.metricsScrapeIntervalSeconds());
+      String msKey = namespace + "/" + clusterName + "/" + 
ConfigUtils.COMPONENT_METASTORE;
+      List<PodMetrics> msMetrics = metricsCache.getOrEmpty(msKey, 
msAuto.metricsScrapeIntervalSeconds() * 3);
+      evaluateComponent(cluster, client, namespace, clusterName,
+          ConfigUtils.COMPONENT_METASTORE, msAuto,
+          spec.metastore().replicas(), patches, statuses, msMetrics);
+    }
+
+    // LLAP
+    if (spec.llap().isEnabled() && spec.llap().autoscaling().isEnabled()) {
+      AutoscalingSpec llapAuto = spec.llap().autoscaling();
+      Map<String, String> llapSelector = Labels.selectorForComponent(cluster, 
ConfigUtils.COMPONENT_LLAP);
+      bgScraper.registerOrUpdate(namespace, clusterName,
+          ConfigUtils.COMPONENT_LLAP, llapSelector,
+          llapAuto.metricsPort(), llapAuto.metricsScrapeIntervalSeconds());
+      String llapKey = namespace + "/" + clusterName + "/" + 
ConfigUtils.COMPONENT_LLAP;
+      List<PodMetrics> llapMetrics = metricsCache.getOrEmpty(llapKey, 
llapAuto.metricsScrapeIntervalSeconds() * 3);
+      evaluateComponent(cluster, client, namespace, clusterName,
+          ConfigUtils.COMPONENT_LLAP, llapAuto,
+          spec.llap().replicas(), patches, statuses, llapMetrics);
+    }
+
+    // TezAM
+    if (spec.tezAm().isEnabled() && spec.tezAm().autoscaling().isEnabled()) {
+      AutoscalingSpec tezAuto = spec.tezAm().autoscaling();
+      Map<String, String> tezSelector = Labels.selectorForComponent(cluster, 
ConfigUtils.COMPONENT_TEZAM);
+      bgScraper.registerOrUpdate(namespace, clusterName,
+          ConfigUtils.COMPONENT_TEZAM, tezSelector,
+          tezAuto.metricsPort(), tezAuto.metricsScrapeIntervalSeconds());
+      String tezKey = namespace + "/" + clusterName + "/" + 
ConfigUtils.COMPONENT_TEZAM;
+      List<PodMetrics> tezMetrics = metricsCache.getOrEmpty(tezKey, 
tezAuto.metricsScrapeIntervalSeconds() * 3);
+      evaluateComponent(cluster, client, namespace, clusterName,
+          ConfigUtils.COMPONENT_TEZAM, tezAuto,
+          spec.tezAm().replicas(), patches, statuses, tezMetrics);
+    }
+
+    return new AutoscalingEvaluation(patches, statuses);
+  }
+
+  /**
+   * Returns cached HS2 metrics (used by LLAP/TezAM activation gate).
+   * Non-blocking — reads from the background-scraper cache.
+   */
+  public List<PodMetrics> getHs2MetricsFromCache(HiveCluster cluster) {
+    String namespace = cluster.getMetadata().getNamespace();
+    String clusterName = cluster.getMetadata().getName();
+    String key = namespace + "/" + clusterName + "/" + 
ConfigUtils.COMPONENT_HIVESERVER2;
+    int maxStale = 
cluster.getSpec().hiveServer2().autoscaling().metricsScrapeIntervalSeconds() * 
3;
+    return metricsCache.getOrEmpty(key, maxStale);
+  }
+
+  private void evaluateComponent(HiveCluster cluster, KubernetesClient client,
+      String namespace, String clusterName, String component,
+      AutoscalingSpec autoscaling, int maxReplicas,
+      Map<String, Integer> patches, Map<String, AutoscalingStatus> statuses,
+      List<PodMetrics> metrics) {
+
+    int currentReplicas = getCurrentReplicas(client, namespace, clusterName, 
component);
+
+    String key = namespace + "/" + clusterName + "/" + component;
+
+    // For LLAP and TezAM, scaling decisions are based on HS2 metrics 
(activation gate),
+    // not their own pod metrics. Allow evaluation even with 0 own pods.
+    boolean usesHs2Activation = ConfigUtils.COMPONENT_LLAP.equals(component)
+        || ConfigUtils.COMPONENT_TEZAM.equals(component);
+
+    if (metrics.isEmpty() && !usesHs2Activation) {
+      LOG.debug("[{}] No ready pods to scrape, skipping", component);
+      MANAGED_REPLICAS.put(key, currentReplicas);
+      return;
+    }
+
+    ComponentAutoscaler autoscaler = autoscalers.computeIfAbsent(key,
+        k -> new ComponentAutoscaler(component, createStrategy(component, 
cluster)));
+
+    ComponentAutoscaler.EvaluationResult result =
+        autoscaler.evaluate(metrics, autoscaling, currentReplicas, 
maxReplicas);
+
+    // Build status
+    if (result.patchTo() != null) {
+      lastScaleTimes.put(key, Instant.now().toString());
+    }
+    AutoscalingStatus as = new AutoscalingStatus();
+    as.setCurrentMetricValue(result.rawMetricValue());
+    // Only show scaleUpThreshold for strategies that use it (TezAM is 
demand-based, no threshold)
+    if (autoscaler.usesScaleUpThreshold()) {
+      as.setScaleUpThreshold(autoscaling.scaleUpThreshold());
+    }
+    // CPU metrics (only for HS2 and HMS — LLAP/TezAM don't use CPU-based 
scaling)
+    if ((ConfigUtils.COMPONENT_HIVESERVER2.equals(component) || 
ConfigUtils.COMPONENT_METASTORE.equals(component))
+        && autoscaling.cpuScaleUpThreshold() > 0) {
+      as.setCurrentCpuPercent(result.cpuPercent());
+      as.setCpuScaleUpThreshold(autoscaling.cpuScaleUpThreshold());
+      as.setCpuProposedReplicas(result.cpuProposedReplicas());
+    }
+    as.setProposedReplicas(result.proposedReplicas());
+    as.setLastScaleTime(lastScaleTimes.get(key));
+    statuses.put(component, as);
+
+    if (result.patchTo() != null) {
+      int patchValue = result.patchTo();
+      patches.put(component, patchValue);
+      MANAGED_REPLICAS.put(key, patchValue);
+    } else {
+      // No change needed — record current replicas as the managed value
+      MANAGED_REPLICAS.put(key, currentReplicas);
+    }
+  }
+
+  private ScalingStrategy createStrategy(String component, HiveCluster 
cluster) {
+    return switch (component) {
+    case ConfigUtils.COMPONENT_HIVESERVER2 -> new HiveServer2ScalingStrategy();
+    case ConfigUtils.COMPONENT_METASTORE -> new MetastoreScalingStrategy();
+    case ConfigUtils.COMPONENT_LLAP -> new LlapScalingStrategy(this, cluster);
+    case ConfigUtils.COMPONENT_TEZAM -> new TezAmScalingStrategy(this, 
cluster);
+    default -> throw new IllegalArgumentException("Unknown component: " + 
component);
+    };
+  }
+
+  private int getCurrentReplicas(KubernetesClient client, String namespace,
+      String clusterName, String component) {
+    String workloadName = clusterName + "-" + component;
+    if (ConfigUtils.COMPONENT_LLAP.equals(component) || 
ConfigUtils.COMPONENT_TEZAM.equals(component)) {
+      var ss = client.apps().statefulSets()
+          .inNamespace(namespace).withName(workloadName).get();
+      return ss != null && ss.getSpec().getReplicas() != null ? 
ss.getSpec().getReplicas() : 0;
+    } else {
+      var deploy = client.apps().deployments()
+          .inNamespace(namespace).withName(workloadName).get();
+      return deploy != null && deploy.getSpec().getReplicas() != null
+          ? deploy.getSpec().getReplicas() : 0;
+    }
+  }
+
+  /**
+   * Patches each pod's deletion cost annotation based on its active session 
count.
+   * Kubernetes uses this during scale-down to kill idle pods first (lower 
cost = killed first).
+   * <p>
+   * Only meaningful for Deployments (HS2, Metastore) — the ReplicaSet 
controller
+   * respects this annotation. StatefulSets ignore it and always terminate by 
ordinal.
+   */
+  private void updateDeploymentPodDeletionCost(KubernetesClient client, String 
namespace,
+      List<PodMetrics> metrics, String metricName) {
+    for (PodMetrics pm : metrics) {
+      int sessions = pm.metrics().getOrDefault(metricName, 0.0).intValue();
+      try {
+        client.pods().inNamespace(namespace).withName(pm.podName())
+            .edit(pod -> {
+              pod.getMetadata().getAnnotations()
+                  .put("controller.kubernetes.io/pod-deletion-cost", 
String.valueOf(sessions));
+              return pod;
+            });

Review Comment:
   updateDeploymentPodDeletionCost() assumes the Pod has a non-null annotations 
map. Pods frequently have no annotations, which would cause a 
NullPointerException inside the edit() lambda and prevent deletion-cost tuning 
from working.



##########
packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/MetastoreScalingStrategy.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.kubernetes.operator.autoscaling;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hive.kubernetes.operator.model.spec.AutoscalingSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Scaling strategy for Hive Metastore.
+ * HMS uses HTTP transport — connections are per-request (stateless), so
+ * open_connections is always ~0. Instead we compute API request rate:
+ * rate = (sum(api_*_total) - previous_sum) / elapsed_seconds.
+ * desired = ceil(rate / scaleUpThreshold)
+ */
+public class MetastoreScalingStrategy implements ScalingStrategy {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MetastoreScalingStrategy.class);
+  private static final String API_COUNTER_PREFIX = "api_";
+  private static final String API_COUNTER_SUFFIX = "_total";
+
+  // Previous scrape state for rate computation
+  private final ConcurrentHashMap<String, Double> previousCounters = new 
ConcurrentHashMap<>();
+  private long previousTimestampMs = 0;
+  private int lastMetric;
+
+  @Override
+  public int computeDesiredReplicas(List<PodMetrics> podMetrics,
+      AutoscalingSpec autoscaling, int maxReplicas) {
+
+    // Sum all api_*_total counters across all pods
+    double currentTotal = 0;
+    for (PodMetrics pm : podMetrics) {
+      for (Map.Entry<String, Double> entry : pm.metrics().entrySet()) {
+        String name = entry.getKey();
+        if (name.startsWith(API_COUNTER_PREFIX) && 
name.endsWith(API_COUNTER_SUFFIX)) {
+          currentTotal += entry.getValue();
+        }
+      }
+    }
+
+    long now = System.currentTimeMillis();
+    double rate = 0;
+
+    if (previousTimestampMs > 0) {
+      double elapsedSeconds = (now - previousTimestampMs) / 1000.0;
+      if (elapsedSeconds > 0) {
+        double previousTotal = previousCounters.values().stream()
+            .mapToDouble(Double::doubleValue).sum();
+        double delta = currentTotal - previousTotal;
+        if (delta < 0) {
+          // Counter reset (pod restart) — skip this sample
+          delta = 0;
+        }
+        rate = delta / elapsedSeconds;
+      }
+    }
+
+    // Store current state for next evaluation
+    previousCounters.clear();
+    previousCounters.put("_total", currentTotal);
+    previousTimestampMs = now;
+
+    lastMetric = (int) Math.round(rate);
+
+    if (rate <= 0) {
+      return autoscaling.minReplicas();
+    }
+
+    LOG.debug("[metastore] API request rate: {}/s, threshold: {}",
+        String.format("%.2f", rate), autoscaling.scaleUpThreshold());
+
+    int desired = (int) Math.ceil(rate / autoscaling.scaleUpThreshold());
+    return Math.max(desired, autoscaling.minReplicas());

Review Comment:
   computeDesiredReplicas() divides by autoscaling.scaleUpThreshold() without 
guarding against 0/negative values. A misconfigured threshold would cause a 
division-by-zero and break the autoscaling loop for Metastore.



##########
packaging/src/kubernetes/pom.xml:
##########
@@ -65,9 +65,14 @@
       <version>${fabric8.version}</version>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>2.0.16</version>
+    </dependency>
     <dependency>
       <groupId>org.apache.logging.log4j</groupId>
-      <artifactId>log4j-slf4j-impl</artifactId>
+      <artifactId>log4j-slf4j2-impl</artifactId>
       <version>${log4j2.version}</version>

Review Comment:
   This module hard-codes slf4j-api 2.0.16 while the rest of the build is 
managed via the top-level slf4j.version property (currently 1.7.x). Having a 
one-off version here makes dependency convergence harder and can lead to mixed 
SLF4J major versions across modules/artifacts; consider aligning via 
dependencyManagement (or introducing a dedicated property for the operator) so 
the upgrade is intentional and consistent.



##########
packaging/src/kubernetes/README.md:
##########
@@ -505,19 +505,602 @@ kubectl get hiveclusters
 kubectl describe hivecluster hive
 ```
 
+---
+
+## Autoscaling
+
+The operator supports metric-based autoscaling for all four Hive components 
using
+an **operator-driven control loop** that scrapes JMX Exporter metrics directly 
from
+pods. No Prometheus server or external autoscaling tools are needed. 
Autoscaling is
+opt-in per component and designed for **zero query failures** during 
scale-down.
+
+### Prerequisites
+
+- No external dependencies — the operator handles all scaling decisions 
internally
+
+### How It Works
+
+When `autoscaling.enabled: true` is set for a component, the operator:
+1. Attaches the JMX Exporter javaagent (port 9404) to each pod
+2. Polls `/metrics` on each pod at `metricsScrapeIntervalSeconds` intervals
+3. Computes desired replicas using component-specific formulas
+4. Applies HPA-like stabilization windows (scale-up/scale-down)
+5. Patches the workload `spec.replicas` directly
+
+### Graceful Scale-Down Architecture
+
+```
+                        Scale Down Flow                                
+ 1. Operator reduces desired replicas (metric below threshold,      
+    stabilization window elapsed)                  
+ 2. PodDisruptionBudget ensures minAvailable=1 (at least one pod    
+    always running)                                                 
+ 3. Kubernetes sends SIGTERM to selected pod                        
+ 4. preStop hook runs:                                              
+    - HS2: deregisters from ZK, drains open sessions, kills JVM    
+    - HMS: kills JVM (stateless HTTP — no drain needed)             
+    - LLAP: waits until all executors become idle, kills JVM        
+    - TezAM: no drain (DAGAppMaster does not expose JMX metrics)            
+ 5. terminationGracePeriodSeconds = gracePeriodSeconds (safety cap) 
+ 6. Pod terminates immediately once drain completes (does NOT wait  
+    the full grace period — it's only the upper safety bound)
+```
+
+> **Note:** Shell entrypoints (PID 1) in containers don't forward SIGTERM to 
child
+> processes. The preStop hook explicitly sends SIGTERM to the Hive/Tez Java 
process
+> after drain completes, ensuring prompt shutdown without waiting for the 
grace period
+> to expire.
+
+### Scaling Timers
+
+The autoscaling system uses three independent timing controls:
+
+| Timer | Config Field | Default | Purpose |
+|-------|-------------|---------|---------|
+| **Metrics scrape interval** | `metricsScrapeIntervalSeconds` | `10` | How 
often the operator scrapes JMX Exporter `/metrics` on each pod. This is the 
**biggest bottleneck** for autoscaling reaction time. |
+| **Scale-up stabilization** | `scaleUpStabilizationSeconds` | `60` | Window: 
picks the highest recommendation within this period before scaling up. Prevents 
flapping when metrics oscillate. Set to `0` for LLAP and TezAM (reactive 
dependents). |
+| **Scale-down stabilization** | `scaleDownStabilizationSeconds` | `300-900` | 
Window: picks the most conservative (highest) recommendation within this period 
before scaling down. Also acts as the cooldown between consecutive scale-downs 
— no separate cooldown needed. |
+
+**How they interact:**
+- Load spike detected → operator scrapes metrics within 
`metricsScrapeIntervalSeconds` → waits `scaleUpStabilizationSeconds` then 
scales up
+- Load drops → operator waits `scaleDownStabilizationSeconds` (stabilization 
window must confirm low demand consistently) then scales down
+
+**Tuning reaction time:** With defaults (`metricsScrapeIntervalSeconds: 10`, 
`scaleUpStabilizationSeconds: 0` for LLAP/TezAM), scale-up latency is ~10-20s 
(one scrape cycle). For HS2 with `scaleUpStabilizationSeconds: 60`, expect ~70s.
+
+### Per-Component Scaling Logic
+
+| Component | Scale-Up Formula | Scale-Down | JMX Metric |
+|-----------|-----------------|------------|------------|
+| **HiveServer2** | `max(ceil(sessions / threshold), cpu_desired)` | Sessions 
drop to 0 AND CPU below threshold → scale to minReplicas | `hs2_open_sessions`, 
`jvm_process_cpu_load` |
+| **Metastore** | `max(ceil(api_rate / threshold), cpu_desired)` | Rate drops 
to 0 AND CPU below threshold → scale to minReplicas | `api_*_total`, 
`jvm_process_cpu_load` |
+| **LLAP** | `ceil(avg(queued + configured - available) / scaleUpThreshold)` | 
All executors idle + no HS2 sessions | `hadoop_llapdaemon_executor*` |
+| **Tez AM** | `max(sum(hs2_open_sessions), count(HS2_pods) * 
sessions_per_queue)` | All HS2 sessions closed | `hs2_open_sessions` (from HS2 
pods) |
+
+**TezAM Scaling Model:** TezAM uses demand-driven scaling with two formulas 
(max wins):
+1. **Session demand** — `sum(hs2_open_sessions)`: scales to match the total 
number of
+   concurrent sessions across all HS2 pods (each session needs its own 
exclusive TezAM).
+2. **Pre-warm** — `count(HS2 pods with sessions) × 
hive.server2.tez.sessions.per.default.queue` (default 1):
+   ensures every active HS2 pod has enough TezAM sessions pre-claimed from 
ZooKeeper.
+
+The operator takes the maximum across both formulas. This ensures TezAM 
capacity
+is always sufficient for both current demand and eager session pre-warming.
+TezAM scaling is purely demand-driven from HS2 metrics.
+
+### Scale-to-Zero Architecture
+
+When `minReplicas: 0` is configured (LLAP, TezAM), the cluster scales those
+components down to zero pods when HS2 has no active sessions. HS2 itself always
+maintains at least 1 replica (`minReplicas >= 1`) so it is always available to
+accept connections.
+
+```
+                   Scale-to-Zero (Idle Detection)                    
+
+  1. HS2 reports hs2_open_sessions = 0 for scaleDownStabilization     
+     → operator scales HS2 to minReplicas (>= 1)                    
+                                                                     
+  2. Operator sees hs2_open_sessions = 0 on next LLAP/TezAM eval     
+     → activation gate fails                                         
+     → scale LLAP and TezAM to 0 (if minReplicas=0)                 
+                                                                     
+  3. HMS stays at minReplicas=1 (always available)                   
+
+```
+
+```
+                   Wake-from-Zero (LLAP/TezAM)                       
+
+  1. Beeline connects to HS2 (always running, at least 1 pod)        
+                                                                     
+  2. HS2 reports hs2_open_sessions > 0 via JMX Exporter              
+                                                                     
+  3. Operator detects HS2 sessions on next scrape cycle:             
+     - LLAP activation gate passes → scales up from 0                
+     - TezAM activation gate passes → scales up from 0              
+                                                                     
+  4. Query executes once LLAP/TezAM pods are ready                   
+
+```
+
+**Session protection:** The HS2 Service uses `sessionAffinity: ClientIP` to 
ensure
+beeline clients always reach the same pod. The preStop hook deregisters the 
pod from
+ZooKeeper (preventing new sessions) and waits for `hs2_open_sessions` to drain 
to 0
+before terminating. The `gracePeriodSeconds` (default 3600s) is a safety cap — 
the pod
+terminates immediately once sessions drain, not after the full grace period.
+
+**Component-specific behavior:**
+
+| Component | minReplicas | Scale-to-Zero Trigger | Wake Trigger |
+|-----------|-------------|----------------------|--------------|
+| **HS2** | 1 | N/A (always running) | N/A |
+| **HMS** | 1 | Never (always running) | N/A |
+| **LLAP** | 0 | No HS2 sessions (activation gate fails) | HS2 has open 
sessions (next scrape) |
+| **TezAM** | 0 | No HS2 sessions (activation gate fails) | HS2 has open 
sessions (next scrape) |
+
+### Auto-Suspend (Full Cluster Hibernation)
+
+Auto-suspend goes beyond scale-to-zero — it fully hibernates the **entire** 
cluster
+(including HS2 and HMS) to 0 replicas after a configurable idle timeout. This 
is
+useful for dev/test clusters that should not consume resources when nobody is 
using
+them.
+
+**Prerequisites:** Auto-suspend requires autoscaling to be enabled on ALL 
active
+components (HS2, LLAP if enabled, TezAM if enabled, and HMS if 
`includeMetastore=true`).
+The operator will not auto-suspend unless it can confirm all components are at 
their
+minimum state.
+
+**Idle criteria (all must hold simultaneously for `idleTimeoutMinutes`):**
+
+| Component | Idle Condition |
+|-----------|---------------|
+| **HS2** | At `minReplicas` with 0 open sessions |
+| **HMS** | At `minReplicas` (only checked if `includeMetastore=true`) |
+| **LLAP** | At `minReplicas` (default 0) |
+| **TezAM** | At `minReplicas` (default 0) |
+
+**Important:** HS2 can **only** scale to 0 replicas via auto-suspend. Normal
+autoscaling always maintains `minReplicas >= 1` for HS2. Auto-suspend is the
+only mechanism that overrides this to achieve full hibernation.
+
+```
+                    Auto-Suspend Flow
+
+  1. Autoscaling scales all components to their minReplicas
+     (HS2≥1, HMS≥1, LLAP/TezAM to configured min)
+
+  2. Operator detects idle state:
+     - HS2 has 0 open sessions
+     - HMS at minReplicas (if includeMetastore=true)
+     - LLAP/TezAM at minReplicas
+
+  3. Idle timer starts (status: clusterPhase=Idle, idleSince=<now>)
+
+  4. After idleTimeoutMinutes (default 15):
+     - ALL components scaled to 0 (HMS excluded if includeMetastore=false)
+     - spec.suspend set to true (cluster stays suspended until user wakes it)
+     - Status: clusterPhase=Suspended, suspendedSince=<now>
+
+  5. To wake: kubectl patch hivecluster hive --type=merge -p 
'{"spec":{"suspend":false}}'
+     All components restored to minReplicas
+     (HS2/HMS ≥1, LLAP/TezAM ≥1 for immediate usability)
+
+```
+
+**Configuration:**
+
+```yaml
+cluster:
+  autoSuspend:
+    enabled: true
+    idleTimeoutMinutes: 15    # minutes idle before full hibernation
+    includeMetastore: true    # set false to keep HMS running during suspend
+```
+
+**Manual Suspend/Wake Commands:**
+
+```bash
+# Suspend immediately (bypasses idle timer)
+kubectl patch hivecluster hive --type=merge -p '{"spec":{"suspend":true}}'
+
+# Wake cluster (restores to minReplicas)
+kubectl patch hivecluster hive --type=merge -p '{"spec":{"suspend":false}}'
+```
+
+Manual suspend works regardless of whether `autoSuspend.enabled` is true — it
+immediately scales all components to 0 without waiting for the idle timeout.
+When `includeMetastore: false`, HMS stays running even during manual suspend.
+
+**Observing cluster state:**
+
+```bash
+# Quick view — printer columns show phase and idle time
+kubectl get hivecluster
+```
+```
+NAME   PHASE   IDLE (MIN)   AGE
+hive   Idle    12           2h
+```
+
+```bash
+# After suspend triggers
+kubectl get hivecluster
+```
+```
+NAME   PHASE       IDLE (MIN)   AGE
+hive   Suspended                2h
+```
+
+```bash
+# Full status (kubectl get hivecluster hive -o yaml)
+```
+```yaml
+status:
+  clusterPhase: Suspended
+  idleSince: "2026-06-08T10:00:00Z"
+  idleForMinutes: 15
+  suspendedSince: "2026-06-08T10:15:00Z"
+  conditions:
+    - type: Suspended
+      status: "True"
+      reason: AutoSuspend        # or ManualSuspend
+      message: "Cluster suspended after idle timeout"
+      lastTransitionTime: "2026-06-08T10:15:00Z"
+```
+
+When the cluster is running normally:
+```
+NAME   PHASE     IDLE (MIN)   AGE
+hive   Running                2h
+```
+
+**Full example (autoscaling + auto-suspend):**
+
+```yaml
+cluster:
+  autoSuspend:
+    enabled: true
+    idleTimeoutMinutes: 15
+    includeMetastore: false   # keep HMS running during suspend
+
+  hiveServer2:
+    replicas: 10
+    autoscaling:
+      enabled: true
+      minReplicas: 1
+
+  metastore:
+    replicas: 6
+    autoscaling:
+      enabled: true
+      minReplicas: 1
+
+  llap:
+    replicas: 8
+    autoscaling:
+      enabled: true
+      minReplicas: 0        # scales to 0 via normal autoscaling when HS2 idle
+
+  tezAm:
+    replicas: 10
+    autoscaling:
+      enabled: true
+      minReplicas: 0        # scales to 0 via normal autoscaling when HS2 idle
+```
+
+With this configuration, the cluster lifecycle is:
+1. Under load → all components scaled up by autoscaler
+2. Load drops → autoscaler scales to minReplicas (HS2=1, HMS=1, LLAP=0, 
TezAM=0)
+3. HS2 idle (0 sessions) for 15 minutes → auto-suspend kicks in → HS2, LLAP, 
TezAM to 0 (HMS stays at minReplicas)
+4. `kubectl patch hivecluster hive --type=merge -p 
'{"spec":{"suspend":false}}'` → wake → HS2=1, LLAP=1, TezAM=1
+5. User connects → autoscaler detects sessions → scales up as needed
+
+### CPU-Based Scaling (HS2 and HMS)
+
+In addition to the primary metrics (sessions for HS2, API request rate for 
HMS),
+the operator supports a secondary **CPU-based scaling signal** for HiveServer2 
and
+Metastore. The final desired replica count is:
+
+```
+final_desired = max(metric_desired, cpu_desired)
+```
+
+Either signal can trigger scale-up; neither can force scale-down below what the
+other recommends. CPU-based scaling uses the same stabilization windows as 
metric-based
+scaling (no separate CPU stabilization).
+
+**How it works:**
+
+1. The operator scrapes `ProcessCpuLoad` from `java.lang:type=OperatingSystem` 
via JMX
+   Exporter (exported as `jvm_process_cpu_load`, a 0.0–1.0 fraction)
+2. Averages across all pods, converts to percentage (0–100)
+3. If avg CPU >= `cpuScaleUpThreshold`: scales up proportionally
+   (`ceil(avgCpu * currentReplicas / cpuScaleUpThreshold)`)
+4. If avg CPU < `cpuScaleDownThreshold`: scales down
+   (`ceil(avgCpu * currentReplicas / cpuScaleUpThreshold)`, floored at 
`minReplicas`)
+5. Between thresholds: holds current replica count
+
+**Configuration:**
+
+| Value | Default | Description |
+|-------|---------|-------------|
+| `cluster.<component>.autoscaling.cpuScaleUpThreshold` | `90` | CPU 
percentage (0-100) that triggers scale-up. Set to `0` to disable CPU-based 
scaling. |
+| `cluster.<component>.autoscaling.cpuScaleDownThreshold` | `30` | CPU 
percentage (0-100) below which scale-down is considered. |
+
+**Example:**
+
+```yaml
+cluster:
+  hiveServer2:
+    replicas: 10
+    resources:
+      limitsCpu: "2"        # Recommended: set CPU limits so ProcessCpuLoad is 
relative to pod allocation
+    autoscaling:
+      enabled: true
+      cpuScaleUpThreshold: 90
+      cpuScaleDownThreshold: 30
+
+  metastore:
+    replicas: 6
+    resources:
+      limitsCpu: "2"
+    autoscaling:
+      enabled: true
+      cpuScaleUpThreshold: 90
+      cpuScaleDownThreshold: 30
+```
+
+**Important: CPU limits and metric accuracy**
+
+`ProcessCpuLoad` reports CPU usage as a fraction of **available processors**. 
Without
+CPU limits, the JVM sees all node cores (e.g., 8 cores), so even heavy 
single-pod
+load only shows ~12.5%. With `limitsCpu: "2"`, the JVM sees 2 processors and 
the
+metric becomes "% of allocated CPU" — making thresholds meaningful.
+
+| Pod CPU Limit | JVM sees | 90% threshold means |
+|---------------|----------|---------------------|
+| None (no limit) | All node cores (e.g., 8) | Using 7.2 of 8 cores — very 
hard to reach |
+| `2` | 2 cores | Using 1.8 of 2 allocated cores |
+| `4` | 4 cores | Using 3.6 of 4 allocated cores |
+
+**Recommendation:** Always set `resources.limitsCpu` when using CPU-based 
autoscaling.
+
+**Status output:**
+
+The operator reports CPU metrics in the HiveCluster status:
+
+```yaml
+status:
+  hiveServer2:
+    autoscaling:
+      currentMetricValue: 5           # total sessions
+      scaleUpThreshold: 100
+      currentCpuPercent: 72.45        # avg ProcessCpuLoad * 100
+      cpuScaleUpThreshold: 90
+      cpuProposedReplicas: 2          # what CPU alone would recommend
+      proposedReplicas: 2
+      lastScaleTime: "2026-05-31T04:23:07Z"
+```
+
+**Applicability:** CPU-based scaling only applies to HS2 and HMS. LLAP and 
TezAM
+do not use CPU as a scaling signal (LLAP scales on busy executor slots which 
already
+correlates with CPU; TezAM is demand-based from HS2 session count).
+
+---
+
+### Enabling Autoscaling
+
+**CLI (with Ozone storage backend):**
+
+Each component has sensible per-component defaults (see [Configuration 
Reference](#configuration-reference)).
+Only `enabled=true` is needed to turn on autoscaling:
+
+```bash
+helm install hive ./helm/hive-operator \
+  --set cluster.database.type=postgres \
+  --set 
cluster.database.url="jdbc:postgresql://postgres-postgresql:5432/metastore" \
+  --set cluster.database.driver="org.postgresql.Driver" \
+  --set cluster.database.username=hive \
+  --set cluster.database.passwordSecretRef.name=hive-db-secret \
+  --set cluster.database.passwordSecretRef.key=password \
+  --set 
cluster.database.driverJarUrl="https://repo1.maven.org/maven2/org/postgresql/postgresql/42.7.5/postgresql-42.7.5.jar";
 \
+  --set cluster.zookeeper.quorum="zookeeper:2181" \
+  --set cluster.storage.coreSiteOverrides."fs\.defaultFS"="s3a://hive" \
+  --set 
cluster.storage.coreSiteOverrides."fs\.s3a\.endpoint"="http://ozone-s3g-rest:9878";
 \
+  --set-string 
cluster.storage.coreSiteOverrides."fs\.s3a\.path\.style\.access"=true \
+  --set 'cluster.storage.envVars[0].name=HADOOP_OPTIONAL_TOOLS' \
+  --set 'cluster.storage.envVars[0].value=hadoop-aws' \
+  --set 'cluster.storage.envVars[1].name=AWS_ACCESS_KEY_ID' \
+  --set 'cluster.storage.envVars[1].value=ozone' \
+  --set 'cluster.storage.envVars[2].name=AWS_SECRET_ACCESS_KEY' \
+  --set 'cluster.storage.envVars[2].value=ozone' \
+  --set cluster.hiveServer2.autoscaling.enabled=true \
+  --set cluster.metastore.autoscaling.enabled=true \
+  --set cluster.llap.autoscaling.enabled=true \
+  --set cluster.tezAm.autoscaling.enabled=true
+```
+
+**Values file (for customizing beyond defaults):**
+
+```yaml
+# values-autoscaling.yaml — only override what you need
+cluster:
+  database:
+    type: postgres
+    url: "jdbc:postgresql://postgres-postgresql:5432/metastore"
+    driver: "org.postgresql.Driver"
+    username: hive
+    passwordSecretRef:
+      name: hive-db-secret
+      key: password
+    driverJarUrl: 
"https://repo1.maven.org/maven2/org/postgresql/postgresql/42.7.5/postgresql-42.7.5.jar";
+
+  zookeeper:
+    quorum: "zookeeper:2181"
+
+  storage:
+    coreSiteOverrides:
+      fs.defaultFS: "s3a://hive"
+      fs.s3a.endpoint: "http://ozone-s3g-rest:9878";
+      fs.s3a.path.style.access: "true"
+    envVars:
+      - name: HADOOP_OPTIONAL_TOOLS
+        value: "hadoop-aws"
+      - name: AWS_ACCESS_KEY_ID
+        value: "ozone"
+      - name: AWS_SECRET_ACCESS_KEY
+        value: "ozone"
+
+  hiveServer2:
+    replicas: 10              # Acts as maxReplicas when autoscaling is enabled
+    autoscaling:
+      enabled: true
+      # minReplicas: 1        # default — always keep at least 1 HS2 running
+      # scaleUpThreshold: 80  # default — avg open sessions per pod triggering 
scale-up
+      # scaleUpStabilizationSeconds: 60   # default — scale-up window
+      # scaleDownStabilizationSeconds: 600 # default — scale-down window (also 
acts as cooldown)
+      # metricsScrapeIntervalSeconds: 10  # default — operator scrape interval 
(lower = faster reaction)
+
+  metastore:
+    replicas: 6               # Acts as maxReplicas when autoscaling is enabled
+    autoscaling:
+      enabled: true
+      # minReplicas: 1        # default — always keep at least 1 metastore 
running
+      # scaleUpThreshold: 75  # default — API request rate (req/s) triggering 
scale-up
+      # scaleUpStabilizationSeconds: 60   # default — scale-up window
+      # scaleDownStabilizationSeconds: 300 # default — scale-down window (also 
acts as cooldown)
+      # gracePeriodSeconds: 60 # default — fast drain (HMS is stateless)
+      # metricsScrapeIntervalSeconds: 10  # default — operator scrape interval
+
+  llap:
+    replicas: 8               # Acts as maxReplicas when autoscaling is enabled
+    autoscaling:
+      enabled: true
+      # minReplicas: 0        # default — scale to zero when no HS2 sessions
+      # scaleUpThreshold: 1   # default — total busy slots (queued+running) 
triggering scale-up
+      # scaleUpStabilizationSeconds: 60   # default — scale-up window
+      # scaleDownStabilizationSeconds: 900 # default — scale-down window (long 
— scaling down destroys cache)
+      # gracePeriodSeconds: 600 # default — 10 min drain for in-flight 
fragments
+      # metricsScrapeIntervalSeconds: 10  # default — operator scrape interval 
(lower = faster reaction)
+
+  tezAm:
+    replicas: 10              # Acts as maxReplicas when autoscaling is enabled
+    autoscaling:
+      enabled: true
+      # minReplicas: 0        # default — scale to zero when no HS2 sessions
+      # scaleUpThreshold: 1   # default — threshold for demand metric (1 = 
match HS2 pod count)
+      # scaleUpStabilizationSeconds: 60   # default — HPA scale-up window
+      # scaleDownStabilizationSeconds: 300 # default — HPA scale-down window
+      # gracePeriodSeconds: 120 # default — 2 min drain for DAG completion
+      # metricsScrapeIntervalSeconds: 10  # default — operator scrape interval 
(lower = faster reaction)
+```
+
+```bash
+helm install hive ./helm/hive-operator -f values-autoscaling.yaml
+```
+
+When autoscaling is enabled, the operator automatically:
+- Deploys the JMX Exporter javaagent (port 9404, `/metrics`)
+- Enables `hive.server2.metrics.enabled` / `metastore.metrics.enabled` (JMX 
reporter)
+- Attaches JMX Exporter javaagent (port 9404, `/metrics`) to each pod
+- Creates PodDisruptionBudgets (minAvailable: 1)
+- Configures preStop lifecycle hooks for graceful drain
+- Sets `terminationGracePeriodSeconds` to the configured grace period
+- LLAP/TezAM use HS2 metrics as activation gate (only scale when HS2 has 
sessions)
+
+**JMX Metrics Scraped by Operator (per component):**
+
+| Component | Key Metrics | Purpose |
+|-----------|---------|---------|
+| **HiveServer2** | `hs2_open_sessions`, `jvm_process_cpu_load` | Session 
count for primary scaling + CPU for secondary scaling signal |
+| **Metastore** | `api_*_total`, `jvm_process_cpu_load` | API call counters 
(operator computes request rate from deltas) + CPU for secondary scaling signal 
|
+| **LLAP** | `hadoop_llapdaemon_executornumqueuedrequests`, 
`hadoop_llapdaemon_executornumexecutorsconfigured`, 
`hadoop_llapdaemon_executornumexecutorsavailable` | Total busy slots = queued + 
configured - available |
+| **Tez AM** | N/A (scales on HS2 metrics) | TezAM scaling is demand-driven 
from `hs2_open_sessions` — no TezAM-specific metrics needed |
+
+### Enabling Autoscaling — Example
+
+To enable autoscaling for HS2 and Metastore:
+
+```yaml
+cluster:
+  hiveServer2:
+    replicas: 4                 # max replicas ceiling
+    autoscaling:
+      enabled: true
+      scaleUpThreshold: 1       # scale up when total sessions > 1
+      minReplicas: 1            # always keep at least 1 HS2 pod running
+
+  metastore:
+    replicas: 3                 # max replicas ceiling
+    autoscaling:
+      enabled: true
+      minReplicas: 1            # always keep at least 1 running
+      scaleUpThreshold: 75      # API requests/sec threshold
+```
+
+> **Note:** LLAP scales on total busy slots (queued + running executors).
+> TezAM scales on demand — the number of active HS2 pods multiplied by
+> `hive.server2.tez.sessions.per.default.queue` (default 1).
+
+### Helm Values Reference (Autoscaling)
+
+| Value | Default | Description |
+|-------|---------|-------------|
+| `cluster.<component>.replicas` | `1-2` | Static replica count, or max 
replicas ceiling when autoscaling is enabled |
+| `cluster.<component>.autoscaling.enabled` | `false` | Enable operator-driven 
autoscaling |
+| `cluster.<component>.autoscaling.minReplicas` | `1` (HS2/HMS), `0` 
(LLAP/TezAM) | Minimum replica count. Set to 0 for scale-to-zero (LLAP, TezAM 
only; HS2 minimum is 1) |
+| `cluster.<component>.autoscaling.scaleUpThreshold` | varies | Metric 
threshold triggering scale-up |
+| `cluster.<component>.autoscaling.scaleUpStabilizationSeconds` | `60` | 
Stabilization window for scale-up (picks highest recommendation in window) |
+| `cluster.<component>.autoscaling.scaleDownStabilizationSeconds` | `300-900` 
| Stabilization window for scale-down (picks most conservative recommendation 
in window). Also acts as cooldown between consecutive scale-downs. |
+| `cluster.<component>.autoscaling.gracePeriodSeconds` | `3600` | Safety cap: 
max drain time before forced termination. Pod exits immediately once drain 
completes. |
+| `cluster.<component>.autoscaling.metricsScrapeIntervalSeconds` | `10` | How 
often the operator scrapes JMX metrics from pods. Lower = faster reaction. |
+| `cluster.<component>.autoscaling.cpuScaleUpThreshold` | `90` | CPU 
percentage (0-100) triggering scale-up. Only HS2/HMS. Set to 0 to disable. |
+| `cluster.<component>.autoscaling.cpuScaleDownThreshold` | `30` | CPU 
percentage (0-100) below which scale-down is considered. Only HS2/HMS. |
+
+---
+
 ## Connect to HiveServer2
 
+HiveServer2 runs in **HTTP transport mode** by default (recommended for 
Kubernetes
+environments as it works well with load balancers, ingress controllers, and 
proxies).
+
+### Standard Connection (minReplicas >= 1)
+
+When HS2 always has at least one pod running, connect directly to the service:
+
 ```bash
-kubectl exec -it deployment/hive-hiveserver2 -- beeline -u 
"jdbc:hive2://hive-hiveserver2:10000/"
+kubectl exec -it deployment/hive-hiveserver2 -- beeline -u 
"jdbc:hive2://hive-hiveserver2:10001/;transportMode=http;httpPath=cliservice"
 ```

Review Comment:
   PR metadata says there is no user-facing change, but this change set 
switches the documented/default HiveServer2 connection to HTTP transport on 
port 10001 (and also configures transport-mode defaults in the operator). That 
affects how users connect (JDBC URL/ports) and should be called out as 
user-facing behavior in the PR description/release notes.



##########
packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/BackgroundMetricsScraper.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.kubernetes.operator.autoscaling;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Runs periodic metrics scraping in the background so that the JOSDK reconcile
+ * thread is never blocked by HTTP calls to pod JMX exporters.
+ * <p>
+ * Each component gets its own scheduled task that writes results to a shared
+ * {@link MetricsCache}. The reconciler reads from that cache (non-blocking).
+ */
+public class BackgroundMetricsScraper {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BackgroundMetricsScraper.class);
+
+  private final ScheduledExecutorService scheduler;
+  private final MetricsScraper scraper;
+  private final MetricsCache cache;
+  // Key: "namespace/clusterName/component" → active scrape task
+  private final ConcurrentHashMap<String, ScheduledFuture<?>> activeTasks =
+      new ConcurrentHashMap<>();
+  // Tracks registered intervals to detect spec changes
+  private final ConcurrentHashMap<String, Integer> registeredIntervals =
+      new ConcurrentHashMap<>();
+
+  public BackgroundMetricsScraper(MetricsScraper scraper, MetricsCache cache) {
+    this.scraper = scraper;
+    this.cache = cache;
+    this.scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
+      Thread t = new Thread(r, "hive-metrics-scraper");
+      t.setDaemon(true);
+      return t;
+    });

Review Comment:
   BackgroundMetricsScraper uses a single-thread ScheduledExecutorService. With 
autoscaling enabled for multiple components, one slow scrape (up to the per-pod 
timeout) can delay all other components and effectively violate the configured 
metricsScrapeIntervalSeconds for them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to