Before I ruin the copy.

This copy,

m_open_file_map  assigned with ID of pairs-   while the ID-fd is an 
generated num grow by 2 (eg. 1 as DFS 2 as local  the connection resources 
fd)  

still to work out a more proper ID to use, as might the loop is 
out-breaking.

on the request of fd, and it is RS-data , it's open_file_map is fd+1 (the 
meta is the fd)

the fd key , get assigned at open/create as well with file type. 

1 > for Meta

2 > for RS-data(table)

if(filePath == "Fsrv") fileType[USE_openFilesCount] = 1;
else fileType[USE_openFilesCount] = 2;

 USE_openFilesCount --- Is the key for file_map, while for the local data 
it is the  USE_openFilesCount+1

addr is the same for both new file_map



Let me know, if you think there is an issue of such way around.

By the way, with the key assigned for fd, it is a way around to manage the 
number of open_file_descriptors on the system.



Thanks,

Kashirin Alex


 
On Tuesday, February 4, 2014 8:11:03 AM UTC+2, Alex Kashirin wrote:
>
> the QFSbroker combined with local FS.
>
> m_open_file_map get assigned predefined IDs to store by.
>
>
> It's now a more better version, Doug you can just download it from the 
> server , the log details on the older email.
>
>
> Thanks,
>
> Kashirin Alex
>
>
>
>
> On Monday, February 3, 2014 7:33:42 PM UTC+2, Alex Kashirin wrote:
>>
>> Why the localbroker use FileUtils and not Filesystem.h  (the FileSystem 
>> has close file descriptor while the FileUtils do not!)
>>
>>
>> the localBroker, might not closing the open file descriptors
>>
>>
>> Thanks,
>>
>> Kashirin Alex 
>>
>>
>>
>> On Sunday, February 2, 2014 3:32:50 AM UTC+2, Alex Kashirin wrote:
>>>
>>> Will be good to have opinions on the built and run.
>>>
>>> cmake file needs to be updated.
>>>
>>> does not contain Java related.
>>>
>>> Thanks,
>>>
>>> Kashirin Alex
>>>
>>>
>>>

-- 
You received this message because you are subscribed to the Google Groups 
"Hypertable Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/hypertable-dev.
For more options, visit https://groups.google.com/groups/opt_out.
/**
 * Copyright (C) 2007-2012 Hypertable, Inc.
 *
 * This file is part of Hypertable.
 *
 * Hypertable is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 3
 * of the License, or any later version.
 *
 * Hypertable is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 * 02110-1301, USA.
 */

#include "Common/Compat.h"
#include <cstdlib>
#include <iostream>
#include <fstream>
#include <string>

extern "C" {
#include <poll.h>
#include <sys/types.h>
#include <unistd.h>
}

#include "Common/Error.h"
#include "Common/FileUtils.h"
#include "Common/InetAddr.h"
#include "Common/Init.h"
#include "Common/Usage.h"
#include "Common/System.h"

#include "AsyncComm/ApplicationQueue.h"
#include "AsyncComm/Comm.h"
#include "AsyncComm/DispatchHandler.h"

#include "DfsBroker/Lib/Config.h"
#include "DfsBroker/Lib/ConnectionHandlerFactory.h"

#include "QfsBroker.h"

using namespace Hypertable;
using namespace Config;
using namespace std;


namespace KFS {
  extern std::string KFS_BUILD_VERSION_STRING;
} 

namespace {

  struct AppPolicy : Policy {
    static void init_options() {
      cmdline_desc().add_options()
        ("root", str()->default_value("fs/local"), "root directory for local "
         "broker (if relative, it's relative to the installation directory")
        ;
      alias("root", "DfsBroker.Local.Root");
	  alias("port", "Qfs.MetaServer.Port");
      alias("host", "Qfs.MetaServer.Name");
      alias("workers", "Qfs.Broker.Workers");
      alias("reactors", "Qfs.Broker.Reactors");
    }
  };
  typedef Meta::list<AppPolicy, DfsBrokerPolicy, DefaultCommPolicy> Policies;

}

int main(int argc, char **argv) {
  try {
	
	
    init_with_policies<Policies>(argc, argv);
    int port = get_i16("DfsBroker.Port");
    int worker_count = get_i32("workers");

    Comm *comm = Comm::instance();

    ApplicationQueuePtr app_queue = new ApplicationQueue(worker_count);
    BrokerPtr broker = new QfsBroker(properties);

    ConnectionHandlerFactoryPtr chfp =
        new DfsBroker::ConnectionHandlerFactory(comm, app_queue, broker);
    InetAddr listen_addr(INADDR_ANY, port);

    comm->listen(listen_addr, chfp);
    app_queue->join();
  }
  catch (Exception &e) {
    HT_ERROR_OUT << e << HT_END;
    return 1;
  }
}
/**
 * Copyright (C) 2007-2013 Hypertable, Inc.
 *
 * This file is part of Hypertable.
 *
 * Hypertable is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 3
 * of the License, or any later version.
 *
 * Hypertable is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 * 02110-1301, USA.
 */

