kszucs commented on code in PR #7185:
URL: https://github.com/apache/opendal/pull/7185#discussion_r2836250374


##########
core/services/hf/src/core.rs:
##########
@@ -18,325 +18,606 @@
 use std::fmt::Debug;
 use std::sync::Arc;
 
+use backon::ExponentialBuilder;
+use backon::Retryable;
+use bytes::Buf;
 use bytes::Bytes;
 use http::Request;
 use http::Response;
 use http::header;
-use percent_encoding::{NON_ALPHANUMERIC, utf8_percent_encode};
 use serde::Deserialize;
 
-use super::backend::RepoType;
+#[cfg(feature = "xet")]
+use subxet::data::XetFileInfo;
+#[cfg(feature = "xet")]
+use subxet::data::streaming::XetClient;
+#[cfg(feature = "xet")]
+use subxet::utils::auth::TokenRefresher;
+
+use super::error::parse_error;
+use super::uri::HfRepo;
 use opendal_core::raw::*;
 use opendal_core::*;
 
-fn percent_encode_revision(revision: &str) -> String {
-    utf8_percent_encode(revision, NON_ALPHANUMERIC).to_string()
+/// API payload structures for preupload operations
+#[derive(serde::Serialize)]
+struct PreuploadFile {
+    path: String,
+    size: i64,
+    sample: String,
+}
+
+#[derive(serde::Serialize)]
+struct PreuploadRequest {
+    files: Vec<PreuploadFile>,
+}
+
+#[derive(serde::Deserialize, Debug)]
+struct PreuploadFileResponse {
+    #[allow(dead_code)]
+    path: String,
+    #[serde(rename = "uploadMode")]
+    upload_mode: String,
+}
+
+#[derive(serde::Deserialize, Debug)]
+struct PreuploadResponse {
+    files: Vec<PreuploadFileResponse>,
+}
+
+/// API payload structures for commit operations
+#[derive(Debug, serde::Serialize)]
+pub(super) struct CommitFile {
+    pub path: String,
+    pub content: String,
+    pub encoding: String,
+}
+
+#[derive(Debug, serde::Serialize)]
+pub(super) struct LfsFile {
+    pub path: String,
+    pub oid: String,
+    pub algo: String,
+    pub size: u64,
+}
+
+#[derive(Clone, Debug, serde::Serialize)]
+pub(super) struct DeletedFile {
+    pub path: String,
+}
+
+/// Bucket batch operation payload structures
+#[cfg(feature = "xet")]
+#[derive(Debug, serde::Serialize)]
+#[serde(tag = "type", rename_all = "camelCase")]
+pub(super) enum BucketOperation {
+    #[serde(rename_all = "camelCase")]
+    AddFile { path: String, xet_hash: String },
+    #[serde(rename_all = "camelCase")]
+    #[allow(dead_code)]
+    DeleteFile { path: String },
+}
+
+#[derive(serde::Serialize)]
+pub(super) struct MixedCommitPayload {
+    pub summary: String,
+    #[serde(skip_serializing_if = "Vec::is_empty")]
+    pub files: Vec<CommitFile>,
+    #[serde(rename = "lfsFiles", skip_serializing_if = "Vec::is_empty")]
+    pub lfs_files: Vec<LfsFile>,
+    #[serde(rename = "deletedFiles", skip_serializing_if = "Vec::is_empty")]
+    pub deleted_files: Vec<DeletedFile>,
+}
+
+// API response types
+
+#[derive(serde::Deserialize, Debug)]
+pub(super) struct CommitResponse {
+    #[serde(rename = "commitOid")]
+    pub commit_oid: Option<String>,
+    #[allow(dead_code)]
+    #[serde(rename = "commitUrl")]
+    pub commit_url: Option<String>,
 }
 
+#[derive(Deserialize, Eq, PartialEq, Debug)]
+#[serde(rename_all = "camelCase")]
+pub(super) struct PathInfo {
+    #[serde(rename = "type")]
+    pub type_: String,
+    #[serde(default)]
+    pub oid: Option<String>,
+    pub size: u64,
+    #[serde(default)]
+    pub lfs: Option<LfsInfo>,
+    pub path: String,
+    #[serde(default)]
+    pub last_commit: Option<LastCommit>,
+}
+
+impl PathInfo {
+    pub fn entry_mode(&self) -> EntryMode {
+        match self.type_.as_str() {
+            "directory" => EntryMode::DIR,
+            "file" => EntryMode::FILE,
+            _ => EntryMode::Unknown,
+        }
+    }
+
+    pub fn metadata(&self) -> Result<Metadata> {
+        let mode = self.entry_mode();
+        let mut meta = Metadata::new(mode);
+
+        if let Some(commit_info) = self.last_commit.as_ref() {
+            meta.set_last_modified(commit_info.date.parse::<Timestamp>()?);
+        }
+
+        if mode == EntryMode::FILE {
+            meta.set_content_length(self.size);
+            // For buckets, oid may be None; for regular repos, prefer lfs.oid 
then oid
+            if let Some(lfs) = &self.lfs {
+                meta.set_etag(&lfs.oid);
+            } else if let Some(oid) = &self.oid {
+                meta.set_etag(oid);
+            }
+        }
+
+        Ok(meta)
+    }
+}
+
+#[derive(Deserialize, Eq, PartialEq, Debug)]
+pub(super) struct LfsInfo {
+    pub oid: String,
+}
+
+#[derive(Deserialize, Eq, PartialEq, Debug)]
+pub(super) struct LastCommit {
+    pub date: String,
+}
+
+#[cfg(feature = "xet")]
+#[derive(Clone, Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub(super) struct XetToken {
+    pub access_token: String,
+    pub cas_url: String,
+    pub exp: u64,
+}
+
+#[cfg(feature = "xet")]
+pub(super) struct XetTokenRefresher {
+    core: HfCore,
+    token_type: &'static str,
+}
+
+#[cfg(feature = "xet")]
+impl XetTokenRefresher {
+    pub(super) fn new(core: &HfCore, token_type: &'static str) -> Self {
+        Self {
+            core: core.clone(),
+            token_type,
+        }
+    }
+}
+
+#[cfg(feature = "xet")]
+#[async_trait::async_trait]
+impl TokenRefresher for XetTokenRefresher {
+    async fn refresh(
+        &self,
+    ) -> std::result::Result<(String, u64), subxet::utils::errors::AuthError> {
+        let token = self
+            .core
+            .xet_token(self.token_type)
+            .await
+            .map_err(subxet::utils::errors::AuthError::token_refresh_failure)?;
+        Ok((token.access_token, token.exp))
+    }
+}
+
+// Core HuggingFace client that manages API interactions, authentication
+// and shared logic for reader/writer/lister.
+
+#[derive(Clone)]
 pub struct HfCore {
     pub info: Arc<AccessorInfo>,
 
-    pub repo_type: RepoType,
-    pub repo_id: String,
-    pub revision: String,
+    pub repo: HfRepo,
     pub root: String,
     pub token: Option<String>,
     pub endpoint: String,
+    pub max_retries: usize,
+
+    // Whether XET storage protocol is enabled for reads. When true
+    // and the `xet` feature is compiled in, reads will check for
+    // XET-backed files and use the XET protocol for downloading.
+    #[cfg(feature = "xet")]
+    pub xet_enabled: bool,
+
+    /// HTTP client with redirects disabled, used by XET probes to
+    /// inspect headers on 302 responses.
+    #[cfg(feature = "xet")]
+    pub no_redirect_client: HttpClient,
 }
 
 impl Debug for HfCore {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        f.debug_struct("HfCore")
-            .field("repo_type", &self.repo_type)
-            .field("repo_id", &self.repo_id)
-            .field("revision", &self.revision)
+        let mut s = f.debug_struct("HfCore");
+        s.field("repo", &self.repo)
             .field("root", &self.root)
-            .field("endpoint", &self.endpoint)
-            .finish_non_exhaustive()
+            .field("endpoint", &self.endpoint);
+        #[cfg(feature = "xet")]
+        s.field("xet_enabled", &self.xet_enabled);
+        s.finish_non_exhaustive()
     }
 }
 
 impl HfCore {
-    pub async fn hf_path_info(&self, path: &str) -> Result<Response<Buffer>> {
-        let p = build_abs_path(&self.root, path)
-            .trim_end_matches('/')
-            .to_string();
-
-        let url = match self.repo_type {
-            RepoType::Model => format!(
-                "{}/api/models/{}/paths-info/{}",
-                &self.endpoint,
-                &self.repo_id,
-                percent_encode_revision(&self.revision)
-            ),
-            RepoType::Dataset => format!(
-                "{}/api/datasets/{}/paths-info/{}",
-                &self.endpoint,
-                &self.repo_id,
-                percent_encode_revision(&self.revision)
-            ),
-            RepoType::Space => format!(
-                "{}/api/spaces/{}/paths-info/{}",
-                &self.endpoint,
-                &self.repo_id,
-                percent_encode_revision(&self.revision)
-            ),
+    pub fn new(
+        info: Arc<AccessorInfo>,
+        repo: HfRepo,
+        root: String,
+        token: Option<String>,
+        endpoint: String,
+        max_retries: usize,
+        #[cfg(feature = "xet")] xet_enabled: bool,
+    ) -> Result<Self> {
+        // When xet is enabled at runtime, use dedicated reqwest clients 
instead
+        // of the global one. This avoids "dispatch task is gone" errors when
+        // multiple tokio runtimes exist (e.g. in tests) and ensures the
+        // no-redirect client shares the same runtime as the standard client.
+        // When xet is disabled, preserve whatever HTTP client is already set
+        // on `info` (important for mock-based unit tests).
+        #[cfg(feature = "xet")]
+        let no_redirect_client = if xet_enabled {

Review Comment:
   We need a client not following the redirects to retrieve the `X-Xet-Hash` 
header. If I'm not mistaken we need another http client for this, not sure how 
else could I handle it. 
   
   Since a new revamped Xet client is in the works (which will be publised and 
this PR should eventually depend on) we may be able to delegate this task to 
the Xet client's side. What do you think @hoytak?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to