FIXED, the FD's id assignment (now, it is with the std::map managed)
NOW, What was the FIX for the permission denied Issue on the QFSBroker that
was fix ?
Current outputs:
root@www-dev:/opt/hypertable/0.9.7.15/fs# du
4 ./local/hypertable/tables/2/1/default/qyoNKN5rd__dbHKv
8 ./local/hypertable/tables/2/1/default
12 ./local/hypertable/tables/2/1
13540 ./local/hypertable/tables/2/0/default/qyoNKN5rd__dbHKv
8 ./local/hypertable/tables/2/0/default/nOkqBfmTLuX1NEv6
13552 ./local/hypertable/tables/2/0/default
4 ./local/hypertable/tables/2/0/_xfer/nOkqBfmTLuX1NEv6/1391726922
8 ./local/hypertable/tables/2/0/_xfer/nOkqBfmTLuX1NEv6
12 ./local/hypertable/tables/2/0/_xfer
13568 ./local/hypertable/tables/2/0
13584 ./local/hypertable/tables/2
4 ./local/hypertable/tables/0/1/range/qyoNKN5rd__dbHKv
8 ./local/hypertable/tables/0/1/range
4 ./local/hypertable/tables/0/1/server/qyoNKN5rd__dbHKv
8 ./local/hypertable/tables/0/1/server
20 ./local/hypertable/tables/0/1
4 ./local/hypertable/tables/0/0/default/qyoNKN5rd__dbHKv
4 ./local/hypertable/tables/0/0/default/4MmZEeUI1pJuk6SA
12 ./local/hypertable/tables/0/0/default
4 ./local/hypertable/tables/0/0/logging/qyoNKN5rd__dbHKv
4 ./local/hypertable/tables/0/0/logging/4MmZEeUI1pJuk6SA
12 ./local/hypertable/tables/0/0/logging
4 ./local/hypertable/tables/0/0/location/qyoNKN5rd__dbHKv
4 ./local/hypertable/tables/0/0/location/4MmZEeUI1pJuk6SA
12 ./local/hypertable/tables/0/0/location
40 ./local/hypertable/tables/0/0
64 ./local/hypertable/tables/0
13652 ./local/hypertable/tables
13656 ./local/hypertable
13660 ./local
13740 .
root@www-dev:/opt/hypertable/0.9.7.15/fs# tail -n 17
../log/DfsBroker.qfs.log
1391726923 INFO qfsBroker :
(/root/src/hypertable/src/cc/DfsBroker/qfs/QfsBroker.h:82) close(
/hypertable/tables/2/0/_xfer/nOkqBfmTLuX1NEv6/1391726922/0 , 23 )
1391726924 INFO qfsBroker :
(/root/src/hypertable/src/cc/DfsBroker/qfs/QfsBroker.cc:62) close(7) file:
/hypertable/tables/2/0/default/qyoNKN5rd__dbHKv/cs4
1391726928 INFO qfsBroker :
(/root/src/hypertable/src/cc/DfsBroker/qfs/QfsBroker.h:82) close(
/hypertable/tables/2/0/default/qyoNKN5rd__dbHKv/cs4 , 24 )
1391726928 INFO qfsBroker :
(/root/src/hypertable/src/cc/DfsBroker/qfs/QfsBroker.cc:171)
open(/hypertable/tables/2/0/default/qyoNKN5rd__dbHKv/cs4) = 7
1391726928 INFO qfsBroker :
(/root/src/hypertable/src/cc/DfsBroker/qfs/QfsBroker.cc:62) close(6) file:
/hypertable/tables/2/0/_xfer/nOkqBfmTLuX1NEv6/1391726922/0
1391726928 INFO qfsBroker :
(/root/src/hypertable/src/cc/DfsBroker/qfs/QfsBroker.cc:262)
create(/hypertable/tables/2/0/default/nOkqBfmTLuX1NEv6/hints) = 6
1391726928 INFO qfsBroker :
(/root/src/hypertable/src/cc/DfsBroker/qfs/QfsBroker.cc:62) close(6) file:
/hypertable/tables/2/0/default/nOkqBfmTLuX1NEv6/hints
1391726928 INFO qfsBroker :
(/root/src/hypertable/src/cc/DfsBroker/qfs/QfsBroker.h:82) close(
/hypertable/tables/2/0/default/nOkqBfmTLuX1NEv6/hints , 25 )
1391726928 ERROR qfsBroker :
(/root/src/hypertable/src/cc/DfsBroker/qfs/QfsBroker.cc:259)
open(/hypertable/tables/2/0/default/qyoNKN5rd__dbHKv/hints) failure (15) -
Permission denied 13
ERROR Permission denied 13
1391726928 ERROR qfsBroker :
(/root/src/hypertable/src/cc/DfsBroker/qfs/QfsBroker.cc:259)
open(/hypertable/tables/2/0/default/qyoNKN5rd__dbHKv/hints) failure (18) -
Permission denied 13
ERROR Permission denied 13
1391726928 INFO qfsBroker :
(/root/src/hypertable/src/cc/DfsBroker/qfs/QfsBroker.cc:171)
open(/hypertable/tables/2/0/default/qyoNKN5rd__dbHKv/hints) = 6
1391726928 INFO qfsBroker :
(/root/src/hypertable/src/cc/DfsBroker/qfs/QfsBroker.h:82) close(
/hypertable/tables/2/0/default/qyoNKN5rd__dbHKv/hints , 32 )
1391726928 INFO qfsBroker :
(/root/src/hypertable/src/cc/DfsBroker/qfs/QfsBroker.cc:62) close(6) file:
/hypertable/tables/2/0/default/qyoNKN5rd__dbHKv/hints
1391726928 INFO qfsBroker :
(/root/src/hypertable/src/cc/DfsBroker/qfs/QfsBroker.h:82) close(
/hypertable/tables/2/0/default/qyoNKN5rd__dbHKv/hints , 34 )
1391726928 INFO qfsBroker :
(/root/src/hypertable/src/cc/DfsBroker/qfs/QfsBroker.cc:171)
open(/hypertable/tables/2/0/default/qyoNKN5rd__dbHKv/cs4) = 6
root@www-dev:/opt/hypertable/0.9.7.15/fs#
QfsShell> cd 0/default/qyoNKN5rd__dbHKv/
QfsShell> ls
cs4
hints
QfsShell> ls -l
cs4 <r 3> rw-r--r-- root root 13857280 Feb 6 15:48
hints <r 3> rw-r--r-- root root 163 Feb 6 15:46
QfsShell> ls -l
servers/ <dir> rwxr-xr-x root root 2682 Feb 6 15:44
tables/ <dir> rwxr-xr-x root root 13857732 Feb 6 15:44
root@www-dev:/opt/hypertable/0.9.7.15/fs/local/hypertable/tables/2/0/default/qyoNKN5rd__dbHKv#
ls -l
total 13536
-rw-r--r-- 1 root root 13857280 Feb 6 15:48 cs4
-rw-r--r-- 1 root root 0 Feb 6 15:48 hints
Thanks, need help with the Permission Denied QFS issue!
Kashirin Alex
On Wednesday, February 5, 2014 3:26:20 AM UTC+2, Alex Kashirin wrote:
>
> Before I ruin the copy.
>
> This copy,
>
> m_open_file_map assigned with ID of pairs- while the ID-fd is an
> generated num grow by 2 (eg. 1 as DFS 2 as local the connection resources
> fd)
>
> still to work out a more proper ID to use, as might the loop is
> out-breaking.
>
> on the request of fd, and it is RS-data , it's open_file_map is fd+1 (the
> meta is the fd)
>
> the fd key , get assigned at open/create as well with file type.
>
> 1 > for Meta
>
> 2 > for RS-data(table)
>
> if(filePath == "Fsrv") fileType[USE_openFilesCount] = 1;
> else fileType[USE_openFilesCount] = 2;
>
> USE_openFilesCount --- Is the key for file_map, while for the local data
> it is the USE_openFilesCount+1
>
> addr is the same for both new file_map
>
>
>
> Let me know, if you think there is an issue of such way around.
>
> By the way, with the key assigned for fd, it is a way around to manage the
> number of open_file_descriptors on the system.
>
>
>
> Thanks,
>
> Kashirin Alex
>
>
>
> On Tuesday, February 4, 2014 8:11:03 AM UTC+2, Alex Kashirin wrote:
>>
>> the QFSbroker combined with local FS.
>>
>> m_open_file_map get assigned predefined IDs to store by.
>>
>>
>> It's now a more better version, Doug you can just download it from the
>> server , the log details on the older email.
>>
>>
>> Thanks,
>>
>> Kashirin Alex
>>
>>
>>
>>
>> On Monday, February 3, 2014 7:33:42 PM UTC+2, Alex Kashirin wrote:
>>>
>>> Why the localbroker use FileUtils and not Filesystem.h (the FileSystem
>>> has close file descriptor while the FileUtils do not!)
>>>
>>>
>>> the localBroker, might not closing the open file descriptors
>>>
>>>
>>> Thanks,
>>>
>>> Kashirin Alex
>>>
>>>
>>>
>>> On Sunday, February 2, 2014 3:32:50 AM UTC+2, Alex Kashirin wrote:
>>>>
>>>> Will be good to have opinions on the built and run.
>>>>
>>>> cmake file needs to be updated.
>>>>
>>>> does not contain Java related.
>>>>
>>>> Thanks,
>>>>
>>>> Kashirin Alex
>>>>
>>>>
>>>>
--
You received this message because you are subscribed to the Google Groups
"Hypertable Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/hypertable-dev.
For more options, visit https://groups.google.com/groups/opt_out.
/**
* Copyright (C) 2007-2012 Hypertable, Inc.
*
* This file is part of Hypertable.
*
* Hypertable is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 3
* of the License, or any later version.
*
* Hypertable is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*/
#include "Common/Compat.h"
#include <cstdlib>
#include <iostream>
#include <fstream>
#include <string>
extern "C" {
#include <poll.h>
#include <sys/types.h>
#include <unistd.h>
}
#include "Common/Error.h"
#include "Common/FileUtils.h"
#include "Common/InetAddr.h"
#include "Common/Init.h"
#include "Common/Usage.h"
#include "Common/System.h"
#include "AsyncComm/ApplicationQueue.h"
#include "AsyncComm/Comm.h"
#include "AsyncComm/DispatchHandler.h"
#include "DfsBroker/Lib/Config.h"
#include "DfsBroker/Lib/ConnectionHandlerFactory.h"
#include "QfsBroker.h"
using namespace Hypertable;
using namespace Config;
using namespace std;
namespace KFS {
extern std::string KFS_BUILD_VERSION_STRING;
}
namespace {
struct AppPolicy : Policy {
static void init_options() {
cmdline_desc().add_options()
("root", str()->default_value("fs/local"), "root directory for local "
"broker (if relative, it's relative to the installation directory")
;
alias("root", "DfsBroker.Local.Root");
alias("port", "Qfs.MetaServer.Port");
alias("host", "Qfs.MetaServer.Name");
alias("workers", "Qfs.Broker.Workers");
alias("reactors", "Qfs.Broker.Reactors");
}
};
typedef Meta::list<AppPolicy, DfsBrokerPolicy, DefaultCommPolicy> Policies;
}
int main(int argc, char **argv) {
try {
init_with_policies<Policies>(argc, argv);
int port = get_i16("DfsBroker.Port");
int worker_count = get_i32("workers");
Comm *comm = Comm::instance();
ApplicationQueuePtr app_queue = new ApplicationQueue(worker_count);
BrokerPtr broker = new QfsBroker(properties);
ConnectionHandlerFactoryPtr chfp =
new DfsBroker::ConnectionHandlerFactory(comm, app_queue, broker);
InetAddr listen_addr(INADDR_ANY, port);
comm->listen(listen_addr, chfp);
app_queue->join();
}
catch (Exception &e) {
HT_ERROR_OUT << e << HT_END;
return 1;
}
}
/**
* Copyright (C) 2007-2013 Hypertable, Inc.
*
* This file is part of Hypertable.
*
* Hypertable is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 3
* of the License, or any later version.
*
* Hypertable is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*/
#ifndef HYPERTABLE_QFSROKER_H
#define HYPERTABLE_QFSBROKER_H
extern "C" {
#include <unistd.h>
}
#include "Common/String.h"
#include "Common/atomic.h"
#include "Common/Properties.h"
#include "DfsBroker/Lib/Broker.h"
namespace KFS {
class KfsClient;
}
namespace Hypertable {
using namespace DfsBroker;
/**
* A subclass of Hypertable::DfsBroker::OpenFileData to hold the fields needed by the QfsBroker
*/
class OpenFileDataQfs: public OpenFileData {
public:
OpenFileDataQfs(const std::string &fname, int _fd, KFS::KfsClient *client): m_fname(fname), fd(_fd), m_client(client) {};
virtual ~OpenFileDataQfs();
std::string m_fname;
int fd;
KFS::KfsClient *const m_client;
//private:
//std::string m_fname;
//int m_fd;
//KFS::KfsClient *const m_client;
};
/**
*/
class OpenFileDataQfsPtr : public OpenFileDataPtr {
public:
OpenFileDataQfsPtr() : OpenFileDataPtr() { }
OpenFileDataQfsPtr(OpenFileDataQfs *ofdq) : OpenFileDataPtr(ofdq, true) {}
OpenFileDataQfs *operator->() const {
return (OpenFileDataQfs*) get();
}
};
/**
*
*/
class OpenFileDataLocal : public OpenFileData {
public:
OpenFileDataLocal(const String &fname, int _fd, int _flags) : fd(_fd), flags(_flags), filename(fname) { }
virtual ~OpenFileDataLocal() {
HT_INFOF("close( %s , %d )", filename.c_str(), fd);
close(fd);
}
int fd;
int flags;
String filename;
};
/**
*
*/
class OpenFileDataLocalPtr : public OpenFileDataPtr {
public:
OpenFileDataLocalPtr() : OpenFileDataPtr() { }
OpenFileDataLocalPtr(OpenFileDataLocal *ofdl)
: OpenFileDataPtr(ofdl, true) { }
OpenFileDataLocal *operator->() const {
return (OpenFileDataLocal *)get();
}
};
/**
* QfsBroker implementation class. @see Hypertable::DfsBroker::Broker
*/
class QfsBroker : public DfsBroker::Broker {
public:
QfsBroker(PropertiesPtr& cfg);
virtual ~QfsBroker();
virtual void open(ResponseCallbackOpen *cb, const char *fname, uint32_t flags, uint32_t bufsz);
virtual void close(ResponseCallback *cb, uint32_t fd);
virtual void create(ResponseCallbackOpen *cb, const char *fname, uint32_t flags, int32_t bufsz, int16_t replication, int64_t blksz);
virtual void read(ResponseCallbackRead *cb, uint32_t fd, uint32_t amount);
virtual void append(ResponseCallbackAppend *, uint32_t fd, uint32_t amount, const void *data, bool flush);
virtual void seek(ResponseCallback *cb, uint32_t fd, uint64_t offset);
virtual void remove(ResponseCallback *cb, const char *fname);
virtual void length(ResponseCallbackLength *cb, const char *fname,
bool accurate = true);
virtual void pread(ResponseCallbackRead *cb, uint32_t fd, uint64_t offset,
uint32_t amount, bool verify_checksum);
virtual void mkdirs(ResponseCallback *cb, const char *dname);
virtual void rmdir(ResponseCallback *cb, const char *dname);
virtual void flush(ResponseCallback *cb, uint32_t fd);
virtual void status(ResponseCallback *cb);
virtual void shutdown(ResponseCallback *cb);
virtual void readdir(ResponseCallbackReaddir *cb, const char *dname);
virtual void posix_readdir(ResponseCallbackPosixReaddir *cb,
const char *dname);
virtual void exists(ResponseCallbackExists *cb, const char *fname);
virtual void rename(ResponseCallback *cb, const char *src, const char *dst);
virtual void debug(ResponseCallback *cb, int32_t command,
StaticBuffer &serialized_parameters);
private:
std::string m_host;
int m_port;
void report_error(ResponseCallback *cb, int error);
KFS::KfsClient* const m_client;
int openFilesCount;
std::map <int, int> fileType;
static atomic_t ms_next_fd;
//virtual void report_error(ResponseCallback *cb);
String m_rootdir;
bool m_verbose;
bool m_directio;
bool m_no_removal;
};
}
#endif // HYPERTABLE_QFSBROKER_H/**
* Copyright (C) 2007-2013 Hypertable, Inc.
*
* This file is part of Hypertable.
*
* Hypertable is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 3
* of the License, or any later version.
*
* Hypertable is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*/
#include "Common/Compat.h"
#include <cerrno>
#include <cstdio>
#include <cstring>
#include <string>
#include <vector>
#include <cstdlib>
extern "C" {
#include <dirent.h>
#include <fcntl.h>
#include <limits.h>
#include <poll.h>
#include <sys/types.h>
#if defined(__sun__)
#include <sys/fcntl.h>
#endif
#include <sys/uio.h>
#include <unistd.h>
#include <time.h>
}
#include "AsyncComm/ReactorFactory.h"
#include "Common/FileUtils.h"
#include "Common/Filesystem.h"
#include "Common/Path.h"
#include "Common/String.h"
#include "Common/System.h"
#include "Common/SystemInfo.h"
#include "QfsBroker.h"
#include <kfs/KfsClient.h>
using namespace Hypertable;
using namespace std;
using namespace KFS;
OpenFileDataQfs::~OpenFileDataQfs() {
HT_INFOF("close(%d) file: %s", fd, m_fname.c_str());
m_client->Close(fd);
}
QfsBroker::QfsBroker(PropertiesPtr &cfg)
: m_host(cfg->get_str("host")),
m_port(cfg->get_i16("port")),
m_client(KFS::Connect(m_host, m_port)) {
m_verbose = cfg->get_bool("verbose");
m_directio = cfg->get_bool("DfsBroker.Local.DirectIO");
m_no_removal = cfg->get_bool("DfsBroker.DisableFileRemoval");
#if defined(__linux__)
// disable direct i/o for kernels < 2.6
if (m_directio) {
if (System::os_info().version_major == 2 &&
System::os_info().version_minor < 6)
m_directio = false;
}
#endif
/**
* Determine root directory
*/
Path root = cfg->get_str("root", "");
if (!root.is_complete()) {
Path data_dir = cfg->get_str("Hypertable.DataDirectory");
root = data_dir / root;
}
m_rootdir = root.string();
// ensure that root directory exists
if (!FileUtils::mkdirs(m_rootdir))
exit(1);
}
QfsBroker::~QfsBroker() {
delete m_client;
}
void QfsBroker::open(ResponseCallbackOpen *cb, const char *fname, uint32_t flags, uint32_t bufsz) {
int fd;
struct sockaddr_in addr;
cb->get_address(addr);
String abspath;
String filePath;
if (fname[0] == '/') {
abspath = m_rootdir + fname;
filePath = (String)"F" + fname[12] + fname[14]+ fname[15];
} else {
abspath = m_rootdir + "/" + fname;
filePath = (String)"F" + fname[11] + fname[13] + fname[14];
}
int oflags = O_RDONLY;
if (m_directio && flags & Filesystem::OPEN_FLAG_DIRECTIO) {
#ifdef O_DIRECT
oflags |= O_DIRECT;
#endif
}
int USE_openFilesCount = 0;
for (int i=1; i<=openFilesCount;) {
i = i + 2;
if(openFilesCount == i) {
openFilesCount =openFilesCount + 2;
USE_openFilesCount = openFilesCount;
break;
} else {
if(fileType.find(i) == fileType.end()){
USE_openFilesCount = i;
break;
}
}
}
if(USE_openFilesCount == 0) {
openFilesCount = openFilesCount + 2;
USE_openFilesCount = openFilesCount;
}
if(filePath == "Fsrv") fileType[USE_openFilesCount] = 1;
else fileType[USE_openFilesCount] = 2;
HT_DEBUGF("open file='%s' flags=%u bufsz=%d", fname, flags, bufsz);
//HT_ERRORF("open failed: file='%s' - %s", abspath.c_str(), strerror(errno));
if(fileType[USE_openFilesCount] == 2) {
fd = ::open(abspath.c_str(), oflags);
if(fd == -1) {
HT_ERRORF("open(%s) failure (%d) - %s", fname, USE_openFilesCount, KFS::ErrorCodeToStr(fd).c_str());
report_error(cb,errno);
return;
}
OpenFileDataLocalPtr fdata(new OpenFileDataLocal(fname, fd, O_RDONLY));
m_open_file_map.create(USE_openFilesCount+1, addr, fdata);
}
fd = m_client->Open(fname, O_RDONLY);
if(fd < 0) {
HT_ERRORF("open(%s) failure (%d) - %s", fname, USE_openFilesCount, KFS::ErrorCodeToStr(fd).c_str());
report_error(cb, fd);
}
HT_INFOF("open(%s) = %d", fname, fd);
OpenFileDataQfsPtr fdata(new OpenFileDataQfs(fname, fd, m_client));
m_open_file_map.create(USE_openFilesCount, addr, fdata);
if(bufsz)
m_client->SetIoBufferSize(fd, bufsz);
cb->response(USE_openFilesCount);
}
void QfsBroker::close(ResponseCallback *cb, uint32_t fd) {
HT_DEBUGF("close(%d)", fd);
m_open_file_map.remove(fd);
m_open_file_map.remove(fd+1);
fileType.erase(fd);
cb->response_ok();
}
void QfsBroker::create(ResponseCallbackOpen *cb, const char *fname, uint32_t flags, int32_t bufsz, int16_t replication, int64_t blksz) {
int oflags = O_WRONLY | O_CREAT;
int fd;
struct sockaddr_in addr;
cb->get_address(addr);
HT_DEBUGF("create file='%s' flags=%u bufsz=%d replication=%d blksz=%lld",
fname, flags, bufsz, (int)replication, (Lld)blksz);
String abspath;
String filePath;
if (fname[0] == '/') {
abspath = m_rootdir + fname;
filePath = (String)"F" + fname[12] + fname[14]+ fname[15];
} else {
abspath = m_rootdir + "/" + fname;
filePath = (String)"F" + fname[11] + fname[13] + fname[14];
}
if (flags & Filesystem::OPEN_FLAG_OVERWRITE)
oflags |= O_TRUNC;
else
oflags |= O_APPEND;
if (m_directio && flags & Filesystem::OPEN_FLAG_DIRECTIO) {
#ifdef O_DIRECT
oflags |= O_DIRECT;
#endif
}
int USE_openFilesCount = 0;
for (int i=1; i<=openFilesCount;) {
i = i + 2;
if(openFilesCount == i) {
openFilesCount =openFilesCount + 2;
USE_openFilesCount = openFilesCount;
break;
} else {
if(fileType.find(i) == fileType.end()){
USE_openFilesCount = i;
break;
}
}
}
if(USE_openFilesCount == 0) {
openFilesCount = openFilesCount + 2;
USE_openFilesCount = openFilesCount;
}
if(filePath == "Fsrv") fileType[USE_openFilesCount] = 1;
else fileType[USE_openFilesCount] = 2;
if(fileType[USE_openFilesCount] == 2) {
fd = ::open(abspath.c_str(), oflags, 0644);
if(fd == -1) {
HT_ERRORF("open failed: file='%s' - %s", abspath.c_str(), strerror(errno));
report_error(cb,errno);
}
OpenFileDataLocalPtr fdata(new OpenFileDataLocal(fname, fd, O_WRONLY));
m_open_file_map.create(USE_openFilesCount+1, addr, fdata);
}
if(flags & Filesystem::OPEN_FLAG_OVERWRITE)
fd = m_client->Open(fname, O_CREAT | O_TRUNC | O_RDWR);
else
fd = m_client->Open(fname, O_CREAT|O_WRONLY);
if(fd < 0) {
HT_ERRORF("open(%s) failure (%d) - %s", fname, USE_openFilesCount, KFS::ErrorCodeToStr(fd).c_str());
report_error(cb, fd);
} else {
HT_INFOF("create(%s) = %d", fname, fd);
OpenFileDataQfsPtr fdata(new OpenFileDataQfs(fname, fd, m_client));
m_open_file_map.create(USE_openFilesCount, addr, fdata);
if(bufsz)
m_client->SetIoBufferSize(fd, bufsz);
}
cb->response(USE_openFilesCount);
}
void QfsBroker::seek(ResponseCallback *cb, uint32_t fd, uint64_t offset) {
if (fileType[fd] == 0) {
char errbuf[32];
sprintf(errbuf, "%d", fd);
cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
return;
}
if(fileType[fd] == 2) {
int error;
OpenFileDataLocalPtr fdataLFS;
if (!m_open_file_map.get(fd+1, fdataLFS)) {
char errbuf[32];
sprintf(errbuf, "%d", fd+1);
cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
return;
}
HT_DEBUGF("seek fd=%lu offset=%llu", (Lu)fd+1, (Llu)offset);
if ((offset = (uint64_t)lseek(fdataLFS->fd, offset, SEEK_SET)) == (uint64_t)-1) {
report_error(cb,errno);
HT_ERRORF("lseek failed: fd=%d offset=%llu - %s", fdataLFS->fd, (Llu)offset,
strerror(errno));
return;
}
if ((error = cb->response_ok()) != Error::OK)
HT_ERRORF("Problem sending response for seek(%u, %llu) - %s",
(unsigned)fd+1, (Llu)offset, Error::get_text(error));
cb->response_ok();
} else {
OpenFileDataQfsPtr fdataDFS;
if (!m_open_file_map.get(fd, fdataDFS)) {
char errbuf[32];
sprintf(errbuf, "%d", fd);
cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
return;
}
chunkOff_t status = m_client->Seek(fdataDFS->fd, offset);
if(status == (chunkOff_t)-1) {
HT_ERRORF("seek(%d,%lld) failure (%d) - %s", (int)fd, (Lld)offset, (int)-status, KFS::ErrorCodeToStr(status).c_str());
report_error(cb, status);
} else
cb->response_ok();
}
}
void QfsBroker::read(ResponseCallbackRead *cb, uint32_t fd, uint32_t amount) {
StaticBuffer buf((size_t)amount, (size_t)HT_DIRECT_IO_ALIGNMENT);
uint64_t offset;
if(fileType[fd] == 2) {
OpenFileDataLocalPtr fdataLFS;
ssize_t nread;
int error;
HT_DEBUGF("read fd=%d amount=%d", fd, amount);
if (!m_open_file_map.get(fd+1, fdataLFS)) {
char errbuf[32];
sprintf(errbuf, "%d", fd+1);
cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
return;
}
if ((offset = (uint64_t)lseek(fdataLFS->fd, 0, SEEK_SET)) == (uint64_t)-1) {
report_error(cb,errno);
HT_ERRORF("lseek failed: fd=%d offset=%llu - %s", fdataLFS->fd, (Llu)offset,
strerror(errno));
return;
}
if ((nread = FileUtils::read(fdataLFS->fd, buf.base, amount)) == -1) {
////////////// RECOVERY FROM DFS /////////////////
report_error(cb,errno);
HT_ERRORF("read failed: fd=%d offset=%llu amount=%d - %s",
fd+1, (Llu)offset, amount, strerror(errno));
return;
////////////////////////////////////////////////////
}
buf.size = nread;
if ((error = cb->response(offset, buf)) != Error::OK)
HT_ERRORF("Problem sending response for read(%u, %u) - %s",
(unsigned)fd, (unsigned)amount, Error::get_text(error));
} else {
OpenFileDataQfsPtr fdataDFS;
if (!m_open_file_map.get(fd, fdataDFS)) {
char errbuf[32];
sprintf(errbuf, "%d", fd);
cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
return;
}
offset = m_client->Tell(fdataDFS->fd);
int len = m_client->Read(fdataDFS->fd, reinterpret_cast<char*>(buf.base), amount);
if(len<0) {
HT_ERRORF("read(%d,%lld) failure (%d) - %s", (int)fd, (Lld)amount, -len, KFS::ErrorCodeToStr(len).c_str());
report_error(cb, len);
} else
cb->response(offset, buf);
}
}
void QfsBroker::append(ResponseCallbackAppend *cb, uint32_t fd, uint32_t amount, const void *data, bool flush) {
OpenFileDataQfsPtr fdataDFS;
if (!m_open_file_map.get(fd, fdataDFS)) {
char errbuf[32];
sprintf(errbuf, "%d", fd);
cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
return;
}
uint64_t offset = m_client->Tell(fdataDFS->fd);
ssize_t written = m_client->Write(fdataDFS->fd, reinterpret_cast<const char*>(data), amount);
if(written < 0) {
HT_ERRORF("append(%d,%lld,%s) failure (%d) - %s", (int)fd, (Lld)amount,
flush ? "true" : "false", (int)-written, KFS::ErrorCodeToStr(written).c_str());
report_error(cb,errno);
} else {
if(flush) {
int error = m_client->Sync(fdataDFS->fd);
if(error) {
HT_ERRORF("append(%d,%lld,%s) failure (%d) - %s", (int)fd, (Lld)amount,
flush ? "true" : "false", -error, KFS::ErrorCodeToStr(error).c_str());
return report_error(cb, error);
}
}
}
//String cmd_str;
//cmd_str = (String)"echo 'META DATA append : " + openFileName[fd] + " ' > '/home/www/META_DATA_append.txt'";
//system(cmd_str.c_str());
if(fileType[fd] == 2) {
//String cmd_str;
//cmd_str = (String)"echo 'RS DATA TO LOCAL : " + openFileName[fd] + " ' > '/home/www/RS_DATA_TO_LOCAL.txt'";
//system(cmd_str.c_str());
OpenFileDataLocalPtr fdataLFS;
ssize_t nwritten;
if (!m_open_file_map.get(fd+1, fdataLFS)) {
char errbuf[32];
sprintf(errbuf, "%d", fd+1);
cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
return;
}
if ((offset = (uint64_t)lseek(fdataLFS->fd, 0, SEEK_CUR)) == (uint64_t)-1) {
report_error(cb,errno);
HT_ERRORF("lseek failed: fd=%d offset=0 SEEK_CUR - %s", fd+1,
strerror(errno));
return;
}
HT_DEBUGF("append fd=%lu offset=%llu", (Lu)fd, (Llu)offset);
if ((nwritten = FileUtils::write(fdataLFS->fd, data, amount)) == -1) {
report_error(cb,errno);
HT_ERRORF("write failed: fd=%d offset=%llu amount=%d data=%p- %s",
fd+1, (Llu)offset, amount, data, strerror(errno));
return;
}
if (fsync(fdataLFS->fd) != 0) {
report_error(cb,errno);
HT_ERRORF("flush failed: fd=%d - %s", fd, strerror(errno));
return;
}
// int error;
//if ((error = cb->response_ok()) != Error::OK)
//HT_ERRORF("Problem sending response for seek(%u, %llu) - %s",
//(unsigned)fd, (Llu)offset, strerror(errno));
}
cb->response(offset, written);
}
void QfsBroker::remove(ResponseCallback *cb, const char *fname) {
HT_INFOF("remove file='%s'", fname);
String abspath;
String filePath;
if (fname[0] == '/') {
abspath = m_rootdir + fname;
filePath = (String)"F" + fname[12] + fname[14]+ fname[15];
} else {
abspath = m_rootdir + "/" + fname;
filePath = (String)"F" + fname[11] + fname[13] + fname[14];
}
if(filePath != "Fsrv") {
if (m_no_removal) {
String deleted_file = abspath + ".deleted";
if (!FileUtils::rename(abspath, deleted_file)) {
report_error(cb,errno);
return;
}
} else {
if (unlink(abspath.c_str()) == -1) {
report_error(cb,errno);
HT_ERRORF("unlink failed: file='%s' - %s", abspath.c_str(),
strerror(errno));
return;
}
}
//int error;
//if ((error = cb->response_ok()) != Error::OK)
//HT_ERRORF("Problem sending response for remove(%s) - %s",
//fname, Error::get_text(error));
}
int status = m_client->Remove(fname);
if(status == 0)
cb->response_ok();
else {
HT_ERRORF("remove(%s) failure (%d) - %s", fname, -status, KFS::ErrorCodeToStr(status).c_str());
report_error(cb, status);
}
}
void QfsBroker::length(ResponseCallbackLength *cb, const char *fname, bool accurate)
{
String abspath;
String filePath;
if (fname[0] == '/') {
abspath = m_rootdir + fname;
filePath = (String)"F" + fname[12] + fname[14]+ fname[15];
} else {
abspath = m_rootdir + "/" + fname;
filePath = (String)"F" + fname[11] + fname[13] + fname[14];
}
if(filePath != "Fsrv") {
uint64_t length;
HT_DEBUGF("length file='%s' (accurate=%s)", fname,
accurate ? "true" : "false");
if ((length = FileUtils::length(abspath)) == (uint64_t)-1) {
HT_ERRORF("length (stat) failed: file='%s' - %s", abspath.c_str(),
strerror(errno));
report_error(cb,errno);
}
cb->response(length);
} else {
KfsFileAttr result;
int err = m_client->Stat(fname, result);
if(err == 0)
cb->response(result.fileSize);
else {
HT_ERRORF("length(%s) failure (%d) - %s", fname, -err, KFS::ErrorCodeToStr(err).c_str());
report_error(cb, err);
}
}
}
void QfsBroker::pread(ResponseCallbackRead *cb, uint32_t fd, uint64_t offset, uint32_t amount, bool verify_checksum) {
StaticBuffer buf((size_t)amount, (size_t)HT_DIRECT_IO_ALIGNMENT);
if(fileType[fd] == 2) {
ssize_t nread;
int error;
OpenFileDataLocalPtr fdataLFS;
HT_DEBUGF("pread fd=%d offset=%llu amount=%d", fd, (Llu)offset, amount);
if (!m_open_file_map.get(fd+1, fdataLFS)) {
char errbuf[32];
sprintf(errbuf, "%d", fd+1);
cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
return;
}
nread = FileUtils::pread(fdataLFS->fd, buf.base, buf.aligned_size(), (off_t)offset);
if (nread != (ssize_t)buf.aligned_size()) {
////////////// RECOVERY FROM DFS /////////////////
report_error(cb,errno);
HT_ERRORF("pread failed: fd=%d amount=%d aligned_size=%d offset=%llu - %s",
fd+1, (int)amount, (int)buf.aligned_size(), (Llu)offset,
strerror(errno));
return;
////////////////////////////////////////////////////
}
if ((error = cb->response(offset, buf)) != Error::OK)
HT_ERRORF("Problem sending response for pread(%u, %llu, %u) - %s",
(unsigned)fd+1, (Llu)offset, (unsigned)amount, Error::get_text(error));
} else {
OpenFileDataQfsPtr fdataDFS;
if (!m_open_file_map.get(fd, fdataDFS)) {
char errbuf[32];
sprintf(errbuf, "%d", fd);
cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
return;
}
ssize_t status = m_client->PRead(fdataDFS->fd, offset, reinterpret_cast<char*>(buf.base), amount);
if(status < 0) {
HT_ERRORF("pread(%d,%lld,%lld) failure (%d) - %s", (int)fd, (Lld)offset,
(Lld)amount, (int)-status, KFS::ErrorCodeToStr(status).c_str());
report_error(cb, status);
} else
cb->response(offset, buf);
}
}
void QfsBroker::mkdirs(ResponseCallback *cb, const char *dname) {
String absdir;
String filePath;
if (m_verbose)
HT_DEBUGF("mkdirs dir='%s'", dname);
if (dname[0] == '/') {
absdir = m_rootdir + dname;
filePath = (String)"F" + dname[12] + dname[14]+ dname[15];
} else {
absdir = m_rootdir + "/" + dname;
filePath = (String)"F" + dname[11] + dname[13] + dname[14];
}
if(filePath != "Fsrv") {
if (!FileUtils::mkdirs(absdir)) {
report_error(cb,errno);
HT_ERRORF("mkdirs failed: dname='%s' - %s", absdir.c_str(),
strerror(errno));
// if ((error = cb->response_ok()) != Error::OK)
// HT_ERRORF("Problem sending response for mkdirs(%s) - %s",
// dname, Error::get_text(error));
}
}
int status = m_client->Mkdirs(dname);
if(status < 0) {
HT_ERRORF("mkdirs(%s) failure (%d) - %s", dname, -status, KFS::ErrorCodeToStr(status).c_str());
report_error(cb, status);
} else
cb->response_ok();
return;
}
void QfsBroker::rmdir(ResponseCallback *cb, const char *dname) {
String absdir;
String filePath;
String cmd_str;
if (m_verbose)
HT_INFOF("rmdir dir='%s'", dname);
if (dname[0] == '/') {
absdir = m_rootdir + dname;
filePath = (String)"F" + dname[12] + dname[14]+ dname[15];
} else {
absdir = m_rootdir + "/" + dname;
filePath = (String)"F" + dname[11] + dname[13] + dname[14];
}
if(filePath != "Fsrv") {
if (FileUtils::exists(absdir)) {
if (m_no_removal) {
String deleted_file = absdir + ".deleted";
if (!FileUtils::rename(absdir, deleted_file)) {
report_error(cb,errno);
return;
}
} else {
cmd_str = (String)"/bin/rm -rf " + absdir;
if (system(cmd_str.c_str()) != 0) {
HT_ERRORF("%s failed.", cmd_str.c_str());
cb->error(Error::DFSBROKER_IO_ERROR, cmd_str);
return;
}
}
}
#if 0
if (rmdir(absdir.c_str()) != 0) {
report_error(cb,errno);
HT_ERRORF("rmdir failed: dname='%s' - %s", absdir.c_str(), strerror(errno));
return;
}
#endif
// if ((error = cb->response_ok()) != Error::OK)
// HT_ERRORF("Problem sending response for mkdirs(%s) - %s",
//dname, Error::get_text(error));
}
int status = m_client->Rmdirs(dname);
if(status < 0 && status != -ENOENT) {
HT_ERRORF("rmdir(%s) failure (%d) - %s", dname, -status, KFS::ErrorCodeToStr(status).c_str());
report_error(cb, status);
} else
cb->response_ok();
return;
}
void QfsBroker::flush(ResponseCallback *cb, uint32_t fd) {
if(fileType[fd] == 2) {
OpenFileDataLocalPtr fdataLFS;
HT_DEBUGF("flush fd=%d", fd+1);
if (!m_open_file_map.get(fd+1, fdataLFS)) {
char errbuf[32];
sprintf(errbuf, "%d", fd+1);
cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
return;
}
if (fsync(fdataLFS->fd) != 0) {
HT_ERRORF("flush failed: fd=%d - %s", fd=1, strerror(errno));
report_error(cb,errno);
}
}
HT_DEBUGF("flush fd=%d", fd);
OpenFileDataQfsPtr fdataDFS;
if (!m_open_file_map.get(fd, fdataDFS)) {
char errbuf[32];
sprintf(errbuf, "%d", fd);
cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
return;
}
int status = m_client->Sync(fdataDFS->fd);
if(status < 0) {
HT_ERRORF("flush(%d) failure (%d) - %s", (int)fd, -status, KFS::ErrorCodeToStr(status).c_str());
report_error(cb, status);
} else
cb->response_ok();
}
void QfsBroker::readdir(ResponseCallbackReaddir *cb, const char *dname) {
// no local - in-case, RS need to decide of cluster used filename
std::vector<std::string> result, listing;
int err = m_client->Readdir(dname, result);
std::vector<std::string>::iterator end = result.end();
for(std::vector<std::string>::iterator it = result.begin(); it != end; ++it) {
const std::string& ent = *it;
if(ent != "." && ent != "..")
listing.push_back(ent);
}
if(err == 0)
cb->response(listing);
else {
HT_ERRORF("readdir(%s) failure (%d) - %s", dname, -err, KFS::ErrorCodeToStr(err).c_str());
report_error(cb,err);
}
}
void QfsBroker::posix_readdir(ResponseCallbackPosixReaddir *cb,
const char *dname) {
HT_ASSERT(!"posix_readdir() not yet implemented.");
}
void QfsBroker::exists(ResponseCallbackExists *cb, const char *fname) {
//String abspath;
//String filePath;
//if (fname[0] == '/') {
// abspath = m_rootdir + fname;
// filePath = (String)"F" + fname[12] + fname[14]+ fname[15];
// } else {
// abspath = m_rootdir + "/" + fname;
// filePath = (String)"F" + fname[11] + fname[13] + fname[14];
// }
//if(filePath == "Fsrv")
// cb->response(FileUtils::exists(abspath));
// else
// no local - in-case, RS need to decide of cluster used filename
cb->response(m_client->Exists(fname));
}
void QfsBroker::rename(ResponseCallback *cb, const char *src, const char *dst) {
//OUTPUT ON FILE PATH
String cmd_str;
cmd_str = (String)"echo 'SRC: " + src + " DST: " + dst + " ' > '/home/www/RENAME_FILE.txt'";
system(cmd_str.c_str());
int err = m_client->Rename(src, dst);
if(err == 0)
cb->response_ok();
else {
HT_ERRORF("rename(%s,%s) failure (%d) - %s", src, dst, -err, KFS::ErrorCodeToStr(err).c_str());
report_error(cb, err);
}
}
void QfsBroker::debug(ResponseCallback *cb, int32_t command, StaticBuffer &serialized_parameters) {
cb->error(Error::NOT_IMPLEMENTED, format("Unsupported debug command - %d",
command));
}
void QfsBroker::status(ResponseCallback *cb) {
cb->response_ok();
}
void QfsBroker::shutdown(ResponseCallback *cb) {
m_open_file_map.remove_all();
fileType.clear();
cb->response_ok();
}
void QfsBroker::report_error(ResponseCallback *cb, int error) {
string errors = KFS::ErrorCodeToStr(error);
switch(-error) {
case ENOTDIR:
case ENAMETOOLONG:
case ENOENT:
cb->error(Error::DFSBROKER_BAD_FILENAME, errors);
break;
case EACCES:
case EPERM:
cb->error(Error::DFSBROKER_PERMISSION_DENIED, errors);
break;
case EBADF:
cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errors);
break;
case EINVAL:
cb->error(Error::DFSBROKER_INVALID_ARGUMENT, errors);
break;
default:
cb->error(Error::DFSBROKER_IO_ERROR, errors);
break;
}
#ifndef NDEBUG
std::clog << "ERROR " << errors << std::endl;
#endif //NDEBUG
}