#ifndef HYPERTABLE_QFSROKER_H
#define HYPERTABLE_QFSBROKER_H

extern "C" {
#include <unistd.h>
}

#include "Common/String.h"
#include "Common/atomic.h"
#include "Common/Properties.h"

#include "DfsBroker/Lib/Broker.h"




namespace KFS {
  class KfsClient;
}

namespace Hypertable {
  using namespace DfsBroker;


  /**
   * A subclass of Hypertable::DfsBroker::OpenFileData to hold the fields needed by the QfsBroker
   */
  class OpenFileDataQfs: public OpenFileData {
  public:
  OpenFileDataQfs(const std::string &fname, int _fd, KFS::KfsClient *client): m_fname(fname), fd(_fd), m_client(client) {};
  
    virtual ~OpenFileDataQfs();

    std::string m_fname;
	int fd;
    KFS::KfsClient *const m_client;
  //private:
    //std::string m_fname;
    //int m_fd;
    //KFS::KfsClient *const m_client;
  };

  /**
   */
  class OpenFileDataQfsPtr : public OpenFileDataPtr {
  public:
  OpenFileDataQfsPtr() : OpenFileDataPtr() { }
  OpenFileDataQfsPtr(OpenFileDataQfs *ofdq) : OpenFileDataPtr(ofdq, true) {}
    OpenFileDataQfs *operator->() const {
      return (OpenFileDataQfs*) get();
    }
  };

    /**
   *
   */
  class OpenFileDataLocal : public OpenFileData {
  public:
  OpenFileDataLocal(const String &fname, int _fd, int _flags) : fd(_fd), flags(_flags), filename(fname) { }
    virtual ~OpenFileDataLocal() {
      HT_INFOF("close( %s , %d )", filename.c_str(), fd);
      close(fd);
    }
    int  fd;
    int  flags;
    String filename;
  };

  /**
   *
   */
  class OpenFileDataLocalPtr : public OpenFileDataPtr {
  public:
    OpenFileDataLocalPtr() : OpenFileDataPtr() { }
    OpenFileDataLocalPtr(OpenFileDataLocal *ofdl)
      : OpenFileDataPtr(ofdl, true) { }
    OpenFileDataLocal *operator->() const {
      return (OpenFileDataLocal *)get();
    }
  };


  
  
  
  
  /**
   * QfsBroker implementation class. @see Hypertable::DfsBroker::Broker
   */
  class QfsBroker : public DfsBroker::Broker {
  public:
    QfsBroker(PropertiesPtr& cfg);
    virtual ~QfsBroker();

    virtual void open(ResponseCallbackOpen *cb, const char *fname, uint32_t flags, uint32_t bufsz);
    virtual void close(ResponseCallback *cb, uint32_t fd);

    virtual void create(ResponseCallbackOpen *cb, const char *fname, uint32_t flags, int32_t bufsz, int16_t replication, int64_t blksz);
    virtual void read(ResponseCallbackRead *cb, uint32_t fd, uint32_t amount);
    virtual void append(ResponseCallbackAppend *, uint32_t fd, uint32_t amount, const void *data, bool flush);
    virtual void seek(ResponseCallback *cb, uint32_t fd, uint64_t offset);
    virtual void remove(ResponseCallback *cb, const char *fname);
    virtual void length(ResponseCallbackLength *cb, const char *fname,
                        bool accurate = true);
    virtual void pread(ResponseCallbackRead *cb, uint32_t fd, uint64_t offset,
                       uint32_t amount, bool verify_checksum);
    virtual void mkdirs(ResponseCallback *cb, const char *dname);
    virtual void rmdir(ResponseCallback *cb, const char *dname);
    virtual void flush(ResponseCallback *cb, uint32_t fd);
    virtual void status(ResponseCallback *cb);
    virtual void shutdown(ResponseCallback *cb);
    virtual void readdir(ResponseCallbackReaddir *cb, const char *dname);
    virtual void posix_readdir(ResponseCallbackPosixReaddir *cb,
			       const char *dname);
    virtual void exists(ResponseCallbackExists *cb, const char *fname);
    virtual void rename(ResponseCallback *cb, const char *src, const char *dst);
    virtual void debug(ResponseCallback *cb, int32_t command,
                       StaticBuffer &serialized_parameters);
  private:
    std::string m_host;
    int m_port;
    void report_error(ResponseCallback *cb, int error);
    KFS::KfsClient* const m_client;
	int *fileType;
	int openFilesCount;
	
