hanahmily commented on code in PR #341: URL: https://github.com/apache/skywalking-banyandb/pull/341#discussion_r1365030620
########## pkg/fs/file_system.go: ########## @@ -19,59 +19,58 @@ package fs import ( - "container/list" + "bufio" + "os" ) +const moduleName string = "filesystem" + // Mode contains permission of file and directory. -type Mode struct{} +type Mode uint64 // Iter is used for streaming read. -type Iter struct{} +type Iter struct { + fileName string + reader *bufio.Reader + buffer []byte +} // Dir operation interface. type Dir interface { Review Comment: I believe we should remove the `Dir` as `os` package typically interacts with the directory directly instead of using an `os.File`. Having a separate Dir doesn't add any value other than requiring us to be mindful of closing it to prevent resource leaks. Thus, we should consider moving the directory's operations back into the `FileSystem`. ########## pkg/fs/file_system.go: ########## @@ -19,59 +19,58 @@ package fs import ( - "container/list" + "bufio" + "os" ) +const moduleName string = "filesystem" + // Mode contains permission of file and directory. -type Mode struct{} +type Mode uint64 // Iter is used for streaming read. -type Iter struct{} +type Iter struct { + fileName string + reader *bufio.Reader + buffer []byte +} // Dir operation interface. type Dir interface { // Delete the directory. DeleteDirectory() error - // Rename the directory. - RenameDirectory(newName string) error // Get all lists of files or children's directories in the directory. - ReadDirectory() (list.List, error) - // Set directory permission. - SetDirectoryPermission(permission Mode) error + ReadDirectory() ([]os.DirEntry, error) + // Close directory. + CloseDirectory() error } // File operation interface. type File interface { // Append mode, which adds new data to the end of a file. - AppendWriteFile(buffer []byte) error + AppendWriteFile(buffer []byte) (int, error) // Vector Append mode, which supports appending consecutive buffers to the end of the file. - AppendWritevFile(iov *[][]byte) error + AppendWritevFile(iov *[][]byte) (int, error) // Delete the file. DeleteFile() error // Reading a specified location of file. - ReadFile(offset int, buffer []byte) error + ReadFile(offset int64, buffer []byte) (int, error) // Reading contiguous regions of a file and dispersing them into discontinuous buffers. - ReadvFile(iov *[][]byte) error - // Read the entire file using streaming read - StreamReadFile(offset int, buffer []byte) (*Iter, error) - // Rename the file. - RenameFile(newName string) error + ReadvFile(offset int64, iov *[][]byte) (int, error) + // Read the entire file using streaming read. + StreamReadFile(buffer []byte) (*Iter, error) // Get the file written data's size and return an error if the file does not exist. The unit of file size is Byte. - GetFileSize() (int, error) - // Set directory permission. - SetFilePermission(permission Mode) error + GetFileSize() (int64, error) + // Close File. + CloseFile() error } // FileSystem operation interface. type FileSystem interface { - // Create the directory by specified name and mode. - CreateDirectory(name string, permission Mode) error - // Open the directory by specified name. - OpenDirectory(name string) (*Dir, error) - // Create the file by specified name and mode. - CreateFile(name string, permission Mode) error - // Open the file by specified name. - OpenFile(name string) (*File, error) + // Create and open the directory by specified name and mode. + CreateDirectory(name string, permission Mode) (Dir, error) Review Comment: Merging Create and Open operations into one is not a proper solution. When I create a directory, the subsequent step is to write some files into it. The Dir is not necessary because `CreateFile` doesn't require it. However, I still need to close the Dir. Thus, CreateDirectory doesn't return the Dir, and OpenDirectory is still required. ########## pkg/fs/file_system.go: ########## @@ -19,59 +19,58 @@ package fs import ( - "container/list" + "bufio" + "os" ) +const moduleName string = "filesystem" + // Mode contains permission of file and directory. -type Mode struct{} +type Mode uint64 // Iter is used for streaming read. -type Iter struct{} +type Iter struct { + fileName string + reader *bufio.Reader + buffer []byte +} // Dir operation interface. type Dir interface { // Delete the directory. DeleteDirectory() error - // Rename the directory. - RenameDirectory(newName string) error // Get all lists of files or children's directories in the directory. - ReadDirectory() (list.List, error) - // Set directory permission. - SetDirectoryPermission(permission Mode) error + ReadDirectory() ([]os.DirEntry, error) + // Close directory. + CloseDirectory() error } // File operation interface. type File interface { // Append mode, which adds new data to the end of a file. - AppendWriteFile(buffer []byte) error + AppendWriteFile(buffer []byte) (int, error) // Vector Append mode, which supports appending consecutive buffers to the end of the file. - AppendWritevFile(iov *[][]byte) error + AppendWritevFile(iov *[][]byte) (int, error) // Delete the file. DeleteFile() error // Reading a specified location of file. - ReadFile(offset int, buffer []byte) error + ReadFile(offset int64, buffer []byte) (int, error) // Reading contiguous regions of a file and dispersing them into discontinuous buffers. - ReadvFile(iov *[][]byte) error - // Read the entire file using streaming read - StreamReadFile(offset int, buffer []byte) (*Iter, error) - // Rename the file. - RenameFile(newName string) error + ReadvFile(offset int64, iov *[][]byte) (int, error) + // Read the entire file using streaming read. + StreamReadFile(buffer []byte) (*Iter, error) Review Comment: ```suggestion StreamRead(buffer []byte) (*Iter, error) ``` ########## pkg/fs/file_system.go: ########## @@ -19,59 +19,58 @@ package fs import ( - "container/list" + "bufio" + "os" ) +const moduleName string = "filesystem" + // Mode contains permission of file and directory. -type Mode struct{} +type Mode uint64 // Iter is used for streaming read. -type Iter struct{} +type Iter struct { + fileName string + reader *bufio.Reader + buffer []byte +} // Dir operation interface. type Dir interface { // Delete the directory. DeleteDirectory() error - // Rename the directory. - RenameDirectory(newName string) error // Get all lists of files or children's directories in the directory. - ReadDirectory() (list.List, error) - // Set directory permission. - SetDirectoryPermission(permission Mode) error + ReadDirectory() ([]os.DirEntry, error) + // Close directory. + CloseDirectory() error } // File operation interface. type File interface { // Append mode, which adds new data to the end of a file. - AppendWriteFile(buffer []byte) error + AppendWriteFile(buffer []byte) (int, error) // Vector Append mode, which supports appending consecutive buffers to the end of the file. - AppendWritevFile(iov *[][]byte) error + AppendWritevFile(iov *[][]byte) (int, error) // Delete the file. DeleteFile() error // Reading a specified location of file. - ReadFile(offset int, buffer []byte) error + ReadFile(offset int64, buffer []byte) (int, error) // Reading contiguous regions of a file and dispersing them into discontinuous buffers. - ReadvFile(iov *[][]byte) error - // Read the entire file using streaming read - StreamReadFile(offset int, buffer []byte) (*Iter, error) - // Rename the file. - RenameFile(newName string) error + ReadvFile(offset int64, iov *[][]byte) (int, error) Review Comment: ```suggestion Readv(offset int64, iov *[][]byte) (int, error) ``` ########## pkg/fs/local_file_system.go: ########## @@ -0,0 +1,408 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package fs (file system) is an independent component to operate file and directory. +package fs + +import ( + "bufio" + "fmt" + "io" + "os" + + "github.com/apache/skywalking-banyandb/pkg/logger" +) + +// LocalFileSystem implements the File System interface. +type LocalFileSystem struct { + logger *logger.Logger +} + +// LocalDirectory implements the Dir interface. +type LocalDirectory struct { + dir *os.File +} + +// LocalFile implements the File interface. +type LocalFile struct { + file *os.File +} + +// NewLocalFileSystem is used to create the Local File system. +func NewLocalFileSystem() FileSystem { + return &LocalFileSystem{ + logger: logger.GetLogger(moduleName), + } +} + +// CreateDirectory is used to create and open the directory by specified name and mode. +func (fs *LocalFileSystem) CreateDirectory(name string, permission Mode) (Dir, error) { + var err error + err = os.MkdirAll(name, os.FileMode(permission)) + if err != nil { + if os.IsPermission(err) { + return nil, &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, directory name: %s, permission: %d, error message: %s", name, permission, err), + } + } + return nil, &FileSystemError{ + Code: otherError, + Message: fmt.Sprintf("Create directory return error, directory name: %s, error message: %s", name, err), + } + } + + dir, err := os.Open(name) + if err != nil { + return nil, &FileSystemError{ + Code: openError, + Message: fmt.Sprintf("Open directory return error, directory name: %s, error message: %s", name, err), + } + } + + return &LocalDirectory{ + dir: dir, + }, nil +} + +// CreateFile is used to create and open the file by specified name and mode. +func (fs *LocalFileSystem) CreateFile(name string, permission Mode) (File, error) { + file, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(permission)) + if err != nil { + if os.IsExist(err) { + return nil, &FileSystemError{ + Code: isExistError, + Message: fmt.Sprintf("File is exist, file name: %s,error message: %s", name, err), + } + } else if os.IsPermission(err) { + return nil, &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, file name: %s, permission: %d,error message: %s", name, permission, err), + } + } else { + return nil, &FileSystemError{ + Code: otherError, + Message: fmt.Sprintf("Create file return error, file name: %s,error message: %s", name, err), + } + } + } + + return &LocalFile{ + file: file, + }, nil +} + +// FlushWriteFile is Flush mode, which flushes all data to one file. +func (fs *LocalFileSystem) FlushWriteFile(buffer []byte, name string, permission Mode) (int, error) { + file, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(permission)) + if err != nil { + if os.IsExist(err) { + return 0, &FileSystemError{ + Code: isExistError, + Message: fmt.Sprintf("File is exist, file name: %s,error message: %s", name, err), + } + } else if os.IsPermission(err) { + return 0, &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, file name: %s, permission: %d,error message: %s", name, permission, err), + } + } else { + return 0, &FileSystemError{ + Code: otherError, + Message: fmt.Sprintf("Create file return error, file name: %s,error message: %s", name, err), + } + } + } + defer file.Close() + + size, err := file.Write(buffer) + if err != nil { + return size, &FileSystemError{ + Code: flushError, + Message: fmt.Sprintf("Flush file return error, file name: %s,error message: %s", name, err), + } + } + Review Comment: On Linux and BSD-like systems, use unix.Fadvise to avoid caching data in the page cache. BanyanDB has its own cache, so don't rely on the file system's cache. ########## pkg/fs/local_file_system_test.go: ########## @@ -0,0 +1,173 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package fs (file system) is an independent component to operate file and directory. +package fs + +import ( + "bytes" + "errors" + "io" + "os" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" +) + +var _ = ginkgo.Describe("Loacl File System", func() { + const ( + data string = "BanyanDB" + dirName string = "dir" + fileName string = "dir/file" + flushFileName string = "dir/flushFile" + ) + + var fs FileSystem + + ginkgo.Context("Local File", func() { + ginkgo.BeforeEach(func() { + fs = NewLocalFileSystem() Review Comment: To prevent cluttering in the current directory, kindly execute the test within the `tmp` folder. ########## pkg/fs/file_system.go: ########## @@ -19,59 +19,58 @@ package fs import ( - "container/list" + "bufio" + "os" ) +const moduleName string = "filesystem" + // Mode contains permission of file and directory. -type Mode struct{} +type Mode uint64 // Iter is used for streaming read. -type Iter struct{} +type Iter struct { + fileName string + reader *bufio.Reader + buffer []byte +} // Dir operation interface. type Dir interface { // Delete the directory. DeleteDirectory() error - // Rename the directory. - RenameDirectory(newName string) error // Get all lists of files or children's directories in the directory. - ReadDirectory() (list.List, error) - // Set directory permission. - SetDirectoryPermission(permission Mode) error + ReadDirectory() ([]os.DirEntry, error) Review Comment: You can't expose `os` package through the return value ########## pkg/fs/file_system.go: ########## @@ -19,59 +19,58 @@ package fs import ( - "container/list" + "bufio" + "os" ) +const moduleName string = "filesystem" + // Mode contains permission of file and directory. -type Mode struct{} +type Mode uint64 // Iter is used for streaming read. -type Iter struct{} +type Iter struct { + fileName string + reader *bufio.Reader + buffer []byte +} // Dir operation interface. type Dir interface { // Delete the directory. DeleteDirectory() error - // Rename the directory. - RenameDirectory(newName string) error // Get all lists of files or children's directories in the directory. - ReadDirectory() (list.List, error) - // Set directory permission. - SetDirectoryPermission(permission Mode) error + ReadDirectory() ([]os.DirEntry, error) + // Close directory. + CloseDirectory() error } // File operation interface. type File interface { // Append mode, which adds new data to the end of a file. - AppendWriteFile(buffer []byte) error + AppendWriteFile(buffer []byte) (int, error) // Vector Append mode, which supports appending consecutive buffers to the end of the file. - AppendWritevFile(iov *[][]byte) error + AppendWritevFile(iov *[][]byte) (int, error) // Delete the file. DeleteFile() error // Reading a specified location of file. - ReadFile(offset int, buffer []byte) error + ReadFile(offset int64, buffer []byte) (int, error) // Reading contiguous regions of a file and dispersing them into discontinuous buffers. - ReadvFile(iov *[][]byte) error - // Read the entire file using streaming read - StreamReadFile(offset int, buffer []byte) (*Iter, error) - // Rename the file. - RenameFile(newName string) error + ReadvFile(offset int64, iov *[][]byte) (int, error) + // Read the entire file using streaming read. + StreamReadFile(buffer []byte) (*Iter, error) // Get the file written data's size and return an error if the file does not exist. The unit of file size is Byte. - GetFileSize() (int, error) - // Set directory permission. - SetFilePermission(permission Mode) error + GetFileSize() (int64, error) Review Comment: ```suggestion Size() (int64, error) ``` ########## pkg/fs/file_system.go: ########## @@ -19,59 +19,58 @@ package fs import ( - "container/list" + "bufio" + "os" ) +const moduleName string = "filesystem" + // Mode contains permission of file and directory. -type Mode struct{} +type Mode uint64 // Iter is used for streaming read. -type Iter struct{} +type Iter struct { + fileName string + reader *bufio.Reader + buffer []byte +} // Dir operation interface. type Dir interface { // Delete the directory. DeleteDirectory() error - // Rename the directory. - RenameDirectory(newName string) error // Get all lists of files or children's directories in the directory. - ReadDirectory() (list.List, error) - // Set directory permission. - SetDirectoryPermission(permission Mode) error + ReadDirectory() ([]os.DirEntry, error) + // Close directory. + CloseDirectory() error } // File operation interface. type File interface { // Append mode, which adds new data to the end of a file. - AppendWriteFile(buffer []byte) error + AppendWriteFile(buffer []byte) (int, error) // Vector Append mode, which supports appending consecutive buffers to the end of the file. - AppendWritevFile(iov *[][]byte) error + AppendWritevFile(iov *[][]byte) (int, error) // Delete the file. DeleteFile() error Review Comment: Let's move it back to FileSystem for the same reason as Dir. ########## pkg/fs/file_system.go: ########## @@ -19,59 +19,58 @@ package fs import ( - "container/list" + "bufio" + "os" ) +const moduleName string = "filesystem" + // Mode contains permission of file and directory. -type Mode struct{} +type Mode uint64 // Iter is used for streaming read. -type Iter struct{} +type Iter struct { + fileName string + reader *bufio.Reader + buffer []byte +} // Dir operation interface. type Dir interface { // Delete the directory. DeleteDirectory() error - // Rename the directory. - RenameDirectory(newName string) error // Get all lists of files or children's directories in the directory. - ReadDirectory() (list.List, error) - // Set directory permission. - SetDirectoryPermission(permission Mode) error + ReadDirectory() ([]os.DirEntry, error) + // Close directory. + CloseDirectory() error } // File operation interface. type File interface { // Append mode, which adds new data to the end of a file. - AppendWriteFile(buffer []byte) error + AppendWriteFile(buffer []byte) (int, error) Review Comment: ```suggestion Write(buffer []byte) (int, error) ``` ########## pkg/fs/file_system.go: ########## @@ -19,59 +19,58 @@ package fs import ( - "container/list" + "bufio" + "os" ) +const moduleName string = "filesystem" + // Mode contains permission of file and directory. -type Mode struct{} +type Mode uint64 // Iter is used for streaming read. -type Iter struct{} +type Iter struct { + fileName string + reader *bufio.Reader + buffer []byte +} // Dir operation interface. type Dir interface { // Delete the directory. DeleteDirectory() error - // Rename the directory. - RenameDirectory(newName string) error // Get all lists of files or children's directories in the directory. - ReadDirectory() (list.List, error) - // Set directory permission. - SetDirectoryPermission(permission Mode) error + ReadDirectory() ([]os.DirEntry, error) + // Close directory. + CloseDirectory() error } // File operation interface. type File interface { // Append mode, which adds new data to the end of a file. - AppendWriteFile(buffer []byte) error + AppendWriteFile(buffer []byte) (int, error) // Vector Append mode, which supports appending consecutive buffers to the end of the file. - AppendWritevFile(iov *[][]byte) error + AppendWritevFile(iov *[][]byte) (int, error) // Delete the file. DeleteFile() error // Reading a specified location of file. - ReadFile(offset int, buffer []byte) error + ReadFile(offset int64, buffer []byte) (int, error) Review Comment: ```suggestion Read(offset int64, buffer []byte) (int, error) ``` ########## pkg/fs/file_system.go: ########## @@ -19,59 +19,58 @@ package fs import ( - "container/list" + "bufio" + "os" ) +const moduleName string = "filesystem" + // Mode contains permission of file and directory. -type Mode struct{} +type Mode uint64 // Iter is used for streaming read. -type Iter struct{} +type Iter struct { + fileName string + reader *bufio.Reader + buffer []byte +} // Dir operation interface. type Dir interface { // Delete the directory. DeleteDirectory() error - // Rename the directory. - RenameDirectory(newName string) error // Get all lists of files or children's directories in the directory. - ReadDirectory() (list.List, error) - // Set directory permission. - SetDirectoryPermission(permission Mode) error + ReadDirectory() ([]os.DirEntry, error) + // Close directory. + CloseDirectory() error } // File operation interface. type File interface { // Append mode, which adds new data to the end of a file. - AppendWriteFile(buffer []byte) error + AppendWriteFile(buffer []byte) (int, error) // Vector Append mode, which supports appending consecutive buffers to the end of the file. - AppendWritevFile(iov *[][]byte) error + AppendWritevFile(iov *[][]byte) (int, error) // Delete the file. DeleteFile() error // Reading a specified location of file. - ReadFile(offset int, buffer []byte) error + ReadFile(offset int64, buffer []byte) (int, error) // Reading contiguous regions of a file and dispersing them into discontinuous buffers. - ReadvFile(iov *[][]byte) error - // Read the entire file using streaming read - StreamReadFile(offset int, buffer []byte) (*Iter, error) - // Rename the file. - RenameFile(newName string) error + ReadvFile(offset int64, iov *[][]byte) (int, error) + // Read the entire file using streaming read. + StreamReadFile(buffer []byte) (*Iter, error) // Get the file written data's size and return an error if the file does not exist. The unit of file size is Byte. - GetFileSize() (int, error) - // Set directory permission. - SetFilePermission(permission Mode) error + GetFileSize() (int64, error) + // Close File. + CloseFile() error Review Comment: ```suggestion Close() error ``` ########## pkg/fs/file_system.go: ########## @@ -19,59 +19,58 @@ package fs import ( - "container/list" + "bufio" + "os" ) +const moduleName string = "filesystem" + // Mode contains permission of file and directory. -type Mode struct{} +type Mode uint64 // Iter is used for streaming read. -type Iter struct{} +type Iter struct { + fileName string + reader *bufio.Reader + buffer []byte +} // Dir operation interface. type Dir interface { // Delete the directory. DeleteDirectory() error - // Rename the directory. - RenameDirectory(newName string) error // Get all lists of files or children's directories in the directory. - ReadDirectory() (list.List, error) - // Set directory permission. - SetDirectoryPermission(permission Mode) error + ReadDirectory() ([]os.DirEntry, error) + // Close directory. + CloseDirectory() error } // File operation interface. type File interface { // Append mode, which adds new data to the end of a file. - AppendWriteFile(buffer []byte) error + AppendWriteFile(buffer []byte) (int, error) // Vector Append mode, which supports appending consecutive buffers to the end of the file. - AppendWritevFile(iov *[][]byte) error + AppendWritevFile(iov *[][]byte) (int, error) Review Comment: ```suggestion Writev(iov *[][]byte) (int, error) ``` ########## pkg/fs/file_system.go: ########## @@ -19,59 +19,58 @@ package fs import ( - "container/list" + "bufio" + "os" ) +const moduleName string = "filesystem" + // Mode contains permission of file and directory. -type Mode struct{} +type Mode uint64 // Iter is used for streaming read. -type Iter struct{} +type Iter struct { + fileName string + reader *bufio.Reader + buffer []byte +} // Dir operation interface. type Dir interface { // Delete the directory. DeleteDirectory() error - // Rename the directory. - RenameDirectory(newName string) error // Get all lists of files or children's directories in the directory. - ReadDirectory() (list.List, error) - // Set directory permission. - SetDirectoryPermission(permission Mode) error + ReadDirectory() ([]os.DirEntry, error) + // Close directory. + CloseDirectory() error } // File operation interface. type File interface { // Append mode, which adds new data to the end of a file. - AppendWriteFile(buffer []byte) error + AppendWriteFile(buffer []byte) (int, error) // Vector Append mode, which supports appending consecutive buffers to the end of the file. - AppendWritevFile(iov *[][]byte) error + AppendWritevFile(iov *[][]byte) (int, error) // Delete the file. DeleteFile() error // Reading a specified location of file. - ReadFile(offset int, buffer []byte) error + ReadFile(offset int64, buffer []byte) (int, error) // Reading contiguous regions of a file and dispersing them into discontinuous buffers. - ReadvFile(iov *[][]byte) error - // Read the entire file using streaming read - StreamReadFile(offset int, buffer []byte) (*Iter, error) - // Rename the file. - RenameFile(newName string) error + ReadvFile(offset int64, iov *[][]byte) (int, error) + // Read the entire file using streaming read. + StreamReadFile(buffer []byte) (*Iter, error) // Get the file written data's size and return an error if the file does not exist. The unit of file size is Byte. - GetFileSize() (int, error) - // Set directory permission. - SetFilePermission(permission Mode) error + GetFileSize() (int64, error) + // Close File. + CloseFile() error } // FileSystem operation interface. type FileSystem interface { - // Create the directory by specified name and mode. - CreateDirectory(name string, permission Mode) error - // Open the directory by specified name. - OpenDirectory(name string) (*Dir, error) - // Create the file by specified name and mode. - CreateFile(name string, permission Mode) error - // Open the file by specified name. - OpenFile(name string) (*File, error) + // Create and open the directory by specified name and mode. + CreateDirectory(name string, permission Mode) (Dir, error) + // Create and open the file by specified name and mode. + CreateFile(name string, permission Mode) (File, error) // Flush mode, which flushes all data to one file. - FlushWriteFile(buffer []byte, permission Mode) (*File, error) + FlushWriteFile(buffer []byte, name string, permission Mode) (int, error) Review Comment: ```suggestion Write(buffer []byte, name string, permission Mode) (int, error) ``` ########## pkg/fs/local_file_system.go: ########## @@ -0,0 +1,408 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package fs (file system) is an independent component to operate file and directory. +package fs + +import ( + "bufio" + "fmt" + "io" + "os" + + "github.com/apache/skywalking-banyandb/pkg/logger" +) + +// LocalFileSystem implements the File System interface. +type LocalFileSystem struct { + logger *logger.Logger +} + +// LocalDirectory implements the Dir interface. +type LocalDirectory struct { + dir *os.File +} + +// LocalFile implements the File interface. +type LocalFile struct { + file *os.File +} + +// NewLocalFileSystem is used to create the Local File system. +func NewLocalFileSystem() FileSystem { + return &LocalFileSystem{ + logger: logger.GetLogger(moduleName), + } +} + +// CreateDirectory is used to create and open the directory by specified name and mode. +func (fs *LocalFileSystem) CreateDirectory(name string, permission Mode) (Dir, error) { + var err error + err = os.MkdirAll(name, os.FileMode(permission)) + if err != nil { + if os.IsPermission(err) { + return nil, &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, directory name: %s, permission: %d, error message: %s", name, permission, err), + } + } + return nil, &FileSystemError{ + Code: otherError, + Message: fmt.Sprintf("Create directory return error, directory name: %s, error message: %s", name, err), + } + } + + dir, err := os.Open(name) + if err != nil { + return nil, &FileSystemError{ + Code: openError, + Message: fmt.Sprintf("Open directory return error, directory name: %s, error message: %s", name, err), + } + } + + return &LocalDirectory{ + dir: dir, + }, nil +} + +// CreateFile is used to create and open the file by specified name and mode. +func (fs *LocalFileSystem) CreateFile(name string, permission Mode) (File, error) { + file, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(permission)) + if err != nil { + if os.IsExist(err) { + return nil, &FileSystemError{ + Code: isExistError, + Message: fmt.Sprintf("File is exist, file name: %s,error message: %s", name, err), + } + } else if os.IsPermission(err) { + return nil, &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, file name: %s, permission: %d,error message: %s", name, permission, err), + } + } else { + return nil, &FileSystemError{ + Code: otherError, + Message: fmt.Sprintf("Create file return error, file name: %s,error message: %s", name, err), + } + } + } + + return &LocalFile{ + file: file, + }, nil +} + +// FlushWriteFile is Flush mode, which flushes all data to one file. +func (fs *LocalFileSystem) FlushWriteFile(buffer []byte, name string, permission Mode) (int, error) { + file, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(permission)) + if err != nil { + if os.IsExist(err) { + return 0, &FileSystemError{ + Code: isExistError, + Message: fmt.Sprintf("File is exist, file name: %s,error message: %s", name, err), + } + } else if os.IsPermission(err) { + return 0, &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, file name: %s, permission: %d,error message: %s", name, permission, err), + } + } else { + return 0, &FileSystemError{ + Code: otherError, + Message: fmt.Sprintf("Create file return error, file name: %s,error message: %s", name, err), + } + } + } + defer file.Close() + + size, err := file.Write(buffer) + if err != nil { + return size, &FileSystemError{ + Code: flushError, + Message: fmt.Sprintf("Flush file return error, file name: %s,error message: %s", name, err), + } + } + + return size, nil +} + +// DeleteDirectory is used for deleting the directory. +func (dir *LocalDirectory) DeleteDirectory() error { + err := os.RemoveAll(dir.dir.Name()) + if err != nil { + if os.IsNotExist(err) { + return &FileSystemError{ + Code: isNotExistError, + Message: fmt.Sprintf("Directory is not exist, directory name: %s, error message: %s", dir.dir.Name(), err), + } + } else if os.IsPermission(err) { + return &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, directory name: %s, error message: %s", dir.dir.Name(), err), + } + } else { + return &FileSystemError{ + Code: otherError, + Message: fmt.Sprintf("Delete directory error, directory name: %s, error message: %s", dir.dir.Name(), err), + } + } + } + return nil +} + +// ReadDirectory is used to get all lists of files or children's directories in the directory. +func (dir *LocalDirectory) ReadDirectory() ([]os.DirEntry, error) { + dirs, err := os.ReadDir(dir.dir.Name()) + if err != nil { + if os.IsNotExist(err) { + return nil, &FileSystemError{ + Code: isNotExistError, + Message: fmt.Sprintf("Directory is not exist, directory name: %s, error message: %s", dir.dir.Name(), err), + } + } else if os.IsPermission(err) { + return nil, &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, directory name: %s, error message: %s", dir.dir.Name(), err), + } + } else { + return nil, &FileSystemError{ + Code: otherError, + Message: fmt.Sprintf("Read directory error, directory name: %s, error message: %s", dir.dir.Name(), err), + } + } + } + return dirs, nil +} + +// CloseDirectory is used to close directory. +func (dir *LocalDirectory) CloseDirectory() error { + err := dir.dir.Close() + if err != nil { + return &FileSystemError{ + Code: closeError, + Message: fmt.Sprintf("Close directory error, directory name: %s, error message: %s", dir.dir.Name(), err), + } + } + return nil +} + +// AppendWriteFile is append mode, which adds new data to the end of a file. +func (file *LocalFile) AppendWriteFile(buffer []byte) (int, error) { + size, err := file.file.Write(buffer) + if err != nil { + if os.IsNotExist(err) { + return size, &FileSystemError{ + Code: isNotExistError, + Message: fmt.Sprintf("File is not exist, file name: %s, error message: %s", file.file.Name(), err), + } + } else if os.IsPermission(err) { + return size, &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, file name: %s, error message: %s", file.file.Name(), err), + } + } else { + // include io.ErrShortWrite + return size, &FileSystemError{ + Code: writeError, + Message: fmt.Sprintf("Write file error, file name: %s, error message: %s", file.file.Name(), err), + } + } + } + return size, nil +} + +// AppendWritevFile is vector Append mode, which supports appending consecutive buffers to the end of the file. +// TODO: Optimizing under Linux. +func (file *LocalFile) AppendWritevFile(iov *[][]byte) (int, error) { + var size int + for _, buffer := range *iov { + wsize, err := file.file.Write(buffer) + if err != nil { + if os.IsNotExist(err) { + return size, &FileSystemError{ + Code: isNotExistError, + Message: fmt.Sprintf("File is not exist, file name: %s, error message: %s", file.file.Name(), err), + } + } else if os.IsPermission(err) { + return size, &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, file name: %s, error message: %s", file.file.Name(), err), + } + } else { + // include io.ErrShortWrite + return size, &FileSystemError{ + Code: writeError, + Message: fmt.Sprintf("Write file error, file name: %s, error message: %s", file.file.Name(), err), + } + } + } + size += wsize + } + return size, nil +} + +// DeleteFile is used to delete the file. +func (file *LocalFile) DeleteFile() error { + err := os.Remove(file.file.Name()) + if err != nil { + if os.IsNotExist(err) { + return &FileSystemError{ + Code: isNotExistError, + Message: fmt.Sprintf("File is not exist, file name: %s, error message: %s", file.file.Name(), err), + } + } else if os.IsPermission(err) { + return &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, file name: %s, error message: %s", file.file.Name(), err), + } + } else { + return &FileSystemError{ + Code: otherError, + Message: fmt.Sprintf("Delete file error, file name: %s, error message: %s", file.file.Name(), err), + } + } + } + return nil +} + +// ReadFile is used to read a specified location of file. +func (file *LocalFile) ReadFile(offset int64, buffer []byte) (int, error) { + rsize, err := file.file.ReadAt(buffer, offset) + if err != nil { + if err == io.EOF { + return rsize, err + } else if os.IsNotExist(err) { + return rsize, &FileSystemError{ + Code: isNotExistError, + Message: fmt.Sprintf("File is not exist, file name: %s, error message: %s", file.file.Name(), err), + } + } else if os.IsPermission(err) { + return rsize, &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, file name: %s, error message: %s", file.file.Name(), err), + } + } else { + return rsize, &FileSystemError{ + Code: readError, + Message: fmt.Sprintf("Read file error, file name: %s, read file size: %d, error message: %s", file.file.Name(), rsize, err), + } + } + } + + return rsize, nil +} + +// ReadvFile is used to read contiguous regions of a file and disperse them into discontinuous buffers. +// TODO: Optimizing under Linux. +func (file *LocalFile) ReadvFile(offset int64, iov *[][]byte) (int, error) { + var size int + for _, buffer := range *iov { + rsize, err := file.file.ReadAt(buffer, offset) + if err != nil { + if err == io.EOF { + return size, err + } else if os.IsNotExist(err) { + return size, &FileSystemError{ + Code: isNotExistError, + Message: fmt.Sprintf("File is not exist, file name: %s, error message: %s", file.file.Name(), err), + } + } else if os.IsPermission(err) { + return size, &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, file name: %s, error message: %s", file.file.Name(), err), + } + } else { + return size, &FileSystemError{ + Code: readError, + Message: fmt.Sprintf("Read file error, file name: %s, error message: %s", file.file.Name(), err), + } + } + } + size += rsize + offset += int64(rsize) + } + + return size, nil +} + +// StreamReadFile is used to read the entire file using streaming read. +func (file *LocalFile) StreamReadFile(buffer []byte) (*Iter, error) { + reader := bufio.NewReader(file.file) + return &Iter{reader: reader, buffer: buffer, fileName: file.file.Name()}, nil +} + +// GetFileSize is used to get the file written data's size and return an error if the file does not exist. The unit of file size is Byte. +func (file *LocalFile) GetFileSize() (int64, error) { + fileInfo, err := os.Stat(file.file.Name()) + if err != nil { + if os.IsNotExist(err) { + return -1, &FileSystemError{ + Code: isNotExistError, + Message: fmt.Sprintf("File is not exist, file name: %s, error message: %s", file.file.Name(), err), + } + } else if os.IsPermission(err) { + return -1, &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, file name: %s, error message: %s", file.file.Name(), err), + } + } else { + return -1, &FileSystemError{ + Code: otherError, + Message: fmt.Sprintf("Get file size error, file name: %s, error message: %s", file.file.Name(), err), + } + } + } + return fileInfo.Size(), nil +} + +// CloseFile is used to close File. +func (file *LocalFile) CloseFile() error { + err := file.file.Close() + if err != nil { + return &FileSystemError{ + Code: closeError, + Message: fmt.Sprintf("Close File error, directory name: %s, error message: %s", file.file.Name(), err), + } + } + return nil +} + +// Next is used to obtain the next data of stream. +func (iter *Iter) Next() (int, error) { + rsize, err := iter.reader.Read(iter.buffer) + if err != nil { + if err == io.EOF { + return rsize, err + } else if os.IsNotExist(err) { + return rsize, &FileSystemError{ + Code: isNotExistError, + Message: fmt.Sprintf("File is not exist, file name: %s, error message: %s", iter.fileName, err), + } + } else if os.IsPermission(err) { + return rsize, &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, file name: %s, error message: %s", iter.fileName, err), + } + } else { + return rsize, &FileSystemError{ + Code: readError, + Message: fmt.Sprintf("Read file error, file name: %s, read file size: %d, error message: %s", iter.fileName, rsize, err), + } + } Review Comment: Duplicated code. ########## pkg/fs/local_file_system_test.go: ########## @@ -0,0 +1,173 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package fs (file system) is an independent component to operate file and directory. +package fs + +import ( + "bytes" + "errors" + "io" + "os" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" +) + +var _ = ginkgo.Describe("Loacl File System", func() { + const ( + data string = "BanyanDB" + dirName string = "dir" + fileName string = "dir/file" + flushFileName string = "dir/flushFile" + ) + + var fs FileSystem + + ginkgo.Context("Local File", func() { + ginkgo.BeforeEach(func() { + fs = NewLocalFileSystem() + }) + + ginkgo.AfterEach(func() { + err := os.RemoveAll(dirName) + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + }) + + ginkgo.It("File Operation", func() { Review Comment: It will be better to create the case for each operation ########## pkg/fs/local_file_system.go: ########## @@ -0,0 +1,408 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package fs (file system) is an independent component to operate file and directory. +package fs + +import ( + "bufio" + "fmt" + "io" + "os" + + "github.com/apache/skywalking-banyandb/pkg/logger" +) + +// LocalFileSystem implements the File System interface. +type LocalFileSystem struct { + logger *logger.Logger +} + +// LocalDirectory implements the Dir interface. +type LocalDirectory struct { + dir *os.File +} + +// LocalFile implements the File interface. +type LocalFile struct { + file *os.File +} + +// NewLocalFileSystem is used to create the Local File system. +func NewLocalFileSystem() FileSystem { + return &LocalFileSystem{ + logger: logger.GetLogger(moduleName), + } +} + +// CreateDirectory is used to create and open the directory by specified name and mode. +func (fs *LocalFileSystem) CreateDirectory(name string, permission Mode) (Dir, error) { + var err error + err = os.MkdirAll(name, os.FileMode(permission)) + if err != nil { + if os.IsPermission(err) { + return nil, &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, directory name: %s, permission: %d, error message: %s", name, permission, err), + } + } + return nil, &FileSystemError{ + Code: otherError, + Message: fmt.Sprintf("Create directory return error, directory name: %s, error message: %s", name, err), + } + } + + dir, err := os.Open(name) + if err != nil { + return nil, &FileSystemError{ + Code: openError, + Message: fmt.Sprintf("Open directory return error, directory name: %s, error message: %s", name, err), + } + } + + return &LocalDirectory{ + dir: dir, + }, nil +} + +// CreateFile is used to create and open the file by specified name and mode. +func (fs *LocalFileSystem) CreateFile(name string, permission Mode) (File, error) { + file, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(permission)) + if err != nil { + if os.IsExist(err) { + return nil, &FileSystemError{ + Code: isExistError, + Message: fmt.Sprintf("File is exist, file name: %s,error message: %s", name, err), + } + } else if os.IsPermission(err) { + return nil, &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, file name: %s, permission: %d,error message: %s", name, permission, err), + } + } else { + return nil, &FileSystemError{ + Code: otherError, + Message: fmt.Sprintf("Create file return error, file name: %s,error message: %s", name, err), + } + } + } + + return &LocalFile{ + file: file, + }, nil +} + +// FlushWriteFile is Flush mode, which flushes all data to one file. +func (fs *LocalFileSystem) FlushWriteFile(buffer []byte, name string, permission Mode) (int, error) { + file, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(permission)) + if err != nil { + if os.IsExist(err) { + return 0, &FileSystemError{ + Code: isExistError, + Message: fmt.Sprintf("File is exist, file name: %s,error message: %s", name, err), + } + } else if os.IsPermission(err) { + return 0, &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, file name: %s, permission: %d,error message: %s", name, permission, err), + } + } else { + return 0, &FileSystemError{ + Code: otherError, + Message: fmt.Sprintf("Create file return error, file name: %s,error message: %s", name, err), + } + } + } + defer file.Close() + + size, err := file.Write(buffer) + if err != nil { + return size, &FileSystemError{ + Code: flushError, + Message: fmt.Sprintf("Flush file return error, file name: %s,error message: %s", name, err), + } + } + + return size, nil +} + +// DeleteDirectory is used for deleting the directory. +func (dir *LocalDirectory) DeleteDirectory() error { + err := os.RemoveAll(dir.dir.Name()) + if err != nil { + if os.IsNotExist(err) { + return &FileSystemError{ + Code: isNotExistError, + Message: fmt.Sprintf("Directory is not exist, directory name: %s, error message: %s", dir.dir.Name(), err), + } + } else if os.IsPermission(err) { + return &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, directory name: %s, error message: %s", dir.dir.Name(), err), + } + } else { + return &FileSystemError{ + Code: otherError, + Message: fmt.Sprintf("Delete directory error, directory name: %s, error message: %s", dir.dir.Name(), err), + } + } + } + return nil +} + +// ReadDirectory is used to get all lists of files or children's directories in the directory. +func (dir *LocalDirectory) ReadDirectory() ([]os.DirEntry, error) { + dirs, err := os.ReadDir(dir.dir.Name()) + if err != nil { + if os.IsNotExist(err) { + return nil, &FileSystemError{ + Code: isNotExistError, + Message: fmt.Sprintf("Directory is not exist, directory name: %s, error message: %s", dir.dir.Name(), err), + } + } else if os.IsPermission(err) { + return nil, &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, directory name: %s, error message: %s", dir.dir.Name(), err), + } + } else { + return nil, &FileSystemError{ + Code: otherError, + Message: fmt.Sprintf("Read directory error, directory name: %s, error message: %s", dir.dir.Name(), err), + } + } + } + return dirs, nil +} + +// CloseDirectory is used to close directory. +func (dir *LocalDirectory) CloseDirectory() error { + err := dir.dir.Close() + if err != nil { + return &FileSystemError{ + Code: closeError, + Message: fmt.Sprintf("Close directory error, directory name: %s, error message: %s", dir.dir.Name(), err), + } + } + return nil +} + +// AppendWriteFile is append mode, which adds new data to the end of a file. +func (file *LocalFile) AppendWriteFile(buffer []byte) (int, error) { + size, err := file.file.Write(buffer) + if err != nil { + if os.IsNotExist(err) { + return size, &FileSystemError{ + Code: isNotExistError, + Message: fmt.Sprintf("File is not exist, file name: %s, error message: %s", file.file.Name(), err), + } + } else if os.IsPermission(err) { + return size, &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, file name: %s, error message: %s", file.file.Name(), err), + } + } else { + // include io.ErrShortWrite + return size, &FileSystemError{ + Code: writeError, + Message: fmt.Sprintf("Write file error, file name: %s, error message: %s", file.file.Name(), err), + } + } + } + return size, nil +} + +// AppendWritevFile is vector Append mode, which supports appending consecutive buffers to the end of the file. +// TODO: Optimizing under Linux. +func (file *LocalFile) AppendWritevFile(iov *[][]byte) (int, error) { + var size int + for _, buffer := range *iov { + wsize, err := file.file.Write(buffer) + if err != nil { + if os.IsNotExist(err) { + return size, &FileSystemError{ + Code: isNotExistError, + Message: fmt.Sprintf("File is not exist, file name: %s, error message: %s", file.file.Name(), err), + } + } else if os.IsPermission(err) { + return size, &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, file name: %s, error message: %s", file.file.Name(), err), + } + } else { + // include io.ErrShortWrite + return size, &FileSystemError{ + Code: writeError, + Message: fmt.Sprintf("Write file error, file name: %s, error message: %s", file.file.Name(), err), + } + } + } + size += wsize + } + return size, nil +} + +// DeleteFile is used to delete the file. +func (file *LocalFile) DeleteFile() error { + err := os.Remove(file.file.Name()) + if err != nil { + if os.IsNotExist(err) { + return &FileSystemError{ + Code: isNotExistError, + Message: fmt.Sprintf("File is not exist, file name: %s, error message: %s", file.file.Name(), err), + } + } else if os.IsPermission(err) { + return &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, file name: %s, error message: %s", file.file.Name(), err), + } + } else { + return &FileSystemError{ + Code: otherError, + Message: fmt.Sprintf("Delete file error, file name: %s, error message: %s", file.file.Name(), err), + } + } + } + return nil +} + +// ReadFile is used to read a specified location of file. +func (file *LocalFile) ReadFile(offset int64, buffer []byte) (int, error) { + rsize, err := file.file.ReadAt(buffer, offset) + if err != nil { Review Comment: ```suggestion if err == nil { return rsize, nil } ``` ########## pkg/fs/local_file_system.go: ########## @@ -0,0 +1,408 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package fs (file system) is an independent component to operate file and directory. +package fs + +import ( + "bufio" + "fmt" + "io" + "os" + + "github.com/apache/skywalking-banyandb/pkg/logger" +) + +// LocalFileSystem implements the File System interface. +type LocalFileSystem struct { + logger *logger.Logger +} + +// LocalDirectory implements the Dir interface. +type LocalDirectory struct { + dir *os.File +} + +// LocalFile implements the File interface. +type LocalFile struct { + file *os.File +} + +// NewLocalFileSystem is used to create the Local File system. +func NewLocalFileSystem() FileSystem { + return &LocalFileSystem{ + logger: logger.GetLogger(moduleName), + } +} + +// CreateDirectory is used to create and open the directory by specified name and mode. +func (fs *LocalFileSystem) CreateDirectory(name string, permission Mode) (Dir, error) { + var err error + err = os.MkdirAll(name, os.FileMode(permission)) + if err != nil { + if os.IsPermission(err) { + return nil, &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, directory name: %s, permission: %d, error message: %s", name, permission, err), + } + } + return nil, &FileSystemError{ + Code: otherError, + Message: fmt.Sprintf("Create directory return error, directory name: %s, error message: %s", name, err), + } + } + + dir, err := os.Open(name) + if err != nil { + return nil, &FileSystemError{ + Code: openError, + Message: fmt.Sprintf("Open directory return error, directory name: %s, error message: %s", name, err), + } + } + + return &LocalDirectory{ + dir: dir, + }, nil +} + +// CreateFile is used to create and open the file by specified name and mode. +func (fs *LocalFileSystem) CreateFile(name string, permission Mode) (File, error) { + file, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(permission)) + if err != nil { + if os.IsExist(err) { + return nil, &FileSystemError{ + Code: isExistError, + Message: fmt.Sprintf("File is exist, file name: %s,error message: %s", name, err), + } + } else if os.IsPermission(err) { + return nil, &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, file name: %s, permission: %d,error message: %s", name, permission, err), + } + } else { + return nil, &FileSystemError{ + Code: otherError, + Message: fmt.Sprintf("Create file return error, file name: %s,error message: %s", name, err), + } + } + } + + return &LocalFile{ + file: file, + }, nil +} + +// FlushWriteFile is Flush mode, which flushes all data to one file. +func (fs *LocalFileSystem) FlushWriteFile(buffer []byte, name string, permission Mode) (int, error) { + file, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(permission)) + if err != nil { + if os.IsExist(err) { + return 0, &FileSystemError{ + Code: isExistError, + Message: fmt.Sprintf("File is exist, file name: %s,error message: %s", name, err), + } + } else if os.IsPermission(err) { + return 0, &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, file name: %s, permission: %d,error message: %s", name, permission, err), + } + } else { + return 0, &FileSystemError{ + Code: otherError, + Message: fmt.Sprintf("Create file return error, file name: %s,error message: %s", name, err), + } + } + } + defer file.Close() + + size, err := file.Write(buffer) + if err != nil { + return size, &FileSystemError{ + Code: flushError, + Message: fmt.Sprintf("Flush file return error, file name: %s,error message: %s", name, err), + } + } + + return size, nil +} + +// DeleteDirectory is used for deleting the directory. +func (dir *LocalDirectory) DeleteDirectory() error { + err := os.RemoveAll(dir.dir.Name()) + if err != nil { + if os.IsNotExist(err) { + return &FileSystemError{ + Code: isNotExistError, + Message: fmt.Sprintf("Directory is not exist, directory name: %s, error message: %s", dir.dir.Name(), err), + } + } else if os.IsPermission(err) { + return &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, directory name: %s, error message: %s", dir.dir.Name(), err), + } + } else { + return &FileSystemError{ + Code: otherError, + Message: fmt.Sprintf("Delete directory error, directory name: %s, error message: %s", dir.dir.Name(), err), + } + } + } + return nil +} + +// ReadDirectory is used to get all lists of files or children's directories in the directory. +func (dir *LocalDirectory) ReadDirectory() ([]os.DirEntry, error) { + dirs, err := os.ReadDir(dir.dir.Name()) + if err != nil { + if os.IsNotExist(err) { + return nil, &FileSystemError{ + Code: isNotExistError, + Message: fmt.Sprintf("Directory is not exist, directory name: %s, error message: %s", dir.dir.Name(), err), + } + } else if os.IsPermission(err) { + return nil, &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, directory name: %s, error message: %s", dir.dir.Name(), err), + } + } else { + return nil, &FileSystemError{ + Code: otherError, + Message: fmt.Sprintf("Read directory error, directory name: %s, error message: %s", dir.dir.Name(), err), + } + } + } + return dirs, nil +} + +// CloseDirectory is used to close directory. +func (dir *LocalDirectory) CloseDirectory() error { + err := dir.dir.Close() + if err != nil { + return &FileSystemError{ + Code: closeError, + Message: fmt.Sprintf("Close directory error, directory name: %s, error message: %s", dir.dir.Name(), err), + } + } + return nil +} + +// AppendWriteFile is append mode, which adds new data to the end of a file. +func (file *LocalFile) AppendWriteFile(buffer []byte) (int, error) { + size, err := file.file.Write(buffer) + if err != nil { + if os.IsNotExist(err) { + return size, &FileSystemError{ + Code: isNotExistError, + Message: fmt.Sprintf("File is not exist, file name: %s, error message: %s", file.file.Name(), err), + } + } else if os.IsPermission(err) { + return size, &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, file name: %s, error message: %s", file.file.Name(), err), + } + } else { + // include io.ErrShortWrite + return size, &FileSystemError{ + Code: writeError, + Message: fmt.Sprintf("Write file error, file name: %s, error message: %s", file.file.Name(), err), + } + } + } + return size, nil +} + +// AppendWritevFile is vector Append mode, which supports appending consecutive buffers to the end of the file. +// TODO: Optimizing under Linux. +func (file *LocalFile) AppendWritevFile(iov *[][]byte) (int, error) { + var size int + for _, buffer := range *iov { + wsize, err := file.file.Write(buffer) + if err != nil { + if os.IsNotExist(err) { + return size, &FileSystemError{ + Code: isNotExistError, + Message: fmt.Sprintf("File is not exist, file name: %s, error message: %s", file.file.Name(), err), + } + } else if os.IsPermission(err) { + return size, &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, file name: %s, error message: %s", file.file.Name(), err), + } + } else { + // include io.ErrShortWrite + return size, &FileSystemError{ + Code: writeError, + Message: fmt.Sprintf("Write file error, file name: %s, error message: %s", file.file.Name(), err), + } + } + } + size += wsize + } + return size, nil +} + +// DeleteFile is used to delete the file. +func (file *LocalFile) DeleteFile() error { + err := os.Remove(file.file.Name()) + if err != nil { + if os.IsNotExist(err) { + return &FileSystemError{ + Code: isNotExistError, + Message: fmt.Sprintf("File is not exist, file name: %s, error message: %s", file.file.Name(), err), + } + } else if os.IsPermission(err) { + return &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, file name: %s, error message: %s", file.file.Name(), err), + } + } else { + return &FileSystemError{ + Code: otherError, + Message: fmt.Sprintf("Delete file error, file name: %s, error message: %s", file.file.Name(), err), + } + } + } + return nil +} + +// ReadFile is used to read a specified location of file. +func (file *LocalFile) ReadFile(offset int64, buffer []byte) (int, error) { + rsize, err := file.file.ReadAt(buffer, offset) + if err != nil { + if err == io.EOF { + return rsize, err + } else if os.IsNotExist(err) { + return rsize, &FileSystemError{ + Code: isNotExistError, + Message: fmt.Sprintf("File is not exist, file name: %s, error message: %s", file.file.Name(), err), + } + } else if os.IsPermission(err) { + return rsize, &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, file name: %s, error message: %s", file.file.Name(), err), + } + } else { + return rsize, &FileSystemError{ + Code: readError, + Message: fmt.Sprintf("Read file error, file name: %s, read file size: %d, error message: %s", file.file.Name(), rsize, err), + } + } + } + Review Comment: Use `unix.Fadvise` as the Write -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@skywalking.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org