This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 03bbc0da83 Remove unused file_type.rs (#7478)
03bbc0da83 is described below
commit 03bbc0da8377d1a74cd851f09f9634f81fbcaee1
Author: Kousuke Saruta <[email protected]>
AuthorDate: Wed Sep 6 02:55:35 2023 +0900
Remove unused file_type.rs (#7478)
---
datafusion/common/src/file_type.rs | 410 -------------------------------------
1 file changed, 410 deletions(-)
diff --git a/datafusion/common/src/file_type.rs
b/datafusion/common/src/file_type.rs
deleted file mode 100644
index f8d4fc0a31..0000000000
--- a/datafusion/common/src/file_type.rs
+++ /dev/null
@@ -1,410 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! File type abstraction
-
-use crate::error::{DataFusionError, Result};
-#[cfg(feature = "compression")]
-use async_compression::tokio::bufread::{
- BzDecoder as AsyncBzDecoder, BzEncoder as AsyncBzEncoder,
- GzipDecoder as AsyncGzDecoder, GzipEncoder as AsyncGzEncoder,
- XzDecoder as AsyncXzDecoder, XzEncoder as AsyncXzEncoder,
- ZstdDecoder as AsyncZstdDecoer, ZstdEncoder as AsyncZstdEncoder,
-};
-
-use crate::parsers::CompressionTypeVariant;
-#[cfg(feature = "compression")]
-use async_compression::tokio::write::{BzEncoder, GzipEncoder, XzEncoder,
ZstdEncoder};
-use bytes::Bytes;
-#[cfg(feature = "compression")]
-use bzip2::read::MultiBzDecoder;
-#[cfg(feature = "compression")]
-use flate2::read::MultiGzDecoder;
-
-use core::fmt;
-use futures::stream::BoxStream;
-use futures::StreamExt;
-#[cfg(feature = "compression")]
-use futures::TryStreamExt;
-use std::fmt::Display;
-use std::str::FromStr;
-use tokio::io::AsyncWrite;
-#[cfg(feature = "compression")]
-use tokio_util::io::{ReaderStream, StreamReader};
-#[cfg(feature = "compression")]
-use xz2::read::XzDecoder;
-#[cfg(feature = "compression")]
-use zstd::Decoder as ZstdDecoder;
-use CompressionTypeVariant::*;
-
-/// The default file extension of arrow files
-pub const DEFAULT_ARROW_EXTENSION: &str = ".arrow";
-/// The default file extension of avro files
-pub const DEFAULT_AVRO_EXTENSION: &str = ".avro";
-/// The default file extension of csv files
-pub const DEFAULT_CSV_EXTENSION: &str = ".csv";
-/// The default file extension of json files
-pub const DEFAULT_JSON_EXTENSION: &str = ".json";
-/// The default file extension of parquet files
-pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet";
-
-/// Define each `FileType`/`FileCompressionType`'s extension
-pub trait GetExt {
- /// File extension getter
- fn get_ext(&self) -> String;
-}
-
-/// Readable file compression type
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub struct FileCompressionType {
- variant: CompressionTypeVariant,
-}
-
-impl GetExt for FileCompressionType {
- fn get_ext(&self) -> String {
- match self.variant {
- GZIP => ".gz".to_owned(),
- BZIP2 => ".bz2".to_owned(),
- XZ => ".xz".to_owned(),
- ZSTD => ".zst".to_owned(),
- UNCOMPRESSED => "".to_owned(),
- }
- }
-}
-
-impl From<CompressionTypeVariant> for FileCompressionType {
- fn from(t: CompressionTypeVariant) -> Self {
- Self { variant: t }
- }
-}
-
-impl FromStr for FileCompressionType {
- type Err = DataFusionError;
-
- fn from_str(s: &str) -> Result<Self> {
- let variant = CompressionTypeVariant::from_str(s).map_err(|_| {
- DataFusionError::NotImplemented(format!("Unknown
FileCompressionType: {s}"))
- })?;
- Ok(Self { variant })
- }
-}
-
-/// `FileCompressionType` implementation
-impl FileCompressionType {
- /// Gzip-ed file
- pub const GZIP: Self = Self { variant: GZIP };
-
- /// Bzip2-ed file
- pub const BZIP2: Self = Self { variant: BZIP2 };
-
- /// Xz-ed file (liblzma)
- pub const XZ: Self = Self { variant: XZ };
-
- /// Zstd-ed file
- pub const ZSTD: Self = Self { variant: ZSTD };
-
- /// Uncompressed file
- pub const UNCOMPRESSED: Self = Self {
- variant: UNCOMPRESSED,
- };
-
- /// The file is compressed or not
- pub const fn is_compressed(&self) -> bool {
- self.variant.is_compressed()
- }
-
- /// Given a `Stream`, create a `Stream` which data are compressed with
`FileCompressionType`.
- pub fn convert_to_compress_stream(
- &self,
- s: BoxStream<'static, Result<Bytes>>,
- ) -> Result<BoxStream<'static, Result<Bytes>>> {
- Ok(match self.variant {
- #[cfg(feature = "compression")]
- GZIP =>
ReaderStream::new(AsyncGzEncoder::new(StreamReader::new(s)))
- .map_err(DataFusionError::from)
- .boxed(),
- #[cfg(feature = "compression")]
- BZIP2 =>
ReaderStream::new(AsyncBzEncoder::new(StreamReader::new(s)))
- .map_err(DataFusionError::from)
- .boxed(),
- #[cfg(feature = "compression")]
- XZ => ReaderStream::new(AsyncXzEncoder::new(StreamReader::new(s)))
- .map_err(DataFusionError::from)
- .boxed(),
- #[cfg(feature = "compression")]
- ZSTD =>
ReaderStream::new(AsyncZstdEncoder::new(StreamReader::new(s)))
- .map_err(DataFusionError::from)
- .boxed(),
- #[cfg(not(feature = "compression"))]
- GZIP | BZIP2 | XZ | ZSTD => {
- return crate::error::_not_impl_err!("Compression feature is
not enabled")
- }
- UNCOMPRESSED => s.boxed(),
- })
- }
-
- /// Wrap the given `AsyncWrite` so that it performs compressed writes
- /// according to this `FileCompressionType`.
- pub fn convert_async_writer(
- &self,
- w: Box<dyn AsyncWrite + Send + Unpin>,
- ) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
- Ok(match self.variant {
- #[cfg(feature = "compression")]
- GZIP => Box::new(GzipEncoder::new(w)),
- #[cfg(feature = "compression")]
- BZIP2 => Box::new(BzEncoder::new(w)),
- #[cfg(feature = "compression")]
- XZ => Box::new(XzEncoder::new(w)),
- #[cfg(feature = "compression")]
- ZSTD => Box::new(ZstdEncoder::new(w)),
- #[cfg(not(feature = "compression"))]
- GZIP | BZIP2 | XZ | ZSTD => {
- return crate::error::_not_impl_err!("Compression feature is
not enabled")
- }
- UNCOMPRESSED => w,
- })
- }
-
- /// Given a `Stream`, create a `Stream` which data are decompressed with
`FileCompressionType`.
- pub fn convert_stream(
- &self,
- s: BoxStream<'static, Result<Bytes>>,
- ) -> Result<BoxStream<'static, Result<Bytes>>> {
- Ok(match self.variant {
- #[cfg(feature = "compression")]
- GZIP =>
ReaderStream::new(AsyncGzDecoder::new(StreamReader::new(s)))
- .map_err(DataFusionError::from)
- .boxed(),
- #[cfg(feature = "compression")]
- BZIP2 =>
ReaderStream::new(AsyncBzDecoder::new(StreamReader::new(s)))
- .map_err(DataFusionError::from)
- .boxed(),
- #[cfg(feature = "compression")]
- XZ => ReaderStream::new(AsyncXzDecoder::new(StreamReader::new(s)))
- .map_err(DataFusionError::from)
- .boxed(),
- #[cfg(feature = "compression")]
- ZSTD =>
ReaderStream::new(AsyncZstdDecoer::new(StreamReader::new(s)))
- .map_err(DataFusionError::from)
- .boxed(),
- #[cfg(not(feature = "compression"))]
- GZIP | BZIP2 | XZ | ZSTD => {
- return crate::error::_not_impl_err!("Compression feature is
not enabled")
- }
- UNCOMPRESSED => s.boxed(),
- })
- }
-
- /// Given a `Read`, create a `Read` which data are decompressed with
`FileCompressionType`.
- pub fn convert_read<T: std::io::Read + Send + 'static>(
- &self,
- r: T,
- ) -> Result<Box<dyn std::io::Read + Send>> {
- Ok(match self.variant {
- #[cfg(feature = "compression")]
- GZIP => Box::new(MultiGzDecoder::new(r)),
- #[cfg(feature = "compression")]
- BZIP2 => Box::new(MultiBzDecoder::new(r)),
- #[cfg(feature = "compression")]
- XZ => Box::new(XzDecoder::new_multi_decoder(r)),
- #[cfg(feature = "compression")]
- ZSTD => match ZstdDecoder::new(r) {
- Ok(decoder) => Box::new(decoder),
- Err(e) => return Err(DataFusionError::External(Box::new(e))),
- },
- #[cfg(not(feature = "compression"))]
- GZIP | BZIP2 | XZ | ZSTD => {
- return crate::error::_not_impl_err!("Compression feature is
not enabled")
- }
- UNCOMPRESSED => Box::new(r),
- })
- }
-}
-
-/// Readable file type
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
-pub enum FileType {
- /// Apache Arrow file
- ARROW,
- /// Apache Avro file
- AVRO,
- /// Apache Parquet file
- PARQUET,
- /// CSV file
- CSV,
- /// JSON file
- JSON,
-}
-
-impl GetExt for FileType {
- fn get_ext(&self) -> String {
- match self {
- FileType::ARROW => DEFAULT_ARROW_EXTENSION.to_owned(),
- FileType::AVRO => DEFAULT_AVRO_EXTENSION.to_owned(),
- FileType::PARQUET => DEFAULT_PARQUET_EXTENSION.to_owned(),
- FileType::CSV => DEFAULT_CSV_EXTENSION.to_owned(),
- FileType::JSON => DEFAULT_JSON_EXTENSION.to_owned(),
- }
- }
-}
-
-impl Display for FileType {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- let out = match self {
- FileType::CSV => "csv",
- FileType::JSON => "json",
- FileType::PARQUET => "parquet",
- FileType::AVRO => "avro",
- FileType::ARROW => "arrow",
- };
- write!(f, "{}", out)
- }
-}
-
-impl FromStr for FileType {
- type Err = DataFusionError;
-
- fn from_str(s: &str) -> Result<Self> {
- let s = s.to_uppercase();
- match s.as_str() {
- "ARROW" => Ok(FileType::ARROW),
- "AVRO" => Ok(FileType::AVRO),
- "PARQUET" => Ok(FileType::PARQUET),
- "CSV" => Ok(FileType::CSV),
- "JSON" | "NDJSON" => Ok(FileType::JSON),
- _ => crate::error::_not_impl_err!("Unknown FileType: {s}"),
- }
- }
-}
-
-impl FileType {
- /// Given a `FileCompressionType`, return the `FileType`'s extension with
compression suffix
- pub fn get_ext_with_compression(&self, c: FileCompressionType) ->
Result<String> {
- let ext = self.get_ext();
-
- match self {
- FileType::JSON | FileType::CSV => Ok(format!("{}{}", ext,
c.get_ext())),
- FileType::PARQUET | FileType::AVRO | FileType::ARROW => match
c.variant {
- UNCOMPRESSED => Ok(ext),
- _ => crate::error::_internal_err!(
- "FileCompressionType can be specified for CSV/JSON
FileType."
- ),
- },
- }
- }
-}
-
-#[cfg(test)]
-mod tests {
- use crate::error::DataFusionError;
- use crate::file_type::{FileCompressionType, FileType};
- use std::str::FromStr;
-
- #[test]
- fn get_ext_with_compression() {
- for (file_type, compression, extension) in [
- (FileType::CSV, FileCompressionType::UNCOMPRESSED, ".csv"),
- (FileType::CSV, FileCompressionType::GZIP, ".csv.gz"),
- (FileType::CSV, FileCompressionType::XZ, ".csv.xz"),
- (FileType::CSV, FileCompressionType::BZIP2, ".csv.bz2"),
- (FileType::CSV, FileCompressionType::ZSTD, ".csv.zst"),
- (FileType::JSON, FileCompressionType::UNCOMPRESSED, ".json"),
- (FileType::JSON, FileCompressionType::GZIP, ".json.gz"),
- (FileType::JSON, FileCompressionType::XZ, ".json.xz"),
- (FileType::JSON, FileCompressionType::BZIP2, ".json.bz2"),
- (FileType::JSON, FileCompressionType::ZSTD, ".json.zst"),
- ] {
- assert_eq!(
- file_type.get_ext_with_compression(compression).unwrap(),
- extension
- );
- }
-
- // Cannot specify compression for these file types
- for (file_type, extension) in
- [(FileType::AVRO, ".avro"), (FileType::PARQUET, ".parquet")]
- {
- assert_eq!(
- file_type
-
.get_ext_with_compression(FileCompressionType::UNCOMPRESSED)
- .unwrap(),
- extension
- );
- for compression in [
- FileCompressionType::GZIP,
- FileCompressionType::XZ,
- FileCompressionType::BZIP2,
- FileCompressionType::ZSTD,
- ] {
- assert!(matches!(
- file_type.get_ext_with_compression(compression),
- Err(DataFusionError::Internal(_))
- ));
- }
- }
- }
-
- #[test]
- fn from_str() {
- for (ext, file_type) in [
- ("csv", FileType::CSV),
- ("CSV", FileType::CSV),
- ("json", FileType::JSON),
- ("JSON", FileType::JSON),
- ("avro", FileType::AVRO),
- ("AVRO", FileType::AVRO),
- ("parquet", FileType::PARQUET),
- ("PARQUET", FileType::PARQUET),
- ] {
- assert_eq!(FileType::from_str(ext).unwrap(), file_type);
- }
-
- assert!(matches!(
- FileType::from_str("Unknown"),
- Err(DataFusionError::NotImplemented(_))
- ));
-
- for (ext, compression_type) in [
- ("gz", FileCompressionType::GZIP),
- ("GZ", FileCompressionType::GZIP),
- ("gzip", FileCompressionType::GZIP),
- ("GZIP", FileCompressionType::GZIP),
- ("xz", FileCompressionType::XZ),
- ("XZ", FileCompressionType::XZ),
- ("bz2", FileCompressionType::BZIP2),
- ("BZ2", FileCompressionType::BZIP2),
- ("bzip2", FileCompressionType::BZIP2),
- ("BZIP2", FileCompressionType::BZIP2),
- ("zst", FileCompressionType::ZSTD),
- ("ZST", FileCompressionType::ZSTD),
- ("zstd", FileCompressionType::ZSTD),
- ("ZSTD", FileCompressionType::ZSTD),
- ("", FileCompressionType::UNCOMPRESSED),
- ] {
- assert_eq!(
- FileCompressionType::from_str(ext).unwrap(),
- compression_type
- );
- }
-
- assert!(matches!(
- FileCompressionType::from_str("Unknown"),
- Err(DataFusionError::NotImplemented(_))
- ));
- }
-}