    static atomic_t ms_next_fd;
    //virtual void report_error(ResponseCallback *cb);
    String       m_rootdir;
    bool         m_verbose;
    bool         m_directio;
    bool         m_no_removal;
  };
}
#endif // HYPERTABLE_QFSBROKER_H
/**
 * Copyright (C) 2007-2013 Hypertable, Inc.
 *
 * This file is part of Hypertable.
 *
 * Hypertable is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 3
 * of the License, or any later version.
 *
 * Hypertable is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 * 02110-1301, USA.
 */

#include "Common/Compat.h"

#include <cerrno>
#include <cstdio>
#include <cstring>
#include <string>
#include <vector>
#include <cstdlib>

extern "C" {
#include <dirent.h>
#include <fcntl.h>
#include <limits.h>
#include <poll.h>
#include <sys/types.h>
#if defined(__sun__)
#include <sys/fcntl.h>
#endif
#include <sys/uio.h>
#include <unistd.h>
#include <time.h>
}

#include "AsyncComm/ReactorFactory.h"

#include "Common/FileUtils.h"
#include "Common/Filesystem.h"
#include "Common/Path.h"
#include "Common/String.h"
#include "Common/System.h"
#include "Common/SystemInfo.h"

#include "QfsBroker.h"
#include <kfs/KfsClient.h>

using namespace Hypertable;
using namespace std;
using namespace KFS;

OpenFileDataQfs::~OpenFileDataQfs() {
  HT_INFOF("close(%d) file: %s", fd, m_fname.c_str());
  m_client->Close(fd);
}

QfsBroker::QfsBroker(PropertiesPtr &cfg)
    : m_host(cfg->get_str("host")),
    m_port(cfg->get_i16("port")),
    m_client(KFS::Connect(m_host, m_port)) {
  
	fileType = new int[1024];
	openFilesCount = 1;
	
	m_verbose = cfg->get_bool("verbose");
	m_directio = cfg->get_bool("DfsBroker.Local.DirectIO");
	m_no_removal = cfg->get_bool("DfsBroker.DisableFileRemoval");

#if defined(__linux__)
  // disable direct i/o for kernels < 2.6
  if (m_directio) {
    if (System::os_info().version_major == 2 &&
        System::os_info().version_minor < 6)
      m_directio = false;
  }
#endif

  /**
   * Determine root directory
   */
  Path root = cfg->get_str("root", "");

  if (!root.is_complete()) {
    Path data_dir = cfg->get_str("Hypertable.DataDirectory");
    root = data_dir / root;
  }

  m_rootdir = root.string();

  // ensure that root directory exists
  if (!FileUtils::mkdirs(m_rootdir))
    exit(1);
	
}

QfsBroker::~QfsBroker() {
  delete m_client;
}
void QfsBroker::open(ResponseCallbackOpen *cb, const char *fname, uint32_t flags, uint32_t bufsz) {
	int fd;
	struct sockaddr_in addr;
	cb->get_address(addr);
	
	String abspath;
	String filePath;
	if (fname[0] == '/') {
		abspath = m_rootdir + fname;
		filePath = (String)"F" + fname[12] + fname[14]+ fname[15];
	} else {
		abspath = m_rootdir + "/" + fname;
		filePath = (String)"F" + fname[11] + fname[13] + fname[14];
	}
	
	int oflags = O_RDONLY;

	if (m_directio && flags & Filesystem::OPEN_FLAG_DIRECTIO) {
#ifdef O_DIRECT
		oflags |= O_DIRECT;
#endif
	}
	
	int USE_openFilesCount = 0;
	for (int i=1; i<=openFilesCount;) {
		i = i + 2;
		if(openFilesCount == i) {
			openFilesCount =openFilesCount + 2;
			USE_openFilesCount = openFilesCount; 
			fileType = (int *)malloc(openFilesCount+1024 * sizeof(int));
			break;
		} else {
			if(fileType[i] == 0) {
				USE_openFilesCount = i;
				break;
			}
		}
	}
	if(USE_openFilesCount == 0) {
		openFilesCount = openFilesCount + 2;
		USE_openFilesCount = openFilesCount;
	}

	if(filePath == "Fsrv") fileType[USE_openFilesCount] = 1;
	else fileType[USE_openFilesCount] = 2;
	
	HT_DEBUGF("open file='%s' flags=%u bufsz=%d", fname, flags, bufsz);
    //HT_ERRORF("open failed: file='%s' - %s", abspath.c_str(), strerror(errno));
  
	if(fileType[USE_openFilesCount] == 2) {
		fd = ::open(abspath.c_str(), oflags);
		if(fd == -1) {
			HT_ERRORF("open(%s) failure (%d) - %s", fname, USE_openFilesCount, KFS::ErrorCodeToStr(fd).c_str());
			report_error(cb,errno);
			return;
		}
		
		OpenFileDataLocalPtr fdata(new OpenFileDataLocal(fname, fd, O_RDONLY));
		m_open_file_map.create(USE_openFilesCount+1, addr, fdata);
	}
	 
	fd = m_client->Open(fname, O_RDONLY);
	if(fd < 0) {
		HT_ERRORF("open(%s) failure (%d) - %s", fname, USE_openFilesCount, KFS::ErrorCodeToStr(fd).c_str());
		report_error(cb, fd);
	}
	
    HT_INFOF("open(%s) = %d", fname, fd);
    OpenFileDataQfsPtr fdata(new OpenFileDataQfs(fname, fd, m_client));
    m_open_file_map.create(USE_openFilesCount, addr, fdata);
	
    if(bufsz)
      m_client->SetIoBufferSize(fd, bufsz);
    cb->response(USE_openFilesCount);
}

