Copilot commented on code in PR #6507:
URL: https://github.com/apache/hive/pull/6507#discussion_r3413889774
##########
packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/HiveDependentResource.java:
##########
@@ -422,4 +633,233 @@ protected static Probe buildTcpProbe(int port, ProbeSpec
spec, int defaultInitia
return builder.build();
}
+ /**
+ * Applies the autoscaling lifecycle to a workload's pod template: sets a
preStop
+ * exec lifecycle hook, terminationGracePeriodSeconds, and Prometheus scrape
annotations.
+ *
+ * @param podSpec the pod spec of the workload (Deployment or
StatefulSet)
+ * @param podMetadata the pod template metadata (for annotations)
+ * @param preStopScript the shell script to run in the preStop hook
+ * @param gracePeriodSeconds termination grace period
+ */
+ protected static void applyAutoscalingLifecycle(
+ io.fabric8.kubernetes.api.model.PodSpec podSpec,
+ io.fabric8.kubernetes.api.model.ObjectMeta podMetadata,
+ String preStopScript, int gracePeriodSeconds,
+ int metricsScrapeIntervalSeconds) {
+ io.fabric8.kubernetes.api.model.Lifecycle lifecycle =
+ new io.fabric8.kubernetes.api.model.LifecycleBuilder()
+ .withNewPreStop()
+ .withNewExec()
+ .withCommand("/bin/bash", "-c", preStopScript)
+ .endExec()
+ .endPreStop()
+ .build();
+ podSpec.getContainers().get(0).setLifecycle(lifecycle);
+ podSpec.setTerminationGracePeriodSeconds((long) gracePeriodSeconds);
+ applyPrometheusScrapeAnnotations(podMetadata,
metricsScrapeIntervalSeconds);
+ }
+
+ /**
+ * Adds Prometheus scrape annotations to a pod template so that
+ * the JMX Exporter metrics endpoint is discovered by Prometheus.
+ */
+ private static void applyPrometheusScrapeAnnotations(
+ io.fabric8.kubernetes.api.model.ObjectMeta podMetadata,
+ int scrapeIntervalSeconds) {
+ podMetadata.getAnnotations().put("prometheus.io/scrape", "true");
+ podMetadata.getAnnotations().put("prometheus.io/port",
+ String.valueOf(ConfigUtils.PROMETHEUS_JMX_EXPORTER_PORT));
+ podMetadata.getAnnotations().put("prometheus.io/path", "/metrics");
+ podMetadata.getAnnotations().put("prometheus.io/scrape-interval",
+ scrapeIntervalSeconds + "s");
+ }
Review Comment:
applyPrometheusScrapeAnnotations() assumes podMetadata.getAnnotations() is
non-null; if the pod template has no annotations yet, this will throw a
NullPointerException and break reconciliation when autoscaling lifecycle is
applied. Initialize the annotations map before writing to it.
##########
packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/HiveDependentResource.java:
##########
@@ -125,6 +134,208 @@ public Matcher.Result<R> match(R actualResource, R
desired,
return super.match(actualResource, desired, primary, context);
}
+ @Override
+ protected R handleCreate(R desired, P primary, Context<P> context) {
+ try {
+ return super.handleCreate(desired, primary, context);
+ } catch (KubernetesClientException e) {
+ if (e.getCode() == 409) {
+ LOG.info("Resource {} already exists (informer lag), "
+ + "will reconcile on next event",
+ desired.getMetadata().getName());
+ return desired;
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * Resolves the replica count to set in the desired workload spec.
+ * <p>
+ * Always returns an explicit value — never null. Returning null would cause
+ * JOSDK/SSA to omit spec.replicas, and Kubernetes would default it to 1.
+ * <p>
+ * When autoscaling is enabled:
+ * - On CREATE: returns initialReplicas (minReplicas for the component)
+ * - On UPDATE: returns the autoscaler's managed value, or falls back to
+ * the current actual replicas from the informer cache.
+ * <p>
+ * When autoscaling is disabled: returns staticReplicas (the spec value).
+ */
+ protected Integer resolveReplicaCount(P primary, Context<P> context,
+ AutoscalingSpec autoscaling, int staticReplicas, int initialReplicas) {
+ // Suspended cluster → 0 replicas (dependent resources natively respect
suspend).
+ // Exception: HMS stays running if includeMetastore=false in autoSuspend
config.
+ if (primary instanceof HiveCluster hc && hc.getSpec().suspend()) {
+ boolean isMetastore =
ConfigUtils.COMPONENT_METASTORE.equals(getComponentName());
+ if (!isMetastore || hc.getSpec().autoSuspend().includeMetastore()) {
+ return 0;
+ }
+ }
+ if (autoscaling == null || !autoscaling.isEnabled()) {
+ return staticReplicas;
+ }
+ Optional<R> existing = getSecondaryResource(primary, context);
+ if (existing.isPresent()) {
+ // Check if the autoscaler has made a decision during this operator's
lifecycle
+ Integer managed = HiveClusterAutoscaler.getManagedReplicas(
+ primary.getMetadata().getNamespace(),
+ primary.getMetadata().getName(),
+ getComponentName());
+ if (managed != null) {
+ return managed;
+ }
+ // Fallback: operator restarted and MANAGED_REPLICAS is empty — read
current value
+ R resource = existing.get();
+ if (resource instanceof io.fabric8.kubernetes.api.model.apps.Deployment
d) {
+ return d.getSpec() != null && d.getSpec().getReplicas() != null
+ ? d.getSpec().getReplicas() : initialReplicas;
+ }
+ if (resource instanceof io.fabric8.kubernetes.api.model.apps.StatefulSet
s) {
+ return s.getSpec() != null && s.getSpec().getReplicas() != null
+ ? s.getSpec().getReplicas() : initialReplicas;
+ }
+ return initialReplicas;
+ }
+ // First creation: start at minReplicas.
+ return initialReplicas;
+ }
+
+
+ /**
+ * Returns the component name for this dependent (used for autoscaler
replica lookup).
+ * Subclasses should override if they manage a workload with autoscaling.
+ */
+ protected String getComponentName() {
+ return null;
+ }
+
+ /**
+ * Builds a preStop drain script that polls a single Prometheus metric
+ * (from the JMX Exporter at localhost:9404/metrics) until the value
+ * reaches zero, then exits to allow graceful pod termination.
+ *
+ * @param startupMessage logged at the start (e.g. "Waiting for open
connections to drain")
+ * @param metricName Prometheus metric name (used in grep and log
messages)
+ * @param varName shell variable name for the extracted value (e.g.
"CONNS")
+ * @param idleMessage logged when idle condition is met (e.g. "All
connections drained. Shutting down.")
+ * @param sleepSeconds polling interval in seconds
+ * @param maxRetries max consecutive curl failures before giving up
+ * @param prefixCommands optional commands to run before the polling loop
(may be null)
+ */
+ protected static String buildDrainScript(
+ String startupMessage, String metricName, String varName,
+ String idleMessage, int sleepSeconds, int maxRetries,
+ List<String> prefixCommands) {
+ List<String> lines = new ArrayList<>();
+ lines.add("#!/bin/bash");
+ if (prefixCommands != null) {
+ lines.addAll(prefixCommands);
+ }
+ lines.add("echo '[preStop] " + startupMessage
+ + " (polling localhost:9404/metrics)...'");
+ lines.add("RETRIES=0");
+ lines.add("while true; do");
+ lines.add(" RESPONSE=$(curl -sf http://localhost:9404/metrics)");
+ lines.add(" if [ $? -ne 0 ]; then");
+ lines.add(" RETRIES=$((RETRIES+1))");
+ lines.add(" echo \"[preStop] ERROR: JMX Exporter unreachable on port
9404 (attempt $RETRIES)\"");
+ lines.add(" if [ $RETRIES -ge " + maxRetries + " ]; then");
Review Comment:
The preStop drain scripts hardcode the JMX exporter endpoint as
localhost:9404, but AutoscalingSpec exposes metricsPort and the operator passes
that port into addJmxExporter(). If a user overrides metricsPort, the drain
hook will poll the wrong port and may skip draining (or force shutdown after
retries). Either make the drain script/Prometheus annotations use the
configured metricsPort, or remove metricsPort configurability and enforce 9404
everywhere.
##########
packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/HiveServer2ScalingStrategy.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.kubernetes.operator.autoscaling;
+
+import java.util.List;
+
+import org.apache.hive.kubernetes.operator.model.spec.AutoscalingSpec;
+
+/**
+ * Scaling strategy for HiveServer2.
+ * desired = ceil(sum(hs2_open_sessions across all pods) / scaleUpThreshold)
+ * Uses sum() so that each session is counted — prevents premature scale-down
+ * of pods that still have active sessions.
+ */
+public class HiveServer2ScalingStrategy implements ScalingStrategy {
+
+ static final String METRIC_OPEN_SESSIONS = "hs2_open_sessions";
+
+ private int lastMetric;
+
+ @Override
+ public int computeDesiredReplicas(List<PodMetrics> podMetrics,
+ AutoscalingSpec autoscaling, int maxReplicas) {
+ // HS2 is the cluster entry point — scaling to 0 makes the cluster
unreachable.
+ // Enforce floor of 1 regardless of CRD defaults or user misconfiguration.
+ int safeMinReplicas = Math.max(1, autoscaling.minReplicas());
+
+ double totalSessions = 0;
+ for (PodMetrics pm : podMetrics) {
+ totalSessions += pm.metrics().getOrDefault(METRIC_OPEN_SESSIONS, 0.0);
+ }
+
+ lastMetric = (int) totalSessions;
+
+ if (totalSessions <= 0) {
+ return safeMinReplicas;
+ }
+
+ int desired = (int) Math.ceil(totalSessions /
autoscaling.scaleUpThreshold());
+ return Math.max(desired, safeMinReplicas);
Review Comment:
computeDesiredReplicas() divides by autoscaling.scaleUpThreshold() without
guarding against 0/negative values. A misconfigured CR (or default drift) would
cause a division-by-zero at runtime and break autoscaling.
##########
packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/LlapScalingStrategy.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.kubernetes.operator.autoscaling;
+
+import java.util.List;
+
+import org.apache.hive.kubernetes.operator.model.HiveCluster;
+import org.apache.hive.kubernetes.operator.model.spec.AutoscalingSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Scaling strategy for LLAP daemons.
+ * Formula: avg(QueuedRequests + Configured - Available) across all pods.
+ * This represents average "busy slots + queued" per daemon.
+ * desired = ceil(avg_busy / scaleUpThreshold)
+ * <p>
+ * Activation gate: only scale if HS2 has open sessions (prevents zombie
scaling).
+ */
+public class LlapScalingStrategy implements ScalingStrategy {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(LlapScalingStrategy.class);
+
+ static final String METRIC_QUEUED =
"hadoop_llapdaemon_executornumqueuedrequests";
+ static final String METRIC_CONFIGURED =
"hadoop_llapdaemon_executornumexecutorsconfigured";
+ static final String METRIC_AVAILABLE =
"hadoop_llapdaemon_executornumexecutorsavailable";
+
+ private final HiveClusterAutoscaler orchestrator;
+ private final HiveCluster cluster;
+ private int lastMetric;
+
+ public LlapScalingStrategy(HiveClusterAutoscaler orchestrator, HiveCluster
cluster) {
+ this.orchestrator = orchestrator;
+ this.cluster = cluster;
+ }
+
+ @Override
+ public int computeDesiredReplicas(List<PodMetrics> podMetrics,
+ AutoscalingSpec autoscaling, int maxReplicas) {
+
+ // Activation gate: check if HS2 has any open sessions.
+ // If scrape returns empty but LLAP has running pods, treat as "unknown"
and preserve.
+ // This prevents spurious scale-to-zero from transient scrape failures
after operator restart.
+ List<PodMetrics> hs2Metrics = orchestrator.getHs2MetricsFromCache(cluster);
+ Boolean sessionsDetected = detectHs2Sessions(hs2Metrics);
+ if (sessionsDetected == null && !podMetrics.isEmpty()) {
+ // HS2 scrape returned no data but LLAP is running — hold current state
+ LOG.debug("[llap] HS2 scrape returned no pods; preserving LLAP (has {}
running pods)", podMetrics.size());
+ lastMetric = 0;
+ return Math.max(1, autoscaling.minReplicas());
+ }
+ if (sessionsDetected == null || !sessionsDetected) {
+ LOG.debug("[llap] HS2 has no open sessions, scaling to minReplicas");
+ lastMetric = 0;
+ return autoscaling.minReplicas();
+ }
+
+ // HS2 has sessions but LLAP has no pods yet — scale up to at least 1
+ if (podMetrics.isEmpty()) {
+ int minReplica = Math.max(1, autoscaling.minReplicas());
+ LOG.debug("[llap] HS2 has sessions but LLAP has 0 pods, scaling to {}",
minReplica);
+ lastMetric = 0;
+ return minReplica;
+ }
+
+ // Compute average busy slots across all LLAP pods
+ double totalBusy = 0;
+ int podCount = 0;
+ for (PodMetrics pm : podMetrics) {
+ double queued = pm.metrics().getOrDefault(METRIC_QUEUED, 0.0);
+ double configured = pm.metrics().getOrDefault(METRIC_CONFIGURED, 0.0);
+ double available = pm.metrics().getOrDefault(METRIC_AVAILABLE, 0.0);
+ double busy = queued + configured - available;
+ totalBusy += busy;
+ podCount++;
+ }
+
+ double avgBusy = totalBusy / podCount;
+ lastMetric = (int) Math.round(avgBusy);
+
+ if (avgBusy <= 0) {
+ // HS2 has sessions (passed activation gate above) but executors are
idle between queries.
+ // Keep at least 1 daemon to avoid flapping: scaling to 0 here would
cause immediate
+ // scale-back-up on the next evaluation when the empty-pod path triggers.
+ return Math.max(1, autoscaling.minReplicas());
+ }
+
+ LOG.debug("[llap] avgBusy={}, threshold={}", String.format("%.2f",
avgBusy),
+ autoscaling.scaleUpThreshold());
+
+ int desired = (int) Math.ceil(avgBusy / autoscaling.scaleUpThreshold());
+ return Math.max(desired, autoscaling.minReplicas());
Review Comment:
computeDesiredReplicas() divides by autoscaling.scaleUpThreshold() without
guarding against 0/negative values. A misconfigured threshold would cause a
division-by-zero and break LLAP autoscaling.
##########
packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/HiveClusterAutoscaler.java:
##########
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.kubernetes.operator.autoscaling;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.hive.kubernetes.operator.model.HiveCluster;
+import org.apache.hive.kubernetes.operator.model.HiveClusterSpec;
+import org.apache.hive.kubernetes.operator.model.spec.AutoscalingSpec;
+import org.apache.hive.kubernetes.operator.model.status.AutoscalingStatus;
+import org.apache.hive.kubernetes.operator.util.ConfigUtils;
+import org.apache.hive.kubernetes.operator.util.Labels;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Main autoscaling orchestrator. Evaluates all enabled components and
+ * returns a map of component → desired replica count for those that need
changing.
+ * <p>
+ * Maintains per-cluster, per-component state (stabilization windows).
+ */
+public class HiveClusterAutoscaler {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HiveClusterAutoscaler.class);
+
+ /** Result of evaluating all components. */
+ public record AutoscalingEvaluation(
+ Map<String, Integer> patches,
+ Map<String, AutoscalingStatus> statuses) {}
+
+ // Shared replica store: the autoscaler writes its desired replicas here so
that
+ // dependent resources can read them (avoids informer cache lag reverting
patches).
+ // Key: "namespace/clusterName/component" → desired replicas
+ private static final ConcurrentHashMap<String, Integer> MANAGED_REPLICAS =
+ new ConcurrentHashMap<>();
+
+ /**
+ * Returns the autoscaler-managed replica count for a component, or null if
the
+ * autoscaler hasn't made a decision yet (e.g., first reconcile before
evaluation runs).
+ */
+ public static Integer getManagedReplicas(String namespace, String
clusterName, String component) {
+ return MANAGED_REPLICAS.get(namespace + "/" + clusterName + "/" +
component);
+ }
+
+ /**
+ * Sets the managed replica count for a component. Used by suspend/wake logic
+ * to override what the autoscaler would normally compute.
+ */
+ public static void setManagedReplicas(String namespace, String clusterName,
+ String component, int replicas) {
+ MANAGED_REPLICAS.put(namespace + "/" + clusterName + "/" + component,
replicas);
+ }
+
+ private record PendingScaleDown(int targetReplicas, Instant annotatedAt) {}
+
+ private final MetricsScraper scraper;
+ private final BackgroundMetricsScraper bgScraper;
+ private final MetricsCache metricsCache;
+ // Key: "namespace/clusterName/component"
+ private final ConcurrentHashMap<String, ComponentAutoscaler> autoscalers =
+ new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, String> lastScaleTimes =
+ new ConcurrentHashMap<>();
+ // Two-phase scale-down: holds deferred scale-down targets while
pod-deletion-cost
+ // annotations propagate (2s delay before applying the actual scale patch).
+ private final ConcurrentHashMap<String, PendingScaleDown> pendingScaleDowns =
+ new ConcurrentHashMap<>();
+
+ public HiveClusterAutoscaler(MetricsScraper scraper,
+ BackgroundMetricsScraper bgScraper, MetricsCache metricsCache) {
+ this.scraper = scraper;
+ this.bgScraper = bgScraper;
+ this.metricsCache = metricsCache;
+ }
+
+ public BackgroundMetricsScraper getBackgroundScraper() {
+ return bgScraper;
+ }
+
+ /**
+ * Removes all in-memory state for a deleted HiveCluster to prevent memory
leaks.
+ */
+ public void cleanupCluster(String namespace, String clusterName) {
+ String prefix = namespace + "/" + clusterName + "/";
+ MANAGED_REPLICAS.keySet().removeIf(k -> k.startsWith(prefix));
+ autoscalers.keySet().removeIf(k -> k.startsWith(prefix));
+ lastScaleTimes.keySet().removeIf(k -> k.startsWith(prefix));
+ pendingScaleDowns.keySet().removeIf(k -> k.startsWith(prefix));
+ LOG.info("Cleaned up autoscaler state for {}/{}", namespace, clusterName);
+ }
+
+ /**
+ * Returns true if there are pending scale-down operations waiting for
+ * annotation propagation. The reconciler should reschedule sooner (2s)
+ * when this returns true.
+ */
+ public boolean hasPendingScaleDowns() {
+ return !pendingScaleDowns.isEmpty();
+ }
+
+ /**
+ * Evaluate all autoscaling-enabled components and return patches and status
info.
+ *
+ * @param cluster the HiveCluster resource
+ * @param client the Kubernetes client (for reading current replica counts)
+ * @return evaluation result with patches and per-component autoscaling
statuses
+ */
+ public AutoscalingEvaluation evaluate(HiveCluster cluster, KubernetesClient
client) {
+ Map<String, Integer> patches = new HashMap<>();
+ Map<String, AutoscalingStatus> statuses = new HashMap<>();
+ HiveClusterSpec spec = cluster.getSpec();
+ String namespace = cluster.getMetadata().getNamespace();
+ String clusterName = cluster.getMetadata().getName();
+
+ // HiveServer2
+ if (spec.hiveServer2().autoscaling().isEnabled()) {
+ AutoscalingSpec hs2Auto = spec.hiveServer2().autoscaling();
+ String hs2Key = namespace + "/" + clusterName + "/" +
ConfigUtils.COMPONENT_HIVESERVER2;
+ Map<String, String> hs2Selector = Labels.selectorForComponent(cluster,
ConfigUtils.COMPONENT_HIVESERVER2);
+ bgScraper.registerOrUpdate(namespace, clusterName,
+ ConfigUtils.COMPONENT_HIVESERVER2, hs2Selector,
+ hs2Auto.metricsPort(), hs2Auto.metricsScrapeIntervalSeconds());
+ int maxStale = hs2Auto.metricsScrapeIntervalSeconds() * 3;
+ List<PodMetrics> hs2Metrics = metricsCache.getOrEmpty(hs2Key, maxStale);
+
+ // Two-phase scale-down: check if a pending scale-down from a prior
+ // reconcile is ready to be applied (annotations have propagated).
+ PendingScaleDown pending = pendingScaleDowns.get(hs2Key);
+ if (pending != null) {
+ if (Duration.between(pending.annotatedAt(), Instant.now()).toSeconds()
>= 2) {
+ patches.put(ConfigUtils.COMPONENT_HIVESERVER2,
pending.targetReplicas());
+ MANAGED_REPLICAS.put(hs2Key, pending.targetReplicas());
+ lastScaleTimes.put(hs2Key, Instant.now().toString());
+ pendingScaleDowns.remove(hs2Key);
+ LOG.info("[hiveserver2] Applying deferred scale-down to {}
replicas", pending.targetReplicas());
+ }
+ // Build status even when waiting for pending scale-down
+ evaluateComponent(cluster, client, namespace, clusterName,
+ ConfigUtils.COMPONENT_HIVESERVER2, hs2Auto,
+ spec.hiveServer2().replicas(), new HashMap<>(), statuses,
hs2Metrics);
+ } else {
+ // Pod deletion cost only applies to Deployments (ReplicaSet
controller).
+ // StatefulSets always scale down by highest ordinal regardless of this
+ // annotation. LLAP/TezAM graceful drain is handled by preStop hooks.
+ updateDeploymentPodDeletionCost(client, namespace, hs2Metrics,
"hs2_open_sessions");
+
+ Map<String, Integer> hs2Patches = new HashMap<>();
+ evaluateComponent(cluster, client, namespace, clusterName,
+ ConfigUtils.COMPONENT_HIVESERVER2, hs2Auto,
+ spec.hiveServer2().replicas(), hs2Patches, statuses, hs2Metrics);
+
+ Integer hs2Patch = hs2Patches.get(ConfigUtils.COMPONENT_HIVESERVER2);
+ int currentReplicas = getCurrentReplicas(client, namespace,
clusterName, ConfigUtils.COMPONENT_HIVESERVER2);
+ if (hs2Patch != null && hs2Patch < currentReplicas) {
+ // Scale-down: defer to allow deletion-cost annotations to propagate
+ pendingScaleDowns.put(hs2Key, new PendingScaleDown(hs2Patch,
Instant.now()));
+ LOG.info("[hiveserver2] Deferring scale-down to {} (waiting for
deletion-cost propagation)",
+ hs2Patch);
+ } else if (hs2Patch != null) {
+ // Scale-up: apply immediately
+ patches.put(ConfigUtils.COMPONENT_HIVESERVER2, hs2Patch);
+ MANAGED_REPLICAS.put(hs2Key, hs2Patch);
+ }
+ }
+ }
+
+ // Metastore
+ if (spec.metastore().isEnabled() &&
spec.metastore().autoscaling().isEnabled()) {
+ AutoscalingSpec msAuto = spec.metastore().autoscaling();
+ Map<String, String> msSelector = Labels.selectorForComponent(cluster,
ConfigUtils.COMPONENT_METASTORE);
+ bgScraper.registerOrUpdate(namespace, clusterName,
+ ConfigUtils.COMPONENT_METASTORE, msSelector,
+ msAuto.metricsPort(), msAuto.metricsScrapeIntervalSeconds());
+ String msKey = namespace + "/" + clusterName + "/" +
ConfigUtils.COMPONENT_METASTORE;
+ List<PodMetrics> msMetrics = metricsCache.getOrEmpty(msKey,
msAuto.metricsScrapeIntervalSeconds() * 3);
+ evaluateComponent(cluster, client, namespace, clusterName,
+ ConfigUtils.COMPONENT_METASTORE, msAuto,
+ spec.metastore().replicas(), patches, statuses, msMetrics);
+ }
+
+ // LLAP
+ if (spec.llap().isEnabled() && spec.llap().autoscaling().isEnabled()) {
+ AutoscalingSpec llapAuto = spec.llap().autoscaling();
+ Map<String, String> llapSelector = Labels.selectorForComponent(cluster,
ConfigUtils.COMPONENT_LLAP);
+ bgScraper.registerOrUpdate(namespace, clusterName,
+ ConfigUtils.COMPONENT_LLAP, llapSelector,
+ llapAuto.metricsPort(), llapAuto.metricsScrapeIntervalSeconds());
+ String llapKey = namespace + "/" + clusterName + "/" +
ConfigUtils.COMPONENT_LLAP;
+ List<PodMetrics> llapMetrics = metricsCache.getOrEmpty(llapKey,
llapAuto.metricsScrapeIntervalSeconds() * 3);
+ evaluateComponent(cluster, client, namespace, clusterName,
+ ConfigUtils.COMPONENT_LLAP, llapAuto,
+ spec.llap().replicas(), patches, statuses, llapMetrics);
+ }
+
+ // TezAM
+ if (spec.tezAm().isEnabled() && spec.tezAm().autoscaling().isEnabled()) {
+ AutoscalingSpec tezAuto = spec.tezAm().autoscaling();
+ Map<String, String> tezSelector = Labels.selectorForComponent(cluster,
ConfigUtils.COMPONENT_TEZAM);
+ bgScraper.registerOrUpdate(namespace, clusterName,
+ ConfigUtils.COMPONENT_TEZAM, tezSelector,
+ tezAuto.metricsPort(), tezAuto.metricsScrapeIntervalSeconds());
+ String tezKey = namespace + "/" + clusterName + "/" +
ConfigUtils.COMPONENT_TEZAM;
+ List<PodMetrics> tezMetrics = metricsCache.getOrEmpty(tezKey,
tezAuto.metricsScrapeIntervalSeconds() * 3);
+ evaluateComponent(cluster, client, namespace, clusterName,
+ ConfigUtils.COMPONENT_TEZAM, tezAuto,
+ spec.tezAm().replicas(), patches, statuses, tezMetrics);
+ }
+
+ return new AutoscalingEvaluation(patches, statuses);
+ }
+
+ /**
+ * Returns cached HS2 metrics (used by LLAP/TezAM activation gate).
+ * Non-blocking — reads from the background-scraper cache.
+ */
+ public List<PodMetrics> getHs2MetricsFromCache(HiveCluster cluster) {
+ String namespace = cluster.getMetadata().getNamespace();
+ String clusterName = cluster.getMetadata().getName();
+ String key = namespace + "/" + clusterName + "/" +
ConfigUtils.COMPONENT_HIVESERVER2;
+ int maxStale =
cluster.getSpec().hiveServer2().autoscaling().metricsScrapeIntervalSeconds() *
3;
+ return metricsCache.getOrEmpty(key, maxStale);
+ }
+
+ private void evaluateComponent(HiveCluster cluster, KubernetesClient client,
+ String namespace, String clusterName, String component,
+ AutoscalingSpec autoscaling, int maxReplicas,
+ Map<String, Integer> patches, Map<String, AutoscalingStatus> statuses,
+ List<PodMetrics> metrics) {
+
+ int currentReplicas = getCurrentReplicas(client, namespace, clusterName,
component);
+
+ String key = namespace + "/" + clusterName + "/" + component;
+
+ // For LLAP and TezAM, scaling decisions are based on HS2 metrics
(activation gate),
+ // not their own pod metrics. Allow evaluation even with 0 own pods.
+ boolean usesHs2Activation = ConfigUtils.COMPONENT_LLAP.equals(component)
+ || ConfigUtils.COMPONENT_TEZAM.equals(component);
+
+ if (metrics.isEmpty() && !usesHs2Activation) {
+ LOG.debug("[{}] No ready pods to scrape, skipping", component);
+ MANAGED_REPLICAS.put(key, currentReplicas);
+ return;
+ }
+
+ ComponentAutoscaler autoscaler = autoscalers.computeIfAbsent(key,
+ k -> new ComponentAutoscaler(component, createStrategy(component,
cluster)));
+
+ ComponentAutoscaler.EvaluationResult result =
+ autoscaler.evaluate(metrics, autoscaling, currentReplicas,
maxReplicas);
+
+ // Build status
+ if (result.patchTo() != null) {
+ lastScaleTimes.put(key, Instant.now().toString());
+ }
+ AutoscalingStatus as = new AutoscalingStatus();
+ as.setCurrentMetricValue(result.rawMetricValue());
+ // Only show scaleUpThreshold for strategies that use it (TezAM is
demand-based, no threshold)
+ if (autoscaler.usesScaleUpThreshold()) {
+ as.setScaleUpThreshold(autoscaling.scaleUpThreshold());
+ }
+ // CPU metrics (only for HS2 and HMS — LLAP/TezAM don't use CPU-based
scaling)
+ if ((ConfigUtils.COMPONENT_HIVESERVER2.equals(component) ||
ConfigUtils.COMPONENT_METASTORE.equals(component))
+ && autoscaling.cpuScaleUpThreshold() > 0) {
+ as.setCurrentCpuPercent(result.cpuPercent());
+ as.setCpuScaleUpThreshold(autoscaling.cpuScaleUpThreshold());
+ as.setCpuProposedReplicas(result.cpuProposedReplicas());
+ }
+ as.setProposedReplicas(result.proposedReplicas());
+ as.setLastScaleTime(lastScaleTimes.get(key));
+ statuses.put(component, as);
+
+ if (result.patchTo() != null) {
+ int patchValue = result.patchTo();
+ patches.put(component, patchValue);
+ MANAGED_REPLICAS.put(key, patchValue);
+ } else {
+ // No change needed — record current replicas as the managed value
+ MANAGED_REPLICAS.put(key, currentReplicas);
+ }
+ }
+
+ private ScalingStrategy createStrategy(String component, HiveCluster
cluster) {
+ return switch (component) {
+ case ConfigUtils.COMPONENT_HIVESERVER2 -> new HiveServer2ScalingStrategy();
+ case ConfigUtils.COMPONENT_METASTORE -> new MetastoreScalingStrategy();
+ case ConfigUtils.COMPONENT_LLAP -> new LlapScalingStrategy(this, cluster);
+ case ConfigUtils.COMPONENT_TEZAM -> new TezAmScalingStrategy(this,
cluster);
+ default -> throw new IllegalArgumentException("Unknown component: " +
component);
+ };
+ }
+
+ private int getCurrentReplicas(KubernetesClient client, String namespace,
+ String clusterName, String component) {
+ String workloadName = clusterName + "-" + component;
+ if (ConfigUtils.COMPONENT_LLAP.equals(component) ||
ConfigUtils.COMPONENT_TEZAM.equals(component)) {
+ var ss = client.apps().statefulSets()
+ .inNamespace(namespace).withName(workloadName).get();
+ return ss != null && ss.getSpec().getReplicas() != null ?
ss.getSpec().getReplicas() : 0;
+ } else {
+ var deploy = client.apps().deployments()
+ .inNamespace(namespace).withName(workloadName).get();
+ return deploy != null && deploy.getSpec().getReplicas() != null
+ ? deploy.getSpec().getReplicas() : 0;
+ }
+ }
+
+ /**
+ * Patches each pod's deletion cost annotation based on its active session
count.
+ * Kubernetes uses this during scale-down to kill idle pods first (lower
cost = killed first).
+ * <p>
+ * Only meaningful for Deployments (HS2, Metastore) — the ReplicaSet
controller
+ * respects this annotation. StatefulSets ignore it and always terminate by
ordinal.
+ */
+ private void updateDeploymentPodDeletionCost(KubernetesClient client, String
namespace,
+ List<PodMetrics> metrics, String metricName) {
+ for (PodMetrics pm : metrics) {
+ int sessions = pm.metrics().getOrDefault(metricName, 0.0).intValue();
+ try {
+ client.pods().inNamespace(namespace).withName(pm.podName())
+ .edit(pod -> {
+ pod.getMetadata().getAnnotations()
+ .put("controller.kubernetes.io/pod-deletion-cost",
String.valueOf(sessions));
+ return pod;
+ });
Review Comment:
updateDeploymentPodDeletionCost() assumes the Pod has a non-null annotations
map. Pods frequently have no annotations, which would cause a
NullPointerException inside the edit() lambda and prevent deletion-cost tuning
from working.
##########
packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/MetastoreScalingStrategy.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.kubernetes.operator.autoscaling;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hive.kubernetes.operator.model.spec.AutoscalingSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Scaling strategy for Hive Metastore.
+ * HMS uses HTTP transport — connections are per-request (stateless), so
+ * open_connections is always ~0. Instead we compute API request rate:
+ * rate = (sum(api_*_total) - previous_sum) / elapsed_seconds.
+ * desired = ceil(rate / scaleUpThreshold)
+ */
+public class MetastoreScalingStrategy implements ScalingStrategy {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MetastoreScalingStrategy.class);
+ private static final String API_COUNTER_PREFIX = "api_";
+ private static final String API_COUNTER_SUFFIX = "_total";
+
+ // Previous scrape state for rate computation
+ private final ConcurrentHashMap<String, Double> previousCounters = new
ConcurrentHashMap<>();
+ private long previousTimestampMs = 0;
+ private int lastMetric;
+
+ @Override
+ public int computeDesiredReplicas(List<PodMetrics> podMetrics,
+ AutoscalingSpec autoscaling, int maxReplicas) {
+
+ // Sum all api_*_total counters across all pods
+ double currentTotal = 0;
+ for (PodMetrics pm : podMetrics) {
+ for (Map.Entry<String, Double> entry : pm.metrics().entrySet()) {
+ String name = entry.getKey();
+ if (name.startsWith(API_COUNTER_PREFIX) &&
name.endsWith(API_COUNTER_SUFFIX)) {
+ currentTotal += entry.getValue();
+ }
+ }
+ }
+
+ long now = System.currentTimeMillis();
+ double rate = 0;
+
+ if (previousTimestampMs > 0) {
+ double elapsedSeconds = (now - previousTimestampMs) / 1000.0;
+ if (elapsedSeconds > 0) {
+ double previousTotal = previousCounters.values().stream()
+ .mapToDouble(Double::doubleValue).sum();
+ double delta = currentTotal - previousTotal;
+ if (delta < 0) {
+ // Counter reset (pod restart) — skip this sample
+ delta = 0;
+ }
+ rate = delta / elapsedSeconds;
+ }
+ }
+
+ // Store current state for next evaluation
+ previousCounters.clear();
+ previousCounters.put("_total", currentTotal);
+ previousTimestampMs = now;
+
+ lastMetric = (int) Math.round(rate);
+
+ if (rate <= 0) {
+ return autoscaling.minReplicas();
+ }
+
+ LOG.debug("[metastore] API request rate: {}/s, threshold: {}",
+ String.format("%.2f", rate), autoscaling.scaleUpThreshold());
+
+ int desired = (int) Math.ceil(rate / autoscaling.scaleUpThreshold());
+ return Math.max(desired, autoscaling.minReplicas());
Review Comment:
computeDesiredReplicas() divides by autoscaling.scaleUpThreshold() without
guarding against 0/negative values. A misconfigured threshold would cause a
division-by-zero and break the autoscaling loop for Metastore.
##########
packaging/src/kubernetes/pom.xml:
##########
@@ -65,9 +65,14 @@
<version>${fabric8.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>2.0.16</version>
+ </dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-slf4j-impl</artifactId>
+ <artifactId>log4j-slf4j2-impl</artifactId>
<version>${log4j2.version}</version>
Review Comment:
This module hard-codes slf4j-api 2.0.16 while the rest of the build is
managed via the top-level slf4j.version property (currently 1.7.x). Having a
one-off version here makes dependency convergence harder and can lead to mixed
SLF4J major versions across modules/artifacts; consider aligning via
dependencyManagement (or introducing a dedicated property for the operator) so
the upgrade is intentional and consistent.
##########
packaging/src/kubernetes/README.md:
##########
@@ -505,19 +505,602 @@ kubectl get hiveclusters
kubectl describe hivecluster hive
```
+---
+
+## Autoscaling
+
+The operator supports metric-based autoscaling for all four Hive components
using
+an **operator-driven control loop** that scrapes JMX Exporter metrics directly
from
+pods. No Prometheus server or external autoscaling tools are needed.
Autoscaling is
+opt-in per component and designed for **zero query failures** during
scale-down.
+
+### Prerequisites
+
+- No external dependencies — the operator handles all scaling decisions
internally
+
+### How It Works
+
+When `autoscaling.enabled: true` is set for a component, the operator:
+1. Attaches the JMX Exporter javaagent (port 9404) to each pod
+2. Polls `/metrics` on each pod at `metricsScrapeIntervalSeconds` intervals
+3. Computes desired replicas using component-specific formulas
+4. Applies HPA-like stabilization windows (scale-up/scale-down)
+5. Patches the workload `spec.replicas` directly
+
+### Graceful Scale-Down Architecture
+
+```
+ Scale Down Flow
+ 1. Operator reduces desired replicas (metric below threshold,
+ stabilization window elapsed)
+ 2. PodDisruptionBudget ensures minAvailable=1 (at least one pod
+ always running)
+ 3. Kubernetes sends SIGTERM to selected pod
+ 4. preStop hook runs:
+ - HS2: deregisters from ZK, drains open sessions, kills JVM
+ - HMS: kills JVM (stateless HTTP — no drain needed)
+ - LLAP: waits until all executors become idle, kills JVM
+ - TezAM: no drain (DAGAppMaster does not expose JMX metrics)
+ 5. terminationGracePeriodSeconds = gracePeriodSeconds (safety cap)
+ 6. Pod terminates immediately once drain completes (does NOT wait
+ the full grace period — it's only the upper safety bound)
+```
+
+> **Note:** Shell entrypoints (PID 1) in containers don't forward SIGTERM to
child
+> processes. The preStop hook explicitly sends SIGTERM to the Hive/Tez Java
process
+> after drain completes, ensuring prompt shutdown without waiting for the
grace period
+> to expire.
+
+### Scaling Timers
+
+The autoscaling system uses three independent timing controls:
+
+| Timer | Config Field | Default | Purpose |
+|-------|-------------|---------|---------|
+| **Metrics scrape interval** | `metricsScrapeIntervalSeconds` | `10` | How
often the operator scrapes JMX Exporter `/metrics` on each pod. This is the
**biggest bottleneck** for autoscaling reaction time. |
+| **Scale-up stabilization** | `scaleUpStabilizationSeconds` | `60` | Window:
picks the highest recommendation within this period before scaling up. Prevents
flapping when metrics oscillate. Set to `0` for LLAP and TezAM (reactive
dependents). |
+| **Scale-down stabilization** | `scaleDownStabilizationSeconds` | `300-900` |
Window: picks the most conservative (highest) recommendation within this period
before scaling down. Also acts as the cooldown between consecutive scale-downs
— no separate cooldown needed. |
+
+**How they interact:**
+- Load spike detected → operator scrapes metrics within
`metricsScrapeIntervalSeconds` → waits `scaleUpStabilizationSeconds` then
scales up
+- Load drops → operator waits `scaleDownStabilizationSeconds` (stabilization
window must confirm low demand consistently) then scales down
+
+**Tuning reaction time:** With defaults (`metricsScrapeIntervalSeconds: 10`,
`scaleUpStabilizationSeconds: 0` for LLAP/TezAM), scale-up latency is ~10-20s
(one scrape cycle). For HS2 with `scaleUpStabilizationSeconds: 60`, expect ~70s.
+
+### Per-Component Scaling Logic
+
+| Component | Scale-Up Formula | Scale-Down | JMX Metric |
+|-----------|-----------------|------------|------------|
+| **HiveServer2** | `max(ceil(sessions / threshold), cpu_desired)` | Sessions
drop to 0 AND CPU below threshold → scale to minReplicas | `hs2_open_sessions`,
`jvm_process_cpu_load` |
+| **Metastore** | `max(ceil(api_rate / threshold), cpu_desired)` | Rate drops
to 0 AND CPU below threshold → scale to minReplicas | `api_*_total`,
`jvm_process_cpu_load` |
+| **LLAP** | `ceil(avg(queued + configured - available) / scaleUpThreshold)` |
All executors idle + no HS2 sessions | `hadoop_llapdaemon_executor*` |
+| **Tez AM** | `max(sum(hs2_open_sessions), count(HS2_pods) *
sessions_per_queue)` | All HS2 sessions closed | `hs2_open_sessions` (from HS2
pods) |
+
+**TezAM Scaling Model:** TezAM uses demand-driven scaling with two formulas
(max wins):
+1. **Session demand** — `sum(hs2_open_sessions)`: scales to match the total
number of
+ concurrent sessions across all HS2 pods (each session needs its own
exclusive TezAM).
+2. **Pre-warm** — `count(HS2 pods with sessions) ×
hive.server2.tez.sessions.per.default.queue` (default 1):
+ ensures every active HS2 pod has enough TezAM sessions pre-claimed from
ZooKeeper.
+
+The operator takes the maximum across both formulas. This ensures TezAM
capacity
+is always sufficient for both current demand and eager session pre-warming.
+TezAM scaling is purely demand-driven from HS2 metrics.
+
+### Scale-to-Zero Architecture
+
+When `minReplicas: 0` is configured (LLAP, TezAM), the cluster scales those
+components down to zero pods when HS2 has no active sessions. HS2 itself always
+maintains at least 1 replica (`minReplicas >= 1`) so it is always available to
+accept connections.
+
+```
+ Scale-to-Zero (Idle Detection)
+
+ 1. HS2 reports hs2_open_sessions = 0 for scaleDownStabilization
+ → operator scales HS2 to minReplicas (>= 1)
+
+ 2. Operator sees hs2_open_sessions = 0 on next LLAP/TezAM eval
+ → activation gate fails
+ → scale LLAP and TezAM to 0 (if minReplicas=0)
+
+ 3. HMS stays at minReplicas=1 (always available)
+
+```
+
+```
+ Wake-from-Zero (LLAP/TezAM)
+
+ 1. Beeline connects to HS2 (always running, at least 1 pod)
+
+ 2. HS2 reports hs2_open_sessions > 0 via JMX Exporter
+
+ 3. Operator detects HS2 sessions on next scrape cycle:
+ - LLAP activation gate passes → scales up from 0
+ - TezAM activation gate passes → scales up from 0
+
+ 4. Query executes once LLAP/TezAM pods are ready
+
+```
+
+**Session protection:** The HS2 Service uses `sessionAffinity: ClientIP` to
ensure
+beeline clients always reach the same pod. The preStop hook deregisters the
pod from
+ZooKeeper (preventing new sessions) and waits for `hs2_open_sessions` to drain
to 0
+before terminating. The `gracePeriodSeconds` (default 3600s) is a safety cap —
the pod
+terminates immediately once sessions drain, not after the full grace period.
+
+**Component-specific behavior:**
+
+| Component | minReplicas | Scale-to-Zero Trigger | Wake Trigger |
+|-----------|-------------|----------------------|--------------|
+| **HS2** | 1 | N/A (always running) | N/A |
+| **HMS** | 1 | Never (always running) | N/A |
+| **LLAP** | 0 | No HS2 sessions (activation gate fails) | HS2 has open
sessions (next scrape) |
+| **TezAM** | 0 | No HS2 sessions (activation gate fails) | HS2 has open
sessions (next scrape) |
+
+### Auto-Suspend (Full Cluster Hibernation)
+
+Auto-suspend goes beyond scale-to-zero — it fully hibernates the **entire**
cluster
+(including HS2 and HMS) to 0 replicas after a configurable idle timeout. This
is
+useful for dev/test clusters that should not consume resources when nobody is
using
+them.
+
+**Prerequisites:** Auto-suspend requires autoscaling to be enabled on ALL
active
+components (HS2, LLAP if enabled, TezAM if enabled, and HMS if
`includeMetastore=true`).
+The operator will not auto-suspend unless it can confirm all components are at
their
+minimum state.
+
+**Idle criteria (all must hold simultaneously for `idleTimeoutMinutes`):**
+
+| Component | Idle Condition |
+|-----------|---------------|
+| **HS2** | At `minReplicas` with 0 open sessions |
+| **HMS** | At `minReplicas` (only checked if `includeMetastore=true`) |
+| **LLAP** | At `minReplicas` (default 0) |
+| **TezAM** | At `minReplicas` (default 0) |
+
+**Important:** HS2 can **only** scale to 0 replicas via auto-suspend. Normal
+autoscaling always maintains `minReplicas >= 1` for HS2. Auto-suspend is the
+only mechanism that overrides this to achieve full hibernation.
+
+```
+ Auto-Suspend Flow
+
+ 1. Autoscaling scales all components to their minReplicas
+ (HS2≥1, HMS≥1, LLAP/TezAM to configured min)
+
+ 2. Operator detects idle state:
+ - HS2 has 0 open sessions
+ - HMS at minReplicas (if includeMetastore=true)
+ - LLAP/TezAM at minReplicas
+
+ 3. Idle timer starts (status: clusterPhase=Idle, idleSince=<now>)
+
+ 4. After idleTimeoutMinutes (default 15):
+ - ALL components scaled to 0 (HMS excluded if includeMetastore=false)
+ - spec.suspend set to true (cluster stays suspended until user wakes it)
+ - Status: clusterPhase=Suspended, suspendedSince=<now>
+
+ 5. To wake: kubectl patch hivecluster hive --type=merge -p
'{"spec":{"suspend":false}}'
+ All components restored to minReplicas
+ (HS2/HMS ≥1, LLAP/TezAM ≥1 for immediate usability)
+
+```
+
+**Configuration:**
+
+```yaml
+cluster:
+ autoSuspend:
+ enabled: true
+ idleTimeoutMinutes: 15 # minutes idle before full hibernation
+ includeMetastore: true # set false to keep HMS running during suspend
+```
+
+**Manual Suspend/Wake Commands:**
+
+```bash
+# Suspend immediately (bypasses idle timer)
+kubectl patch hivecluster hive --type=merge -p '{"spec":{"suspend":true}}'
+
+# Wake cluster (restores to minReplicas)
+kubectl patch hivecluster hive --type=merge -p '{"spec":{"suspend":false}}'
+```
+
+Manual suspend works regardless of whether `autoSuspend.enabled` is true — it
+immediately scales all components to 0 without waiting for the idle timeout.
+When `includeMetastore: false`, HMS stays running even during manual suspend.
+
+**Observing cluster state:**
+
+```bash
+# Quick view — printer columns show phase and idle time
+kubectl get hivecluster
+```
+```
+NAME PHASE IDLE (MIN) AGE
+hive Idle 12 2h
+```
+
+```bash
+# After suspend triggers
+kubectl get hivecluster
+```
+```
+NAME PHASE IDLE (MIN) AGE
+hive Suspended 2h
+```
+
+```bash
+# Full status (kubectl get hivecluster hive -o yaml)
+```
+```yaml
+status:
+ clusterPhase: Suspended
+ idleSince: "2026-06-08T10:00:00Z"
+ idleForMinutes: 15
+ suspendedSince: "2026-06-08T10:15:00Z"
+ conditions:
+ - type: Suspended
+ status: "True"
+ reason: AutoSuspend # or ManualSuspend
+ message: "Cluster suspended after idle timeout"
+ lastTransitionTime: "2026-06-08T10:15:00Z"
+```
+
+When the cluster is running normally:
+```
+NAME PHASE IDLE (MIN) AGE
+hive Running 2h
+```
+
+**Full example (autoscaling + auto-suspend):**
+
+```yaml
+cluster:
+ autoSuspend:
+ enabled: true
+ idleTimeoutMinutes: 15
+ includeMetastore: false # keep HMS running during suspend
+
+ hiveServer2:
+ replicas: 10
+ autoscaling:
+ enabled: true
+ minReplicas: 1
+
+ metastore:
+ replicas: 6
+ autoscaling:
+ enabled: true
+ minReplicas: 1
+
+ llap:
+ replicas: 8
+ autoscaling:
+ enabled: true
+ minReplicas: 0 # scales to 0 via normal autoscaling when HS2 idle
+
+ tezAm:
+ replicas: 10
+ autoscaling:
+ enabled: true
+ minReplicas: 0 # scales to 0 via normal autoscaling when HS2 idle
+```
+
+With this configuration, the cluster lifecycle is:
+1. Under load → all components scaled up by autoscaler
+2. Load drops → autoscaler scales to minReplicas (HS2=1, HMS=1, LLAP=0,
TezAM=0)
+3. HS2 idle (0 sessions) for 15 minutes → auto-suspend kicks in → HS2, LLAP,
TezAM to 0 (HMS stays at minReplicas)
+4. `kubectl patch hivecluster hive --type=merge -p
'{"spec":{"suspend":false}}'` → wake → HS2=1, LLAP=1, TezAM=1
+5. User connects → autoscaler detects sessions → scales up as needed
+
+### CPU-Based Scaling (HS2 and HMS)
+
+In addition to the primary metrics (sessions for HS2, API request rate for
HMS),
+the operator supports a secondary **CPU-based scaling signal** for HiveServer2
and
+Metastore. The final desired replica count is:
+
+```
+final_desired = max(metric_desired, cpu_desired)
+```
+
+Either signal can trigger scale-up; neither can force scale-down below what the
+other recommends. CPU-based scaling uses the same stabilization windows as
metric-based
+scaling (no separate CPU stabilization).
+
+**How it works:**
+
+1. The operator scrapes `ProcessCpuLoad` from `java.lang:type=OperatingSystem`
via JMX
+ Exporter (exported as `jvm_process_cpu_load`, a 0.0–1.0 fraction)
+2. Averages across all pods, converts to percentage (0–100)
+3. If avg CPU >= `cpuScaleUpThreshold`: scales up proportionally
+ (`ceil(avgCpu * currentReplicas / cpuScaleUpThreshold)`)
+4. If avg CPU < `cpuScaleDownThreshold`: scales down
+ (`ceil(avgCpu * currentReplicas / cpuScaleUpThreshold)`, floored at
`minReplicas`)
+5. Between thresholds: holds current replica count
+
+**Configuration:**
+
+| Value | Default | Description |
+|-------|---------|-------------|
+| `cluster.<component>.autoscaling.cpuScaleUpThreshold` | `90` | CPU
percentage (0-100) that triggers scale-up. Set to `0` to disable CPU-based
scaling. |
+| `cluster.<component>.autoscaling.cpuScaleDownThreshold` | `30` | CPU
percentage (0-100) below which scale-down is considered. |
+
+**Example:**
+
+```yaml
+cluster:
+ hiveServer2:
+ replicas: 10
+ resources:
+ limitsCpu: "2" # Recommended: set CPU limits so ProcessCpuLoad is
relative to pod allocation
+ autoscaling:
+ enabled: true
+ cpuScaleUpThreshold: 90
+ cpuScaleDownThreshold: 30
+
+ metastore:
+ replicas: 6
+ resources:
+ limitsCpu: "2"
+ autoscaling:
+ enabled: true
+ cpuScaleUpThreshold: 90
+ cpuScaleDownThreshold: 30
+```
+
+**Important: CPU limits and metric accuracy**
+
+`ProcessCpuLoad` reports CPU usage as a fraction of **available processors**.
Without
+CPU limits, the JVM sees all node cores (e.g., 8 cores), so even heavy
single-pod
+load only shows ~12.5%. With `limitsCpu: "2"`, the JVM sees 2 processors and
the
+metric becomes "% of allocated CPU" — making thresholds meaningful.
+
+| Pod CPU Limit | JVM sees | 90% threshold means |
+|---------------|----------|---------------------|
+| None (no limit) | All node cores (e.g., 8) | Using 7.2 of 8 cores — very
hard to reach |
+| `2` | 2 cores | Using 1.8 of 2 allocated cores |
+| `4` | 4 cores | Using 3.6 of 4 allocated cores |
+
+**Recommendation:** Always set `resources.limitsCpu` when using CPU-based
autoscaling.
+
+**Status output:**
+
+The operator reports CPU metrics in the HiveCluster status:
+
+```yaml
+status:
+ hiveServer2:
+ autoscaling:
+ currentMetricValue: 5 # total sessions
+ scaleUpThreshold: 100
+ currentCpuPercent: 72.45 # avg ProcessCpuLoad * 100
+ cpuScaleUpThreshold: 90
+ cpuProposedReplicas: 2 # what CPU alone would recommend
+ proposedReplicas: 2
+ lastScaleTime: "2026-05-31T04:23:07Z"
+```
+
+**Applicability:** CPU-based scaling only applies to HS2 and HMS. LLAP and
TezAM
+do not use CPU as a scaling signal (LLAP scales on busy executor slots which
already
+correlates with CPU; TezAM is demand-based from HS2 session count).
+
+---
+
+### Enabling Autoscaling
+
+**CLI (with Ozone storage backend):**
+
+Each component has sensible per-component defaults (see [Configuration
Reference](#configuration-reference)).
+Only `enabled=true` is needed to turn on autoscaling:
+
+```bash
+helm install hive ./helm/hive-operator \
+ --set cluster.database.type=postgres \
+ --set
cluster.database.url="jdbc:postgresql://postgres-postgresql:5432/metastore" \
+ --set cluster.database.driver="org.postgresql.Driver" \
+ --set cluster.database.username=hive \
+ --set cluster.database.passwordSecretRef.name=hive-db-secret \
+ --set cluster.database.passwordSecretRef.key=password \
+ --set
cluster.database.driverJarUrl="https://repo1.maven.org/maven2/org/postgresql/postgresql/42.7.5/postgresql-42.7.5.jar"
\
+ --set cluster.zookeeper.quorum="zookeeper:2181" \
+ --set cluster.storage.coreSiteOverrides."fs\.defaultFS"="s3a://hive" \
+ --set
cluster.storage.coreSiteOverrides."fs\.s3a\.endpoint"="http://ozone-s3g-rest:9878"
\
+ --set-string
cluster.storage.coreSiteOverrides."fs\.s3a\.path\.style\.access"=true \
+ --set 'cluster.storage.envVars[0].name=HADOOP_OPTIONAL_TOOLS' \
+ --set 'cluster.storage.envVars[0].value=hadoop-aws' \
+ --set 'cluster.storage.envVars[1].name=AWS_ACCESS_KEY_ID' \
+ --set 'cluster.storage.envVars[1].value=ozone' \
+ --set 'cluster.storage.envVars[2].name=AWS_SECRET_ACCESS_KEY' \
+ --set 'cluster.storage.envVars[2].value=ozone' \
+ --set cluster.hiveServer2.autoscaling.enabled=true \
+ --set cluster.metastore.autoscaling.enabled=true \
+ --set cluster.llap.autoscaling.enabled=true \
+ --set cluster.tezAm.autoscaling.enabled=true
+```
+
+**Values file (for customizing beyond defaults):**
+
+```yaml
+# values-autoscaling.yaml — only override what you need
+cluster:
+ database:
+ type: postgres
+ url: "jdbc:postgresql://postgres-postgresql:5432/metastore"
+ driver: "org.postgresql.Driver"
+ username: hive
+ passwordSecretRef:
+ name: hive-db-secret
+ key: password
+ driverJarUrl:
"https://repo1.maven.org/maven2/org/postgresql/postgresql/42.7.5/postgresql-42.7.5.jar"
+
+ zookeeper:
+ quorum: "zookeeper:2181"
+
+ storage:
+ coreSiteOverrides:
+ fs.defaultFS: "s3a://hive"
+ fs.s3a.endpoint: "http://ozone-s3g-rest:9878"
+ fs.s3a.path.style.access: "true"
+ envVars:
+ - name: HADOOP_OPTIONAL_TOOLS
+ value: "hadoop-aws"
+ - name: AWS_ACCESS_KEY_ID
+ value: "ozone"
+ - name: AWS_SECRET_ACCESS_KEY
+ value: "ozone"
+
+ hiveServer2:
+ replicas: 10 # Acts as maxReplicas when autoscaling is enabled
+ autoscaling:
+ enabled: true
+ # minReplicas: 1 # default — always keep at least 1 HS2 running
+ # scaleUpThreshold: 80 # default — avg open sessions per pod triggering
scale-up
+ # scaleUpStabilizationSeconds: 60 # default — scale-up window
+ # scaleDownStabilizationSeconds: 600 # default — scale-down window (also
acts as cooldown)
+ # metricsScrapeIntervalSeconds: 10 # default — operator scrape interval
(lower = faster reaction)
+
+ metastore:
+ replicas: 6 # Acts as maxReplicas when autoscaling is enabled
+ autoscaling:
+ enabled: true
+ # minReplicas: 1 # default — always keep at least 1 metastore
running
+ # scaleUpThreshold: 75 # default — API request rate (req/s) triggering
scale-up
+ # scaleUpStabilizationSeconds: 60 # default — scale-up window
+ # scaleDownStabilizationSeconds: 300 # default — scale-down window (also
acts as cooldown)
+ # gracePeriodSeconds: 60 # default — fast drain (HMS is stateless)
+ # metricsScrapeIntervalSeconds: 10 # default — operator scrape interval
+
+ llap:
+ replicas: 8 # Acts as maxReplicas when autoscaling is enabled
+ autoscaling:
+ enabled: true
+ # minReplicas: 0 # default — scale to zero when no HS2 sessions
+ # scaleUpThreshold: 1 # default — total busy slots (queued+running)
triggering scale-up
+ # scaleUpStabilizationSeconds: 60 # default — scale-up window
+ # scaleDownStabilizationSeconds: 900 # default — scale-down window (long
— scaling down destroys cache)
+ # gracePeriodSeconds: 600 # default — 10 min drain for in-flight
fragments
+ # metricsScrapeIntervalSeconds: 10 # default — operator scrape interval
(lower = faster reaction)
+
+ tezAm:
+ replicas: 10 # Acts as maxReplicas when autoscaling is enabled
+ autoscaling:
+ enabled: true
+ # minReplicas: 0 # default — scale to zero when no HS2 sessions
+ # scaleUpThreshold: 1 # default — threshold for demand metric (1 =
match HS2 pod count)
+ # scaleUpStabilizationSeconds: 60 # default — HPA scale-up window
+ # scaleDownStabilizationSeconds: 300 # default — HPA scale-down window
+ # gracePeriodSeconds: 120 # default — 2 min drain for DAG completion
+ # metricsScrapeIntervalSeconds: 10 # default — operator scrape interval
(lower = faster reaction)
+```
+
+```bash
+helm install hive ./helm/hive-operator -f values-autoscaling.yaml
+```
+
+When autoscaling is enabled, the operator automatically:
+- Deploys the JMX Exporter javaagent (port 9404, `/metrics`)
+- Enables `hive.server2.metrics.enabled` / `metastore.metrics.enabled` (JMX
reporter)
+- Attaches JMX Exporter javaagent (port 9404, `/metrics`) to each pod
+- Creates PodDisruptionBudgets (minAvailable: 1)
+- Configures preStop lifecycle hooks for graceful drain
+- Sets `terminationGracePeriodSeconds` to the configured grace period
+- LLAP/TezAM use HS2 metrics as activation gate (only scale when HS2 has
sessions)
+
+**JMX Metrics Scraped by Operator (per component):**
+
+| Component | Key Metrics | Purpose |
+|-----------|---------|---------|
+| **HiveServer2** | `hs2_open_sessions`, `jvm_process_cpu_load` | Session
count for primary scaling + CPU for secondary scaling signal |
+| **Metastore** | `api_*_total`, `jvm_process_cpu_load` | API call counters
(operator computes request rate from deltas) + CPU for secondary scaling signal
|
+| **LLAP** | `hadoop_llapdaemon_executornumqueuedrequests`,
`hadoop_llapdaemon_executornumexecutorsconfigured`,
`hadoop_llapdaemon_executornumexecutorsavailable` | Total busy slots = queued +
configured - available |
+| **Tez AM** | N/A (scales on HS2 metrics) | TezAM scaling is demand-driven
from `hs2_open_sessions` — no TezAM-specific metrics needed |
+
+### Enabling Autoscaling — Example
+
+To enable autoscaling for HS2 and Metastore:
+
+```yaml
+cluster:
+ hiveServer2:
+ replicas: 4 # max replicas ceiling
+ autoscaling:
+ enabled: true
+ scaleUpThreshold: 1 # scale up when total sessions > 1
+ minReplicas: 1 # always keep at least 1 HS2 pod running
+
+ metastore:
+ replicas: 3 # max replicas ceiling
+ autoscaling:
+ enabled: true
+ minReplicas: 1 # always keep at least 1 running
+ scaleUpThreshold: 75 # API requests/sec threshold
+```
+
+> **Note:** LLAP scales on total busy slots (queued + running executors).
+> TezAM scales on demand — the number of active HS2 pods multiplied by
+> `hive.server2.tez.sessions.per.default.queue` (default 1).
+
+### Helm Values Reference (Autoscaling)
+
+| Value | Default | Description |
+|-------|---------|-------------|
+| `cluster.<component>.replicas` | `1-2` | Static replica count, or max
replicas ceiling when autoscaling is enabled |
+| `cluster.<component>.autoscaling.enabled` | `false` | Enable operator-driven
autoscaling |
+| `cluster.<component>.autoscaling.minReplicas` | `1` (HS2/HMS), `0`
(LLAP/TezAM) | Minimum replica count. Set to 0 for scale-to-zero (LLAP, TezAM
only; HS2 minimum is 1) |
+| `cluster.<component>.autoscaling.scaleUpThreshold` | varies | Metric
threshold triggering scale-up |
+| `cluster.<component>.autoscaling.scaleUpStabilizationSeconds` | `60` |
Stabilization window for scale-up (picks highest recommendation in window) |
+| `cluster.<component>.autoscaling.scaleDownStabilizationSeconds` | `300-900`
| Stabilization window for scale-down (picks most conservative recommendation
in window). Also acts as cooldown between consecutive scale-downs. |
+| `cluster.<component>.autoscaling.gracePeriodSeconds` | `3600` | Safety cap:
max drain time before forced termination. Pod exits immediately once drain
completes. |
+| `cluster.<component>.autoscaling.metricsScrapeIntervalSeconds` | `10` | How
often the operator scrapes JMX metrics from pods. Lower = faster reaction. |
+| `cluster.<component>.autoscaling.cpuScaleUpThreshold` | `90` | CPU
percentage (0-100) triggering scale-up. Only HS2/HMS. Set to 0 to disable. |
+| `cluster.<component>.autoscaling.cpuScaleDownThreshold` | `30` | CPU
percentage (0-100) below which scale-down is considered. Only HS2/HMS. |
+
+---
+
## Connect to HiveServer2
+HiveServer2 runs in **HTTP transport mode** by default (recommended for
Kubernetes
+environments as it works well with load balancers, ingress controllers, and
proxies).
+
+### Standard Connection (minReplicas >= 1)
+
+When HS2 always has at least one pod running, connect directly to the service:
+
```bash
-kubectl exec -it deployment/hive-hiveserver2 -- beeline -u
"jdbc:hive2://hive-hiveserver2:10000/"
+kubectl exec -it deployment/hive-hiveserver2 -- beeline -u
"jdbc:hive2://hive-hiveserver2:10001/;transportMode=http;httpPath=cliservice"
```
Review Comment:
PR metadata says there is no user-facing change, but this change set
switches the documented/default HiveServer2 connection to HTTP transport on
port 10001 (and also configures transport-mode defaults in the operator). That
affects how users connect (JDBC URL/ports) and should be called out as
user-facing behavior in the PR description/release notes.
##########
packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/BackgroundMetricsScraper.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.kubernetes.operator.autoscaling;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Runs periodic metrics scraping in the background so that the JOSDK reconcile
+ * thread is never blocked by HTTP calls to pod JMX exporters.
+ * <p>
+ * Each component gets its own scheduled task that writes results to a shared
+ * {@link MetricsCache}. The reconciler reads from that cache (non-blocking).
+ */
+public class BackgroundMetricsScraper {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(BackgroundMetricsScraper.class);
+
+ private final ScheduledExecutorService scheduler;
+ private final MetricsScraper scraper;
+ private final MetricsCache cache;
+ // Key: "namespace/clusterName/component" → active scrape task
+ private final ConcurrentHashMap<String, ScheduledFuture<?>> activeTasks =
+ new ConcurrentHashMap<>();
+ // Tracks registered intervals to detect spec changes
+ private final ConcurrentHashMap<String, Integer> registeredIntervals =
+ new ConcurrentHashMap<>();
+
+ public BackgroundMetricsScraper(MetricsScraper scraper, MetricsCache cache) {
+ this.scraper = scraper;
+ this.cache = cache;
+ this.scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r, "hive-metrics-scraper");
+ t.setDaemon(true);
+ return t;
+ });
Review Comment:
BackgroundMetricsScraper uses a single-thread ScheduledExecutorService. With
autoscaling enabled for multiple components, one slow scrape (up to the per-pod
timeout) can delay all other components and effectively violate the configured
metricsScrapeIntervalSeconds for them.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]