This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 455cd4013 [CELEBORN-1111] Supporting connection to HDFS with Kerberos
authentication enabled
455cd4013 is described below
commit 455cd401376373e406ee0dc8c046ca966421af73
Author: joey.ljy <[email protected]>
AuthorDate: Sat Nov 4 17:21:41 2023 +0800
[CELEBORN-1111] Supporting connection to HDFS with Kerberos authentication
enabled
### What changes were proposed in this pull request?
Adding Kerberos support for HDFS storage type.
The following five parameters need to be configured:
| key | value |
| :--: | :--: |
| celeborn.storage.hdfs.kerberos.enabled | true |
| celeborn.storage.hdfs.kerberos.principal | userREALM |
| celeborn.storage.hdfs.kerberos.keytab | /path/test.keytab |
| celeborn.hadoop.hadoop.security.authorization | kerberos |
| celeborn.hadoop.dfs.namenode.kerberos.principal | hdfs/_HOSTREALM |
### Why are the changes needed?
Connecting to HDFS with Kerberos enabled requires support for keytab login.
### Does this PR introduce _any_ user-facing change?
Add 3 configurations.
celeborn.storage.hdfs.kerberos.enabled
celeborn.storage.hdfs.kerberos.principal
celeborn.storage.hdfs.kerberos.keytab
### How was this patch tested?
Test in Kerberos enabled HDFS cluster.
Closes #2072 from liujiayi771/hdfs-kerberos.
Authored-by: joey.ljy <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 31 ++++++++++++++++++++++
.../celeborn/common/util/CelebornHadoopUtils.scala | 28 +++++++++++++++++--
docs/configuration/master.md | 3 +++
docs/configuration/worker.md | 3 +++
4 files changed, 63 insertions(+), 2 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index e024f6f1e..6923fea3f 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1065,6 +1065,13 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
get(CLIENT_RESULT_PARTITION_SUPPORT_FLOATING_BUFFER)
def clientFlinkDataCompressionEnabled: Boolean =
get(CLIENT_DATA_COMPRESSION_ENABLED)
def clientShuffleMapPartitionSplitEnabled =
get(CLIENT_SHUFFLE_MAPPARTITION_SPLIT_ENABLED)
+
+ // //////////////////////////////////////////////////////
+ // kerberos //
+ // //////////////////////////////////////////////////////
+ def hdfsStorageKerberosEnabled = get(HDFS_STORAGE_TYPE_KERBEROS_ENABLED)
+ def hdfsStorageKerberosPrincipal = get(HDFS_STORAGE_KERBEROS_PRINCIPAL)
+ def hdfsStorageKerberosKeytab = get(HDFS_STORAGE_KERBEROS_KEYTAB)
}
object CelebornConf extends Logging {
@@ -4009,4 +4016,28 @@ object CelebornConf extends Logging {
.version("0.3.2")
.intConf
.createWithDefault(64)
+
+ val HDFS_STORAGE_TYPE_KERBEROS_ENABLED: ConfigEntry[Boolean] =
+ buildConf("celeborn.storage.hdfs.kerberos.enabled")
+ .categories("master", "worker")
+ .version("0.3.2")
+ .doc("Whether to enable kerberos authentication for HDFS storage
connection.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val HDFS_STORAGE_KERBEROS_PRINCIPAL: OptionalConfigEntry[String] =
+ buildConf("celeborn.storage.hdfs.kerberos.principal")
+ .categories("master", "worker")
+ .version("0.3.2")
+ .doc("Kerberos principal for HDFS storage connection.")
+ .stringConf
+ .createOptional
+
+ val HDFS_STORAGE_KERBEROS_KEYTAB: OptionalConfigEntry[String] =
+ buildConf("celeborn.storage.hdfs.kerberos.keytab")
+ .categories("master", "worker")
+ .version("0.3.2")
+ .doc("Kerberos keytab file path for HDFS storage connection.")
+ .stringConf
+ .createOptional
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
index 166c2a234..1135d5bcb 100644
---
a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
@@ -17,13 +17,15 @@
package org.apache.celeborn.common.util
-import java.io.IOException
+import java.io.{File, IOException}
import java.util.concurrent.atomic.AtomicBoolean
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.security.UserGroupInformation
import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.exception.CelebornException
import org.apache.celeborn.common.internal.Logging
object CelebornHadoopUtils extends Logging {
@@ -56,7 +58,9 @@ object CelebornHadoopUtils extends Logging {
}
def getHadoopFS(conf: CelebornConf): FileSystem = {
- new Path(conf.hdfsDir).getFileSystem(newConfiguration(conf))
+ val hadoopConf = newConfiguration(conf)
+ initKerberos(conf, hadoopConf)
+ new Path(conf.hdfsDir).getFileSystem(hadoopConf)
}
def deleteHDFSPathOrLogError(hadoopFs: FileSystem, path: Path, recursive:
Boolean): Unit = {
@@ -71,4 +75,24 @@ object CelebornHadoopUtils extends Logging {
logError(s"Failed to delete HDFS ${path}(recursive=$recursive) due to:
", e)
}
}
+
+ def initKerberos(conf: CelebornConf, hadoopConf: Configuration): Unit = {
+ // If we are accessing HDFS and it has Kerberos enabled, we have to login
+ // from a keytab file so that we can access HDFS beyond the kerberos
ticket expiration.
+ UserGroupInformation.setConfiguration(hadoopConf)
+ if (conf.hdfsStorageKerberosEnabled) {
+ val principal = conf.hdfsStorageKerberosPrincipal
+ .getOrElse(throw new NoSuchElementException(
+ CelebornConf.HDFS_STORAGE_KERBEROS_PRINCIPAL.key))
+ val keytab = conf.hdfsStorageKerberosKeytab
+ .getOrElse(throw new
NoSuchElementException(CelebornConf.HDFS_STORAGE_KERBEROS_KEYTAB.key))
+ if (!new File(keytab).exists()) {
+ throw new CelebornException(s"Keytab file: ${keytab} does not exist")
+ } else {
+ logInfo("Attempting to login to Kerberos " +
+ s"using principal: ${principal} and keytab: ${keytab}")
+ UserGroupInformation.loginUserFromKeytab(principal, keytab)
+ }
+ }
+ }
}
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index b83b4b20c..291e56435 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -40,4 +40,7 @@ license: |
| celeborn.master.workerUnavailableInfo.expireTimeout | 1800s | Worker
unavailable info would be cleared when the retention period is expired | 0.3.1
|
| celeborn.storage.availableTypes | HDD | Enabled storages. Available options:
MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical. | 0.3.0 |
| celeborn.storage.hdfs.dir | <undefined> | HDFS base directory for
Celeborn to store shuffle data. | 0.2.0 |
+| celeborn.storage.hdfs.kerberos.enabled | false | Whether to enable kerberos
authentication for HDFS storage connection. | 0.3.2 |
+| celeborn.storage.hdfs.kerberos.keytab | <undefined> | Kerberos keytab
file path for HDFS storage connection. | 0.3.2 |
+| celeborn.storage.hdfs.kerberos.principal | <undefined> | Kerberos
principal for HDFS storage connection. | 0.3.2 |
<!--end-include-->
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index a356844cb..965bf320c 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -24,6 +24,9 @@ license: |
| celeborn.shuffle.chunk.size | 8m | Max chunk size of reducer's merged
shuffle data. For example, if a reducer's shuffle data is 128M and the data
will need 16 fetch chunk requests to fetch. | 0.2.0 |
| celeborn.storage.availableTypes | HDD | Enabled storages. Available options:
MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical. | 0.3.0 |
| celeborn.storage.hdfs.dir | <undefined> | HDFS base directory for
Celeborn to store shuffle data. | 0.2.0 |
+| celeborn.storage.hdfs.kerberos.enabled | false | Whether to enable kerberos
authentication for HDFS storage connection. | 0.3.2 |
+| celeborn.storage.hdfs.kerberos.keytab | <undefined> | Kerberos keytab
file path for HDFS storage connection. | 0.3.2 |
+| celeborn.storage.hdfs.kerberos.principal | <undefined> | Kerberos
principal for HDFS storage connection. | 0.3.2 |
| celeborn.worker.activeConnection.max | <undefined> | If the number of
active connections on a worker exceeds this configuration value, the worker
will be marked as high-load in the heartbeat report, and the master will not
include that node in the response of RequestSlots. | 0.3.1 |
| celeborn.worker.bufferStream.threadsPerMountpoint | 8 | Threads count for
read buffer per mount point. | 0.3.0 |
| celeborn.worker.clean.threads | 64 | Thread number of worker to clean up
expired shuffle keys. | 0.3.2 |