Hi Greg,

Can you send me the patch as an attachment or just a repo url? It
looks like either your MUA or MTA wraps the lines, which makes it hard
to apply

Thanks,

__Luke

On Fri, Jul 17, 2009 at 5:47 PM, gfarnum<[email protected]> wrote:
>
> This patch adds a DfsBroker for the in-development Ceph open-source
> distributed filesystem (ceph.newdream.net). To actually run it on
> Ceph, you need to:
> Have Ceph installed.
> In conf/hypertable.cfg, set the Ceph.MonAddr property to one of your
> Ceph monitor addresses.
>
> If you wish to run the regression tests on Ceph:
> Change cmake/TestHelper.cmake's line ${INSTALL_DIR}/bin/localBroker to
> ${INSTALL_DIR}/bin/cephBroker
> Change bin/start-test-servers.sh's line $INSTALL_DIR/bin/start-all-
> servers.sh local && touch $TOUCH_FILE to $INSTALL_DIR/bin/start-all-
> servers.sh ceph && touch $TOUCH_FILE
> You may also need to increase the timeout in src/cc/Tools/dfsclient/
> dfsTest.cc:149 from 15 seconds (I used 30).
> Start Ceph, install hypertable and run the regression tests.
>
> I haven't had time to delve deeply into the workings of hypertable,
> and you'll notice I followed the model provided by localBroker and
> kosmosBroker pretty closely, so I was also wondering
> 1)if somebody could enlighten me as to the purpose of the addr stored
> in m_open_file_map.create and provided by the callback's get_address?
> 2) Why on earth the shutdown method is required to pause for 20
> seconds (poll(0,0,2000))
>
> Thanks!
> -Greg
>
> ——————————————
> From d4117b7dd57f855b824f346962e9ba19ec3b386b Mon Sep 17 00:00:00 2001
> From: Greg Farnum <[email protected]>
> Date: Mon, 13 Jul 2009 15:24:40 -0700
> Subject: [PATCH] Initial commit of Ceph components. Presently
> unworking.
>
> CephBroker: Passes regression tests if you increase timeout.
>
> Removed CephBroker from regression tests, default properties and
> compile.
> ---
>  CMakeLists.txt                       |    1 +
>  bin/start-dfsbroker.sh               |    5 +-
>  cmake/FindCeph.cmake                 |   43 ++++
>  conf/hypertable.cfg                  |    5 +
>  src/CMakeLists.txt                   |    4 +
>  src/cc/Common/Config.cc              |    6 +
>  src/cc/DfsBroker/ceph/CMakeLists.txt |   26 ++
>  src/cc/DfsBroker/ceph/CephBroker.cc  |  447 ++++++++++++++++++++++++++
> ++++++++
>  src/cc/DfsBroker/ceph/CephBroker.h   |  106 ++++++++
>  src/cc/DfsBroker/ceph/libceph.h      |   77 ++++++
>  src/cc/DfsBroker/ceph/main.cc        |   76 ++++++
>  11 files changed, 795 insertions(+), 1 deletions(-)
>  create mode 100644 cmake/FindCeph.cmake
>  create mode 100644 src/cc/DfsBroker/ceph/CMakeLists.txt
>  create mode 100644 src/cc/DfsBroker/ceph/CephBroker.cc
>  create mode 100644 src/cc/DfsBroker/ceph/CephBroker.h
>  create mode 100644 src/cc/DfsBroker/ceph/libceph.h
>  create mode 100644 src/cc/DfsBroker/ceph/main.cc
>
> diff --git a/CMakeLists.txt b/CMakeLists.txt
> index 041dd92..fa6f47b 100644
> --- a/CMakeLists.txt
> +++ b/CMakeLists.txt
> @@ -88,6 +88,7 @@ find_package(Doxygen)
>  find_package(Tcmalloc)
>  find_package(GoogleHash)
>  find_package(Kfs)
> +find_package(Ceph)
>  find_package(Ant)
>  find_package(JNI)
>  find_package(PythonLibs)
> diff --git a/bin/start-dfsbroker.sh b/bin/start-dfsbroker.sh
> index b9cf77b..b2c462e 100755
> --- a/bin/start-dfsbroker.sh
> +++ b/bin/start-dfsbroker.sh
> @@ -33,7 +33,7 @@ VALGRIND=
>
>  usage() {
>   echo ""
> -  echo "usage: start-dfsbroker.sh [OPTIONS] (local|hadoop|kfs)
> [<global-args>]"
> +  echo "usage: start-dfsbroker.sh [OPTIONS] (local|hadoop|kfs|ceph)
> [<global-args>]"
>   echo ""
>   echo "OPTIONS:"
>   echo "  --valgrind  run broker with valgrind"
> @@ -91,6 +91,9 @@ if [ $? != 0 ] ; then
>   elif [ "$DFS" == "local" ] ; then
>     eval $VALGRIND $HYPERTABLE_HOME/bin/localBroker \
>       --pidfile=$PIDFILE --verbose "$@" '2>&1' $LOGGER &> /dev/null &
> +  elif [ "$DFS" == "ceph" ] ; then
> +    eval $VALGRIND $HYPERTABLE_HOME/bin/cephBroker \
> +      --pidfile=$PIDFILE --verbose "$@" '2>&1' $LOGGER &> /dev/null &
>   else
>     usage
>     exit 1
> diff --git a/cmake/FindCeph.cmake b/cmake/FindCeph.cmake
> new file mode 100644
> index 0000000..efa72c2
> --- /dev/null
> +++ b/cmake/FindCeph.cmake
> @@ -0,0 +1,43 @@
> +# - Find Ceph
> +# Find the native Ceph includes and library
> +#
> +#  Ceph_INCLUDE_DIR - where to find libceph.h, etc.
> +#  Ceph_LIBRARIES   - List of libraries when using Ceph.
> +#  Ceph_FOUND       - True if Ceph found.
> +
> +
> +if (Ceph_INCLUDE)
> +  # Already in cache, be silent
> +  set(Ceph_FIND_QUIETLY TRUE)
> +endif ()
> +
> +find_path(Ceph_INCLUDE libceph.h
> +  /usr/local/include
> +  /usr/include
> +  $ENV{HOME}/ceph/src/client
> +)
> +mark_as_advanced(Ceph_INCLUDE)
> +
> +find_library(Ceph_LIB
> +       NAMES ceph
> +       PATHS /usr/local/lib
> +             $ENV{HOME}/ceph/src/.libs)
> +mark_as_advanced(Ceph_LIB)
> +
> +if (Ceph_INCLUDE AND Ceph_LIB)
> +  set(Ceph_FOUND TRUE)
> +  set(Ceph_LIBRARIES ${Ceph_LIB})
> +else ()
> +   set(Ceph_FOUND FALSE)
> +   set(Ceph_LIBRARIES)
> +endif ()
> +
> +if (Ceph_FOUND)
> +   message(STATUS "Found ceph: ${Ceph_LIBRARIES}")
> +else ()
> +   message(STATUS "Did not find ceph libraries")
> +   if (Ceph_FIND_REQUIRED)
> +      message(FATAL_ERROR "Could NOT find ceph libraries")
> +   endif ()
> +endif ()
> +
> diff --git a/conf/hypertable.cfg b/conf/hypertable.cfg
> index 6a58166..55d86b9 100644
> --- a/conf/hypertable.cfg
> +++ b/conf/hypertable.cfg
> @@ -10,6 +10,11 @@ HdfsBroker.Port=38030
>  HdfsBroker.fs.default.name=hdfs://localhost:9000
>  HdfsBroker.Workers=20
>
> +# Ceph Broker
> +CephBroker.Port=38030
> +CephBroker.Workers=20
> +CephBroker.MonAddr=10.0.1.245:6789
> +
>  # Local Broker
>  DfsBroker.Local.Port=38030
>  DfsBroker.Local.Root=fs/local
> diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
> index 7101caa..68aff25 100644
> --- a/src/CMakeLists.txt
> +++ b/src/CMakeLists.txt
> @@ -38,3 +38,7 @@ endif ()
>  if (Kfs_FOUND)
>    add_subdirectory(cc/DfsBroker/kosmos)
>  endif ()
> +
> +if (Ceph_FOUND)
> +   add_subdirectory(cc/DfsBroker/ceph)
> +endif ()
> diff --git a/src/cc/Common/Config.cc b/src/cc/Common/Config.cc
> index e109827..5060dfc 100644
> --- a/src/cc/Common/Config.cc
> +++ b/src/cc/Common/Config.cc
> @@ -155,6 +155,12 @@ void init_default_options() {
>         "time, in milliseconds, before timing out requests (system
> wide)")
>     ("Hypertable.MetaLog.SkipErrors", boo()->default_value(false),
> "Skipping "
>         "errors instead of throwing exceptions on metalog errors")
> +    ("CephBroker.Port", i16(),
> +     "Port number on which to listen (read by CephBroker only)")
> +    ("CephBroker.Workers", i32(),
> +     "Number of Ceph broker worker threads created, maybe")
> +    ("CephBroker.MonAddr", str(),
> +     "Ceph monitor address to connect to")
>     ("HdfsBroker.Port", i16(),
>         "Port number on which to listen (read by HdfsBroker only)")
>     ("HdfsBroker.fs.default.name", str(), "Hadoop Filesystem "
> diff --git a/src/cc/DfsBroker/ceph/CMakeLists.txt b/src/cc/DfsBroker/
> ceph/CMakeLists.txt
> new file mode 100644
> index 0000000..68e06be
> --- /dev/null
> +++ b/src/cc/DfsBroker/ceph/CMakeLists.txt
> @@ -0,0 +1,26 @@
> +#
> +# Copyright (C) 2008 Doug Judd (Zvents, Inc.)
> +#
> +# This program is free software; you can redistribute it and/or
> +# modify it under the terms of the GNU General Public License
> +# as published by the Free Software Foundation; either version 2
> +# of the License, or any later version.
> +#
> +# This program is distributed in the hope that it will be useful,
> +# but WITHOUT ANY WARRANTY; without even the implied warranty of
> +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> +# GNU General Public License for more details.
> +#
> +# You should have received a copy of the GNU General Public License
> +# along with this program; if not, write to the Free Software
> +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
> +# 02110-1301, USA.
> +#
> +
> +# cephbroker
> +
> +include_directories(${Ceph_INCLUDE})
> +add_executable(cephBroker main.cc CephBroker.cc)
> +target_link_libraries(cephBroker ceph HyperDfsBroker $
> {Ceph_LIBRARIES} ${MALLOC_LIBRARY})
> +
> +install(TARGETS cephBroker RUNTIME DESTINATION ${VERSION}/bin)
> diff --git a/src/cc/DfsBroker/ceph/CephBroker.cc b/src/cc/DfsBroker/
> ceph/CephBroker.cc
> new file mode 100644
> index 0000000..1b29131
> --- /dev/null
> +++ b/src/cc/DfsBroker/ceph/CephBroker.cc
> @@ -0,0 +1,447 @@
> +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -
> *-
> +// vim: ts=8 sw=2 smarttab
> +/*
> + * Ceph - scalable distributed file system
> + *
> + * Copyright (C) 2004-2006 Sage Weil <[email protected]>
> + *
> + * This is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License version 2.1, as published by the Free Software
> + * Foundation.  See file COPYING.
> + *
> + */
> +
> +#include "Common/Compat.h"
> +#include <cerrno>
> +#include <string>
> +
> +extern "C" {
> +#include <fcntl.h>
> +#include <poll.h>
> +#include <sys/types.h>
> +#include <sys/uio.h>
> +#include <unistd.h>
> +}
> +
> +#include "Common/FileUtils.h"
> +#include "Common/System.h"
> +#include "CephBroker.h"
> +
> +using namespace Hypertable;
> +
> +atomic_t CephBroker::ms_next_fd = ATOMIC_INIT(0);
> +
> +CephBroker::CephBroker(PropertiesPtr& cfg) {
> +  m_verbose = cfg->get_bool("Hypertable.Verbose");
> +  m_root_dir = "";
> +  //construct an arguments array
> +  const char *argv[10];
> +  int argc = 0;
> +  argv[argc++] = "cephBroker";
> +  argv[argc++] = "-m";
> +  argv[argc++] = (cfg->get_str("CephBroker.MonAddr").c_str());
> +  /*
> +  // For Ceph debugging, uncomment these lines
> +  argv[argc++] = "--debug_client";
> +  argv[argc++] = "0";
> +  argv[argc++] = "--debug_ms";
> +  argv[argc++] = "0";
> +  argv[argc++] = "--lockdep";
> +  argv[argc++] = "0"; */
> +
> +  HT_INFO("Calling ceph_initialize");
> +  ceph_initialize(argc, argv);
> +  HT_INFO("Calling ceph_mount");
> +  ceph_mount();
> +  HT_INFO("Returning from constructor");
> +}
> +
> +CephBroker::~CephBroker() {
> +  ceph_deinitialize();
> +}
> +
> +void CephBroker::open(ResponseCallbackOpen *cb, const char *fname,
> uint32_t bufsz) {
> +  int fd, ceph_fd;
> +  String abspath;
> +  HT_DEBUGF("open file='%s' bufsz=%d", fname, bufsz);
> +
> +  make_abs_path(fname, abspath);
> +
> +  fd = atomic_inc_return(&ms_next_fd);
> +
> +  if ((ceph_fd = ceph_open(abspath.c_str(), O_RDONLY)) < 0) {
> +    report_error(cb, -ceph_fd);
> +    return;
> +  }
> +  HT_INFOF("open (%s) fd=%d ceph_fd=%d", fname, fd, ceph_fd);
> +
> +  {
> +    struct sockaddr_in addr;
> +    OpenFileDataCephPtr fdata (new OpenFileDataCeph(abspath, ceph_fd,
> O_RDONLY));
> +
> +    cb->get_address(addr);
> +
> +    m_open_file_map.create(fd, addr, fdata);
> +
> +    cb->response(fd);
> +  }
> +}
> +
> +void CephBroker::create(ResponseCallbackOpen *cb, const char *fname,
> bool overwrite,
> +                       int32_t bufsz, int16_t replication, int64_t blksz){
> +  int fd, ceph_fd;
> +  int flags;
> +  String abspath;
> +
> +  make_abs_path(fname, abspath);
> +  HT_DEBUGF("create file='%s' overwrite=%d bufsz=%d replication=%d
> blksz=%lld",
> +            fname, (int)overwrite, bufsz, (int)replication, (Lld)
> blksz);
> +
> +  fd = atomic_inc_return(&ms_next_fd);
> +
> +  if (overwrite)
> +    flags = O_WRONLY | O_CREAT | O_TRUNC;
> +  else
> +    flags = O_WRONLY | O_CREAT | O_APPEND;
> +
> +  //make sure the directories in the path exist
> +  String directory = abspath.substr(0, abspath.rfind('/'));
> +  int r;
> +  HT_INFOF("Calling mkdirs on %s", directory.c_str());
> +  if((r=ceph_mkdirs(directory.c_str(), 0644)) < 0 && r!=-EEXIST) {
> +    HT_ERRORF("create failed on mkdirs: dname='%s' - %d",
> directory.c_str(), -r);
> +    report_error(cb, -r);
> +    return;
> +  }
> +
> +  //create file
> +  if ((ceph_fd = ceph_open(abspath.c_str(), flags, 0644)) < 0) {
> +    HT_ERRORF("open failed: file=%s - %s",  abspath.c_str(), strerror
> (-ceph_fd));
> +    report_error(cb, ceph_fd);
> +    return;
> +  }
> +
> +  HT_INFOF("create % s  = %d", fname, ceph_fd);
> +
> +  {
> +    struct sockaddr_in addr;
> +    OpenFileDataCephPtr fdata (new OpenFileDataCeph(fname, ceph_fd,
> O_WRONLY));
> +
> +    cb->get_address(addr);
> +
> +    m_open_file_map.create(fd, addr, fdata);
> +
> +    cb->response(fd);
> +  }
> +}
> +
> +void CephBroker::close(ResponseCallback *cb, uint32_t fd) {
> +  if (m_verbose) {
> +    HT_INFOF("close fd=%d", fd);
> +  }
> +  OpenFileDataCephPtr fdata;
> +  m_open_file_map.get(fd, fdata);
> +  m_open_file_map.remove(fd);
> +  cb->response_ok();
> +}
> +
> +void CephBroker::read(ResponseCallbackRead *cb, uint32_t fd, uint32_t
> amount) {
> +  OpenFileDataCephPtr fdata;
> +  ssize_t nread;
> +  uint64_t offset;
> +  StaticBuffer buf(new uint8_t [amount], amount);
> +
> +  HT_DEBUGF("read fd=%d amount = %d", fd, amount);
> +
> +  if (!m_open_file_map.get(fd, fdata)) {
> +    char errbuf[32];
> +    sprintf(errbuf, "%d", fd);
> +    cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
> +    HT_ERRORF("bad file handle: %d", fd);
> +    return;
> +  }
> +
> +  if ((offset = ceph_lseek(fdata->fd, 0, SEEK_CUR)) < 0) {
> +    HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=0 SEEK_CUR -
> %s", fd, fdata->fd, strerror(-offset));
> +    report_error(cb, offset);
> +    return;
> +  }
> +
> +  if ((nread = ceph_read(fdata->fd, (char *)buf.base, amount)) < 0 )
> {
> +    HT_ERRORF("read failed: fd=%d ceph_fd=%d amount=%d", fd, fdata-
>>fd, amount);
> +    report_error(cb, -nread);
> +    return;
> +  }
> +
> +  buf.size = nread;
> +  cb->response(offset, buf);
> +}
> +
> +void CephBroker::append(ResponseCallbackAppend *cb, uint32_t fd,
> +                       uint32_t amount, const void *data, bool sync)
> +{
> +  OpenFileDataCephPtr fdata;
> +  ssize_t nwritten;
> +  uint64_t offset;
> +
> +  HT_DEBUG_OUT << "append fd="<< fd <<" amount="<< amount <<" data='"
> +              << format_bytes(20, data, amount) <<" sync="<< sync <<
> HT_END;
> +
> +  if (!m_open_file_map.get(fd, fdata)) {
> +    char errbuf[32];
> +    sprintf(errbuf, "%d", fd);
> +    cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
> +    return;
> +  }
> +
> +  if ((offset = (uint64_t)ceph_lseek(fdata->fd, 0, SEEK_CUR)) < 0) {
> +    HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=0 SEEK_CUR -
> %s", fd, fdata->fd,
> +              strerror(-offset));
> +    report_error(cb, offset);
> +    return;
> +  }
> +
> +  if ((nwritten = ceph_write(fdata->fd, (const char *)data, amount))
> < 0) {
> +    HT_ERRORF("write failed: fd=%d ceph_fd=%d amount=%d - %s", fd,
> fdata->fd, amount,
> +              strerror(-nwritten));
> +    report_error(cb, -nwritten);
> +    return;
> +  }
> +
> +  int r;
> +  if (sync && ((r = ceph_fsync(fdata->fd, true)) != 0)) {
> +    HT_ERRORF("flush failed: fd=%d ceph_fd=%d - %s", fd, fdata->fd,
> strerror(errno));
> +    report_error(cb, r);
> +    return;
> +  }
> +
> +  cb->response(offset, nwritten);
> +}
> +
> +void CephBroker::seek(ResponseCallback *cb, uint32_t fd, uint64_t
> offset) {
> +  OpenFileDataCephPtr fdata;
> +
> +  HT_DEBUGF("seek fd=%lu offset=%llu", (Lu)fd, (Llu)offset);
> +
> +  if (!m_open_file_map.get(fd, fdata)) {
> +    char errbuf[32];
> +    sprintf(errbuf, "%d", fd);
> +    cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
> +    return;
> +  }
> +  int r;
> +  if ((r = (uint64_t)ceph_lseek(fdata->fd, offset, SEEK_SET)) < 0) {
> +    HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=%llu - %s", fd,
> fdata->fd,
> +             (Llu)offset, strerror(-r));
> +    report_error(cb, offset);
> +    return;
> +  }
> +
> +  cb->response_ok();
> +}
> +
> +void CephBroker::remove(ResponseCallback *cb, const char *fname) {
> +  String abspath;
> +
> +  HT_DEBUGF("remove file='%s'", fname);
> +
> +  make_abs_path(fname, abspath);
> +
> +  int r;
> +  if ((r = ceph_unlink(abspath.c_str())) < 0) {
> +    HT_ERRORF("unlink failed: file='%s' - %s", abspath.c_str(),
> strerror(-r));
> +    report_error(cb, r);
> +    return;
> +  }
> +  cb->response_ok();
> +}
> +
> +void CephBroker::length(ResponseCallbackLength *cb, const char
> *fname) {
> +  int r;
> +  struct stat statbuf;
> +
> +  HT_DEBUGF("length file='%s'", fname);
> +
> +  if ((r = ceph_lstat(fname, &statbuf)) < 0) {
> +    String abspath;
> +    make_abs_path(fname, abspath);
> +    HT_ERRORF("length (stat) failed: file='%s' - %s", abspath.c_str
> (), strerror(-r));
> +    report_error(cb,- r);
> +    return;
> +  }
> +  cb->response(statbuf.st_size);
> +}
> +
> +void CephBroker::pread(ResponseCallbackRead *cb, uint32_t fd,
> uint64_t offset,
> +                      uint32_t amount) {
> +  OpenFileDataCephPtr fdata;
> +  ssize_t nread;
> +  StaticBuffer buf(new uint8_t [amount], amount);
> +
> +  HT_DEBUGF("pread fd=%d offset=%llu amount=%d", fd, (Llu)offset,
> amount);
> +
> +  if (!m_open_file_map.get(fd, fdata)) {
> +    char errbuf[32];
> +    sprintf(errbuf, "%d", fd);
> +    cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
> +    return;
> +  }
> +
> +  if ((nread = ceph_read(fdata->fd, (char *)buf.base, amount,
> offset)) < 0) {
> +    HT_ERRORF("pread failed: fd=%d ceph_fd=%d amount=%d offset=%llu -
> %s", fd, fdata->fd,
> +              amount, (Llu)offset, strerror(-nread));
> +    report_error(cb, nread);
> +    return;
> +  }
> +
> +  buf.size = nread;
> +
> +  cb->response(offset, buf);
> +}
> +
> +void CephBroker::mkdirs(ResponseCallback *cb, const char *dname) {
> +  String absdir;
> +
> +  HT_DEBUGF("mkdirs dir='%s'", dname);
> +
> +  make_abs_path(dname, absdir);
> +  int r;
> +  if((r=ceph_mkdirs(absdir.c_str(), 0644)) < 0 && r!=-EEXIST) {
> +    HT_ERRORF("mkdirs failed: dname='%s' - %d", absdir.c_str(), -r);
> +    report_error(cb, -r);
> +    return;
> +  }
> +  cb->response_ok();
> +}
> +
> +void CephBroker::rmdir(ResponseCallback *cb, const char *dname) {
> +  String absdir;
> +  int r;
> +
> +  make_abs_path(dname, absdir);
> +  if((r = rmdir_recursive(absdir.c_str())) < 0) {
> +      HT_ERRORF("failed to remove dir %s, got error %d", absdir.c_str
> (), r);
> +      report_error(cb, -r);
> +      return;
> +  }
> +  cb->response_ok();
> +}
> +
> +int CephBroker::rmdir_recursive(const char *directory) {
> +  DIR *dirp;
> +  struct dirent de;
> +  struct stat st;
> +  int r;
> +  if ((r = ceph_opendir(directory, &dirp) < 0))
> +    return r; //failed to open
> +  while (r = ceph_readdirplus_r(dirp, &de, &st, 0) > 0) {
> +    String new_dir = de.d_name;
> +    if(!(new_dir.compare(".")==0 || new_dir.compare("..")==0)) {
> +      new_dir = directory;
> +      new_dir += '/';
> +      new_dir += de.d_name;
> +      if (S_ISDIR(st.st_mode)) { //it's a dir, clear it out...
> +       if((r=rmdir_recursive(new_dir.c_str())) < 0) return r;
> +      } else { //delete this file
> +       if((r=ceph_unlink(new_dir.c_str())) < 0) return r;
> +      }
> +    }
> +  }
> +  if (r < 0) return r; //we got an error
> +  if ((r = ceph_closedir(dirp)) < 0) return r;
> +  return ceph_rmdir(directory);
> +}
> +
> +void CephBroker::flush(ResponseCallback *cb, uint32_t fd) {
> +  OpenFileDataCephPtr fdata;
> +
> +  HT_DEBUGF("flush fd=%d", fd);
> +
> +  if (!m_open_file_map.get(fd, fdata)) {
> +    char errbuf[32];
> +    sprintf(errbuf, "%d", fd);
> +    cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
> +    return;
> +  }
> +
> +  int r;
> +  if ((r = ceph_fsync(fdata->fd, true)) != 0) {
> +    HT_ERRORF("flush failed: fd=%d  ceph_fd=%d - %s", fd, fdata->fd,
> strerror(-r));
> +    report_error(cb, -r);
> +    return;
> +  }
> +
> +  cb->response_ok();
> +}
> +
> +void CephBroker::status(ResponseCallback *cb) {
> +  cb->response_ok();
> +  /*perhaps a total cheat, but both the local and Kosmos brokers
> +    included in Hypertable also do this. */
> +}
> +
> +void CephBroker::shutdown(ResponseCallback *cb) {
> +  m_open_file_map.remove_all();
> +  cb->response_ok();
> +  poll(0, 0, 2000);
> +}
> +
> +void CephBroker::readdir(ResponseCallbackReaddir *cb, const char
> *dname) {
> +  std::vector<String> listing;
> +  String absdir;
> +
> +  HT_DEBUGF("Readdir dir='%s'", dname);
> +
> +  //get from ceph in list<string>
> +  make_abs_path(dname, absdir);
> +  std::list<String> dir_con;
> +  ceph_getdir(absdir.c_str(), dir_con);
> +
> +  //convert to vector<String>
> +  for (std::list<String>::iterator i = dir_con.begin(); i!=dir_con.end
> (); ++i) {
> +    if (!(i->compare(".")==0 || i->compare("..")==0))
> +      listing.push_back(*i);
> +  }
> +  cb->response(listing);
> +}
> +
> +void CephBroker::exists(ResponseCallbackExists *cb, const char
> *fname) {
> +  String abspath;
> +  struct stat statbuf;
> +
> +  HT_DEBUGF("exists file='%s'", fname);
> +  make_abs_path(fname, abspath);
> +  cb->response(ceph_lstat(abspath.c_str(), &statbuf) == 0);
> +}
> +
> +void CephBroker::rename(ResponseCallback *cb, const char *src, const
> char *dst) {
> +  String src_abs;
> +  String dest_abs;
> +  int r;
> +
> +  make_abs_path(src, src_abs);
> +  make_abs_path(dst, dest_abs);
> +  if ((r = ceph_rename(src_abs.c_str(), dest_abs.c_str())) <0 ) {
> +    report_error(cb, r);
> +    return;
> +  }
> +  cb->response_ok();
> +}
> +
> +void CephBroker::debug(ResponseCallback *cb, int32_t command,
> +                      StaticBuffer &serialized_parameters) {
> +  HT_ERROR("debug commands not implemented!");
> +  cb->error(Error::NOT_IMPLEMENTED, format("Debug commands not
> supported"));
> +}
> +
> +void CephBroker::report_error(ResponseCallback *cb, int error) {
> +  char errbuf[128];
> +  errbuf[0] = 0;
> +
> +  strerror_r(error, errbuf, 128);
> +
> +  cb->error(Error::DFSBROKER_IO_ERROR, errbuf);
> +}
> +
> +
> diff --git a/src/cc/DfsBroker/ceph/CephBroker.h b/src/cc/DfsBroker/
> ceph/CephBroker.h
> new file mode 100644
> index 0000000..d860dfe
> --- /dev/null
> +++ b/src/cc/DfsBroker/ceph/CephBroker.h
> @@ -0,0 +1,106 @@
> +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -
> *-
> +// vim: ts=8 sw=2 smarttab
> +/*
> + * Ceph - scalable distributed file system
> + *
> + * Copyright (C) 2004-2006 Sage Weil <[email protected]>
> + *
> + * This is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License version 2.1, as published by the Free Software
> + * Foundation.  See file COPYING.
> + *
> + */
> +
> +#ifndef HYPERTABLE_CEPHBROKER_H
> +#define HYPERTABLE_CEPHBROKER_H
> +
> +extern "C" {
> +#include <unistd.h>
> +}
> +
> +#include "libceph.h"
> +#include "Common/String.h"
> +#include "Common/atomic.h"
> +#include "Common/Properties.h"
> +
> +#include "DfsBroker/Lib/Broker.h"
> +
> +namespace Hypertable {
> +  using namespace DfsBroker;
> +  /**
> +   *
> +   */
> +  class OpenFileDataCeph : public OpenFileData {
> +  public:
> +    OpenFileDataCeph(const String& fname, int _fd, int _flags) :
> +      fd(_fd), flags(_flags), filename(fname) {}
> +    virtual ~OpenFileDataCeph() { ceph_close(fd); }
> +    int fd;
> +    int flags;
> +    String filename;
> +  };
> +
> +  /**
> +   *
> +   */
> +  class OpenFileDataCephPtr : public OpenFileDataPtr {
> +  public:
> +    OpenFileDataCephPtr() : OpenFileDataPtr() { }
> +    OpenFileDataCephPtr(OpenFileDataCeph *ofdl) : OpenFileDataPtr
> (ofdl, true) { }
> +    OpenFileDataCeph *operator->() const { return (OpenFileDataCeph *)
> get(); }
> +  };
> +
> +  /**
> +   *
> +   */
> +  class CephBroker : public DfsBroker::Broker {
> +  public:
> +    CephBroker(PropertiesPtr& cfg);
> +    virtual ~CephBroker();
> +
> +    virtual void open(ResponseCallbackOpen *cb, const char *fname,
> +                      uint32_t bufsz);
> +    virtual void
> +    create(ResponseCallbackOpen *cb, const char *fname, bool
> overwrite,
> +           int32_t bufsz, int16_t replication, int64_t blksz);
> +    virtual void close(ResponseCallback *cb, uint32_t fd);
> +    virtual void read(ResponseCallbackRead *cb, uint32_t fd, uint32_t
> amount);
> +    virtual void append(ResponseCallbackAppend *cb, uint32_t fd,
> +                        uint32_t amount, const void *data, bool
> sync);
> +    virtual void seek(ResponseCallback *cb, uint32_t fd, uint64_t
> offset);
> +    virtual void remove(ResponseCallback *cb, const char *fname);
> +    virtual void length(ResponseCallbackLength *cb, const char
> *fname);
> +    virtual void pread(ResponseCallbackRead *cb, uint32_t fd,
> uint64_t offset,
> +                       uint32_t amount);
> +    virtual void mkdirs(ResponseCallback *cb, const char *dname);
> +    virtual void rmdir(ResponseCallback *cb, const char *dname);
> +    virtual void flush(ResponseCallback *cb, uint32_t fd);
> +    virtual void status(ResponseCallback *cb);
> +    virtual void shutdown(ResponseCallback *cb);
> +    virtual void readdir(ResponseCallbackReaddir *cb, const char
> *dname);
> +    virtual void exists(ResponseCallbackExists *cb, const char
> *fname);
> +    virtual void rename(ResponseCallback *cb, const char *src, const
> char *dst);
> +    virtual void debug(ResponseCallback *, int32_t command,
> +                       StaticBuffer &serialized_parameters);
> +
> +  private:
> +    static atomic_t ms_next_fd;
> +
> +    virtual void report_error(ResponseCallback *cb, int error);
> +
> +    void make_abs_path(const char *fname, String& abs) {
> +      if (fname[0] == '/')
> +       abs = fname;
> +      else
> +       abs = m_root_dir + "/" + fname;
> +    }
> +
> +    int rmdir_recursive(const char *directory);
> +
> +    bool m_verbose;
> +    String m_root_dir;
> +  };
> +}
> +
> +#endif //HYPERTABLE_CEPH_BROKER_H
> diff --git a/src/cc/DfsBroker/ceph/libceph.h b/src/cc/DfsBroker/ceph/
> libceph.h
> new file mode 100644
> index 0000000..98109a0
> --- /dev/null
> +++ b/src/cc/DfsBroker/ceph/libceph.h
> @@ -0,0 +1,77 @@
> +#ifndef __LIBCEPH_H
> +#define __LIBCEPH_H
> +#include <netinet/in.h>
> +#include <sys/statvfs.h>
> +#include <utime.h>
> +#include <sys/stat.h>
> +#include <stdbool.h>
> +#include <sys/types.h>
> +#include <unistd.h>
> +#include <dirent.h>
> +
> +#ifdef __cplusplus
> +#include <list>
> +#include <string>
> +extern "C" {
> +#endif
> +
> +  struct frag_info_t;
> +int ceph_initialize(int argc, const char **argv);
> +void ceph_deinitialize();
> +
> +int ceph_mount();
> +int ceph_umount();
> +
> +int ceph_statfs(const char *path, struct statvfs *stbuf);
> +
> +int ceph_chdir (const char *s);
> +const char *ceph_getcwd();
> +
> +int ceph_opendir(const char *name, DIR **dirpp);
> +int ceph_closedir(DIR *dirp);
> +int ceph_readdir_r(DIR *dirp, struct dirent *de);
> +int ceph_readdirplus_r(DIR *dirp, struct dirent *de, struct stat *st,
> int *stmask);
> +void ceph_rewinddir(DIR *dirp);
> +loff_t ceph_telldir(DIR *dirp);
> +void ceph_seekdir(DIR *dirp, loff_t offset);
> +
> +int ceph_link (const char *existing, const char *newname);
> +int ceph_unlink (const char *path);
> +int ceph_rename(const char *from, const char *to);
> +
> +// dirs
> +int ceph_mkdir(const char *path, mode_t mode);
> +int ceph_mkdirs(const char *path, mode_t mode);
> +int ceph_rmdir(const char *path);
> +
> +// symlinks
> +int ceph_readlink(const char *path, char *buf, loff_t size);
> +int ceph_symlink(const char *existing, const char *newname);
> +
> +// inode stuff
> +int ceph_lstat(const char *path, struct stat *stbuf, frag_info_t
> *dirstat=0);
> +
> +int ceph_setattr(const char *relpath, struct stat *attr, int mask);
> +int ceph_chmod(const char *path, mode_t mode);
> +int ceph_chown(const char *path, uid_t uid, gid_t gid);
> +int ceph_utime(const char *path, struct utimbuf *buf);
> +int ceph_truncate(const char *path, loff_t size);
> +
> +// file ops
> +int ceph_mknod(const char *path, mode_t mode, dev_t rdev=0);
> +int ceph_open(const char *path, int flags, mode_t mode=0);
> +int ceph_close(int fd);
> +loff_t ceph_lseek(int fd, loff_t offset, int whence);
> +int ceph_read(int fd, char *buf, loff_t size, loff_t offset=-1);
> +int ceph_write(int fd, const char *buf, loff_t size, loff_t
> offset=-1);
> +int ceph_ftruncate(int fd, loff_t size);
> +int ceph_fsync(int fd, bool syncdataonly);
> +int ceph_fstat(int fd, struct stat *stbuf);
> +
> +int ceph_sync_fs();
> +#ifdef __cplusplus
> +int ceph_getdir(const char *relpath, std::list<std::string>&
> names); //not for C, sorry!
> +}
> +#endif
> +
> +#endif
> diff --git a/src/cc/DfsBroker/ceph/main.cc b/src/cc/DfsBroker/ceph/
> main.cc
> new file mode 100644
> index 0000000..b77307a
> --- /dev/null
> +++ b/src/cc/DfsBroker/ceph/main.cc
> @@ -0,0 +1,76 @@
> +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -
> *-
> +// vim: ts=8 sw=2 smarttab
> +/*
> + * Ceph - scalable distributed file system
> + *
> + * Copyright (C) 2004-2006 Sage Weil <[email protected]>
> + *
> + * This is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License version 2.1, as published by the Free Software
> + * Foundation.  See file COPYING.
> + *
> + */
> +
> +#include "Common/Compat.h"
> +#include <iostream>
> +#include <fstream>
> +#include <string>
> +
> +extern "C" {
> +#include <poll.h>
> +#include <sys/types.h>
> +#include <unistd.h>
> +}
> +
> +#include "Common/FileUtils.h"
> +#include "Common/System.h"
> +#include "Common/Usage.h"
> +
> +#include "AsyncComm/ApplicationQueue.h"
> +#include "AsyncComm/Comm.h"
> +
> +#include "DfsBroker/Lib/Config.h"
> +#include "DfsBroker/Lib/ConnectionHandlerFactory.h"
> +
> +#include "CephBroker.h"
> +
> +using namespace Hypertable;
> +using namespace Config;
> +using namespace std;
> +
> +struct AppPolicy : Config::Policy {
> +  static void init() {
> +    alias("reactors", "DfsBroker.Ceph.Reactors");
> +    alias("workers", "DfsBroker.Ceph.Workers");
> +    alias("ceph_mon", "CephBroker.MonAddr");
> +    alias("port", "CephBroker.Port");
> +  }
> +};
> +
> +typedef Meta::list<AppPolicy, DfsBrokerPolicy, DefaultCommPolicy>
> Policies;
> +
> +int main (int argc, char **argv) {
> +  //  HT_INFOF("ceph/main attempting to create pieces %d", argc);
> +  try {
> +    init_with_policies<Policies>(argc, argv);
> +    int port = get_i16("CephBroker.Port");
> +    int worker_count = get_i32("CephBroker.Workers");
> +    Comm *comm = Comm::instance();
> +    ApplicationQueuePtr app_queue = new ApplicationQueue
> (worker_count);
> +    HT_INFOF("attemping to create new CephBroker with address %s",
> properties->get_str("CephBroker.MonAddr").c_str());
> +    BrokerPtr broker = new CephBroker(properties);
> +    HT_INFO("Created CephBroker!");
> +    ConnectionHandlerFactoryPtr chfp =
> +      new DfsBroker::ConnectionHandlerFactory(comm, app_queue,
> broker);
> +    InetAddr listen_addr(INADDR_ANY, port);
> +
> +    comm->listen(listen_addr, chfp);
> +    app_queue->join();
> +  }
> +  catch(Exception &e) {
> +    HT_ERROR_OUT << e << HT_END;
> +    return 1;
> +  }
> +  return 0;
> +}
> --
> 1.5.6.5
>
>
> >
>

--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups 
"Hypertable Development" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to 
[email protected]
For more options, visit this group at 
http://groups.google.com/group/hypertable-dev?hl=en
-~----------~----~----~----~------~----~------~--~---

Reply via email to