void QfsBroker::close(ResponseCallback *cb, uint32_t fd) {
  HT_DEBUGF("close(%d)", fd);
  m_open_file_map.remove(fd);
  m_open_file_map.remove(fd+1);
  fileType[fd] = 0;
  cb->response_ok();
}

void QfsBroker::create(ResponseCallbackOpen *cb, const char *fname, uint32_t flags, int32_t bufsz, int16_t replication, int64_t blksz) {
	int oflags = O_WRONLY | O_CREAT;
	int fd;
	struct sockaddr_in addr;
	cb->get_address(addr);
	
	HT_DEBUGF("create file='%s' flags=%u bufsz=%d replication=%d blksz=%lld",
            fname, flags, bufsz, (int)replication, (Lld)blksz);

	String abspath;
	String filePath;
	if (fname[0] == '/') {
		abspath = m_rootdir + fname;
		filePath = (String)"F" + fname[12] + fname[14]+ fname[15];
	} else {
		abspath = m_rootdir + "/" + fname;
		filePath = (String)"F" + fname[11] + fname[13] + fname[14];
	}
	
	
	if (flags & Filesystem::OPEN_FLAG_OVERWRITE)
		oflags |= O_TRUNC;
	else
		oflags |= O_APPEND;

	if (m_directio && flags & Filesystem::OPEN_FLAG_DIRECTIO) {
#ifdef O_DIRECT
		oflags |= O_DIRECT;
#endif
	}
	
	int USE_openFilesCount = 0;
	for (int i=1; i<=openFilesCount;) {
		i = i + 2;
		if(openFilesCount == i) {
			openFilesCount =openFilesCount + 2;
			USE_openFilesCount = openFilesCount; 
			fileType = (int *)malloc(openFilesCount+1024 * sizeof(int));
			break;
		} else {
			if(fileType[i] == 0) {
				USE_openFilesCount = i;
				break;
			}
		}
	}
	if(USE_openFilesCount == 0) {
		openFilesCount = openFilesCount + 2;
		USE_openFilesCount = openFilesCount;
	}

	if(filePath == "Fsrv") fileType[USE_openFilesCount] = 1;
	else fileType[USE_openFilesCount] = 2;
	
	
	if(fileType[USE_openFilesCount] == 2) {
		fd = ::open(abspath.c_str(), oflags, 0644);
		if(fd == -1) {
			HT_ERRORF("open failed: file='%s' - %s", abspath.c_str(), strerror(errno));
			report_error(cb,errno);
		}
		OpenFileDataLocalPtr fdata(new OpenFileDataLocal(fname, fd, O_WRONLY));
		m_open_file_map.create(USE_openFilesCount+1, addr, fdata);
	}
	
	if(flags & Filesystem::OPEN_FLAG_OVERWRITE) 
		fd = m_client->Open(fname, O_CREAT | O_TRUNC | O_RDWR);
	else
		fd = m_client->Open(fname, O_CREAT|O_WRONLY);
		

	if(fd < 0) {
		HT_ERRORF("open(%s) failure (%d) - %s", fname, USE_openFilesCount, KFS::ErrorCodeToStr(fd).c_str());
		report_error(cb, fd);
	} else {
		HT_INFOF("create(%s) = %d", fname, fd);
		OpenFileDataQfsPtr fdata(new OpenFileDataQfs(fname, fd, m_client));
		m_open_file_map.create(USE_openFilesCount, addr, fdata);
		if(bufsz)
			m_client->SetIoBufferSize(fd, bufsz);
	}
	
	cb->response(USE_openFilesCount);
}

