Repository: arrow Updated Branches: refs/heads/master 3a84653a3 -> e1d574c7c
ARROW-1301: [C++/Python] More complete filesystem API for HDFS This also includes a fair bit of API normalization and cleaning. Author: Wes McKinney <wes.mckin...@twosigma.com> Closes #926 from wesm/ARROW-1301 and squashes the following commits: bcc9310 [Wes McKinney] Add missing API 8bf51f5 [Wes McKinney] Add more filesystem methods, tests for HDFS 98847b5 [Wes McKinney] Some HDFS refactoring. Implement chmod, chown. Normalize Filesystem->FileSystem Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/e1d574c7 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/e1d574c7 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/e1d574c7 Branch: refs/heads/master Commit: e1d574c7cf2207ddde64273cd34ba8352bc003bc Parents: 3a84653 Author: Wes McKinney <wes.mckin...@twosigma.com> Authored: Tue Aug 1 15:51:18 2017 +0200 Committer: Uwe L. Korn <uw...@xhochy.com> Committed: Tue Aug 1 15:51:18 2017 +0200 ---------------------------------------------------------------------- cpp/src/arrow/io/hdfs-internal.cc | 16 +- cpp/src/arrow/io/hdfs.cc | 108 +++++-- cpp/src/arrow/io/hdfs.h | 48 ++- cpp/src/arrow/io/interfaces.h | 25 +- cpp/src/arrow/io/io-hdfs-test.cc | 59 ++-- python/doc/source/api.rst | 10 +- python/doc/source/conf.py | 3 +- python/doc/source/filesystems.rst | 43 ++- python/doc/source/memory.rst | 7 - python/pyarrow/__init__.py | 6 +- python/pyarrow/filesystem.py | 105 +++++-- python/pyarrow/hdfs.py | 34 ++- python/pyarrow/includes/libarrow.pxd | 15 +- python/pyarrow/io-hdfs.pxi | 468 ++++++++++++++++++++++++++++++ python/pyarrow/io.pxi | 298 ------------------- python/pyarrow/lib.pyx | 1 + python/pyarrow/parquet.py | 14 +- python/pyarrow/tests/test_hdfs.py | 97 ++++++- python/pyarrow/tests/test_parquet.py | 4 +- 19 files changed, 915 insertions(+), 446 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/e1d574c7/cpp/src/arrow/io/hdfs-internal.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/hdfs-internal.cc b/cpp/src/arrow/io/hdfs-internal.cc index 8f42b1c..fd7417b 100644 --- a/cpp/src/arrow/io/hdfs-internal.cc +++ b/cpp/src/arrow/io/hdfs-internal.cc @@ -462,21 +462,11 @@ tOffset LibHdfsShim::GetUsed(hdfsFS fs) { return this->hdfsGetUsed(fs); } int LibHdfsShim::Chown(hdfsFS fs, const char* path, const char* owner, const char* group) { - GET_SYMBOL(this, hdfsChown); - if (this->hdfsChown) { - return this->hdfsChown(fs, path, owner, group); - } else { - return 0; - } + return this->hdfsChown(fs, path, owner, group); } int LibHdfsShim::Chmod(hdfsFS fs, const char* path, short mode) { // NOLINT - GET_SYMBOL(this, hdfsChmod); - if (this->hdfsChmod) { - return this->hdfsChmod(fs, path, mode); - } else { - return 0; - } + return this->hdfsChmod(fs, path, mode); } int LibHdfsShim::Utime(hdfsFS fs, const char* path, tTime mtime, tTime atime) { @@ -504,6 +494,8 @@ Status LibHdfsShim::GetRequiredSymbols() { GET_SYMBOL_REQUIRED(this, hdfsGetUsed); GET_SYMBOL_REQUIRED(this, hdfsGetPathInfo); GET_SYMBOL_REQUIRED(this, hdfsListDirectory); + GET_SYMBOL_REQUIRED(this, hdfsChown); + GET_SYMBOL_REQUIRED(this, hdfsChmod); // File methods GET_SYMBOL_REQUIRED(this, hdfsCloseFile); http://git-wip-us.apache.org/repos/asf/arrow/blob/e1d574c7/cpp/src/arrow/io/hdfs.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc index 500f42c..254e483 100644 --- a/cpp/src/arrow/io/hdfs.cc +++ b/cpp/src/arrow/io/hdfs.cc @@ -308,9 +308,9 @@ static void SetPathInfo(const hdfsFileInfo* input, HdfsPathInfo* out) { } // Private implementation -class HdfsClient::HdfsClientImpl { +class HadoopFileSystem::HadoopFileSystemImpl { public: - HdfsClientImpl() {} + HadoopFileSystemImpl() {} Status Connect(const HdfsConnectionConfig* config) { if (config->driver == HdfsDriver::LIBHDFS3) { @@ -396,6 +396,24 @@ class HdfsClient::HdfsClientImpl { return Status::OK(); } + Status Stat(const std::string& path, FileStatistics* stat) { + HdfsPathInfo info; + RETURN_NOT_OK(GetPathInfo(path, &info)); + + stat->size = info.size; + stat->kind = info.kind; + return Status::OK(); + } + + Status GetChildren(const std::string& path, std::vector<std::string>* listing) { + std::vector<HdfsPathInfo> detailed_listing; + RETURN_NOT_OK(ListDirectory(path, &detailed_listing)); + for (const auto& info : detailed_listing) { + listing->push_back(info.name); + } + return Status::OK(); + } + Status ListDirectory(const std::string& path, std::vector<HdfsPathInfo>* listing) { int num_entries = 0; hdfsFileInfo* entries = driver_->ListDirectory(fs_, path.c_str(), &num_entries); @@ -476,6 +494,18 @@ class HdfsClient::HdfsClientImpl { return Status::OK(); } + Status Chmod(const std::string& path, int mode) { + int ret = driver_->Chmod(fs_, path.c_str(), static_cast<short>(mode)); // NOLINT + CHECK_FAILURE(ret, "Chmod"); + return Status::OK(); + } + + Status Chown(const std::string& path, const char* owner, const char* group) { + int ret = driver_->Chown(fs_, path.c_str(), owner, group); + CHECK_FAILURE(ret, "Chown"); + return Status::OK(); + } + private: LibHdfsShim* driver_; @@ -490,68 +520,92 @@ class HdfsClient::HdfsClientImpl { // ---------------------------------------------------------------------- // Public API for HDFSClient -HdfsClient::HdfsClient() { impl_.reset(new HdfsClientImpl()); } +HadoopFileSystem::HadoopFileSystem() { impl_.reset(new HadoopFileSystemImpl()); } -HdfsClient::~HdfsClient() {} +HadoopFileSystem::~HadoopFileSystem() {} -Status HdfsClient::Connect(const HdfsConnectionConfig* config, - std::shared_ptr<HdfsClient>* fs) { +Status HadoopFileSystem::Connect(const HdfsConnectionConfig* config, + std::shared_ptr<HadoopFileSystem>* fs) { // ctor is private, make_shared will not work - *fs = std::shared_ptr<HdfsClient>(new HdfsClient()); + *fs = std::shared_ptr<HadoopFileSystem>(new HadoopFileSystem()); RETURN_NOT_OK((*fs)->impl_->Connect(config)); return Status::OK(); } -Status HdfsClient::MakeDirectory(const std::string& path) { +Status HadoopFileSystem::MakeDirectory(const std::string& path) { return impl_->MakeDirectory(path); } -Status HdfsClient::Delete(const std::string& path, bool recursive) { +Status HadoopFileSystem::Delete(const std::string& path, bool recursive) { return impl_->Delete(path, recursive); } -Status HdfsClient::Disconnect() { return impl_->Disconnect(); } +Status HadoopFileSystem::DeleteDirectory(const std::string& path) { + return Delete(path, true); +} + +Status HadoopFileSystem::Disconnect() { return impl_->Disconnect(); } -bool HdfsClient::Exists(const std::string& path) { return impl_->Exists(path); } +bool HadoopFileSystem::Exists(const std::string& path) { return impl_->Exists(path); } -Status HdfsClient::GetPathInfo(const std::string& path, HdfsPathInfo* info) { +Status HadoopFileSystem::GetPathInfo(const std::string& path, HdfsPathInfo* info) { return impl_->GetPathInfo(path, info); } -Status HdfsClient::GetCapacity(int64_t* nbytes) { return impl_->GetCapacity(nbytes); } +Status HadoopFileSystem::Stat(const std::string& path, FileStatistics* stat) { + return impl_->Stat(path, stat); +} + +Status HadoopFileSystem::GetCapacity(int64_t* nbytes) { + return impl_->GetCapacity(nbytes); +} + +Status HadoopFileSystem::GetUsed(int64_t* nbytes) { return impl_->GetUsed(nbytes); } -Status HdfsClient::GetUsed(int64_t* nbytes) { return impl_->GetUsed(nbytes); } +Status HadoopFileSystem::GetChildren(const std::string& path, + std::vector<std::string>* listing) { + return impl_->GetChildren(path, listing); +} -Status HdfsClient::ListDirectory(const std::string& path, - std::vector<HdfsPathInfo>* listing) { +Status HadoopFileSystem::ListDirectory(const std::string& path, + std::vector<HdfsPathInfo>* listing) { return impl_->ListDirectory(path, listing); } -Status HdfsClient::OpenReadable(const std::string& path, int32_t buffer_size, - std::shared_ptr<HdfsReadableFile>* file) { +Status HadoopFileSystem::OpenReadable(const std::string& path, int32_t buffer_size, + std::shared_ptr<HdfsReadableFile>* file) { return impl_->OpenReadable(path, buffer_size, file); } -Status HdfsClient::OpenReadable(const std::string& path, - std::shared_ptr<HdfsReadableFile>* file) { +Status HadoopFileSystem::OpenReadable(const std::string& path, + std::shared_ptr<HdfsReadableFile>* file) { return OpenReadable(path, kDefaultHdfsBufferSize, file); } -Status HdfsClient::OpenWriteable(const std::string& path, bool append, - int32_t buffer_size, int16_t replication, - int64_t default_block_size, - std::shared_ptr<HdfsOutputStream>* file) { +Status HadoopFileSystem::OpenWriteable(const std::string& path, bool append, + int32_t buffer_size, int16_t replication, + int64_t default_block_size, + std::shared_ptr<HdfsOutputStream>* file) { return impl_->OpenWriteable(path, append, buffer_size, replication, default_block_size, file); } -Status HdfsClient::OpenWriteable(const std::string& path, bool append, - std::shared_ptr<HdfsOutputStream>* file) { +Status HadoopFileSystem::OpenWriteable(const std::string& path, bool append, + std::shared_ptr<HdfsOutputStream>* file) { return OpenWriteable(path, append, 0, 0, 0, file); } -Status HdfsClient::Rename(const std::string& src, const std::string& dst) { +Status HadoopFileSystem::Chmod(const std::string& path, int mode) { + return impl_->Chmod(path, mode); +} + +Status HadoopFileSystem::Chown(const std::string& path, const char* owner, + const char* group) { + return impl_->Chown(path, owner, group); +} + +Status HadoopFileSystem::Rename(const std::string& src, const std::string& dst) { return impl_->Rename(src, dst); } http://git-wip-us.apache.org/repos/asf/arrow/blob/e1d574c7/cpp/src/arrow/io/hdfs.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/hdfs.h b/cpp/src/arrow/io/hdfs.h index 63c3ae0..1507ca9 100644 --- a/cpp/src/arrow/io/hdfs.h +++ b/cpp/src/arrow/io/hdfs.h @@ -34,7 +34,7 @@ class Status; namespace io { -class HdfsClient; +class HadoopFileSystem; class HdfsReadableFile; class HdfsOutputStream; @@ -66,9 +66,9 @@ struct HdfsConnectionConfig { HdfsDriver driver; }; -class ARROW_EXPORT HdfsClient : public FileSystemClient { +class ARROW_EXPORT HadoopFileSystem : public FileSystem { public: - ~HdfsClient(); + ~HadoopFileSystem(); // Connect to an HDFS cluster given a configuration // @@ -76,13 +76,13 @@ class ARROW_EXPORT HdfsClient : public FileSystemClient { // @param fs (out): the created client // @returns Status static Status Connect(const HdfsConnectionConfig* config, - std::shared_ptr<HdfsClient>* fs); + std::shared_ptr<HadoopFileSystem>* fs); // Create directory and all parents // // @param path (in): absolute HDFS path // @returns Status - Status MakeDirectory(const std::string& path); + Status MakeDirectory(const std::string& path) override; // Delete file or directory // @param path: absolute path to data @@ -90,6 +90,8 @@ class ARROW_EXPORT HdfsClient : public FileSystemClient { // @returns error status on failure Status Delete(const std::string& path, bool recursive = false); + Status DeleteDirectory(const std::string& path) override; + // Disconnect from cluster // // @returns Status @@ -112,18 +114,29 @@ class ARROW_EXPORT HdfsClient : public FileSystemClient { // @returns Status Status GetUsed(int64_t* nbytes); + Status GetChildren(const std::string& path, std::vector<std::string>* listing) override; + Status ListDirectory(const std::string& path, std::vector<HdfsPathInfo>* listing); - // @param path file path to change - // @param owner pass nullptr for no change - // @param group pass nullptr for no change + /// Change + /// + /// @param path file path to change + /// @param owner pass nullptr for no change + /// @param group pass nullptr for no change Status Chown(const std::string& path, const char* owner, const char* group); + /// Change path permissions + /// + /// \param path Absolute path in file system + /// \param mode Mode bitset + /// \return Status Status Chmod(const std::string& path, int mode); // Move file or directory from source path to destination path within the // current filesystem - Status Rename(const std::string& src, const std::string& dst); + Status Rename(const std::string& src, const std::string& dst) override; + + Status Stat(const std::string& path, FileStatistics* stat) override; // TODO(wesm): GetWorkingDirectory, SetWorkingDirectory @@ -152,13 +165,18 @@ class ARROW_EXPORT HdfsClient : public FileSystemClient { friend class HdfsReadableFile; friend class HdfsOutputStream; - class ARROW_NO_EXPORT HdfsClientImpl; - std::unique_ptr<HdfsClientImpl> impl_; + class ARROW_NO_EXPORT HadoopFileSystemImpl; + std::unique_ptr<HadoopFileSystemImpl> impl_; - HdfsClient(); - DISALLOW_COPY_AND_ASSIGN(HdfsClient); + HadoopFileSystem(); + DISALLOW_COPY_AND_ASSIGN(HadoopFileSystem); }; +// 0.6.0 +#ifndef ARROW_NO_DEPRECATED_API +using HdfsClient = HadoopFileSystem; +#endif + class ARROW_EXPORT HdfsReadableFile : public RandomAccessFile { public: ~HdfsReadableFile(); @@ -191,7 +209,7 @@ class ARROW_EXPORT HdfsReadableFile : public RandomAccessFile { class ARROW_NO_EXPORT HdfsReadableFileImpl; std::unique_ptr<HdfsReadableFileImpl> impl_; - friend class HdfsClient::HdfsClientImpl; + friend class HadoopFileSystem::HadoopFileSystemImpl; DISALLOW_COPY_AND_ASSIGN(HdfsReadableFile); }; @@ -216,7 +234,7 @@ class ARROW_EXPORT HdfsOutputStream : public OutputStream { class ARROW_NO_EXPORT HdfsOutputStreamImpl; std::unique_ptr<HdfsOutputStreamImpl> impl_; - friend class HdfsClient::HdfsClientImpl; + friend class HadoopFileSystem::HadoopFileSystemImpl; HdfsOutputStream(); http://git-wip-us.apache.org/repos/asf/arrow/blob/e1d574c7/cpp/src/arrow/io/interfaces.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h index e71a5c9..4bb7ebe 100644 --- a/cpp/src/arrow/io/interfaces.h +++ b/cpp/src/arrow/io/interfaces.h @@ -22,6 +22,7 @@ #include <memory> #include <mutex> #include <string> +#include <vector> #include "arrow/util/macros.h" #include "arrow/util/visibility.h" @@ -42,9 +43,29 @@ struct ObjectType { enum type { FILE, DIRECTORY }; }; -class ARROW_EXPORT FileSystemClient { +struct ARROW_EXPORT FileStatistics { + /// Size of file, -1 if finding length is unsupported + int64_t size; + ObjectType::type kind; + + FileStatistics() {} + FileStatistics(int64_t size, ObjectType::type kind) : size(size), kind(kind) {} +}; + +class ARROW_EXPORT FileSystem { public: - virtual ~FileSystemClient() {} + virtual ~FileSystem() {} + + virtual Status MakeDirectory(const std::string& path) = 0; + + virtual Status DeleteDirectory(const std::string& path) = 0; + + virtual Status GetChildren(const std::string& path, + std::vector<std::string>* listing) = 0; + + virtual Status Rename(const std::string& src, const std::string& dst) = 0; + + virtual Status Stat(const std::string& path, FileStatistics* stat) = 0; }; class ARROW_EXPORT FileInterface { http://git-wip-us.apache.org/repos/asf/arrow/blob/e1d574c7/cpp/src/arrow/io/io-hdfs-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/io-hdfs-test.cc b/cpp/src/arrow/io/io-hdfs-test.cc index c584cf5..92f4291 100644 --- a/cpp/src/arrow/io/io-hdfs-test.cc +++ b/cpp/src/arrow/io/io-hdfs-test.cc @@ -48,7 +48,7 @@ struct PivotalDriver { }; template <typename DRIVER> -class TestHdfsClient : public ::testing::Test { +class TestHadoopFileSystem : public ::testing::Test { public: Status MakeScratchDir() { if (client_->Exists(scratch_dir_)) { @@ -124,7 +124,7 @@ class TestHdfsClient : public ::testing::Test { conf_.port = port == nullptr ? 20500 : atoi(port); conf_.driver = DRIVER::type; - ASSERT_OK(HdfsClient::Connect(&conf_, &client_)); + ASSERT_OK(HadoopFileSystem::Connect(&conf_, &client_)); } void TearDown() { @@ -141,11 +141,11 @@ class TestHdfsClient : public ::testing::Test { // Resources shared amongst unit tests std::string scratch_dir_; - std::shared_ptr<HdfsClient> client_; + std::shared_ptr<HadoopFileSystem> client_; }; template <> -std::string TestHdfsClient<PivotalDriver>::HdfsAbsPath(const std::string& relpath) { +std::string TestHadoopFileSystem<PivotalDriver>::HdfsAbsPath(const std::string& relpath) { std::stringstream ss; ss << relpath; return ss.str(); @@ -161,17 +161,17 @@ HdfsDriver JNIDriver::type = HdfsDriver::LIBHDFS; HdfsDriver PivotalDriver::type = HdfsDriver::LIBHDFS3; typedef ::testing::Types<JNIDriver, PivotalDriver> DriverTypes; -TYPED_TEST_CASE(TestHdfsClient, DriverTypes); +TYPED_TEST_CASE(TestHadoopFileSystem, DriverTypes); -TYPED_TEST(TestHdfsClient, ConnectsAgain) { +TYPED_TEST(TestHadoopFileSystem, ConnectsAgain) { SKIP_IF_NO_DRIVER(); - std::shared_ptr<HdfsClient> client; - ASSERT_OK(HdfsClient::Connect(&this->conf_, &client)); + std::shared_ptr<HadoopFileSystem> client; + ASSERT_OK(HadoopFileSystem::Connect(&this->conf_, &client)); ASSERT_OK(client->Disconnect()); } -TYPED_TEST(TestHdfsClient, MakeDirectory) { +TYPED_TEST(TestHadoopFileSystem, MakeDirectory) { SKIP_IF_NO_DRIVER(); std::string path = this->ScratchPath("create-directory"); @@ -190,7 +190,7 @@ TYPED_TEST(TestHdfsClient, MakeDirectory) { ASSERT_RAISES(IOError, this->client_->ListDirectory(path, &listing)); } -TYPED_TEST(TestHdfsClient, GetCapacityUsed) { +TYPED_TEST(TestHadoopFileSystem, GetCapacityUsed) { SKIP_IF_NO_DRIVER(); // Who knows what is actually in your DFS cluster, but expect it to have @@ -203,7 +203,7 @@ TYPED_TEST(TestHdfsClient, GetCapacityUsed) { ASSERT_LT(0, nbytes); } -TYPED_TEST(TestHdfsClient, GetPathInfo) { +TYPED_TEST(TestHadoopFileSystem, GetPathInfo) { SKIP_IF_NO_DRIVER(); HdfsPathInfo info; @@ -233,7 +233,7 @@ TYPED_TEST(TestHdfsClient, GetPathInfo) { ASSERT_EQ(size, info.size); } -TYPED_TEST(TestHdfsClient, AppendToFile) { +TYPED_TEST(TestHadoopFileSystem, AppendToFile) { SKIP_IF_NO_DRIVER(); ASSERT_OK(this->MakeScratchDir()); @@ -252,7 +252,7 @@ TYPED_TEST(TestHdfsClient, AppendToFile) { ASSERT_EQ(size * 2, info.size); } -TYPED_TEST(TestHdfsClient, ListDirectory) { +TYPED_TEST(TestHadoopFileSystem, ListDirectory) { SKIP_IF_NO_DRIVER(); const int size = 100; @@ -292,7 +292,7 @@ TYPED_TEST(TestHdfsClient, ListDirectory) { } } -TYPED_TEST(TestHdfsClient, ReadableMethods) { +TYPED_TEST(TestHadoopFileSystem, ReadableMethods) { SKIP_IF_NO_DRIVER(); ASSERT_OK(this->MakeScratchDir()); @@ -339,7 +339,7 @@ TYPED_TEST(TestHdfsClient, ReadableMethods) { ASSERT_EQ(60, position); } -TYPED_TEST(TestHdfsClient, LargeFile) { +TYPED_TEST(TestHadoopFileSystem, LargeFile) { SKIP_IF_NO_DRIVER(); ASSERT_OK(this->MakeScratchDir()); @@ -374,7 +374,7 @@ TYPED_TEST(TestHdfsClient, LargeFile) { ASSERT_EQ(size, bytes_read); } -TYPED_TEST(TestHdfsClient, RenameFile) { +TYPED_TEST(TestHadoopFileSystem, RenameFile) { SKIP_IF_NO_DRIVER(); ASSERT_OK(this->MakeScratchDir()); @@ -391,7 +391,32 @@ TYPED_TEST(TestHdfsClient, RenameFile) { ASSERT_TRUE(this->client_->Exists(dst_path)); } -TYPED_TEST(TestHdfsClient, ThreadSafety) { +TYPED_TEST(TestHadoopFileSystem, ChmodChown) { + SKIP_IF_NO_DRIVER(); + ASSERT_OK(this->MakeScratchDir()); + + auto path = this->ScratchPath("path-to-chmod"); + + int16_t mode = 0755; + const int size = 100; + + std::vector<uint8_t> data = RandomData(size); + ASSERT_OK(this->WriteDummyFile(path, data.data(), size)); + + HdfsPathInfo info; + ASSERT_OK(this->client_->Chmod(path, mode)); + ASSERT_OK(this->client_->GetPathInfo(path, &info)); + ASSERT_EQ(mode, info.permissions); + + std::string owner = "hadoop"; + std::string group = "hadoop"; + ASSERT_OK(this->client_->Chown(path, owner.c_str(), group.c_str())); + ASSERT_OK(this->client_->GetPathInfo(path, &info)); + ASSERT_EQ("hadoop", info.owner); + ASSERT_EQ("hadoop", info.group); +} + +TYPED_TEST(TestHadoopFileSystem, ThreadSafety) { SKIP_IF_NO_DRIVER(); ASSERT_OK(this->MakeScratchDir()); http://git-wip-us.apache.org/repos/asf/arrow/blob/e1d574c7/python/doc/source/api.rst ---------------------------------------------------------------------- diff --git a/python/doc/source/api.rst b/python/doc/source/api.rst index 6f26076..fd1cb72 100644 --- a/python/doc/source/api.rst +++ b/python/doc/source/api.rst @@ -164,15 +164,17 @@ Input / Output and Shared Memory create_memory_map PythonFile -Filesystems ------------ +File Systems +------------ .. autosummary:: :toctree: generated/ hdfs.connect - HadoopFilesystem - LocalFilesystem + LocalFileSystem + +.. class:: HadoopFileSystem + :noindex: .. _api.ipc: http://git-wip-us.apache.org/repos/asf/arrow/blob/e1d574c7/python/doc/source/conf.py ---------------------------------------------------------------------- diff --git a/python/doc/source/conf.py b/python/doc/source/conf.py index d71b668..25e6d5e 100644 --- a/python/doc/source/conf.py +++ b/python/doc/source/conf.py @@ -62,7 +62,8 @@ extensions = [ ] # Show members for classes in .. autosummary -autodoc_default_flags = ['members', 'undoc-members', 'show-inheritance', 'inherited-members'] +autodoc_default_flags = ['members', 'undoc-members', 'show-inheritance', + 'inherited-members'] # numpydoc configuration napoleon_use_rtype = False http://git-wip-us.apache.org/repos/asf/arrow/blob/e1d574c7/python/doc/source/filesystems.rst ---------------------------------------------------------------------- diff --git a/python/doc/source/filesystems.rst b/python/doc/source/filesystems.rst index 78f6f2a..c0530f9 100644 --- a/python/doc/source/filesystems.rst +++ b/python/doc/source/filesystems.rst @@ -15,8 +15,8 @@ .. specific language governing permissions and limitations .. under the License. -Filesystem Interfaces -===================== +File System Interfaces +====================== In this section, we discuss filesystem-like interfaces in PyArrow. @@ -31,10 +31,11 @@ System. You connect like so: .. code-block:: python import pyarrow as pa - hdfs = pa.hdfs.connect(host, port, user=user, kerb_ticket=ticket_cache_path) - type(hdfs) + fs = pa.hdfs.connect(host, port, user=user, kerb_ticket=ticket_cache_path) + with fs.open(path, 'rb') as f: + # Do something with f -By default, ``pyarrow.hdfs.HadoopFilesystem`` uses libhdfs, a JNI-based +By default, ``pyarrow.hdfs.HadoopFileSystem`` uses libhdfs, a JNI-based interface to the Java Hadoop client. This library is loaded **at runtime** (rather than at link / library load time, since the library may not be in your LD_LIBRARY_PATH), and relies on some environment variables. @@ -57,5 +58,33 @@ You can also use libhdfs3, a thirdparty C++ library for HDFS from Pivotal Labs: .. code-block:: python - hdfs3 = pa.hdfs.connect(host, port, user=user, kerb_ticket=ticket_cache_path, - driver='libhdfs3') + fs = pa.hdfs.connect(host, port, user=user, kerb_ticket=ticket_cache_path, + driver='libhdfs3') + +HDFS API +~~~~~~~~ + +.. currentmodule:: pyarrow + +.. autosummary:: + :toctree: generated/ + + hdfs.connect + HadoopFileSystem.cat + HadoopFileSystem.chmod + HadoopFileSystem.chown + HadoopFileSystem.delete + HadoopFileSystem.df + HadoopFileSystem.disk_usage + HadoopFileSystem.download + HadoopFileSystem.exists + HadoopFileSystem.get_capacity + HadoopFileSystem.get_space_used + HadoopFileSystem.info + HadoopFileSystem.ls + HadoopFileSystem.mkdir + HadoopFileSystem.open + HadoopFileSystem.rename + HadoopFileSystem.rm + HadoopFileSystem.upload + HdfsFile http://git-wip-us.apache.org/repos/asf/arrow/blob/e1d574c7/python/doc/source/memory.rst ---------------------------------------------------------------------- diff --git a/python/doc/source/memory.rst b/python/doc/source/memory.rst index ccc6298..f189199 100644 --- a/python/doc/source/memory.rst +++ b/python/doc/source/memory.rst @@ -226,10 +226,3 @@ file interfaces that can read and write to Arrow Buffers. reader.read(7) These have similar semantics to Python's built-in ``io.BytesIO``. - -Hadoop Filesystem ------------------ - -:class:`~pyarrow.HdfsFile` is an implementation of :class:`~pyarrow.NativeFile` -that can read and write to the Hadoop filesytem. Read more in the -:ref:`Filesystems Section <hdfs>`. http://git-wip-us.apache.org/repos/asf/arrow/blob/e1d574c7/python/pyarrow/__init__.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index 42e5803..8d4a214 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -88,9 +88,9 @@ from pyarrow.lib import (ArrowException, ArrowTypeError) -from pyarrow.filesystem import Filesystem, LocalFilesystem +from pyarrow.filesystem import FileSystem, LocalFileSystem -from pyarrow.hdfs import HadoopFilesystem +from pyarrow.hdfs import HadoopFileSystem import pyarrow.hdfs as hdfs from pyarrow.ipc import (Message, MessageReader, @@ -103,7 +103,7 @@ from pyarrow.ipc import (Message, MessageReader, open_file, serialize_pandas, deserialize_pandas) -localfs = LocalFilesystem.get_instance() +localfs = LocalFileSystem.get_instance() # ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/e1d574c7/python/pyarrow/filesystem.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/filesystem.py b/python/pyarrow/filesystem.py index 4b8ca32..8d2d8fc 100644 --- a/python/pyarrow/filesystem.py +++ b/python/pyarrow/filesystem.py @@ -22,10 +22,21 @@ import posixpath from pyarrow.util import implements -class Filesystem(object): +class FileSystem(object): """ Abstract filesystem interface """ + def cat(self, path): + """ + Return contents of file as a bytes object + + Returns + ------- + contents : bytes + """ + with self.open(path, 'rb') as f: + return f.read() + def ls(self, path): """ Return list of file paths @@ -44,12 +55,68 @@ class Filesystem(object): """ raise NotImplementedError + def disk_usage(self, path): + """ + Compute bytes used by all contents under indicated path in file tree + + Parameters + ---------- + path : string + Can be a file path or directory + + Returns + ------- + usage : int + """ + path_info = self.stat(path) + if path_info['kind'] == 'file': + return path_info['size'] + + total = 0 + for root, directories, files in self.walk(path): + for child_path in files: + abspath = self._path_join(root, child_path) + total += self.stat(abspath)['size'] + + return total + + def _path_join(self, *args): + return self.pathsep.join(args) + + def stat(self, path): + """ + + Returns + ------- + stat : dict + """ + raise NotImplementedError('FileSystem.stat') + def rm(self, path, recursive=False): """ - Alias for Filesystem.delete + Alias for FileSystem.delete """ return self.delete(path, recursive=recursive) + def mv(self, path, new_path): + """ + Alias for FileSystem.rename + """ + return self.rename(path, new_path) + + def rename(self, path, new_path): + """ + Rename file, like UNIX mv command + + Parameters + ---------- + path : string + Path to alter + new_path : string + Path to move to + """ + raise NotImplementedError('FileSystem.rename') + def mkdir(self, path, create_parents=True): raise NotImplementedError @@ -113,40 +180,40 @@ class Filesystem(object): return '/' -class LocalFilesystem(Filesystem): +class LocalFileSystem(FileSystem): _instance = None @classmethod def get_instance(cls): if cls._instance is None: - cls._instance = LocalFilesystem() + cls._instance = LocalFileSystem() return cls._instance - @implements(Filesystem.ls) + @implements(FileSystem.ls) def ls(self, path): return sorted(pjoin(path, x) for x in os.listdir(path)) - @implements(Filesystem.mkdir) + @implements(FileSystem.mkdir) def mkdir(self, path, create_parents=True): if create_parents: os.makedirs(path) else: os.mkdir(path) - @implements(Filesystem.isdir) + @implements(FileSystem.isdir) def isdir(self, path): return os.path.isdir(path) - @implements(Filesystem.isfile) + @implements(FileSystem.isfile) def isfile(self, path): return os.path.isfile(path) - @implements(Filesystem.exists) + @implements(FileSystem.exists) def exists(self, path): return os.path.exists(path) - @implements(Filesystem.open) + @implements(FileSystem.open) def open(self, path, mode='rb'): """ Open file for reading or writing @@ -164,7 +231,7 @@ class LocalFilesystem(Filesystem): return os.walk(top_dir) -class DaskFilesystem(Filesystem): +class DaskFileSystem(FileSystem): """ Wraps s3fs Dask filesystem implementation like s3fs, gcsfs, etc. """ @@ -172,23 +239,23 @@ class DaskFilesystem(Filesystem): def __init__(self, fs): self.fs = fs - @implements(Filesystem.isdir) + @implements(FileSystem.isdir) def isdir(self, path): raise NotImplementedError("Unsupported file system API") - @implements(Filesystem.isfile) + @implements(FileSystem.isfile) def isfile(self, path): raise NotImplementedError("Unsupported file system API") - @implements(Filesystem.delete) + @implements(FileSystem.delete) def delete(self, path, recursive=False): return self.fs.rm(path, recursive=recursive) - @implements(Filesystem.mkdir) + @implements(FileSystem.mkdir) def mkdir(self, path): return self.fs.mkdir(path) - @implements(Filesystem.open) + @implements(FileSystem.open) def open(self, path, mode='rb'): """ Open file for reading or writing @@ -205,9 +272,9 @@ class DaskFilesystem(Filesystem): return self.fs.walk(top_path) -class S3FSWrapper(DaskFilesystem): +class S3FSWrapper(DaskFileSystem): - @implements(Filesystem.isdir) + @implements(FileSystem.isdir) def isdir(self, path): try: contents = self.fs.ls(path) @@ -218,7 +285,7 @@ class S3FSWrapper(DaskFilesystem): except OSError: return False - @implements(Filesystem.isfile) + @implements(FileSystem.isfile) def isfile(self, path): try: contents = self.fs.ls(path) http://git-wip-us.apache.org/repos/asf/arrow/blob/e1d574c7/python/pyarrow/hdfs.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/hdfs.py b/python/pyarrow/hdfs.py index 3240f99..855cc1e 100644 --- a/python/pyarrow/hdfs.py +++ b/python/pyarrow/hdfs.py @@ -18,13 +18,13 @@ import posixpath from pyarrow.util import implements -from pyarrow.filesystem import Filesystem +from pyarrow.filesystem import FileSystem import pyarrow.lib as lib -class HadoopFilesystem(lib._HdfsClient, Filesystem): +class HadoopFileSystem(lib.HadoopFileSystem, FileSystem): """ - Filesystem interface for HDFS cluster. See pyarrow.hdfs.connect for full + FileSystem interface for HDFS cluster. See pyarrow.hdfs.connect for full connection details """ @@ -32,21 +32,25 @@ class HadoopFilesystem(lib._HdfsClient, Filesystem): driver='libhdfs'): self._connect(host, port, user, kerb_ticket, driver) - @implements(Filesystem.isdir) + @implements(FileSystem.isdir) def isdir(self, path): - return lib._HdfsClient.isdir(self, path) + return super(HadoopFileSystem, self).isdir(path) - @implements(Filesystem.isfile) + @implements(FileSystem.isfile) def isfile(self, path): - return lib._HdfsClient.isfile(self, path) + return super(HadoopFileSystem, self).isfile(path) - @implements(Filesystem.delete) + @implements(FileSystem.delete) def delete(self, path, recursive=False): - return lib._HdfsClient.delete(self, path, recursive) + return super(HadoopFileSystem, self).delete(path, recursive) - @implements(Filesystem.mkdir) + @implements(FileSystem.mkdir) def mkdir(self, path, create_parents=True): - return lib._HdfsClient.mkdir(self, path) + return super(HadoopFileSystem, self).mkdir(path) + + @implements(FileSystem.rename) + def rename(self, path, new_path): + return super(HadoopFileSystem, self).rename(path, new_path) def ls(self, path, detail=False): """ @@ -62,7 +66,7 @@ class HadoopFilesystem(lib._HdfsClient, Filesystem): ------- result : list of dicts (detail=True) or strings (detail=False) """ - return lib._HdfsClient.ls(self, path, detail) + return super(HadoopFileSystem, self).ls(path, detail) def walk(self, top_path): """ @@ -82,7 +86,7 @@ class HadoopFilesystem(lib._HdfsClient, Filesystem): directories, files = _libhdfs_walk_files_dirs(top_path, contents) yield top_path, directories, files for dirname in directories: - for tup in self.walk(dirname): + for tup in self.walk(self._path_join(top_path, dirname)): yield tup @@ -126,8 +130,8 @@ def connect(host="default", port=0, user=None, kerb_ticket=None, Returns ------- - filesystem : HadoopFilesystem + filesystem : HadoopFileSystem """ - fs = HadoopFilesystem(host=host, port=port, user=user, + fs = HadoopFileSystem(host=host, port=port, user=user, kerb_ticket=kerb_ticket, driver=driver) return fs http://git-wip-us.apache.org/repos/asf/arrow/blob/e1d574c7/python/pyarrow/includes/libarrow.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index db6770f..8d7e279 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -413,6 +413,10 @@ cdef extern from "arrow/io/interfaces.h" namespace "arrow::io" nogil: ObjectType_FILE" arrow::io::ObjectType::FILE" ObjectType_DIRECTORY" arrow::io::ObjectType::DIRECTORY" + cdef cppclass FileStatistics: + int64_t size + ObjectType kind + cdef cppclass FileInterface: CStatus Close() CStatus Tell(int64_t* position) @@ -450,6 +454,9 @@ cdef extern from "arrow/io/interfaces.h" namespace "arrow::io" nogil: WriteableFile): pass + cdef cppclass FileSystem: + CStatus Stat(const c_string& path, FileStatistics* stat) + cdef extern from "arrow/io/file.h" namespace "arrow::io" nogil: @@ -517,10 +524,10 @@ cdef extern from "arrow/io/hdfs.h" namespace "arrow::io" nogil: cdef cppclass HdfsOutputStream(OutputStream): pass - cdef cppclass CHdfsClient" arrow::io::HdfsClient": + cdef cppclass CHadoopFileSystem" arrow::io::HadoopFileSystem"(FileSystem): @staticmethod CStatus Connect(const HdfsConnectionConfig* config, - shared_ptr[CHdfsClient]* client) + shared_ptr[CHadoopFileSystem]* client) CStatus MakeDirectory(const c_string& path) @@ -530,6 +537,10 @@ cdef extern from "arrow/io/hdfs.h" namespace "arrow::io" nogil: c_bool Exists(const c_string& path) + CStatus Chmod(const c_string& path, int mode) + CStatus Chown(const c_string& path, const char* owner, + const char* group) + CStatus GetCapacity(int64_t* nbytes) CStatus GetUsed(int64_t* nbytes) http://git-wip-us.apache.org/repos/asf/arrow/blob/e1d574c7/python/pyarrow/io-hdfs.pxi ---------------------------------------------------------------------- diff --git a/python/pyarrow/io-hdfs.pxi b/python/pyarrow/io-hdfs.pxi new file mode 100644 index 0000000..8ac4e8c --- /dev/null +++ b/python/pyarrow/io-hdfs.pxi @@ -0,0 +1,468 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# ---------------------------------------------------------------------- +# HDFS IO implementation + +_HDFS_PATH_RE = re.compile('hdfs://(.*):(\d+)(.*)') + +try: + # Python 3 + from queue import Queue, Empty as QueueEmpty, Full as QueueFull +except ImportError: + from Queue import Queue, Empty as QueueEmpty, Full as QueueFull + + +def have_libhdfs(): + try: + check_status(HaveLibHdfs()) + return True + except: + return False + + +def have_libhdfs3(): + try: + check_status(HaveLibHdfs3()) + return True + except: + return False + + +def strip_hdfs_abspath(path): + m = _HDFS_PATH_RE.match(path) + if m: + return m.group(3) + else: + return path + + +cdef class HadoopFileSystem: + cdef: + shared_ptr[CHadoopFileSystem] client + + cdef readonly: + bint is_open + + def __cinit__(self): + pass + + def _connect(self, host, port, user, kerb_ticket, driver): + cdef HdfsConnectionConfig conf + + if host is not None: + conf.host = tobytes(host) + conf.port = port + if user is not None: + conf.user = tobytes(user) + if kerb_ticket is not None: + conf.kerb_ticket = tobytes(kerb_ticket) + + if driver == 'libhdfs': + check_status(HaveLibHdfs()) + conf.driver = HdfsDriver_LIBHDFS + else: + check_status(HaveLibHdfs3()) + conf.driver = HdfsDriver_LIBHDFS3 + + with nogil: + check_status(CHadoopFileSystem.Connect(&conf, &self.client)) + self.is_open = True + + @classmethod + def connect(cls, *args, **kwargs): + return cls(*args, **kwargs) + + def __dealloc__(self): + if self.is_open: + self.close() + + def close(self): + """ + Disconnect from the HDFS cluster + """ + self._ensure_client() + with nogil: + check_status(self.client.get().Disconnect()) + self.is_open = False + + cdef _ensure_client(self): + if self.client.get() == NULL: + raise IOError('HDFS client improperly initialized') + elif not self.is_open: + raise IOError('HDFS client is closed') + + def exists(self, path): + """ + Returns True if the path is known to the cluster, False if it does not + (or there is an RPC error) + """ + self._ensure_client() + + cdef c_string c_path = tobytes(path) + cdef c_bool result + with nogil: + result = self.client.get().Exists(c_path) + return result + + def isdir(self, path): + cdef HdfsPathInfo info + self._path_info(path, &info) + return info.kind == ObjectType_DIRECTORY + + def isfile(self, path): + cdef HdfsPathInfo info + self._path_info(path, &info) + return info.kind == ObjectType_FILE + + def get_capacity(self): + """ + Get reported total capacity of file system + + Returns + ------- + capacity : int + """ + cdef int64_t capacity = 0 + with nogil: + check_status(self.client.get().GetCapacity(&capacity)) + return capacity + + def get_space_used(self): + """ + Get space used on file system + + Returns + ------- + space_used : int + """ + cdef int64_t space_used = 0 + with nogil: + check_status(self.client.get().GetUsed(&space_used)) + return space_used + + def df(self): + """ + Return free space on disk, like the UNIX df command + + Returns + ------- + space : int + """ + return self.get_capacity() - self.get_space_used() + + def rename(self, path, new_path): + cdef c_string c_path = tobytes(path) + cdef c_string c_new_path = tobytes(new_path) + with nogil: + check_status(self.client.get().Rename(c_path, c_new_path)) + + def info(self, path): + """ + Return detailed HDFS information for path + + Parameters + ---------- + path : string + Path to file or directory + + Returns + ------- + path_info : dict + """ + cdef HdfsPathInfo info + self._path_info(path, &info) + return { + 'path': frombytes(info.name), + 'owner': frombytes(info.owner), + 'group': frombytes(info.group), + 'size': info.size, + 'block_size': info.block_size, + 'last_modified': info.last_modified_time, + 'last_accessed': info.last_access_time, + 'replication': info.replication, + 'permissions': info.permissions, + 'kind': ('directory' if info.kind == ObjectType_DIRECTORY + else 'file') + } + + def stat(self, path): + """ + Return basic file system statistics about path + + Parameters + ---------- + path : string + Path to file or directory + + Returns + ------- + stat : dict + """ + cdef FileStatistics info + cdef c_string c_path = tobytes(path) + with nogil: + check_status(self.client.get() + .Stat(c_path, &info)) + return { + 'size': info.size, + 'kind': ('directory' if info.kind == ObjectType_DIRECTORY + else 'file') + } + + cdef _path_info(self, path, HdfsPathInfo* info): + cdef c_string c_path = tobytes(path) + + with nogil: + check_status(self.client.get() + .GetPathInfo(c_path, info)) + + + def ls(self, path, bint full_info): + cdef: + c_string c_path = tobytes(path) + vector[HdfsPathInfo] listing + list results = [] + int i + + self._ensure_client() + + with nogil: + check_status(self.client.get() + .ListDirectory(c_path, &listing)) + + cdef const HdfsPathInfo* info + for i in range(<int> listing.size()): + info = &listing[i] + + # Try to trim off the hdfs://HOST:PORT piece + name = strip_hdfs_abspath(frombytes(info.name)) + + if full_info: + kind = ('file' if info.kind == ObjectType_FILE + else 'directory') + + results.append({ + 'kind': kind, + 'name': name, + 'owner': frombytes(info.owner), + 'group': frombytes(info.group), + 'list_modified_time': info.last_modified_time, + 'list_access_time': info.last_access_time, + 'size': info.size, + 'replication': info.replication, + 'block_size': info.block_size, + 'permissions': info.permissions + }) + else: + results.append(name) + + return results + + def chmod(self, path, mode): + """ + Change file permissions + + Parameters + ---------- + path : string + absolute path to file or directory + mode : int + POSIX-like bitmask + """ + self._ensure_client() + cdef c_string c_path = tobytes(path) + cdef int c_mode = mode + with nogil: + check_status(self.client.get() + .Chmod(c_path, c_mode)) + + def chown(self, path, owner=None, group=None): + """ + Change file permissions + + Parameters + ---------- + path : string + absolute path to file or directory + owner : string, default None + New owner, None for no change + group : string, default None + New group, None for no change + """ + cdef: + c_string c_path + c_string c_owner + c_string c_group + const char* c_owner_ptr = NULL + const char* c_group_ptr = NULL + + self._ensure_client() + + c_path = tobytes(path) + if owner is not None: + c_owner = tobytes(owner) + c_owner_ptr = c_owner.c_str() + + if group is not None: + c_group = tobytes(group) + c_group_ptr = c_group.c_str() + + with nogil: + check_status(self.client.get() + .Chown(c_path, c_owner_ptr, c_group_ptr)) + + def mkdir(self, path): + """ + Create indicated directory and any necessary parent directories + """ + self._ensure_client() + cdef c_string c_path = tobytes(path) + with nogil: + check_status(self.client.get() + .MakeDirectory(c_path)) + + def delete(self, path, bint recursive=False): + """ + Delete the indicated file or directory + + Parameters + ---------- + path : string + recursive : boolean, default False + If True, also delete child paths for directories + """ + self._ensure_client() + + cdef c_string c_path = tobytes(path) + with nogil: + check_status(self.client.get() + .Delete(c_path, recursive == 1)) + + def open(self, path, mode='rb', buffer_size=None, replication=None, + default_block_size=None): + """ + Open HDFS file for reading or writing + + Parameters + ---------- + mode : string + Must be one of 'rb', 'wb', 'ab' + + Returns + ------- + handle : HdfsFile + """ + self._ensure_client() + + cdef HdfsFile out = HdfsFile() + + if mode not in ('rb', 'wb', 'ab'): + raise Exception("Mode must be 'rb' (read), " + "'wb' (write, new file), or 'ab' (append)") + + cdef c_string c_path = tobytes(path) + cdef c_bool append = False + + # 0 in libhdfs means "use the default" + cdef int32_t c_buffer_size = buffer_size or 0 + cdef int16_t c_replication = replication or 0 + cdef int64_t c_default_block_size = default_block_size or 0 + + cdef shared_ptr[HdfsOutputStream] wr_handle + cdef shared_ptr[HdfsReadableFile] rd_handle + + if mode in ('wb', 'ab'): + if mode == 'ab': + append = True + + with nogil: + check_status( + self.client.get() + .OpenWriteable(c_path, append, c_buffer_size, + c_replication, c_default_block_size, + &wr_handle)) + + out.wr_file = <shared_ptr[OutputStream]> wr_handle + + out.is_readable = False + out.is_writeable = 1 + else: + with nogil: + check_status(self.client.get() + .OpenReadable(c_path, &rd_handle)) + + out.rd_file = <shared_ptr[RandomAccessFile]> rd_handle + out.is_readable = True + out.is_writeable = 0 + + if c_buffer_size == 0: + c_buffer_size = 2 ** 16 + + out.mode = mode + out.buffer_size = c_buffer_size + out.parent = _HdfsFileNanny(self, out) + out.is_open = True + out.own_file = True + + return out + + def download(self, path, stream, buffer_size=None): + with self.open(path, 'rb') as f: + f.download(stream, buffer_size=buffer_size) + + def upload(self, path, stream, buffer_size=None): + """ + Upload file-like object to HDFS path + """ + with self.open(path, 'wb') as f: + f.upload(stream, buffer_size=buffer_size) + + +# ARROW-404: Helper class to ensure that files are closed before the +# client. During deallocation of the extension class, the attributes are +# decref'd which can cause the client to get closed first if the file has the +# last remaining reference +cdef class _HdfsFileNanny: + cdef: + object client + object file_handle_ref + + def __cinit__(self, client, file_handle): + import weakref + self.client = client + self.file_handle_ref = weakref.ref(file_handle) + + def __dealloc__(self): + fh = self.file_handle_ref() + if fh: + fh.close() + # avoid cyclic GC + self.file_handle_ref = None + self.client = None + + +cdef class HdfsFile(NativeFile): + cdef readonly: + int32_t buffer_size + object mode + object parent + + cdef object __weakref__ + + def __dealloc__(self): + self.parent = None http://git-wip-us.apache.org/repos/asf/arrow/blob/e1d574c7/python/pyarrow/io.pxi ---------------------------------------------------------------------- diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi index 01c987d..211c2a3 100644 --- a/python/pyarrow/io.pxi +++ b/python/pyarrow/io.pxi @@ -733,301 +733,3 @@ cdef get_writer(object source, shared_ptr[OutputStream]* writer): else: raise TypeError('Unable to read from object of type: {0}' .format(type(source))) - -# ---------------------------------------------------------------------- -# HDFS IO implementation - -_HDFS_PATH_RE = re.compile('hdfs://(.*):(\d+)(.*)') - -try: - # Python 3 - from queue import Queue, Empty as QueueEmpty, Full as QueueFull -except ImportError: - from Queue import Queue, Empty as QueueEmpty, Full as QueueFull - - -def have_libhdfs(): - try: - check_status(HaveLibHdfs()) - return True - except: - return False - - -def have_libhdfs3(): - try: - check_status(HaveLibHdfs3()) - return True - except: - return False - - -def strip_hdfs_abspath(path): - m = _HDFS_PATH_RE.match(path) - if m: - return m.group(3) - else: - return path - - -cdef class _HdfsClient: - cdef: - shared_ptr[CHdfsClient] client - - cdef readonly: - bint is_open - - def __cinit__(self): - pass - - def _connect(self, host, port, user, kerb_ticket, driver): - cdef HdfsConnectionConfig conf - - if host is not None: - conf.host = tobytes(host) - conf.port = port - if user is not None: - conf.user = tobytes(user) - if kerb_ticket is not None: - conf.kerb_ticket = tobytes(kerb_ticket) - - if driver == 'libhdfs': - check_status(HaveLibHdfs()) - conf.driver = HdfsDriver_LIBHDFS - else: - check_status(HaveLibHdfs3()) - conf.driver = HdfsDriver_LIBHDFS3 - - with nogil: - check_status(CHdfsClient.Connect(&conf, &self.client)) - self.is_open = True - - @classmethod - def connect(cls, *args, **kwargs): - return cls(*args, **kwargs) - - def __dealloc__(self): - if self.is_open: - self.close() - - def close(self): - """ - Disconnect from the HDFS cluster - """ - self._ensure_client() - with nogil: - check_status(self.client.get().Disconnect()) - self.is_open = False - - cdef _ensure_client(self): - if self.client.get() == NULL: - raise IOError('HDFS client improperly initialized') - elif not self.is_open: - raise IOError('HDFS client is closed') - - def exists(self, path): - """ - Returns True if the path is known to the cluster, False if it does not - (or there is an RPC error) - """ - self._ensure_client() - - cdef c_string c_path = tobytes(path) - cdef c_bool result - with nogil: - result = self.client.get().Exists(c_path) - return result - - def isdir(self, path): - cdef HdfsPathInfo info - self._path_info(path, &info) - return info.kind == ObjectType_DIRECTORY - - def isfile(self, path): - cdef HdfsPathInfo info - self._path_info(path, &info) - return info.kind == ObjectType_FILE - - cdef _path_info(self, path, HdfsPathInfo* info): - cdef c_string c_path = tobytes(path) - - with nogil: - check_status(self.client.get() - .GetPathInfo(c_path, info)) - - - def ls(self, path, bint full_info): - cdef: - c_string c_path = tobytes(path) - vector[HdfsPathInfo] listing - list results = [] - int i - - self._ensure_client() - - with nogil: - check_status(self.client.get() - .ListDirectory(c_path, &listing)) - - cdef const HdfsPathInfo* info - for i in range(<int> listing.size()): - info = &listing[i] - - # Try to trim off the hdfs://HOST:PORT piece - name = strip_hdfs_abspath(frombytes(info.name)) - - if full_info: - kind = ('file' if info.kind == ObjectType_FILE - else 'directory') - - results.append({ - 'kind': kind, - 'name': name, - 'owner': frombytes(info.owner), - 'group': frombytes(info.group), - 'list_modified_time': info.last_modified_time, - 'list_access_time': info.last_access_time, - 'size': info.size, - 'replication': info.replication, - 'block_size': info.block_size, - 'permissions': info.permissions - }) - else: - results.append(name) - - return results - - def mkdir(self, path): - """ - Create indicated directory and any necessary parent directories - """ - self._ensure_client() - - cdef c_string c_path = tobytes(path) - with nogil: - check_status(self.client.get() - .MakeDirectory(c_path)) - - def delete(self, path, bint recursive=False): - """ - Delete the indicated file or directory - - Parameters - ---------- - path : string - recursive : boolean, default False - If True, also delete child paths for directories - """ - self._ensure_client() - - cdef c_string c_path = tobytes(path) - with nogil: - check_status(self.client.get() - .Delete(c_path, recursive == 1)) - - def open(self, path, mode='rb', buffer_size=None, replication=None, - default_block_size=None): - """ - Parameters - ---------- - mode : string, 'rb', 'wb', 'ab' - """ - self._ensure_client() - - cdef HdfsFile out = HdfsFile() - - if mode not in ('rb', 'wb', 'ab'): - raise Exception("Mode must be 'rb' (read), " - "'wb' (write, new file), or 'ab' (append)") - - cdef c_string c_path = tobytes(path) - cdef c_bool append = False - - # 0 in libhdfs means "use the default" - cdef int32_t c_buffer_size = buffer_size or 0 - cdef int16_t c_replication = replication or 0 - cdef int64_t c_default_block_size = default_block_size or 0 - - cdef shared_ptr[HdfsOutputStream] wr_handle - cdef shared_ptr[HdfsReadableFile] rd_handle - - if mode in ('wb', 'ab'): - if mode == 'ab': - append = True - - with nogil: - check_status( - self.client.get() - .OpenWriteable(c_path, append, c_buffer_size, - c_replication, c_default_block_size, - &wr_handle)) - - out.wr_file = <shared_ptr[OutputStream]> wr_handle - - out.is_readable = False - out.is_writeable = 1 - else: - with nogil: - check_status(self.client.get() - .OpenReadable(c_path, &rd_handle)) - - out.rd_file = <shared_ptr[RandomAccessFile]> rd_handle - out.is_readable = True - out.is_writeable = 0 - - if c_buffer_size == 0: - c_buffer_size = 2 ** 16 - - out.mode = mode - out.buffer_size = c_buffer_size - out.parent = _HdfsFileNanny(self, out) - out.is_open = True - out.own_file = True - - return out - - def download(self, path, stream, buffer_size=None): - with self.open(path, 'rb') as f: - f.download(stream, buffer_size=buffer_size) - - def upload(self, path, stream, buffer_size=None): - """ - Upload file-like object to HDFS path - """ - with self.open(path, 'wb') as f: - f.upload(stream, buffer_size=buffer_size) - - -# ARROW-404: Helper class to ensure that files are closed before the -# client. During deallocation of the extension class, the attributes are -# decref'd which can cause the client to get closed first if the file has the -# last remaining reference -cdef class _HdfsFileNanny: - cdef: - object client - object file_handle_ref - - def __cinit__(self, client, file_handle): - import weakref - self.client = client - self.file_handle_ref = weakref.ref(file_handle) - - def __dealloc__(self): - fh = self.file_handle_ref() - if fh: - fh.close() - # avoid cyclic GC - self.file_handle_ref = None - self.client = None - - -cdef class HdfsFile(NativeFile): - cdef readonly: - int32_t buffer_size - object mode - object parent - - cdef object __weakref__ - - def __dealloc__(self): - self.parent = None http://git-wip-us.apache.org/repos/asf/arrow/blob/e1d574c7/python/pyarrow/lib.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/lib.pyx b/python/pyarrow/lib.pyx index 5990308..4df2fcd 100644 --- a/python/pyarrow/lib.pyx +++ b/python/pyarrow/lib.pyx @@ -114,6 +114,7 @@ include "table.pxi" # File IO include "io.pxi" +include "io-hdfs.pxi" # IPC / Messaging include "ipc.pxi" http://git-wip-us.apache.org/repos/asf/arrow/blob/e1d574c7/python/pyarrow/parquet.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index c870412..6d39a23 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -22,7 +22,7 @@ import six import numpy as np -from pyarrow.filesystem import Filesystem, LocalFilesystem +from pyarrow.filesystem import FileSystem, LocalFileSystem from pyarrow._parquet import (ParquetReader, FileMetaData, # noqa RowGroupMetaData, ParquetSchema, ParquetWriter) @@ -403,7 +403,7 @@ class ParquetManifest(object): """ def __init__(self, dirpath, filesystem=None, pathsep='/', partition_scheme='hive'): - self.filesystem = filesystem or LocalFilesystem.get_instance() + self.filesystem = filesystem or LocalFileSystem.get_instance() self.pathsep = pathsep self.dirpath = dirpath self.partition_scheme = partition_scheme @@ -506,7 +506,7 @@ class ParquetDataset(object): ---------- path_or_paths : str or List[str] A directory name, single file name, or list of file names - filesystem : Filesystem, default None + filesystem : FileSystem, default None If nothing passed, paths assumed to be found in the local on-disk filesystem metadata : pyarrow.parquet.FileMetaData @@ -522,7 +522,7 @@ class ParquetDataset(object): def __init__(self, path_or_paths, filesystem=None, schema=None, metadata=None, split_row_groups=False, validate_schema=True): if filesystem is None: - self.fs = LocalFilesystem.get_instance() + self.fs = LocalFileSystem.get_instance() else: self.fs = _ensure_filesystem(filesystem) @@ -631,7 +631,7 @@ class ParquetDataset(object): return keyvalues.get(b'pandas', None) def _get_open_file_func(self): - if self.fs is None or isinstance(self.fs, LocalFilesystem): + if self.fs is None or isinstance(self.fs, LocalFileSystem): def open_file(path, meta=None): return ParquetFile(path, metadata=meta, common_metadata=self.common_metadata) @@ -644,7 +644,7 @@ class ParquetDataset(object): def _ensure_filesystem(fs): - if not isinstance(fs, Filesystem): + if not isinstance(fs, FileSystem): if type(fs).__name__ == 'S3FileSystem': from pyarrow.filesystem import S3FSWrapper return S3FSWrapper(fs) @@ -716,7 +716,7 @@ def read_table(source, columns=None, nthreads=1, metadata=None, Content of the file as a table (of columns) """ if is_string(source): - fs = LocalFilesystem.get_instance() + fs = LocalFileSystem.get_instance() if fs.isdir(source): return fs.read_parquet(source, columns=columns, metadata=metadata) http://git-wip-us.apache.org/repos/asf/arrow/blob/e1d574c7/python/pyarrow/tests/test_hdfs.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_hdfs.py b/python/pyarrow/tests/test_hdfs.py index 1026408..79638f2 100644 --- a/python/pyarrow/tests/test_hdfs.py +++ b/python/pyarrow/tests/test_hdfs.py @@ -72,7 +72,26 @@ class HdfsTestCases(object): cls.hdfs.delete(cls.tmp_path, recursive=True) cls.hdfs.close() - def test_hdfs_close(self): + def test_cat(self): + path = pjoin(self.tmp_path, 'cat-test') + + data = b'foobarbaz' + with self.hdfs.open(path, 'wb') as f: + f.write(data) + + contents = self.hdfs.cat(path) + assert contents == data + + def test_capacity_space(self): + capacity = self.hdfs.get_capacity() + space_used = self.hdfs.get_space_used() + disk_free = self.hdfs.df() + + assert capacity > 0 + assert capacity > space_used + assert disk_free == (capacity - space_used) + + def test_close(self): client = hdfs_test_client() assert client.is_open client.close() @@ -81,7 +100,7 @@ class HdfsTestCases(object): with pytest.raises(Exception): client.ls('/') - def test_hdfs_mkdir(self): + def test_mkdir(self): path = pjoin(self.tmp_path, 'test-dir/test-dir') parent_path = pjoin(self.tmp_path, 'test-dir') @@ -91,7 +110,64 @@ class HdfsTestCases(object): self.hdfs.delete(parent_path, recursive=True) assert not self.hdfs.exists(path) - def test_hdfs_ls(self): + def test_mv_rename(self): + path = pjoin(self.tmp_path, 'mv-test') + new_path = pjoin(self.tmp_path, 'mv-new-test') + + data = b'foobarbaz' + with self.hdfs.open(path, 'wb') as f: + f.write(data) + + assert self.hdfs.exists(path) + self.hdfs.mv(path, new_path) + assert not self.hdfs.exists(path) + assert self.hdfs.exists(new_path) + + assert self.hdfs.cat(new_path) == data + + self.hdfs.rename(new_path, path) + assert self.hdfs.cat(path) == data + + def test_info(self): + path = pjoin(self.tmp_path, 'info-base') + file_path = pjoin(path, 'ex') + self.hdfs.mkdir(path) + + data = b'foobarbaz' + with self.hdfs.open(file_path, 'wb') as f: + f.write(data) + + path_info = self.hdfs.info(path) + file_path_info = self.hdfs.info(file_path) + + assert path_info['kind'] == 'directory' + + assert file_path_info['kind'] == 'file' + assert file_path_info['size'] == len(data) + + def test_disk_usage(self): + path = pjoin(self.tmp_path, 'disk-usage-base') + p1 = pjoin(path, 'p1') + p2 = pjoin(path, 'p2') + + subdir = pjoin(path, 'subdir') + p3 = pjoin(subdir, 'p3') + + if self.hdfs.exists(path): + self.hdfs.delete(path, True) + + self.hdfs.mkdir(path) + self.hdfs.mkdir(subdir) + + data = b'foobarbaz' + + for file_path in [p1, p2, p3]: + with self.hdfs.open(file_path, 'wb') as f: + f.write(data) + + assert self.hdfs.disk_usage(path) == len(data) * 3 + + def test_ls(self): base_path = pjoin(self.tmp_path, 'ls-test') self.hdfs.mkdir(base_path) @@ -106,7 +182,12 @@ class HdfsTestCases(object): contents = sorted(self.hdfs.ls(base_path, False)) assert contents == [dir_path, f1_path] - def test_hdfs_download_upload(self): + def test_chmod_chown(self): + path = pjoin(self.tmp_path, 'chmod-test') + with self.hdfs.open(path, 'wb') as f: + f.write(b'a' * 10) + + def test_download_upload(self): base_path = pjoin(self.tmp_path, 'upload-test') data = b'foobarbaz' @@ -120,7 +201,7 @@ class HdfsTestCases(object): out_buf.seek(0) assert out_buf.getvalue() == data - def test_hdfs_file_context_manager(self): + def test_file_context_manager(self): path = pjoin(self.tmp_path, 'ctx-manager') data = b'foo' @@ -132,7 +213,7 @@ class HdfsTestCases(object): result = f.read(10) assert result == data - def test_hdfs_read_whole_file(self): + def test_read_whole_file(self): path = pjoin(self.tmp_path, 'read-whole-file') data = b'foo' * 1000 @@ -145,7 +226,7 @@ class HdfsTestCases(object): assert result == data @test_parquet.parquet - def test_hdfs_read_multiple_parquet_files(self): + def test_read_multiple_parquet_files(self): import pyarrow.parquet as pq nfiles = 10 @@ -191,7 +272,7 @@ class TestLibHdfs(HdfsTestCases, unittest.TestCase): if not pa.have_libhdfs(): pytest.fail('No libhdfs available on system') - def test_hdfs_orphaned_file(self): + def test_orphaned_file(self): hdfs = hdfs_test_client() file_path = self._make_test_file(hdfs, 'orphaned_file_test', 'fname', 'foobarbaz') http://git-wip-us.apache.org/repos/asf/arrow/blob/e1d574c7/python/pyarrow/tests/test_parquet.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index 06265ca..ab3b26c 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -23,7 +23,7 @@ import json import pytest from pyarrow.compat import guid, u -from pyarrow.filesystem import LocalFilesystem +from pyarrow.filesystem import LocalFileSystem import pyarrow as pa from .pandas_examples import dataframe_with_arrays, dataframe_with_lists @@ -700,7 +700,7 @@ def test_partition_set_dictionary_type(): @parquet def test_read_partitioned_directory(tmpdir): - fs = LocalFilesystem.get_instance() + fs = LocalFileSystem.get_instance() base_path = str(tmpdir) _partition_test_for_filesystem(fs, base_path)