parthchandra commented on code in PR #1494:
URL: https://github.com/apache/datafusion-comet/pull/1494#discussion_r1992095249


##########
native/core/src/parquet/parquet_support.rs:
##########
@@ -199,38 +202,147 @@ fn cast_struct_to_struct(
     }
 }
 
-// Default object store which is local filesystem
+// Mirrors object_store::parse::parse_url for the hdfs object store
+#[cfg(feature = "hdfs")]
+fn parse_hdfs_url(url: &Url) -> Result<(Box<dyn ObjectStore>, Path), 
object_store::Error> {
+    match 
datafusion_comet_objectstore_hdfs::object_store::hdfs::HadoopFileSystem::new(url.as_ref())
+    {
+        Some(object_store) => {
+            let path = object_store.get_path(url.as_str());
+            Ok((Box::new(object_store), path))
+        }
+        _ => {
+            return Err(object_store::Error::Generic {
+                store: "HadoopFileSystem",
+                source: 
Box::new(datafusion_comet_objectstore_hdfs::object_store::hdfs::HadoopFileSystem::HdfsErr::Generic(
+                    "Could not create hdfs object store".to_string(),
+                )),
+            });
+        }
+    }
+}
+
 #[cfg(not(feature = "hdfs"))]
-pub(crate) fn register_object_store(
-    session_context: Arc<SessionContext>,
-) -> Result<ObjectStoreUrl, ExecutionError> {
-    let object_store = object_store::local::LocalFileSystem::new();
-    let url = ObjectStoreUrl::parse("file://")?;
-    session_context
-        .runtime_env()
-        .register_object_store(url.as_ref(), Arc::new(object_store));
-    Ok(url)
+fn parse_hdfs_url(_url: &Url) -> Result<(Box<dyn ObjectStore>, Path), 
object_store::Error> {
+    Err(object_store::Error::Generic {
+        store: "HadoopFileSystem",
+        source: "Hdfs support is not enabled in this build".into(),
+    })
 }
 
-// HDFS object store
-#[cfg(feature = "hdfs")]
-pub(crate) fn register_object_store(
-    session_context: Arc<SessionContext>,
-) -> Result<ObjectStoreUrl, ExecutionError> {
-    // TODO: read the namenode configuration from file schema or from 
spark.defaultFS
-    let url = ObjectStoreUrl::parse("hdfs://namenode:9000")?;
-    if let Some(object_store) =
-        
datafusion_comet_objectstore_hdfs::object_store::hdfs::HadoopFileSystem::new(url.as_ref())
-    {
-        session_context
-            .runtime_env()
-            .register_object_store(url.as_ref(), Arc::new(object_store));
+/// Parses the url, registers the object store, and returns a tuple of the 
object store url and object store path
+pub(crate) fn prepare_object_store(
+    runtime_env: Arc<RuntimeEnv>,
+    url: String,
+) -> Result<(ObjectStoreUrl, Path), ExecutionError> {
+    let mut url = Url::parse(url.as_str())
+        .map_err(|e| ExecutionError::GeneralError(format!("Error parsing URL 
{url}: {e}")))?;
+    let mut scheme = url.scheme();
+    if scheme == "s3a" {
+        scheme = "s3";
+        url.set_scheme("s3").map_err(|_| {
+            ExecutionError::GeneralError("Could not convert scheme from s3a to 
s3".to_string())
+        })?;
+    }
+    let url_key = format!(
+        "{}://{}",
+        scheme,
+        &url[url::Position::BeforeHost..url::Position::AfterPort],
+    );
+
+    let (object_store, object_store_path): (Box<dyn ObjectStore>, Path) = if 
scheme == "hdfs" {
+        parse_hdfs_url(&url)
+    } else {
+        parse_url(&url)
+    }
+    .map_err(|e| ExecutionError::GeneralError(e.to_string()))?;
+
+    let object_store_url = ObjectStoreUrl::parse(url_key.clone())?;
+    runtime_env.register_object_store(&url, Arc::from(object_store));
+    Ok((object_store_url, object_store_path))
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::execution::operators::ExecutionError;
+    use crate::parquet::parquet_support::prepare_object_store;
+    use datafusion_execution::object_store::ObjectStoreUrl;
+    use datafusion_execution::runtime_env::RuntimeEnv;
+    use object_store::path::Path;
+    use std::sync::Arc;
+    use url::Url;
+
+    #[test]
+    #[cfg(not(feature = "hdfs"))]
+    fn test_prepare_object_store() {
+        let local_file_system_url = 
"file:///comet/spark-warehouse/part-00000.snappy.parquet";
+        let s3_url = 
"s3a://test_bucket/comet/spark-warehouse/part-00000.snappy.parquet";
+        let hdfs_url = 
"hdfs://localhost:8020/comet/spark-warehouse/part-00000.snappy.parquet";
+
+        let all_urls = [local_file_system_url, s3_url, hdfs_url];
+        let expected: Vec<Result<(ObjectStoreUrl, Path), ExecutionError>> = 
vec![
+            Ok((
+                ObjectStoreUrl::parse("file://").unwrap(),
+                Path::from("/comet/spark-warehouse/part-00000.snappy.parquet"),
+            )),
+            Ok((
+                ObjectStoreUrl::parse("s3://test_bucket").unwrap(),
+                Path::from("/comet/spark-warehouse/part-00000.snappy.parquet"),
+            )),
+            Err(ExecutionError::GeneralError(
+                "Generic HadoopFileSystem error: Hdfs support is not enabled 
in this build"
+                    .parse()
+                    .unwrap(),
+            )),
+        ];
 
-        return Ok(url);
+        for (i, url_str) in all_urls.iter().enumerate() {
+            let url = &Url::parse(url_str).unwrap();
+            let res = prepare_object_store(Arc::new(RuntimeEnv::default()), 
url.to_string());
+
+            let expected = expected.get(i).unwrap();
+            match expected {
+                Ok((o, p)) => {
+                    let (r_o, r_p) = res.unwrap();
+                    assert_eq!(r_o, *o);
+                    assert_eq!(r_p, *p);
+                }
+                Err(e) => {
+                    assert!(res.is_err());
+                    let Err(res_e) = res else {
+                        panic!("test failed")
+                    };
+                    assert_eq!(e.to_string(), res_e.to_string())
+                }
+            }
+        }
     }
 
-    Err(ExecutionError::GeneralError(format!(
-        "HDFS object store cannot be created for {}",
-        url
-    )))
+    #[test]
+    #[cfg(feature = "hdfs")]
+    fn test_prepare_object_store() {
+        let hdfs_url = 
"hdfs://localhost:8020/comet/spark-warehouse/part-00000.snappy.parquet";

Review Comment:
   Done: https://github.com/apache/datafusion-comet/issues/1515



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to