void QfsBroker::seek(ResponseCallback *cb, uint32_t fd, uint64_t offset) {
	if (fileType[fd] == 0) {
		char errbuf[32];
		sprintf(errbuf, "%d", fd);
		cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
		return;
	}
	
	if(fileType[fd] == 2) {
		int error;
		OpenFileDataLocalPtr fdataLFS;
		
		if (!m_open_file_map.get(fd+1, fdataLFS)) {
			char errbuf[32];
			sprintf(errbuf, "%d", fd+1);
			cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
			return;
		}
		
		HT_DEBUGF("seek fd=%lu offset=%llu", (Lu)fd+1, (Llu)offset);
		
		if ((offset = (uint64_t)lseek(fdataLFS->fd, offset, SEEK_SET)) == (uint64_t)-1) {
			report_error(cb,errno);
			HT_ERRORF("lseek failed: fd=%d offset=%llu - %s", fdataLFS->fd, (Llu)offset,
              strerror(errno));
			return;
		}
		if ((error = cb->response_ok()) != Error::OK)
			HT_ERRORF("Problem sending response for seek(%u, %llu) - %s",
					(unsigned)fd+1, (Llu)offset, Error::get_text(error));
  
		cb->response_ok();
	
	} else {
		OpenFileDataQfsPtr fdataDFS;
		if (!m_open_file_map.get(fd, fdataDFS)) {
			char errbuf[32];
			sprintf(errbuf, "%d", fd);
			cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
			return;
		}
	
		chunkOff_t status = m_client->Seek(fdataDFS->fd, offset);
		if(status == (chunkOff_t)-1) {
			HT_ERRORF("seek(%d,%lld) failure (%d) - %s", (int)fd, (Lld)offset, (int)-status, KFS::ErrorCodeToStr(status).c_str());
			report_error(cb, status);
		} else
			cb->response_ok();
	
	}
}

void QfsBroker::read(ResponseCallbackRead *cb, uint32_t fd, uint32_t amount) {

	StaticBuffer buf((size_t)amount, (size_t)HT_DIRECT_IO_ALIGNMENT);
	uint64_t offset;
	
	if(fileType[fd] == 2) {
		OpenFileDataLocalPtr fdataLFS;
		ssize_t nread;
		int error;
		
		HT_DEBUGF("read fd=%d amount=%d", fd, amount);
		
		if (!m_open_file_map.get(fd+1, fdataLFS)) {
			char errbuf[32];
			sprintf(errbuf, "%d", fd+1);
			cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
			return;
		}
		
		if ((offset = (uint64_t)lseek(fdataLFS->fd, 0, SEEK_SET)) == (uint64_t)-1) {
			report_error(cb,errno);
			HT_ERRORF("lseek failed: fd=%d offset=%llu - %s", fdataLFS->fd, (Llu)offset,
              strerror(errno));
			return;
		}

		if ((nread = FileUtils::read(fdataLFS->fd, buf.base, amount)) == -1) {
			////////////// RECOVERY FROM DFS   /////////////////
			report_error(cb,errno);
			HT_ERRORF("read failed: fd=%d offset=%llu amount=%d - %s",
				fd+1, (Llu)offset, amount, strerror(errno));
			return;
			////////////////////////////////////////////////////
		}
		buf.size = nread;

		if ((error = cb->response(offset, buf)) != Error::OK)
			HT_ERRORF("Problem sending response for read(%u, %u) - %s",
				(unsigned)fd, (unsigned)amount, Error::get_text(error));
		
	} else {
		OpenFileDataQfsPtr fdataDFS;
		if (!m_open_file_map.get(fd, fdataDFS)) {
			char errbuf[32];
			sprintf(errbuf, "%d", fd);
			cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
			return;
		}
		offset = m_client->Tell(fdataDFS->fd);
		int len = m_client->Read(fdataDFS->fd, reinterpret_cast<char*>(buf.base), amount);
		if(len<0) {
			HT_ERRORF("read(%d,%lld) failure (%d) - %s", (int)fd, (Lld)amount, -len, KFS::ErrorCodeToStr(len).c_str());
			report_error(cb, len);
		} else
			cb->response(offset, buf);
	
	}
}

