From f1295603a939ba034927c10d020d763780eb6c22 Mon Sep 17 00:00:00 2001
From: Mateusz Berezecki <mateuszb@gmail.com>
Date: Sat, 10 Oct 2009 14:51:26 +0200
Subject: [PATCH 2/2] Added support for loading data into a table from a DFS file (Issue 259).

---
 src/cc/Hypertable/Lib/CMakeLists.txt           |   16 ++--
 src/cc/Hypertable/Lib/DfsSource.cc             |   92 +++++++++++++++++++++
 src/cc/Hypertable/Lib/DfsSource.h              |   57 +++++++++++++
 src/cc/Hypertable/Lib/HqlInterpreter.cc        |   13 +++-
 src/cc/Hypertable/Lib/LoadDataSourceFactory.cc |    5 +
 src/cc/Hypertable/Lib/LoadDataSourceFileDfs.cc |  103 ++++++++++++++++++++++++
 src/cc/Hypertable/Lib/LoadDataSourceFileDfs.h  |   70 ++++++++++++++++
 7 files changed, 348 insertions(+), 8 deletions(-)
 create mode 100644 src/cc/Hypertable/Lib/DfsSource.cc
 create mode 100644 src/cc/Hypertable/Lib/DfsSource.h
 create mode 100644 src/cc/Hypertable/Lib/LoadDataSourceFileDfs.cc
 create mode 100644 src/cc/Hypertable/Lib/LoadDataSourceFileDfs.h

diff --git a/src/cc/Hypertable/Lib/CMakeLists.txt b/src/cc/Hypertable/Lib/CMakeLists.txt
index d8c62fc..381dacf 100644
--- a/src/cc/Hypertable/Lib/CMakeLists.txt
+++ b/src/cc/Hypertable/Lib/CMakeLists.txt
@@ -36,6 +36,7 @@ CommitLogReader.cc
 Config.cc
 DataGenerator.cc
 Defaults.cc
+DfsSource.cc
 EventHandlerMasterChange.cc
 Filesystem.cc
 FixedRandomStringGenerator.cc
@@ -48,6 +49,7 @@ KeySpec.cc
 LoadDataEscape.cc
 LoadDataSource.cc
 LoadDataSourceFileLocal.cc
+LoadDataSourceFileDfs.cc
 LoadDataSourceStdin.cc
 LoadDataSourceFactory.cc
 LocationCache.cc
@@ -93,12 +95,12 @@ quicklz/quicklz.cc
 )
 
 add_library(Hypertable ${Hypertable_SRCS})
-add_dependencies(Hypertable HyperComm Hyperspace HyperCommon)
-target_link_libraries(Hypertable ${EXPAT_LIBRARIES} Hyperspace)
+add_dependencies(Hypertable HyperComm Hyperspace HyperCommon HyperDfsBroker)
+target_link_libraries(Hypertable ${EXPAT_LIBRARIES} Hyperspace HyperDfsBroker)
 
 # generate_test_data
 add_executable(generate_test_data generate_test_data.cc)
-target_link_libraries(generate_test_data Hypertable)
+target_link_libraries(generate_test_data Hypertable HyperDfsBroker)
 
 # meta log test
 add_executable(metalog_api_test tests/MetaLogApiTest.cc)
@@ -122,7 +124,7 @@ target_link_libraries(locationCacheTest Hypertable)
 
 # loadDataSourceTest
 add_executable(loadDataSourceTest tests/loadDataSourceTest.cc)
-target_link_libraries(loadDataSourceTest Hypertable)
+target_link_libraries(loadDataSourceTest Hypertable HyperDfsBroker)
 
 # compressor_test
 add_executable(compressor_test tests/compressor_test.cc)
@@ -145,15 +147,15 @@ target_link_libraries(escape_test Hypertable)
 
 # large_insert_test
 add_executable(large_insert_test tests/large_insert_test.cc)
-target_link_libraries(large_insert_test Hypertable)
+target_link_libraries(large_insert_test Hypertable HyperDfsBroker)
 
 # MutatorNoLogSyncTest
 add_executable(MutatorNoLogSyncTest tests/MutatorNoLogSyncTest.cc)
