Xuanwo commented on code in PR #7185:
URL: https://github.com/apache/opendal/pull/7185#discussion_r2834266364


##########
core/services/hf/Cargo.toml:
##########
@@ -30,15 +30,35 @@ version = { workspace = true }
 [package.metadata.docs.rs]
 all-features = true
 
+[features]
+default = []
+xet = ["dep:reqwest", "dep:subxet", "dep:futures", "dep:async-trait"]
+
 [dependencies]
+backon = "1.6"
+base64 = { workspace = true }
 bytes = { workspace = true }
 http = { workspace = true }
 log = { workspace = true }
 opendal-core = { path = "../../core", version = "0.55.0", default-features = 
false }
 percent-encoding = "2"
 serde = { workspace = true, features = ["derive"] }
 serde_json = { workspace = true }
+# XET storage protocol support (optional)
+async-trait = { version = "0.1", optional = true }

Review Comment:
   I think we can avoid this.



##########
core/services/hf/src/backend.rs:
##########
@@ -318,4 +415,168 @@ mod tests {
             .build()
             .expect("builder should accept space repo type");
     }
+
+    #[test]
+    fn test_both_schemes_are_supported() {
+        use opendal_core::OperatorRegistry;
+
+        let registry = OperatorRegistry::new();
+        super::super::register_hf_service(&registry);
+
+        // Test short scheme "hf"
+        let op = registry
+            .load("hf://user/repo")
+            .expect("short scheme should be registered and work");
+        assert_eq!(op.info().scheme(), "hf");
+
+        // Test long scheme "huggingface"
+        let op = registry
+            .load("huggingface://user/repo")
+            .expect("long scheme should be registered and work");
+        assert_eq!(op.info().scheme(), "hf");
+    }
+
+    /// Parquet magic bytes: "PAR1"
+    const PARQUET_MAGIC: &[u8] = b"PAR1";
+
+    #[tokio::test]
+    #[ignore = "requires network access"]
+    async fn test_read_parquet_http() {
+        let op = mbpp_operator();
+        let path = "full/train-00000-of-00001.parquet";
+
+        let meta = op.stat(path).await.expect("stat should succeed");
+        assert!(meta.content_length() > 0);
+
+        // Read the first 4 bytes to check parquet header magic
+        let header = op
+            .read_with(path)
+            .range(0..4)
+            .await
+            .expect("read header should succeed");
+        assert_eq!(&header.to_vec(), PARQUET_MAGIC);
+
+        // Read the last 4 bytes to check parquet footer magic
+        let size = meta.content_length();
+        let footer = op
+            .read_with(path)
+            .range(size - 4..size)
+            .await
+            .expect("read footer should succeed");
+        assert_eq!(&footer.to_vec(), PARQUET_MAGIC);
+    }
+
+    #[cfg(feature = "xet")]
+    #[tokio::test]
+    #[ignore = "requires network access"]
+    async fn test_read_parquet_xet() {
+        let op = mbpp_xet_operator();
+        let path = "full/train-00000-of-00001.parquet";
+
+        // Full read via XET and verify parquet magic at both ends
+        let data = op.read(path).await.expect("xet read should succeed");
+        let bytes = data.to_vec();
+        assert!(bytes.len() > 8);
+        assert_eq!(&bytes[..4], PARQUET_MAGIC);
+        assert_eq!(&bytes[bytes.len() - 4..], PARQUET_MAGIC);
+    }
+
+    /// List files in a known dataset directory.
+    #[tokio::test]
+    #[ignore = "requires network access"]
+    async fn test_list_directory() {
+        let op = mbpp_operator();
+        let entries = op.list("full/").await.expect("list should succeed");
+        assert!(!entries.is_empty(), "directory should contain files");
+        assert!(
+            entries.iter().any(|e| e.path().ends_with(".parquet")),
+            "should contain parquet files"
+        );
+    }
+
+    /// List files recursively from root.
+    #[tokio::test]
+    #[ignore = "requires network access"]
+    async fn test_list_recursive() {
+        let op = mbpp_operator();
+        let entries = op
+            .list_with("/")
+            .recursive(true)
+            .await
+            .expect("recursive list should succeed");
+        assert!(
+            entries.len() > 1,
+            "recursive listing should find multiple files"
+        );
+    }
+
+    /// Stat a known file and verify metadata fields.
+    #[tokio::test]
+    #[ignore = "requires network access"]
+    async fn test_stat_known_file() {
+        let op = mbpp_operator();
+        let meta = op
+            .stat("full/train-00000-of-00001.parquet")
+            .await
+            .expect("stat should succeed");
+        assert!(meta.content_length() > 0);
+        assert!(!meta.etag().unwrap_or_default().is_empty());
+    }
+
+    /// Stat a nonexistent path should return NotFound.
+    #[tokio::test]
+    #[ignore = "requires network access"]
+    async fn test_stat_nonexistent() {
+        let op = mbpp_operator();
+        let err = op
+            .stat("this/path/does/not/exist.txt")
+            .await
+            .expect_err("stat on nonexistent path should fail");
+        assert_eq!(err.kind(), ErrorKind::NotFound);
+    }
+
+    /// Read a nonexistent file should return NotFound.
+    #[tokio::test]
+    #[ignore = "requires network access"]
+    async fn test_read_nonexistent() {
+        let op = mbpp_operator();
+        let err = op
+            .read("this/path/does/not/exist.txt")
+            .await
+            .expect_err("read on nonexistent path should fail");
+        assert_eq!(err.kind(), ErrorKind::NotFound);
+    }
+
+    /// Read a middle range of a known file.
+    #[tokio::test]
+    #[ignore = "requires network access"]