void QfsBroker::append(ResponseCallbackAppend *cb, uint32_t fd, uint32_t amount, const void *data, bool flush) {
	OpenFileDataQfsPtr fdataDFS;
	if (!m_open_file_map.get(fd, fdataDFS)) {
		char errbuf[32];
		sprintf(errbuf, "%d", fd);
		cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
		return;
	}
		
	uint64_t offset = m_client->Tell(fdataDFS->fd);
	ssize_t written = m_client->Write(fdataDFS->fd, reinterpret_cast<const char*>(data), amount);
	if(written < 0) {
		HT_ERRORF("append(%d,%lld,%s) failure (%d) - %s", (int)fd, (Lld)amount,
			flush ? "true" : "false", (int)-written, KFS::ErrorCodeToStr(written).c_str());
		report_error(cb,errno);
	} else {
		if(flush) {
			int error = m_client->Sync(fdataDFS->fd);
			if(error) {
				HT_ERRORF("append(%d,%lld,%s) failure (%d) - %s", (int)fd, (Lld)amount,
					flush ? "true" : "false", -error, KFS::ErrorCodeToStr(error).c_str());
				return report_error(cb, error);
			}
		}
	}
		//String cmd_str;
		//cmd_str = (String)"echo 'META DATA append : " + openFileName[fd] + " ' > '/home/www/META_DATA_append.txt'";
		//system(cmd_str.c_str());
		
	if(fileType[fd] == 2) {
		
		//String cmd_str;
		//cmd_str = (String)"echo 'RS DATA TO LOCAL : " + openFileName[fd] + " ' > '/home/www/RS_DATA_TO_LOCAL.txt'";
		//system(cmd_str.c_str());
		
		
		OpenFileDataLocalPtr fdataLFS;
		ssize_t nwritten;
		
		if (!m_open_file_map.get(fd+1, fdataLFS)) {
			char errbuf[32];
			sprintf(errbuf, "%d", fd+1);
			cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
			return;
		}
		
		if ((offset = (uint64_t)lseek(fdataLFS->fd, 0, SEEK_CUR)) == (uint64_t)-1) {
			report_error(cb,errno);
			HT_ERRORF("lseek failed: fd=%d offset=0 SEEK_CUR - %s", fd+1,
              strerror(errno));
			return;
		}
		HT_DEBUGF("append fd=%lu offset=%llu", (Lu)fd, (Llu)offset);
		if ((nwritten = FileUtils::write(fdataLFS->fd, data, amount)) == -1) {
			report_error(cb,errno);
			HT_ERRORF("write failed: fd=%d offset=%llu amount=%d data=%p- %s",
			fd+1, (Llu)offset, amount, data, strerror(errno));
			return;
		}

		if (fsync(fdataLFS->fd) != 0) {
			report_error(cb,errno);
			HT_ERRORF("flush failed: fd=%d - %s", fd, strerror(errno));
			return;
		}
		// int error;
		//if ((error = cb->response_ok()) != Error::OK)
			//HT_ERRORF("Problem sending response for seek(%u, %llu) - %s",
					//(unsigned)fd, (Llu)offset, strerror(errno));
	}

	cb->response(offset, written);
}

void QfsBroker::remove(ResponseCallback *cb, const char *fname) {

	HT_INFOF("remove file='%s'", fname);

	String abspath;
	String filePath;

	
	if (fname[0] == '/') {
		abspath = m_rootdir + fname;
		filePath = (String)"F" + fname[12] + fname[14]+ fname[15];
	} else {
		abspath = m_rootdir + "/" + fname;
		filePath = (String)"F" + fname[11] + fname[13] + fname[14];
	}
	if(filePath != "Fsrv") {
		if (m_no_removal) {
			String deleted_file = abspath + ".deleted";
			if (!FileUtils::rename(abspath, deleted_file)) {
				report_error(cb,errno);
				return;
			}
		} else {
			if (unlink(abspath.c_str()) == -1) {
				report_error(cb,errno);
				HT_ERRORF("unlink failed: file='%s' - %s", abspath.c_str(),
					strerror(errno));
				return;
			}
		}
		//int error;
		//if ((error = cb->response_ok()) != Error::OK)
			//HT_ERRORF("Problem sending response for remove(%s) - %s",
					//fname, Error::get_text(error));

	}
	

	int status = m_client->Remove(fname);
	if(status == 0)
		cb->response_ok();
	else {
		HT_ERRORF("remove(%s) failure (%d) - %s", fname, -status, KFS::ErrorCodeToStr(status).c_str());
		report_error(cb, status);
	}
}

void QfsBroker::length(ResponseCallbackLength *cb, const char *fname, bool accurate)
{
	String abspath;
	String filePath;
	
	if (fname[0] == '/') {
		abspath = m_rootdir + fname;
		filePath = (String)"F" + fname[12] + fname[14]+ fname[15];
	} else {
		abspath = m_rootdir + "/" + fname;
		filePath = (String)"F" + fname[11] + fname[13] + fname[14];
	}
	
	if(filePath != "Fsrv") {
		uint64_t length;

		HT_DEBUGF("length file='%s' (accurate=%s)", fname,
			accurate ? "true" : "false");
		
		if ((length = FileUtils::length(abspath)) == (uint64_t)-1) {
			HT_ERRORF("length (stat) failed: file='%s' - %s", abspath.c_str(),
					strerror(errno));
			report_error(cb,errno);
		}
		cb->response(length);
	} else {

		KfsFileAttr result;
		int err = m_client->Stat(fname, result);
		if(err == 0)
			cb->response(result.fileSize);
		else {
			HT_ERRORF("length(%s) failure (%d) - %s", fname, -err, KFS::ErrorCodeToStr(err).c_str());
			report_error(cb, err);
		}
	}
}

