This is an automated email from the ASF dual-hosted git repository.
wuzhiguo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/bigtop-manager.git
The following commit(s) were added to refs/heads/main by this push:
new 0fb9a4a4 BIGTOP-4487: Disable hadoop short-circuit read when glibc
version lower than 2.34 (#257)
0fb9a4a4 is described below
commit 0fb9a4a439e775aed7bf19975d656daad9c77c67
Author: Dansanyo <[email protected]>
AuthorDate: Sun Aug 31 21:51:47 2025 +0800
BIGTOP-4487: Disable hadoop short-circuit read when glibc version lower
than 2.34 (#257)
---
.../stack/bigtop/v3_3_0/hadoop/HadoopParams.java | 267 +++++++++++++++++++++
1 file changed, 267 insertions(+)
diff --git
a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/HadoopParams.java
b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/HadoopParams.java
index 356c6e03..ced14bad 100644
---
a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/HadoopParams.java
+++
b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/HadoopParams.java
@@ -18,11 +18,13 @@
*/
package org.apache.bigtop.manager.stack.bigtop.v3_3_0.hadoop;
+import org.apache.bigtop.manager.common.shell.ShellResult;
import org.apache.bigtop.manager.grpc.payload.ComponentCommandPayload;
import org.apache.bigtop.manager.stack.bigtop.param.BigtopParams;
import org.apache.bigtop.manager.stack.core.annotations.GlobalParams;
import org.apache.bigtop.manager.stack.core.spi.param.Params;
import org.apache.bigtop.manager.stack.core.utils.LocalSettings;
+import org.apache.bigtop.manager.stack.core.utils.linux.LinuxOSUtils;
import org.apache.commons.lang3.StringUtils;
@@ -125,6 +127,9 @@ public class HadoopParams extends BigtopParams {
((String)
hdfsSite.get("dfs.namenode.https-address")).replace("0.0.0.0",
namenodeList.get(0)));
}
+ // Configure native library dependent settings
+ configureNativeLibraryDependentSettings(hdfsSite);
+
dfsDataDir = (String) hdfsSite.get("dfs.datanode.data.dir");
dfsNameNodeDir = (String) hdfsSite.get("dfs.namenode.name.dir");
nameNodeFormattedDirs = Arrays.stream(dfsNameNodeDir.split(","))
@@ -193,4 +198,266 @@ public class HadoopParams extends BigtopParams {
public String getServiceName() {
return "hadoop";
}
+
+ /**
+ * Configure native library dependent settings for HDFS.
+ * This method intelligently detects libhadoop native library availability
+ * and automatically configures short-circuit reads and UNIX domain socket
settings.
+ * <p>
+ * Short-circuit read optimization explanation:
+ * - When client and DataNode are on the same node, network layer can be
bypassed
+ * to read local data blocks directly
+ * - Requires glibc version >= 2.34 to ensure native library compatibility
+ * - Uses UNIX domain sockets for inter-process communication to improve
performance
+ *
+ * @param hdfsSite The HDFS site configuration map to be modified
+ */
+ private void configureNativeLibraryDependentSettings(Map<String, Object>
hdfsSite) {
+ try {
+ // Detect system glibc version to determine native library support
+ boolean enableShortCircuit = isGlibcVersionCompatible();
+ String domainSocketPath = null;
+
+ if (enableShortCircuit) {
+ log.info("Detected glibc version >= 2.34, enabling
short-circuit read optimization");
+
+ // Get recommended domain socket path and append port
placeholder
+ domainSocketPath = findOptimalDomainSocketPath();
+ if (domainSocketPath != null) {
+ // _PORT placeholder will be replaced with actual port
number by DataNode at runtime
+ domainSocketPath = domainSocketPath + "/dn._PORT";
+ log.info("Enabling short-circuit reads with domain socket
path: {}", domainSocketPath);
+ }
+ } else {
+ log.info("glibc version < 2.34 or detection failed, disabling
short-circuit reads for compatibility");
+ }
+
+ // Apply short-circuit read configuration
+ applyShortCircuitConfiguration(hdfsSite, enableShortCircuit,
domainSocketPath);
+
+ } catch (Exception e) {
+ log.error("Error occurred during glibc version detection,
disabling short-circuit reads for safety", e);
+ applyShortCircuitConfiguration(hdfsSite, false, null);
+ }
+ }
+
+ /**
+ * Check if glibc version is >= 2.34 to determine native library support.
+ * <p>
+ * Detection logic:
+ * 1. First attempt to use 'ldd --version' command to get glibc version
+ * 2. If failed, try 'getconf GNU_LIBC_VERSION' as fallback method
+ * 3. Parse version number and compare with minimum required version (2.34)
+ *
+ * @return true if glibc version >= 2.34, false otherwise
+ */
+ private boolean isGlibcVersionCompatible() {
+ try {
+ // Method 1: Use ldd command to detect glibc version
+ ShellResult result = LinuxOSUtils.execCmd("ldd --version");
+ if (result.getExitCode() == 0) {
+ String output = result.getOutput();
+ String[] lines = output.split("\n");
+ for (String line : lines) {
+ // Look for lines containing glibc version information
+ if (line.contains("GNU libc") || line.contains("GLIBC")) {
+ String version = extractGlibcVersionFromLine(line);
+ if (version != null) {
+ boolean supported = compareVersionStrings(version,
"2.34") >= 0;
+ log.info("Detected glibc version via ldd: {},
supported: {}", version, supported);
+ return supported;
+ }
+ }
+ }
+ } else {
+ log.info("ldd --version command failed with exit code: {}",
result.getExitCode());
+ }
+
+ // Method 2: Try getconf as fallback detection method
+ return detectGlibcVersionViaGetconf();
+
+ } catch (Exception e) {
+ log.info("Exception during glibc version detection: {}",
e.getMessage());
+ return detectGlibcVersionViaGetconf();
+ }
+ }
+
+ /**
+ * Alternative method using getconf command to detect glibc version.
+ *
+ * @return true if detected version >= 2.34, false otherwise
+ */
+ private boolean detectGlibcVersionViaGetconf() {
+ try {
+ ShellResult result = LinuxOSUtils.execCmd("getconf
GNU_LIBC_VERSION");
+ if (result.getExitCode() == 0) {
+ String output = result.getOutput().trim();
+ if (output.startsWith("glibc ")) {
+ String version = output.substring(6).trim();
+ boolean supported = compareVersionStrings(version, "2.34")
>= 0;
+ log.info("Detected glibc version via getconf: {},
supported: {}", version, supported);
+ return supported;
+ }
+ }
+ } catch (Exception e) {
+ log.info("getconf method detection failed: {}", e.getMessage());
+ }
+
+ // Default to false for safety
+ log.warn("Could not determine glibc version, defaulting to disable
short-circuit reads");
+ return false;
+ }
+
+ /**
+ * Extract glibc version number from ldd output line.
+ * <p>
+ * Supported format examples:
+ * - "ldd (GNU libc) 2.35"
+ * - "ldd (Ubuntu GLIBC 2.35-0ubuntu3.1) 2.35"
+ * - "ldd (GNU libc) 2.34"
+ *
+ * @param line Single line of text from ldd command output
+ * @return Extracted version string like "2.35", or null if extraction
failed
+ */
+ private String extractGlibcVersionFromLine(String line) {
+ // Split line by whitespace and look for version pattern
+ String[] parts = line.split("\\s+");
+ for (String part : parts) {
+ // Match version pattern like "2.35"
+ if (part.matches("\\d+\\.\\d+.*")) {
+ // Extract major.minor version numbers
+ String cleanVersion = part.replaceAll("[^\\d.]", "");
+ // Ensure only major and minor versions are kept
+ String[] versionParts = cleanVersion.split("\\.");
+ if (versionParts.length >= 2) {
+ return versionParts[0] + "." + versionParts[1];
+ }
+ return cleanVersion;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Compare two version strings (major.minor format).
+ *
+ * @param v1 First version string
+ * @param v2 Second version string
+ * @return negative if v1 < v2, zero if equal, positive if v1 > v2
+ */
+ private int compareVersionStrings(String v1, String v2) {
+ String[] parts1 = v1.split("\\.");
+ String[] parts2 = v2.split("\\.");
+
+ int major1 = Integer.parseInt(parts1[0]);
+ int minor1 = parts1.length > 1 ? Integer.parseInt(parts1[1]) : 0;
+
+ int major2 = Integer.parseInt(parts2[0]);
+ int minor2 = parts2.length > 1 ? Integer.parseInt(parts2[1]) : 0;
+
+ // Compare major version first
+ if (major1 != major2) {
+ return major1 - major2;
+ }
+ // Compare minor version when major versions are equal
+ return minor1 - minor2;
+ }
+
+ /**
+ * Find the optimal domain socket path.
+ * <p>
+ * Path selection strategy:
+ * 1. Check candidate paths in priority order for existence and writability
+ * 2. If none are available, attempt to create default directory
+ * 3. Finally fall back to /tmp directory
+ *
+ * @return Recommended domain socket base path
+ */
+ private String findOptimalDomainSocketPath() {
+ // Candidate paths in priority order
+ String[] candidatePaths = {"/var/run/hadoop", "/tmp/hadoop", "/tmp"};
+
+ // Check availability of existing paths
+ for (String path : candidatePaths) {
+ java.io.File dir = new java.io.File(path);
+ if (dir.exists() && dir.canWrite()) {
+ log.info("Selected domain socket path: {}", path);
+ return path;
+ }
+ }
+
+ // Try to create default hadoop directory
+ java.io.File defaultDir = new java.io.File("/tmp/hadoop");
+ if (!defaultDir.exists()) {
+ try {
+ if (defaultDir.mkdirs()) {
+ log.info("Created and using domain socket path:
/tmp/hadoop");
+ return "/tmp/hadoop";
+ }
+ } catch (Exception e) {
+ log.warn("Cannot create directory /tmp/hadoop, using /tmp as
fallback", e);
+ }
+ }
+
+ log.info("Using fallback domain socket path: /tmp");
+ return "/tmp";
+ }
+
+ /**
+ * Apply short-circuit read settings in HDFS site configuration.
+ * <p>
+ * Configuration properties explanation:
+ * - dfs.client.read.shortcircuit: Whether to enable short-circuit reads
+ * - dfs.domain.socket.path: UNIX domain socket path
+ * - dfs.client.read.shortcircuit.streams.cache.size: Short-circuit read
stream cache size
+ *
+ * @param hdfsSite HDFS site configuration map
+ * @param enableShortCircuit Whether to enable short-circuit reads
+ * @param domainSocketPath Domain socket path (null to disable domain
socket)
+ */
+ private void applyShortCircuitConfiguration(
+ Map<String, Object> hdfsSite, boolean enableShortCircuit, String
domainSocketPath) {
+
+ // Configure short-circuit read main switch
+ hdfsSite.put("dfs.client.read.shortcircuit",
String.valueOf(enableShortCircuit));
+
+ if (enableShortCircuit && domainSocketPath != null) {
+ // Enable UNIX domain socket for high-performance short-circuit
reads
+ hdfsSite.put("dfs.domain.socket.path", domainSocketPath);
+ log.info("Short-circuit reads enabled with domain socket path:
{}", domainSocketPath);
+ } else {
+ // Remove domain socket path configuration to prevent DataNode
startup failures
+ // This avoids startup errors due to libhadoop loading issues
+ hdfsSite.remove("dfs.domain.socket.path");
+ if (enableShortCircuit) {
+ log.info("Short-circuit reads enabled (fallback mode, without
domain socket)");
+ } else {
+ log.info("Short-circuit reads disabled");
+ }
+ }
+
+ // Configure stream cache based on short-circuit read status
+ configureShortCircuitStreamCache(hdfsSite, enableShortCircuit);
+ }
+
+ /**
+ * Configure short-circuit read stream cache settings.
+ *
+ * @param hdfsSite HDFS site configuration map
+ * @param enableShortCircuit Whether short-circuit reads are enabled
+ */
+ private void configureShortCircuitStreamCache(Map<String, Object>
hdfsSite, boolean enableShortCircuit) {
+ if (enableShortCircuit) {
+ // Optimize cache size when short-circuit reads are enabled for
better performance
+ Object currentCacheSize =
hdfsSite.get("dfs.client.read.shortcircuit.streams.cache.size");
+ if (currentCacheSize == null ||
"0".equals(currentCacheSize.toString())) {
+
hdfsSite.put("dfs.client.read.shortcircuit.streams.cache.size", "4096");
+ log.info("Configured short-circuit read stream cache size to
4096");
+ }
+ } else {
+ // Set cache to 0 when short-circuit reads are disabled to save
memory
+ hdfsSite.put("dfs.client.read.shortcircuit.streams.cache.size",
"0");
+ log.info("Short-circuit read stream cache disabled");
+ }
+ }
}