Review Comment:
   I suggest adding a `tests.rs` file and guiding users to run tests in an 
environment such as `OPNEDAL_SERVICE_TEST_HF`.



##########
core/services/hf/src/core.rs:
##########
@@ -18,325 +18,606 @@
 use std::fmt::Debug;
 use std::sync::Arc;
 
+use backon::ExponentialBuilder;
+use backon::Retryable;
+use bytes::Buf;
 use bytes::Bytes;
 use http::Request;
 use http::Response;
 use http::header;
-use percent_encoding::{NON_ALPHANUMERIC, utf8_percent_encode};
 use serde::Deserialize;
 
-use super::backend::RepoType;
+#[cfg(feature = "xet")]
+use subxet::data::XetFileInfo;
+#[cfg(feature = "xet")]
+use subxet::data::streaming::XetClient;
+#[cfg(feature = "xet")]
+use subxet::utils::auth::TokenRefresher;
+
+use super::error::parse_error;
+use super::uri::HfRepo;
 use opendal_core::raw::*;
 use opendal_core::*;
 
-fn percent_encode_revision(revision: &str) -> String {
-    utf8_percent_encode(revision, NON_ALPHANUMERIC).to_string()
+/// API payload structures for preupload operations
+#[derive(serde::Serialize)]
+struct PreuploadFile {
+    path: String,
+    size: i64,
+    sample: String,
+}
+
+#[derive(serde::Serialize)]
+struct PreuploadRequest {
+    files: Vec<PreuploadFile>,
+}
+
+#[derive(serde::Deserialize, Debug)]
+struct PreuploadFileResponse {
+    #[allow(dead_code)]
+    path: String,
+    #[serde(rename = "uploadMode")]
+    upload_mode: String,
+}
+
+#[derive(serde::Deserialize, Debug)]
+struct PreuploadResponse {
+    files: Vec<PreuploadFileResponse>,
+}
+
+/// API payload structures for commit operations
+#[derive(Debug, serde::Serialize)]
+pub(super) struct CommitFile {
+    pub path: String,
+    pub content: String,
+    pub encoding: String,
+}
+
+#[derive(Debug, serde::Serialize)]
+pub(super) struct LfsFile {
+    pub path: String,
+    pub oid: String,
+    pub algo: String,
+    pub size: u64,
+}
+
+#[derive(Clone, Debug, serde::Serialize)]
+pub(super) struct DeletedFile {
+    pub path: String,
+}
+
+/// Bucket batch operation payload structures
+#[cfg(feature = "xet")]
+#[derive(Debug, serde::Serialize)]
+#[serde(tag = "type", rename_all = "camelCase")]
+pub(super) enum BucketOperation {
+    #[serde(rename_all = "camelCase")]
+    AddFile { path: String, xet_hash: String },
+    #[serde(rename_all = "camelCase")]
+    #[allow(dead_code)]
+    DeleteFile { path: String },
+}
+
+#[derive(serde::Serialize)]
+pub(super) struct MixedCommitPayload {
+    pub summary: String,
+    #[serde(skip_serializing_if = "Vec::is_empty")]
+    pub files: Vec<CommitFile>,
+    #[serde(rename = "lfsFiles", skip_serializing_if = "Vec::is_empty")]
+    pub lfs_files: Vec<LfsFile>,
+    #[serde(rename = "deletedFiles", skip_serializing_if = "Vec::is_empty")]
+    pub deleted_files: Vec<DeletedFile>,
+}
+
+// API response types
+
+#[derive(serde::Deserialize, Debug)]
+pub(super) struct CommitResponse {
+    #[serde(rename = "commitOid")]
+    pub commit_oid: Option<String>,
+    #[allow(dead_code)]
+    #[serde(rename = "commitUrl")]
+    pub commit_url: Option<String>,
 }
 
+#[derive(Deserialize, Eq, PartialEq, Debug)]
+#[serde(rename_all = "camelCase")]
+pub(super) struct PathInfo {
+    #[serde(rename = "type")]
+    pub type_: String,
+    #[serde(default)]
+    pub oid: Option<String>,
+    pub size: u64,
+    #[serde(default)]
+    pub lfs: Option<LfsInfo>,
+    pub path: String,
+    #[serde(default)]
+    pub last_commit: Option<LastCommit>,
+}
+
+impl PathInfo {
+    pub fn entry_mode(&self) -> EntryMode {
+        match self.type_.as_str() {
+            "directory" => EntryMode::DIR,
+            "file" => EntryMode::FILE,
+            _ => EntryMode::Unknown,
+        }
+    }
+
+    pub fn metadata(&self) -> Result<Metadata> {
+        let mode = self.entry_mode();
+        let mut meta = Metadata::new(mode);
+
+        if let Some(commit_info) = self.last_commit.as_ref() {
+            meta.set_last_modified(commit_info.date.parse::<Timestamp>()?);
+        }
+
+        if mode == EntryMode::FILE {
+            meta.set_content_length(self.size);
+            // For buckets, oid may be None; for regular repos, prefer lfs.oid 
then oid
+            if let Some(lfs) = &self.lfs {
+                meta.set_etag(&lfs.oid);
+            } else if let Some(oid) = &self.oid {
+                meta.set_etag(oid);
+            }
+        }
+
+        Ok(meta)
+    }
+}
+
+#[derive(Deserialize, Eq, PartialEq, Debug)]
+pub(super) struct LfsInfo {
+    pub oid: String,
+}
+
+#[derive(Deserialize, Eq, PartialEq, Debug)]
+pub(super) struct LastCommit {
+    pub date: String,
+}
+
+#[cfg(feature = "xet")]
+#[derive(Clone, Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub(super) struct XetToken {
+    pub access_token: String,
+    pub cas_url: String,
+    pub exp: u64,
+}
+
+#[cfg(feature = "xet")]
+pub(super) struct XetTokenRefresher {
+    core: HfCore,
+    token_type: &'static str,
+}
+
+#[cfg(feature = "xet")]
+impl XetTokenRefresher {
+    pub(super) fn new(core: &HfCore, token_type: &'static str) -> Self {
+        Self {
+            core: core.clone(),
+            token_type,
+        }
+    }
+}
+
+#[cfg(feature = "xet")]
+#[async_trait::async_trait]
+impl TokenRefresher for XetTokenRefresher {
+    async fn refresh(
+        &self,
+    ) -> std::result::Result<(String, u64), subxet::utils::errors::AuthError> {
+        let token = self
+            .core
+            .xet_token(self.token_type)
+            .await
+            .map_err(subxet::utils::errors::AuthError::token_refresh_failure)?;
+        Ok((token.access_token, token.exp))
+    }
+}
+
+// Core HuggingFace client that manages API interactions, authentication
+// and shared logic for reader/writer/lister.
+
+#[derive(Clone)]
 pub struct HfCore {
     pub info: Arc<AccessorInfo>,
 
-    pub repo_type: RepoType,
-    pub repo_id: String,
-    pub revision: String,
+    pub repo: HfRepo,
     pub root: String,
     pub token: Option<String>,
     pub endpoint: String,
+    pub max_retries: usize,
+
+    // Whether XET storage protocol is enabled for reads. When true
+    // and the `xet` feature is compiled in, reads will check for
+    // XET-backed files and use the XET protocol for downloading.
+    #[cfg(feature = "xet")]
+    pub xet_enabled: bool,
+
+    /// HTTP client with redirects disabled, used by XET probes to
+    /// inspect headers on 302 responses.
+    #[cfg(feature = "xet")]
+    pub no_redirect_client: HttpClient,
 }
 
 impl Debug for HfCore {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        f.debug_struct("HfCore")
-            .field("repo_type", &self.repo_type)
-            .field("repo_id", &self.repo_id)
-            .field("revision", &self.revision)
+        let mut s = f.debug_struct("HfCore");
+        s.field("repo", &self.repo)
             .field("root", &self.root)
-            .field("endpoint", &self.endpoint)
-            .finish_non_exhaustive()
+            .field("endpoint", &self.endpoint);
+        #[cfg(feature = "xet")]
+        s.field("xet_enabled", &self.xet_enabled);
+        s.finish_non_exhaustive()
     }
 }
 
 impl HfCore {
-    pub async fn hf_path_info(&self, path: &str) -> Result<Response<Buffer>> {
-        let p = build_abs_path(&self.root, path)
-            .trim_end_matches('/')
-            .to_string();
-
-        let url = match self.repo_type {
-            RepoType::Model => format!(
-                "{}/api/models/{}/paths-info/{}",
-                &self.endpoint,
-                &self.repo_id,
-                percent_encode_revision(&self.revision)
-            ),
-            RepoType::Dataset => format!(
-                "{}/api/datasets/{}/paths-info/{}",
-                &self.endpoint,
-                &self.repo_id,
-                percent_encode_revision(&self.revision)
-            ),
-            RepoType::Space => format!(
-                "{}/api/spaces/{}/paths-info/{}",
-                &self.endpoint,
-                &self.repo_id,
-                percent_encode_revision(&self.revision)
-            ),
+    pub fn new(
+        info: Arc<AccessorInfo>,
+        repo: HfRepo,
+        root: String,
+        token: Option<String>,
+        endpoint: String,
+        max_retries: usize,
+        #[cfg(feature = "xet")] xet_enabled: bool,
+    ) -> Result<Self> {
+        // When xet is enabled at runtime, use dedicated reqwest clients 
instead
+        // of the global one. This avoids "dispatch task is gone" errors when
+        // multiple tokio runtimes exist (e.g. in tests) and ensures the
+        // no-redirect client shares the same runtime as the standard client.
+        // When xet is disabled, preserve whatever HTTP client is already set
+        // on `info` (important for mock-based unit tests).
+        #[cfg(feature = "xet")]
+        let no_redirect_client = if xet_enabled {

Review Comment:
   Service shouldn't build their own http clients. But we can figure this out 
later.



##########
core/services/hf/src/backend.rs:
##########
@@ -118,27 +120,40 @@ impl HfBuilder {
         }
         self
     }
+
+    /// Enable XET storage protocol for reads.
+    ///
+    /// When the `xet` feature is compiled in, reads will check for
+    /// XET-backed files and use the XET protocol for downloading.
+    /// Default is disabled.
+    pub fn enable_xet(mut self) -> Self {
+        self.config.xet = true;
+        self
+    }
+
+    /// Disable XET storage protocol for reads.
+    pub fn disable_xet(mut self) -> Self {

Review Comment:
   The same.



##########
core/services/hf/Cargo.toml:
##########
@@ -30,15 +30,35 @@ version = { workspace = true }
 [package.metadata.docs.rs]
 all-features = true
 
+[features]
+default = []
+xet = ["dep:reqwest", "dep:subxet", "dep:futures", "dep:async-trait"]

Review Comment:
   I think we don't need to make this as a feature, since we only have one HF 
service and users do need to communicate with Xet.



##########
core/services/hf/src/backend.rs:
##########
@@ -118,27 +120,40 @@ impl HfBuilder {
         }
         self
     }
+
+    /// Enable XET storage protocol for reads.
+    ///
+    /// When the `xet` feature is compiled in, reads will check for
+    /// XET-backed files and use the XET protocol for downloading.
+    /// Default is disabled.
+    pub fn enable_xet(mut self) -> Self {
+        self.config.xet = true;
+        self
+    }
+
+    /// Disable XET storage protocol for reads.
+    pub fn disable_xet(mut self) -> Self {
+        self.config.xet = false;
+        self
+    }
+
+    /// Set the maximum number of retries for commit operations.
+    ///
+    /// Retries on commit conflicts (HTTP 412) and transient server
+    /// errors (HTTP 5xx). Default is 3.
+    pub fn max_retries(mut self, max_retries: usize) -> Self {

Review Comment:
   Service shouldn't handle retry. All underlaying operations will just fail 
and opendal's RetryLayer will handle it.
   



##########
core/services/hf/src/backend.rs:
##########
@@ -118,27 +120,40 @@ impl HfBuilder {
         }
         self
     }
+
+    /// Enable XET storage protocol for reads.
+    ///
+    /// When the `xet` feature is compiled in, reads will check for
+    /// XET-backed files and use the XET protocol for downloading.
+    /// Default is disabled.
+    pub fn enable_xet(mut self) -> Self {
+        self.config.xet = true;
+        self
+    }
+
+    /// Disable XET storage protocol for reads.
+    pub fn disable_xet(mut self) -> Self {
+        self.config.xet = false;
+        self
+    }
+
+    /// Set the maximum number of retries for commit operations.
+    ///
+    /// Retries on commit conflicts (HTTP 412) and transient server
+    /// errors (HTTP 5xx). Default is 3.
+    pub fn max_retries(mut self, max_retries: usize) -> Self {
+        self.config.max_retries = Some(max_retries);
+        self
+    }
 }
 
 impl Builder for HfBuilder {
     type Config = HfConfig;
 
-    /// Build an HfBackend.
     fn build(self) -> Result<impl Access> {
         debug!("backend build started: {:?}", &self);
 
-        let repo_type = match self.config.repo_type.as_deref() {
-            Some("model") => Ok(RepoType::Model),
-            Some("dataset") | Some("datasets") => Ok(RepoType::Dataset),
-            Some("space") => Ok(RepoType::Space),
-            Some(repo_type) => Err(Error::new(
-                ErrorKind::ConfigInvalid,
-                format!("unknown repo_type: {repo_type}").as_str(),
-            )
-            .with_operation("Builder::build")
-            .with_context("service", HF_SCHEME)),
-            None => Ok(RepoType::Model),
-        }?;
+        let repo_type = self.config.repo_type;

Review Comment:
   Love this change.
   



##########
core/services/hf/src/backend.rs:
##########
@@ -118,27 +120,40 @@ impl HfBuilder {
         }
         self
     }
+
+    /// Enable XET storage protocol for reads.

Review Comment:
   Can we determine this at runtime rather than during configuration? I expect 
it should not be a configuration value.
   
   



##########
core/services/hf/src/core.rs:
##########
@@ -18,325 +18,606 @@
 use std::fmt::Debug;
 use std::sync::Arc;
 
+use backon::ExponentialBuilder;
+use backon::Retryable;
+use bytes::Buf;
 use bytes::Bytes;
 use http::Request;
 use http::Response;
 use http::header;
-use percent_encoding::{NON_ALPHANUMERIC, utf8_percent_encode};
 use serde::Deserialize;
 
-use super::backend::RepoType;
+#[cfg(feature = "xet")]
+use subxet::data::XetFileInfo;
+#[cfg(feature = "xet")]
+use subxet::data::streaming::XetClient;
+#[cfg(feature = "xet")]
+use subxet::utils::auth::TokenRefresher;
+
+use super::error::parse_error;
+use super::uri::HfRepo;
 use opendal_core::raw::*;
 use opendal_core::*;
 
-fn percent_encode_revision(revision: &str) -> String {
-    utf8_percent_encode(revision, NON_ALPHANUMERIC).to_string()
+/// API payload structures for preupload operations
+#[derive(serde::Serialize)]
+struct PreuploadFile {
+    path: String,
+    size: i64,
+    sample: String,
+}
+
+#[derive(serde::Serialize)]
+struct PreuploadRequest {
+    files: Vec<PreuploadFile>,
+}
+
+#[derive(serde::Deserialize, Debug)]
+struct PreuploadFileResponse {
+    #[allow(dead_code)]
+    path: String,
+    #[serde(rename = "uploadMode")]
+    upload_mode: String,
+}
+
+#[derive(serde::Deserialize, Debug)]
+struct PreuploadResponse {
+    files: Vec<PreuploadFileResponse>,
+}
+
+/// API payload structures for commit operations
+#[derive(Debug, serde::Serialize)]
+pub(super) struct CommitFile {
+    pub path: String,
+    pub content: String,
+    pub encoding: String,
+}
+
+#[derive(Debug, serde::Serialize)]
+pub(super) struct LfsFile {
+    pub path: String,
+    pub oid: String,
+    pub algo: String,
+    pub size: u64,
+}
+
+#[derive(Clone, Debug, serde::Serialize)]
+pub(super) struct DeletedFile {
+    pub path: String,
+}
+
+/// Bucket batch operation payload structures
+#[cfg(feature = "xet")]
+#[derive(Debug, serde::Serialize)]
+#[serde(tag = "type", rename_all = "camelCase")]
+pub(super) enum BucketOperation {
+    #[serde(rename_all = "camelCase")]
+    AddFile { path: String, xet_hash: String },
+    #[serde(rename_all = "camelCase")]
+    #[allow(dead_code)]
+    DeleteFile { path: String },
+}
+
+#[derive(serde::Serialize)]
+pub(super) struct MixedCommitPayload {
+    pub summary: String,
+    #[serde(skip_serializing_if = "Vec::is_empty")]
+    pub files: Vec<CommitFile>,
+    #[serde(rename = "lfsFiles", skip_serializing_if = "Vec::is_empty")]
+    pub lfs_files: Vec<LfsFile>,
+    #[serde(rename = "deletedFiles", skip_serializing_if = "Vec::is_empty")]
+    pub deleted_files: Vec<DeletedFile>,
+}
+
+// API response types
+
+#[derive(serde::Deserialize, Debug)]
+pub(super) struct CommitResponse {
+    #[serde(rename = "commitOid")]
+    pub commit_oid: Option<String>,
+    #[allow(dead_code)]
+    #[serde(rename = "commitUrl")]
+    pub commit_url: Option<String>,
 }
 
+#[derive(Deserialize, Eq, PartialEq, Debug)]
+#[serde(rename_all = "camelCase")]
+pub(super) struct PathInfo {
+    #[serde(rename = "type")]
+    pub type_: String,
+    #[serde(default)]
+    pub oid: Option<String>,
+    pub size: u64,
+    #[serde(default)]
+    pub lfs: Option<LfsInfo>,
+    pub path: String,
+    #[serde(default)]
+    pub last_commit: Option<LastCommit>,
+}
+
+impl PathInfo {
+    pub fn entry_mode(&self) -> EntryMode {
+        match self.type_.as_str() {
+            "directory" => EntryMode::DIR,
+            "file" => EntryMode::FILE,
+            _ => EntryMode::Unknown,
+        }
+    }
+
+    pub fn metadata(&self) -> Result<Metadata> {
+        let mode = self.entry_mode();
+        let mut meta = Metadata::new(mode);
+
+        if let Some(commit_info) = self.last_commit.as_ref() {
+            meta.set_last_modified(commit_info.date.parse::<Timestamp>()?);
+        }
+
+        if mode == EntryMode::FILE {
+            meta.set_content_length(self.size);
+            // For buckets, oid may be None; for regular repos, prefer lfs.oid 
then oid
+            if let Some(lfs) = &self.lfs {
+                meta.set_etag(&lfs.oid);
+            } else if let Some(oid) = &self.oid {
+                meta.set_etag(oid);
+            }
+        }
+
+        Ok(meta)
+    }
+}
+
+#[derive(Deserialize, Eq, PartialEq, Debug)]
+pub(super) struct LfsInfo {
+    pub oid: String,
+}
+
+#[derive(Deserialize, Eq, PartialEq, Debug)]
+pub(super) struct LastCommit {
+    pub date: String,
+}
+
+#[cfg(feature = "xet")]
+#[derive(Clone, Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub(super) struct XetToken {
+    pub access_token: String,
+    pub cas_url: String,
+    pub exp: u64,
+}
+
+#[cfg(feature = "xet")]
+pub(super) struct XetTokenRefresher {
+    core: HfCore,
+    token_type: &'static str,
+}
+
+#[cfg(feature = "xet")]
+impl XetTokenRefresher {
+    pub(super) fn new(core: &HfCore, token_type: &'static str) -> Self {
+        Self {
+            core: core.clone(),
+            token_type,
+        }
+    }
+}
+
+#[cfg(feature = "xet")]
+#[async_trait::async_trait]
+impl TokenRefresher for XetTokenRefresher {
+    async fn refresh(
+        &self,
+    ) -> std::result::Result<(String, u64), subxet::utils::errors::AuthError> {
+        let token = self
+            .core
+            .xet_token(self.token_type)
+            .await
+            .map_err(subxet::utils::errors::AuthError::token_refresh_failure)?;
+        Ok((token.access_token, token.exp))
+    }
+}
+
+// Core HuggingFace client that manages API interactions, authentication
+// and shared logic for reader/writer/lister.
+
+#[derive(Clone)]
 pub struct HfCore {
     pub info: Arc<AccessorInfo>,
 
-    pub repo_type: RepoType,
-    pub repo_id: String,
-    pub revision: String,
+    pub repo: HfRepo,
     pub root: String,
     pub token: Option<String>,
     pub endpoint: String,
+    pub max_retries: usize,
+
+    // Whether XET storage protocol is enabled for reads. When true
+    // and the `xet` feature is compiled in, reads will check for
+    // XET-backed files and use the XET protocol for downloading.
+    #[cfg(feature = "xet")]
+    pub xet_enabled: bool,
+
+    /// HTTP client with redirects disabled, used by XET probes to
+    /// inspect headers on 302 responses.
+    #[cfg(feature = "xet")]
+    pub no_redirect_client: HttpClient,
 }
 
 impl Debug for HfCore {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        f.debug_struct("HfCore")
-            .field("repo_type", &self.repo_type)
-            .field("repo_id", &self.repo_id)
-            .field("revision", &self.revision)
+        let mut s = f.debug_struct("HfCore");
+        s.field("repo", &self.repo)
             .field("root", &self.root)
-            .field("endpoint", &self.endpoint)
-            .finish_non_exhaustive()
+            .field("endpoint", &self.endpoint);
+        #[cfg(feature = "xet")]
+        s.field("xet_enabled", &self.xet_enabled);
+        s.finish_non_exhaustive()
     }
 }
 
 impl HfCore {
-    pub async fn hf_path_info(&self, path: &str) -> Result<Response<Buffer>> {
-        let p = build_abs_path(&self.root, path)
-            .trim_end_matches('/')
-            .to_string();
-
-        let url = match self.repo_type {
-            RepoType::Model => format!(
-                "{}/api/models/{}/paths-info/{}",
-                &self.endpoint,
-                &self.repo_id,
-                percent_encode_revision(&self.revision)
-            ),
-            RepoType::Dataset => format!(
-                "{}/api/datasets/{}/paths-info/{}",
-                &self.endpoint,
-                &self.repo_id,
-                percent_encode_revision(&self.revision)
-            ),
-            RepoType::Space => format!(
-                "{}/api/spaces/{}/paths-info/{}",
-                &self.endpoint,
-                &self.repo_id,
-                percent_encode_revision(&self.revision)
-            ),
+    pub fn new(
+        info: Arc<AccessorInfo>,
+        repo: HfRepo,
+        root: String,
+        token: Option<String>,
+        endpoint: String,
+        max_retries: usize,
+        #[cfg(feature = "xet")] xet_enabled: bool,
+    ) -> Result<Self> {
+        // When xet is enabled at runtime, use dedicated reqwest clients 
instead
+        // of the global one. This avoids "dispatch task is gone" errors when
+        // multiple tokio runtimes exist (e.g. in tests) and ensures the
+        // no-redirect client shares the same runtime as the standard client.
+        // When xet is disabled, preserve whatever HTTP client is already set
+        // on `info` (important for mock-based unit tests).
+        #[cfg(feature = "xet")]
+        let no_redirect_client = if xet_enabled {
+            let standard = 
HttpClient::with(build_reqwest(reqwest::redirect::Policy::default())?);
+            let no_redirect = 
HttpClient::with(build_reqwest(reqwest::redirect::Policy::none())?);
+            info.update_http_client(|_| standard);
+            no_redirect
+        } else {
+            info.http_client()
         };
 
-        let mut req = Request::post(&url);
-        // Inject operation to the request.
-        req = req.extension(Operation::Stat);
+        Ok(Self {
+            info,
+            repo,
+            root,
+            token,
+            endpoint,
+            max_retries,
+            #[cfg(feature = "xet")]
+            xet_enabled,
+            #[cfg(feature = "xet")]
+            no_redirect_client,
+        })
+    }
+
+    /// Build an authenticated HTTP request.
+    pub(super) fn request(
+        &self,
+        method: http::Method,
+        url: &str,
+        op: Operation,
+    ) -> http::request::Builder {
+        let mut req = Request::builder().method(method).uri(url).extension(op);
         if let Some(token) = &self.token {
-            let auth_header_content = format_authorization_by_bearer(token)?;
-            req = req.header(header::AUTHORIZATION, auth_header_content);
+            if let Ok(auth) = format_authorization_by_bearer(token) {
+                req = req.header(header::AUTHORIZATION, auth);
+            }
         }
+        req
+    }
 
-        req = req.header(header::CONTENT_TYPE, 
"application/x-www-form-urlencoded");
-
-        let req_body = format!("paths={}&expand=True", 
percent_encode_path(&p));
+    pub(super) fn uri(&self, path: &str) -> super::uri::HfUri {
+        self.repo.uri(&self.root, path)
+    }
 
-        let req = req
-            .body(Buffer::from(Bytes::from(req_body)))
-            .map_err(new_request_build_error)?;
+    /// Send a request with retries, returning the successful response.
+    ///
+    /// Retries on commit conflicts (HTTP 412) and transient server errors

Review Comment:
   The same. We just need to throw a correct error and RetryLayer will handle 
every thing.



##########
core/services/hf/src/core.rs:
##########
@@ -546,235 +732,19 @@ mod tests {
 
         Ok(())
     }
+}
 
-    #[tokio::test]
-    async fn test_hf_list_url_space() -> Result<()> {
-        let (core, mock_client) = create_test_core(
-            RepoType::Space,
-            "org/space",
-            "main",
-            "https://huggingface.co";,
-        );
-
-        core.hf_list("static", false, None).await?;
-
-        let url = mock_client.get_captured_url();
-        assert_eq!(
-            url,
-            
"https://huggingface.co/api/spaces/org/space/tree/main/static?expand=True";
-        );
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn test_hf_resolve_url_space() -> Result<()> {
-        let (core, mock_client) = create_test_core(
-            RepoType::Space,
-            "user/space",
-            "main",
-            "https://huggingface.co";,
-        );
-
-        let args = OpRead::default();
-        core.hf_resolve("README.md", BytesRange::default(), &args)
-            .await?;
-
-        let url = mock_client.get_captured_url();
-        assert_eq!(
-            url,
-            "https://huggingface.co/spaces/user/space/resolve/main/README.md";
-        );
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn test_hf_resolve_with_range() -> Result<()> {
-        let (core, mock_client) = create_test_core(
-            RepoType::Model,
-            "user/model",
-            "main",
-            "https://huggingface.co";,
-        );
-
-        let args = OpRead::default();
-        let range = BytesRange::new(0, Some(1024));
-        core.hf_resolve("large_file.bin", range, &args).await?;
-
-        let url = mock_client.get_captured_url();
-        let headers = mock_client.get_captured_headers();
-        assert_eq!(
-            url,
-            "https://huggingface.co/user/model/resolve/main/large_file.bin";
-        );
-        assert_eq!(headers.get(http::header::RANGE).unwrap(), "bytes=0-1023");
-
-        Ok(())
-    }
-
-    #[test]
-    fn parse_list_response_test() -> Result<()> {
-        let resp = Bytes::from(
-            r#"
-            [
-                {
-                    "type": "file",
-                    "oid": "45fa7c3d85ee7dd4139adbc056da25ae136a65f2",
-                    "size": 69512435,
-                    "lfs": {
-                        "oid": 
"b43f4c2ea569da1d66ca74e26ca8ea4430dfc29195e97144b2d0b4f3f6cafa1c",
-                        "size": 69512435,
-                        "pointerSize": 133
-                    },
-                    "path": "maelstrom/lib/maelstrom.jar"
-                },
-                {
-                    "type": "directory",
-                    "oid": 
"b43f4c2ea569da1d66ca74e26ca8ea4430dfc29195e97144b2d0b4f3f6cafa1c",
-                    "size": 69512435,
-                    "path": "maelstrom/lib/plugins"
-                }
-            ]
-            "#,
-        );
-
-        let decoded_response =
-            
serde_json::from_slice::<Vec<HfStatus>>(&resp).map_err(new_json_deserialize_error)?;
-
-        assert_eq!(decoded_response.len(), 2);
-
-        let file_entry = HfStatus {
-            type_: "file".to_string(),
-            oid: "45fa7c3d85ee7dd4139adbc056da25ae136a65f2".to_string(),
-            size: 69512435,
-            lfs: Some(HfLfs {
-                oid: 
"b43f4c2ea569da1d66ca74e26ca8ea4430dfc29195e97144b2d0b4f3f6cafa1c".to_string(),
-                size: 69512435,
-                pointer_size: 133,
-            }),
-            path: "maelstrom/lib/maelstrom.jar".to_string(),
-            last_commit: None,
-            security: None,
-        };
-
-        assert_eq!(decoded_response[0], file_entry);
-
-        let dir_entry = HfStatus {
-            type_: "directory".to_string(),
-            oid: 
"b43f4c2ea569da1d66ca74e26ca8ea4430dfc29195e97144b2d0b4f3f6cafa1c".to_string(),
-            size: 69512435,
-            lfs: None,
-            path: "maelstrom/lib/plugins".to_string(),
-            last_commit: None,
-            security: None,
-        };
-
-        assert_eq!(decoded_response[1], dir_entry);
-
-        Ok(())
-    }
-
-    #[test]
-    fn parse_files_info_test() -> Result<()> {
-        let resp = Bytes::from(
-            r#"
-            [
-                {
-                    "type": "file",
-                    "oid": "45fa7c3d85ee7dd4139adbc056da25ae136a65f2",
-                    "size": 69512435,
-                    "lfs": {
-                        "oid": 
"b43f4c2ea569da1d66ca74e26ca8ea4430dfc29195e97144b2d0b4f3f6cafa1c",
-                        "size": 69512435,
-                        "pointerSize": 133
-                    },
-                    "path": "maelstrom/lib/maelstrom.jar",
-                    "lastCommit": {
-                        "id": "bc1ef030bf3743290d5e190695ab94582e51ae2f",
-                        "title": "Upload 141 files",
-                        "date": "2023-11-17T23:50:28.000Z"
-                    },
-                    "security": {
-                        "blobId": "45fa7c3d85ee7dd4139adbc056da25ae136a65f2",
-                        "name": "maelstrom/lib/maelstrom.jar",
-                        "safe": true,
-                        "avScan": {
-                            "virusFound": false,
-                            "virusNames": null
-                        },
-                        "pickleImportScan": {
-                            "highestSafetyLevel": "innocuous",
-                            "imports": [
-                                {"module": "torch", "name": "FloatStorage", 
"safety": "innocuous"},
-                                {"module": "collections", "name": 
"OrderedDict", "safety": "innocuous"},
-                                {"module": "torch", "name": "LongStorage", 
"safety": "innocuous"},
-                                {"module": "torch._utils", "name": 
"_rebuild_tensor_v2", "safety": "innocuous"}
-                            ]
-                        }
-                    }
-                }
-            ]
-            "#,
-        );
-
-        let decoded_response =
-            
serde_json::from_slice::<Vec<HfStatus>>(&resp).map_err(new_json_deserialize_error)?;
-
-        assert_eq!(decoded_response.len(), 1);
-
-        let file_info = HfStatus {
-            type_: "file".to_string(),
-            oid: "45fa7c3d85ee7dd4139adbc056da25ae136a65f2".to_string(),
-            size: 69512435,
-            lfs: Some(HfLfs {
-                oid: 
"b43f4c2ea569da1d66ca74e26ca8ea4430dfc29195e97144b2d0b4f3f6cafa1c".to_string(),
-                size: 69512435,
-                pointer_size: 133,
-            }),
-            path: "maelstrom/lib/maelstrom.jar".to_string(),
-            last_commit: Some(HfLastCommit {
-                id: "bc1ef030bf3743290d5e190695ab94582e51ae2f".to_string(),
-                title: "Upload 141 files".to_string(),
-                date: "2023-11-17T23:50:28.000Z".to_string(),
-            }),
-            security: Some(HfSecurity {
-                blob_id: 
"45fa7c3d85ee7dd4139adbc056da25ae136a65f2".to_string(),
-                safe: true,
-                av_scan: Some(HfAvScan {
-                    virus_found: false,
-                    virus_names: None,
-                }),
-                pickle_import_scan: Some(HfPickleImportScan {
-                    highest_safety_level: "innocuous".to_string(),
-                    imports: vec![
-                        HfImport {
-                            module: "torch".to_string(),
-                            name: "FloatStorage".to_string(),
-                            safety: "innocuous".to_string(),
-                        },
-                        HfImport {
-                            module: "collections".to_string(),
-                            name: "OrderedDict".to_string(),
-                            safety: "innocuous".to_string(),
-                        },
-                        HfImport {
-                            module: "torch".to_string(),
-                            name: "LongStorage".to_string(),
-                            safety: "innocuous".to_string(),
-                        },
-                        HfImport {
-                            module: "torch._utils".to_string(),
-                            name: "_rebuild_tensor_v2".to_string(),
-                            safety: "innocuous".to_string(),
-                        },
-                    ],
-                }),
-            }),
-        };
-
-        assert_eq!(decoded_response[0], file_info);
+#[cfg(feature = "xet")]
+pub(super) fn map_xet_error(err: impl std::error::Error + Send + Sync + 
'static) -> Error {

Review Comment:
   We should not do this. Whenever possible, please translate underlying errors 
in meaning to OpenDAL's error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to