void QfsBroker::pread(ResponseCallbackRead *cb, uint32_t fd, uint64_t offset, uint32_t amount, bool verify_checksum) {

	StaticBuffer buf((size_t)amount, (size_t)HT_DIRECT_IO_ALIGNMENT);
	
	if(fileType[fd] == 2) {
		ssize_t nread;
		int error;

		OpenFileDataLocalPtr fdataLFS;
		HT_DEBUGF("pread fd=%d offset=%llu amount=%d", fd, (Llu)offset, amount);

		if (!m_open_file_map.get(fd+1, fdataLFS)) {
			char errbuf[32];
			sprintf(errbuf, "%d", fd+1);
			cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
			return;
		}
		
		nread = FileUtils::pread(fdataLFS->fd, buf.base, buf.aligned_size(), (off_t)offset);
		if (nread != (ssize_t)buf.aligned_size()) {
			////////////// RECOVERY FROM DFS   /////////////////
			report_error(cb,errno);
			HT_ERRORF("pread failed: fd=%d amount=%d aligned_size=%d offset=%llu - %s",
              fd+1, (int)amount, (int)buf.aligned_size(), (Llu)offset,
              strerror(errno));
			return;
			////////////////////////////////////////////////////
		}
		
		if ((error = cb->response(offset, buf)) != Error::OK)
			HT_ERRORF("Problem sending response for pread(%u, %llu, %u) - %s",
				(unsigned)fd+1, (Llu)offset, (unsigned)amount, Error::get_text(error));
		
	} else {
		OpenFileDataQfsPtr fdataDFS;
		if (!m_open_file_map.get(fd, fdataDFS)) {
			char errbuf[32];
			sprintf(errbuf, "%d", fd);
			cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
			return;
		}
		ssize_t status = m_client->PRead(fdataDFS->fd, offset, reinterpret_cast<char*>(buf.base), amount);
		if(status < 0) {
			HT_ERRORF("pread(%d,%lld,%lld) failure (%d) - %s", (int)fd, (Lld)offset,
				(Lld)amount, (int)-status, KFS::ErrorCodeToStr(status).c_str());
			report_error(cb, status);
		} else
			cb->response(offset, buf);
	}

}
void QfsBroker::mkdirs(ResponseCallback *cb, const char *dname) {
  
	String absdir;
	String filePath;
	
	if (m_verbose)
		HT_DEBUGF("mkdirs dir='%s'", dname);
	
	if (dname[0] == '/') {
		absdir = m_rootdir + dname;
		filePath = (String)"F" + dname[12] + dname[14]+ dname[15];
	} else {
		absdir = m_rootdir + "/" + dname;
		filePath = (String)"F" + dname[11] + dname[13] + dname[14];
	}


	if(filePath != "Fsrv") {
		if (!FileUtils::mkdirs(absdir)) {
			report_error(cb,errno);
			HT_ERRORF("mkdirs failed: dname='%s' - %s", absdir.c_str(),
				strerror(errno));
				
			// if ((error = cb->response_ok()) != Error::OK)
			// HT_ERRORF("Problem sending response for mkdirs(%s) - %s",
            // dname, Error::get_text(error));
		}
	} 
	
	int status = m_client->Mkdirs(dname);
	
	if(status < 0) {
		HT_ERRORF("mkdirs(%s) failure (%d) - %s", dname, -status, KFS::ErrorCodeToStr(status).c_str());
		report_error(cb, status);
	} else
		cb->response_ok();

	return;
}

void QfsBroker::rmdir(ResponseCallback *cb, const char *dname) {
  
	String absdir;
	String filePath;
	String cmd_str;
	
	if (m_verbose)
		HT_INFOF("rmdir dir='%s'", dname);
	
	if (dname[0] == '/') {
		absdir = m_rootdir + dname;
		filePath = (String)"F" + dname[12] + dname[14]+ dname[15];
	} else {
		absdir = m_rootdir + "/" + dname;
		filePath = (String)"F" + dname[11] + dname[13] + dname[14];
	}

	if(filePath != "Fsrv") {

		if (FileUtils::exists(absdir)) {
			if (m_no_removal) {
				String deleted_file = absdir + ".deleted";
				if (!FileUtils::rename(absdir, deleted_file)) {
					report_error(cb,errno);
					return;
				}
			} else {
				cmd_str = (String)"/bin/rm -rf " + absdir;
				if (system(cmd_str.c_str()) != 0) {
					HT_ERRORF("%s failed.", cmd_str.c_str());
					cb->error(Error::DFSBROKER_IO_ERROR, cmd_str);
					return;
				}
			}
		}

#if 0
		if (rmdir(absdir.c_str()) != 0) {
			report_error(cb,errno);
			HT_ERRORF("rmdir failed: dname='%s' - %s", absdir.c_str(), strerror(errno));
			return;
		}
#endif

			// if ((error = cb->response_ok()) != Error::OK)
			// HT_ERRORF("Problem sending response for mkdirs(%s) - %s",
             //dname, Error::get_text(error));

	} 
	
	int status = m_client->Rmdirs(dname);
	if(status < 0 && status != -ENOENT) {
		HT_ERRORF("rmdir(%s) failure (%d) - %s", dname, -status, KFS::ErrorCodeToStr(status).c_str());
		report_error(cb, status);
	} else
		cb->response_ok();
		
	return;

}