-target_link_libraries(MutatorNoLogSyncTest Hypertable)
+target_link_libraries(MutatorNoLogSyncTest Hypertable HyperDfsBroker)
 
 # periodic_flush_test
 add_executable(periodic_flush_test tests/periodic_flush_test.cc)
-target_link_libraries(periodic_flush_test Hypertable)
+target_link_libraries(periodic_flush_test Hypertable HyperDfsBroker)
 
 
 #
diff --git a/src/cc/Hypertable/Lib/DfsSource.cc b/src/cc/Hypertable/Lib/DfsSource.cc
new file mode 100644
index 0000000..4d05524
--- /dev/null
+++ b/src/cc/Hypertable/Lib/DfsSource.cc
@@ -0,0 +1,92 @@
+/** -*- c++ -*-
+ * Copyright (C) 2009 Mateusz Berezecki
+ *
+ * This file is part of Hypertable.
+ *
+ * Hypertable is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; version 2 of the
+ * License, or any later version.
+ *
+ * Hypertable is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+#include "DfsSource.h"
+
+using namespace Hypertable;
+
+DfsFileSource::DfsFileSource(DfsBroker::ClientPtr& client,
+                             const std::string& dfs_path)
+  : m_client(client), m_path(dfs_path), m_fd(-1)
+{
+}
+
+void
+DfsFileSource::open()
+{
+  m_fd = m_client->open(m_path);
+  m_file_size = m_client->length(m_path);
+  m_consumed = 0;
+}
+
+std::streamsize
+DfsFileSource::read(char *s, std::streamsize n)
+{
+  if (m_fd == -1) {
+    open();
+  }
+
+  if (m_consumed == m_file_size)
+    return -1;
+
+  std::streamsize consumed = m_client->read(m_fd, s, n);
+  m_consumed += consumed;
+
+  return consumed;
+}
+
+std::streampos
+DfsFileSource::seek(std::streamoff off,
+                    BOOST_IOS::seekdir way, BOOST_IOS::openmode)
+{
+  if (m_fd == -1) {
+    open();
+  }
+
+  if (way != BOOST_IOS::beg) {
+    /*
+      Hadoop's seek() does not conform to standard behavior
+      and does not return resulting offset upon completion
+      of seek op. The result is that it's only possible to
+      seek using absolute offsets.
+
+      XXX: throw an exception here ?
+     */
+    return 0;
+  }
+
+  m_client->seek(m_fd, off);
+  return off;
+}
+
+void
+DfsFileSource::close()
+{
+  if (m_fd != -1) {
+    m_client->close(m_fd);
+    m_fd = -1;
+  }
+}
+
+DfsFileSource::~DfsFileSource()
+{
+  close();
+}
diff --git a/src/cc/Hypertable/Lib/DfsSource.h b/src/cc/Hypertable/Lib/DfsSource.h
new file mode 100644
index 0000000..99c0c8b
--- /dev/null
+++ b/src/cc/Hypertable/Lib/DfsSource.h
@@ -0,0 +1,57 @@
+/** -*- mode: c++; -*-
+ * Copyright (C) 2009 Mateusz Berezecki
+ *
+ * This file is part of Hypertable.
+ *
+ * Hypertable is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; version 2 of the
+ * License, or any later version.
+ *
+ * Hypertable is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+#ifndef HYPERTABLE_DFSSOURCE_H
+#define HYPERTABLE_DFSSOURCE_H
+
+#include "Common/Compat.h"
+#include <string>
+#include <fstream>
+#include <boost/iostreams/concepts.hpp>
+
+#include "DfsBroker/Lib/Client.h"
+
+namespace Hypertable {
+  class DfsFileSource : public boost::iostreams::source {
+  public:
+    DfsFileSource(DfsBroker::ClientPtr& client,
+                  const std::string& dfs_path);
+
+    ~DfsFileSource();
+
+    std::streamsize read(char *s, std::streamsize n);
+    std::streampos seek(std::streamoff off,
+        BOOST_IOS::seekdir way, 
+        BOOST_IOS::openmode which = BOOST_IOS::in | BOOST_IOS::out);
+
+    void open();
+    void close();
+
+  private:
+    DfsBroker::ClientPtr m_client;
+    String m_path;
+    int m_fd;
+    size_t m_consumed;
+    size_t m_file_size;
+  };
+}
+
+#endif
diff --git a/src/cc/Hypertable/Lib/HqlInterpreter.cc b/src/cc/Hypertable/Lib/HqlInterpreter.cc
index d6558c2..99e748c 100644
--- a/src/cc/Hypertable/Lib/HqlInterpreter.cc
+++ b/src/cc/Hypertable/Lib/HqlInterpreter.cc
@@ -52,6 +52,8 @@ extern "C" {
 #include "LoadDataSource.h"
 #include "LoadDataSourceFactory.h"
 
+#include "DfsBroker/Lib/Client.h"
+
 using namespace std;
 using namespace Hypertable;
 using namespace Hql;
@@ -345,7 +347,16 @@ cmd_load_data(Client *client, uint32_t mutator_flags,
 
   HT_ON_SCOPE_EXIT(&close_file, out_fd);
 
-  cb.file_size = FileUtils::size(state.input_file.c_str());
+  if (state.input_file_src == DFS_FILE) {
+    using DfsBroker::ClientPtr;
+    using DfsBroker::Client;
+
+    ConnectionManagerPtr connmgr_ptr(new ConnectionManager());
+    ClientPtr dfs_ptr(new Client(connmgr_ptr, Config::properties));
+    cb.file_size = dfs_ptr->length(state.input_file);    
+  } else {
+    cb.file_size = FileUtils::size(state.input_file.c_str());
+  }
   cb.on_update(cb.file_size);
 
   LoadDataSourcePtr lds;
diff --git a/src/cc/Hypertable/Lib/LoadDataSourceFactory.cc b/src/cc/Hypertable/Lib/LoadDataSourceFactory.cc
index 2d322fe..a3e09ba 100644
--- a/src/cc/Hypertable/Lib/LoadDataSourceFactory.cc
+++ b/src/cc/Hypertable/Lib/LoadDataSourceFactory.cc
@@ -22,6 +22,7 @@
 #include "Common/Compat.h"
 #include "LoadDataSourceFactory.h"
 #include "LoadDataSourceFileLocal.h"
+#include "LoadDataSourceFileDfs.h"
 #include "LoadDataSourceStdin.h"
 
 using namespace Hypertable;
@@ -46,6 +47,10 @@ LoadDataSourceFactory::create(const String &input_fname, const int src,
     case STDIN:
       lds = new LoadDataSourceStdin(header_fname, row_uniquify_chars, dupkeycols);
       break;
+    case DFS_FILE:
+      lds = new LoadDataSourceFileDfs(input_fname, header_fname,
+                                  row_uniquify_chars, dupkeycols);
+      break;
     default:
       HT_THROW(Error::HQL_PARSE_ERROR, "LOAD DATA - bad filename");
   }
diff --git a/src/cc/Hypertable/Lib/LoadDataSourceFileDfs.cc b/src/cc/Hypertable/Lib/LoadDataSourceFileDfs.cc
new file mode 100644
index 0000000..047d669
--- /dev/null
+++ b/src/cc/Hypertable/Lib/LoadDataSourceFileDfs.cc
@@ -0,0 +1,103 @@
+/** -*- c++ -*-
+ * Copyright (C) 2009 Mateusz Berezecki
+ *
+ * This file is part of Hypertable.
+ *
+ * Hypertable is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; version 2 of the
+ * License, or any later version.
+ *
+ * Hypertable is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+#include "Common/Compat.h"
+#include <fstream>
+#include <iostream>
+#include <cerrno>
+#include <cctype>
+#include <cstdlib>
+#include <cstring>
+
+#include <boost/algorithm/string.hpp>
+#include <boost/algorithm/string/predicate.hpp>
+#include <boost/iostreams/filter/gzip.hpp>
+#include <boost/shared_array.hpp>
+
+extern "C" {
+#include <strings.h>
+#include <sys/types.h>
+#include <time.h>
+#include <unistd.h>
+}
+
+#include "Common/DynamicBuffer.h"
+#include "Common/Error.h"
+#include "Common/Logger.h"
+#include "Common/FileUtils.h"
+#include "Common/Config.h"
+
+#include "Key.h"
+#include "LoadDataSourceFileDfs.h"
+
+using namespace boost::iostreams;
+using namespace Hypertable;
+using namespace std;
+
+/**
+ *
+ */
+LoadDataSourceFileDfs::LoadDataSourceFileDfs(const String &fname,
+    const String &header_fname, int row_uniquify_chars, bool dupkeycols)
+  : LoadDataSource(row_uniquify_chars, dupkeycols),
+    m_header_fname(header_fname), m_file_name(fname),
+    m_conn_mgr(new ConnectionManager()),
+    m_client(new DfsBroker::Client(m_conn_mgr, Config::properties)),
+    m_source(m_client, fname)
+{
+  return;
+}
+
+void
+LoadDataSourceFileDfs::init_src()
+{
+  if (boost::algorithm::ends_with(m_file_name, ".gz")) {
+    m_fin.push(gzip_decompressor());
+    m_zipped = true;
+  }
+
+  m_fin.push(m_source);
+}
+
+String
+LoadDataSourceFileDfs::get_header()
+{
+  String header = "";
+  if (m_header_fname != "") {
+    std::ifstream in(m_header_fname.c_str());
+    getline(in, header);
+  }
+  else {
+    getline(m_fin, header);
+  }
+
+  return header;
+}
+
+uint64_t
+LoadDataSourceFileDfs::incr_consumed()
+{
+  uint64_t consumed=0;
+  uint64_t new_offset = m_source.seek(0, BOOST_IOS::cur);
+  consumed = new_offset - m_offset;
+  m_offset = new_offset;
+  return consumed;
+}
diff --git a/src/cc/Hypertable/Lib/LoadDataSourceFileDfs.h b/src/cc/Hypertable/Lib/LoadDataSourceFileDfs.h
new file mode 100644
index 0000000..57a16cc
--- /dev/null
+++ b/src/cc/Hypertable/Lib/LoadDataSourceFileDfs.h
@@ -0,0 +1,70 @@
+/** -*- c++ -*-
+ * Copyright (C) 2009 Mateusz Berezecki
+ *
+ * This file is part of Hypertable.
+ *
+ * Hypertable is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; version 2 of the
+ * License, or any later version.
+ *
+ * Hypertable is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+#ifndef HYPERTABLE_LOADDATASOURCEFILEDFS_H
+#define HYPERTABLE_LOADDATASOURCEFILEDFS_H
+
+#include <fstream>
+#include <iostream>
+#include <string>
+#include <vector>
+
+#include <boost/iostreams/device/file.hpp>
+#include <boost/iostreams/filtering_stream.hpp>
+
+#include "Common/ByteString.h"
+#include "Common/DynamicBuffer.h"
+#include "Common/String.h"
+
+#include "DataSource.h"
+#include "FixedRandomStringGenerator.h"
+#include "LoadDataSource.h"
+
+#include "DfsBroker/Lib/Client.h"
+#include "DfsSource.h"
+
+namespace Hypertable {
+
+  class LoadDataSourceFileDfs : public LoadDataSource {
+
+  public:
+    LoadDataSourceFileDfs(const String &fname, const String &header_fname,
+                          int row_uniquify_chars = 0,
+                          bool dupkeycol = false);
+
+    ~LoadDataSourceFileDfs() { };
+
+    uint64_t incr_consumed();
+
+  protected:
+    String get_header();
+    void init_src();
+
+    String m_header_fname;
+    String m_file_name;
+    ConnectionManagerPtr m_conn_mgr;
+    DfsBroker::ClientPtr m_client;
+    DfsFileSource m_source;
+  };
+
+} // namespace Hypertable
+
+#endif // HYPERTABLE_LOADDATASOURCEFILEDFS_H
-- 
1.6.4.4

