This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new a03ce564f1 fix: object store http header last modified (#4834)
a03ce564f1 is described below
commit a03ce564f1c95e10c78e6a065996cb036ca13cef
Author: Cory Grinstead <[email protected]>
AuthorDate: Tue Sep 19 08:59:21 2023 -0500
fix: object store http header last modified (#4834)
* fix: object store http header last modified
* refactor: make headermeta configurable on required fields
* Update object_store/src/client/header.rs
Co-authored-by: Raphael Taylor-Davies
<[email protected]>
* Update object_store/src/client/header.rs
Co-authored-by: Raphael Taylor-Davies
<[email protected]>
* Update object_store/src/client/header.rs
Co-authored-by: Raphael Taylor-Davies
<[email protected]>
---------
Co-authored-by: Raphael Taylor-Davies
<[email protected]>
---
object_store/src/client/get.rs | 12 ++++----
object_store/src/client/header.rs | 62 ++++++++++++++++++++++++++++++---------
object_store/src/http/mod.rs | 9 ++++--
3 files changed, 62 insertions(+), 21 deletions(-)
diff --git a/object_store/src/client/get.rs b/object_store/src/client/get.rs
index 6b2d60ae56..8b84a079c7 100644
--- a/object_store/src/client/get.rs
+++ b/object_store/src/client/get.rs
@@ -49,8 +49,8 @@ impl<T: GetClient> GetClientExt for T {
async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult> {
let range = options.range.clone();
let response = self.get_request(location, options, false).await?;
- let meta =
- header_meta(location, response.headers()).map_err(|e|
Error::Generic {
+ let meta = header_meta(location, response.headers(),
Default::default())
+ .map_err(|e| Error::Generic {
store: T::STORE,
source: Box::new(e),
})?;
@@ -73,9 +73,11 @@ impl<T: GetClient> GetClientExt for T {
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let options = GetOptions::default();
let response = self.get_request(location, options, true).await?;
- header_meta(location, response.headers()).map_err(|e| Error::Generic {
- store: T::STORE,
- source: Box::new(e),
+ header_meta(location, response.headers(),
Default::default()).map_err(|e| {
+ Error::Generic {
+ store: T::STORE,
+ source: Box::new(e),
+ }
})
}
}
diff --git a/object_store/src/client/header.rs
b/object_store/src/client/header.rs
index cc4f16eaa5..b55494cdb8 100644
--- a/object_store/src/client/header.rs
+++ b/object_store/src/client/header.rs
@@ -19,11 +19,33 @@
use crate::path::Path;
use crate::ObjectMeta;
-use chrono::{DateTime, Utc};
+use chrono::{DateTime, TimeZone, Utc};
use hyper::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED};
use hyper::HeaderMap;
use snafu::{OptionExt, ResultExt, Snafu};
+#[derive(Debug)]
+/// Configuration for header extraction
+pub struct HeaderConfig {
+ /// Whether to require an ETag header when extracting [`ObjectMeta`] from
headers.
+ ///
+ /// Defaults to `true`
+ pub etag_required: bool,
+ /// Whether to require a Last-Modified header when extracting
[`ObjectMeta`] from headers.
+ ///
+ /// Defaults to `true`
+ pub last_modified_required: bool,
+}
+
+impl Default for HeaderConfig {
+ fn default() -> Self {
+ Self {
+ etag_required: true,
+ last_modified_required: true,
+ }
+ }
+}
+
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("ETag Header missing from response"))]
@@ -52,32 +74,44 @@ pub enum Error {
}
/// Extracts [`ObjectMeta`] from the provided [`HeaderMap`]
-pub fn header_meta(location: &Path, headers: &HeaderMap) -> Result<ObjectMeta,
Error> {
- let last_modified = headers
- .get(LAST_MODIFIED)
- .context(MissingLastModifiedSnafu)?;
+pub fn header_meta(
+ location: &Path,
+ headers: &HeaderMap,
+ cfg: HeaderConfig,
+) -> Result<ObjectMeta, Error> {
+ let last_modified = match headers.get(LAST_MODIFIED) {
+ Some(last_modified) => {
+ let last_modified =
last_modified.to_str().context(BadHeaderSnafu)?;
+ DateTime::parse_from_rfc2822(last_modified)
+ .context(InvalidLastModifiedSnafu { last_modified })?
+ .with_timezone(&Utc)
+ }
+ None if cfg.last_modified_required => return
Err(Error::MissingLastModified),
+ None => Utc.timestamp_nanos(0),
+ };
+
+ let e_tag = match headers.get(ETAG) {
+ Some(e_tag) => {
+ let e_tag = e_tag.to_str().context(BadHeaderSnafu)?;
+ Some(e_tag.to_string())
+ }
+ None if cfg.etag_required => return Err(Error::MissingEtag),
+ None => None,
+ };
let content_length = headers
.get(CONTENT_LENGTH)
.context(MissingContentLengthSnafu)?;
- let last_modified = last_modified.to_str().context(BadHeaderSnafu)?;
- let last_modified = DateTime::parse_from_rfc2822(last_modified)
- .context(InvalidLastModifiedSnafu { last_modified })?
- .with_timezone(&Utc);
-
let content_length = content_length.to_str().context(BadHeaderSnafu)?;
let content_length = content_length
.parse()
.context(InvalidContentLengthSnafu { content_length })?;
- let e_tag = headers.get(ETAG).context(MissingEtagSnafu)?;
- let e_tag = e_tag.to_str().context(BadHeaderSnafu)?;
-
Ok(ObjectMeta {
location: location.clone(),
last_modified,
size: content_length,
- e_tag: Some(e_tag.to_string()),
+ e_tag,
})
}
diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs
index e8e7b459e1..6143819756 100644
--- a/object_store/src/http/mod.rs
+++ b/object_store/src/http/mod.rs
@@ -40,7 +40,7 @@ use snafu::{OptionExt, ResultExt, Snafu};
use tokio::io::AsyncWrite;
use url::Url;
-use crate::client::header::header_meta;
+use crate::client::header::{header_meta, HeaderConfig};
use crate::http::client::Client;
use crate::path::Path;
use crate::{
@@ -117,7 +117,12 @@ impl ObjectStore for HttpStore {
async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult> {
let range = options.range.clone();
let response = self.client.get(location, options).await?;
- let meta = header_meta(location,
response.headers()).context(MetadataSnafu)?;
+ let cfg = HeaderConfig {
+ last_modified_required: false,
+ etag_required: false,
+ };
+ let meta =
+ header_meta(location, response.headers(),
cfg).context(MetadataSnafu)?;
let stream = response
.bytes_stream()