void QfsBroker::flush(ResponseCallback *cb, uint32_t fd) {

	if(fileType[fd] == 2) {
		OpenFileDataLocalPtr fdataLFS;

		HT_DEBUGF("flush fd=%d", fd+1);
		if (!m_open_file_map.get(fd+1, fdataLFS)) {
			char errbuf[32];
			sprintf(errbuf, "%d", fd+1);
			cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
			return;
		}
		if (fsync(fdataLFS->fd) != 0) {
			HT_ERRORF("flush failed: fd=%d - %s", fd=1, strerror(errno));
			report_error(cb,errno);
		}
	}
	
	HT_DEBUGF("flush fd=%d", fd);
	OpenFileDataQfsPtr fdataDFS;
	if (!m_open_file_map.get(fd, fdataDFS)) {
		char errbuf[32];
		sprintf(errbuf, "%d", fd);
		cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
		return;
	}
	
	int status = m_client->Sync(fdataDFS->fd);
	if(status < 0) {
		HT_ERRORF("flush(%d) failure (%d) - %s", (int)fd, -status, KFS::ErrorCodeToStr(status).c_str());
		report_error(cb, status);
	} else
		cb->response_ok();
		
}

void QfsBroker::readdir(ResponseCallbackReaddir *cb, const char *dname) {
  // no local - in-case, RS need to decide of cluster used filename
  std::vector<std::string> result, listing;
  int err = m_client->Readdir(dname, result);
  std::vector<std::string>::iterator end = result.end();
  for(std::vector<std::string>::iterator it = result.begin(); it != end; ++it) {
    const std::string& ent = *it;
    if(ent != "." && ent != "..")
      listing.push_back(ent);
  }

  if(err == 0)
    cb->response(listing);
  else {
    HT_ERRORF("readdir(%s) failure (%d) - %s", dname, -err, KFS::ErrorCodeToStr(err).c_str());
    report_error(cb,err);
  }
}

void QfsBroker::posix_readdir(ResponseCallbackPosixReaddir *cb,
			      const char *dname) {
  HT_ASSERT(!"posix_readdir() not yet implemented.");
}


void QfsBroker::exists(ResponseCallbackExists *cb, const char *fname) {
	//String abspath;
	//String filePath;
	
	//if (fname[0] == '/') {
	//	abspath = m_rootdir + fname;
	//	filePath = (String)"F" + fname[12] + fname[14]+ fname[15];
	// } else {
	//	abspath = m_rootdir + "/" + fname;
	//	filePath = (String)"F" + fname[11] + fname[13] + fname[14];
	// }
	//if(filePath == "Fsrv")
	//	cb->response(FileUtils::exists(abspath));
	// else
	
	// no local - in-case, RS need to decide of cluster used filename
		cb->response(m_client->Exists(fname));
}

void QfsBroker::rename(ResponseCallback *cb, const char *src, const char *dst) {

  //OUTPUT ON FILE PATH
		String cmd_str;
		cmd_str = (String)"echo 'SRC: " + src + " DST: " + dst + " ' > '/home/www/RENAME_FILE.txt'";
		system(cmd_str.c_str());

  int err = m_client->Rename(src, dst);
  if(err == 0)
    cb->response_ok();
  else {
    HT_ERRORF("rename(%s,%s) failure (%d) - %s", src, dst, -err, KFS::ErrorCodeToStr(err).c_str());
    report_error(cb, err);
  }
}

void QfsBroker::debug(ResponseCallback *cb, int32_t command, StaticBuffer &serialized_parameters) {
  cb->error(Error::NOT_IMPLEMENTED, format("Unsupported debug command - %d",
                                           command));
}

void QfsBroker::status(ResponseCallback *cb) {
  cb->response_ok();
}

void QfsBroker::shutdown(ResponseCallback *cb) {
	m_open_file_map.remove_all();
	cb->response_ok();
	delete fileType;
	cb->response_ok();
}

void QfsBroker::report_error(ResponseCallback *cb, int error) {
  string errors = KFS::ErrorCodeToStr(error);
  switch(-error) {
  case ENOTDIR:
  case ENAMETOOLONG:
  case ENOENT:
    cb->error(Error::DFSBROKER_BAD_FILENAME, errors);
    break;

  case EACCES:
  case EPERM:
    cb->error(Error::DFSBROKER_PERMISSION_DENIED, errors);
    break;

  case EBADF:
    cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errors);
    break;

  case EINVAL:
    cb->error(Error::DFSBROKER_INVALID_ARGUMENT, errors);
    break;

  default:
    cb->error(Error::DFSBROKER_IO_ERROR, errors);
    break;
  }

#ifndef  NDEBUG
  std::clog << "ERROR " << errors << std::endl;
#endif //NDEBUG
}

Reply via email to