This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 6521248a2 [tool] Add tool to copy replica from local filesystem
6521248a2 is described below

commit 6521248a2f4df4f98fa3b876609408617bc9d701
Author: Yingchun Lai <acelyc1112...@gmail.com>
AuthorDate: Fri Mar 25 15:04:45 2022 +0800

    [tool] Add tool to copy replica from local filesystem
    
    Add tool to copy replica from local filesystem, it would
    be faster than copy from remote peers, it can be use when
    add more disk drivers and make data rebalanced faster between
    disk drivers, or can be used when migrate data from the only
    disk driver to another.
    This tool will also make data more dense than data on old
    data directories, that means we can save much disk space
    and speedup server bootstrap.
    We can use the tool like:
    kudu local_replica copy_from_local <tablet_id> [-src_fs_wal_dir=<dir>] 
[-src_fs_metadata_dir=<dir>] [-src_fs_data_dirs=<dirs>] [-dst_fs_wal_dir=<dir>] 
[-dst_fs_metadata_dir=<dir>] [-dst_fs_data_dirs=<dirs>]
    
    There are some tips to use this tool:
    - Using --src_* and --dst_* prefixes to clarify what directories
      are operating on
    - The server on the source filesystem must stop before coping
      replicas. Because the tool is standalone, it can't anchor
      replica's data blocks or log segments.
    - This tool will not delete data in the source filesystem, you
      should delete data manully if you want, by using
      kudu local_replica delete ...
    
    Change-Id: I1dcafeaad900b66f297914760c54dba887874e95
    Reviewed-on: http://gerrit.cloudera.org:8080/18374
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/integration-tests/tablet_copy-itest.cc    |  11 +-
 src/kudu/master/master_runner.cc                   |   7 +-
 src/kudu/mini-cluster/internal_mini_cluster.h      |   6 +
 src/kudu/tablet/tablet_replica.cc                  |   7 +
 src/kudu/tablet/tablet_replica.h                   |   9 +
 src/kudu/tools/kudu-tool-test.cc                   | 228 +++++++++++++++-
 src/kudu/tools/tool_action_local_replica.cc        | 110 +++++++-
 src/kudu/tserver/mini_tablet_server.cc             |  17 +-
 src/kudu/tserver/mini_tablet_server.h              |   4 +
 src/kudu/tserver/tablet_copy-test-base.h           |   4 +-
 src/kudu/tserver/tablet_copy_client-test.cc        | 195 +++++++++++---
 src/kudu/tserver/tablet_copy_client.cc             | 296 +++++++++++++++++----
 src/kudu/tserver/tablet_copy_client.h              | 130 ++++++---
 src/kudu/tserver/tablet_copy_service.cc            |  23 +-
 src/kudu/tserver/tablet_copy_service.h             |  14 +-
 .../tserver/tablet_copy_source_session-test.cc     |  23 +-
 src/kudu/tserver/tablet_copy_source_session.cc     | 120 +++++++--
 src/kudu/tserver/tablet_copy_source_session.h      |  75 ++++--
 src/kudu/tserver/ts_tablet_manager.cc              |   4 +-
 19 files changed, 1040 insertions(+), 243 deletions(-)

diff --git a/src/kudu/integration-tests/tablet_copy-itest.cc 
b/src/kudu/integration-tests/tablet_copy-itest.cc
index ced7108df..5e06eec6b 100644
--- a/src/kudu/integration-tests/tablet_copy-itest.cc
+++ b/src/kudu/integration-tests/tablet_copy-itest.cc
@@ -129,7 +129,7 @@ using kudu::tablet::TabletDataState;
 using kudu::tablet::TabletSuperBlockPB;
 using kudu::tserver::ListTabletsResponsePB;
 using kudu::tserver::ListTabletsResponsePB_StatusAndSchemaPB;
-using kudu::tserver::TabletCopyClient;
+using kudu::tserver::RemoteTabletCopyClient;
 using std::atomic;
 using std::lock_guard;
 using std::mutex;
@@ -513,12 +513,11 @@ TEST_F(TabletCopyITest, TestDeleteTabletDuringTabletCopy) 
{
 
   {
     // Start up a TabletCopyClient and open a tablet copy session.
-    TabletCopyClient tc_client(tablet_id, fs_manager.get(),
-                               cmeta_manager, cluster_->messenger(),
-                               nullptr /* no metrics */);
+    RemoteTabletCopyClient tc_client(tablet_id, fs_manager.get(),
+                                     cmeta_manager, cluster_->messenger(),
+                                     nullptr /* no metrics */);
     scoped_refptr<tablet::TabletMetadata> meta;
-    
ASSERT_OK(tc_client.Start(cluster_->tablet_server(kTsIndex)->bound_rpc_hostport(),
-                              &meta));
+    
ASSERT_OK(tc_client.Start(cluster_->tablet_server(kTsIndex)->bound_rpc_hostport(),
 &meta));
 
     // Tombstone the tablet on the remote!
     ASSERT_OK(DeleteTablet(ts, tablet_id, TABLET_DATA_TOMBSTONED, timeout));
diff --git a/src/kudu/master/master_runner.cc b/src/kudu/master/master_runner.cc
index fffd28ff0..312149f6d 100644
--- a/src/kudu/master/master_runner.cc
+++ b/src/kudu/master/master_runner.cc
@@ -84,7 +84,7 @@ using kudu::pb_util::SecureShortDebugString;
 using kudu::rpc::RpcController;
 using kudu::tablet::TabletDataState;
 using kudu::tablet::TabletMetadata;
-using kudu::tserver::TabletCopyClient;
+using kudu::tserver::RemoteTabletCopyClient;
 using kudu::tserver::TSTabletManager;
 using std::set;
 using std::string;
@@ -314,8 +314,9 @@ Status ClearLocalSystemCatalogAndCopy(const HostPort& 
src_hp) {
   LOG(INFO) << "Copying system tablet from " << src_hp.ToString();
   std::shared_ptr<rpc::Messenger> messenger;
   RETURN_NOT_OK(rpc::MessengerBuilder("tablet_copy_client").Build(&messenger));
-  TabletCopyClient copy_client(SysCatalogTable::kSysCatalogTabletId, 
&fs_manager, cmeta_manager,
-                               messenger, nullptr /* no metrics */);
+  RemoteTabletCopyClient copy_client(SysCatalogTable::kSysCatalogTabletId,
+                                     &fs_manager, cmeta_manager,
+                                     messenger, nullptr /* no metrics */);
   RETURN_NOT_OK(copy_client.Start(src_hp, /*meta*/nullptr));
   RETURN_NOT_OK(copy_client.FetchAll(/*tablet_replica*/nullptr));
   return copy_client.Finish();
diff --git a/src/kudu/mini-cluster/internal_mini_cluster.h 
b/src/kudu/mini-cluster/internal_mini_cluster.h
index ed214befa..e7b4e3913 100644
--- a/src/kudu/mini-cluster/internal_mini_cluster.h
+++ b/src/kudu/mini-cluster/internal_mini_cluster.h
@@ -23,6 +23,7 @@
 #include <vector>
 
 #include <glog/logging.h>
+#include <gtest/gtest_prod.h>
 
 #include "kudu/client/shared_ptr.h" // IWYU pragma: keep
 #include "kudu/gutil/macros.h"
@@ -49,6 +50,10 @@ namespace rpc {
 class Messenger;
 } // namespace rpc
 
+namespace tools {
+class ToolTest_TestRebuildTserverByLocalReplicaCopy_Test;
+} // namespace tools
+
 namespace tserver {
 class MiniTabletServer;
 class TabletServerAdminServiceProxy;
@@ -215,6 +220,7 @@ class InternalMiniCluster : public MiniCluster {
       int idx) const override;
 
  private:
+  FRIEND_TEST(kudu::tools::ToolTest, TestRebuildTserverByLocalReplicaCopy);
 
   // Creates and starts the cluster masters.
   Status StartMasters();
diff --git a/src/kudu/tablet/tablet_replica.cc 
b/src/kudu/tablet/tablet_replica.cc
index 2f037eda9..ab88d4d0a 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -186,6 +186,13 @@ TabletReplica::TabletReplica(
       last_status_("Tablet initializing...") {
 }
 
+TabletReplica::TabletReplica()
+    : apply_pool_(nullptr),
+      reload_txn_status_tablet_pool_(nullptr),
+      state_(SHUTDOWN),
+      last_status_("Fake replica created") {
+}
+
 TabletReplica::~TabletReplica() {
   // We are required to call Shutdown() before destroying a TabletReplica.
   CHECK(state_ == SHUTDOWN || state_ == FAILED)
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index c7f45053c..ead1fb830 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -82,6 +82,11 @@ class Messenger;
 class ResultTracker;
 } // namespace rpc
 
+namespace tools {
+struct RunnerContext;
+Status CopyFromLocal(const RunnerContext& context);
+} // namespace tools
+
 namespace tablet {
 class AlterSchemaOpState;
 class OpDriver;
@@ -390,6 +395,7 @@ class TabletReplica : public 
RefCountedThreadSafe<TabletReplica>,
   void BeginTxnParticipantOp(int64_t txn_id, RegisteredTxnCallback 
began_txn_cb);
 
  private:
+  friend Status kudu::tools::CopyFromLocal(const kudu::tools::RunnerContext& 
context);
   friend class kudu::AlterTableTest;
   friend class RefCountedThreadSafe<TabletReplica>;
   friend class TabletReplicaTest;
@@ -400,6 +406,9 @@ class TabletReplica : public 
RefCountedThreadSafe<TabletReplica>,
   FRIEND_TEST(TabletReplicaTest, TestMRSAnchorPreventsLogGC);
   FRIEND_TEST(kudu::TxnOpDispatcherITest, LifecycleBasic);
 
+  // Only for CLI tools and tests.
+  TabletReplica();
+
   // A class to properly dispatch transactional write operations arriving
   // with TabletServerService::Write() RPC for the specified tablet replica.
   // Before submitting the operations via TabletReplica::SubmitWrite(), it's
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 793e738af..8956c1d90 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <cstring>
 #include <sys/stat.h>
 
 #include <algorithm>
@@ -291,7 +292,7 @@ class ToolTest : public KuduTest {
     Status s = RunTool(arg_str, stdout, &stderr, nullptr, nullptr);
     SCOPED_TRACE(*stdout);
     SCOPED_TRACE(stderr);
-    ASSERT_OK(s);
+    ASSERT_TRUE(s.ok()) << arg_str;
   }
 
   void RunActionStdinStdoutString(const string& arg_str, const string& stdin,
@@ -663,6 +664,14 @@ class ToolTest : public KuduTest {
     ASSERT_STR_CONTAINS(stderr, expect_err);
   }
 
+  string GetMasterAddrsStr() {
+    vector<string> master_addrs;
+    for (const auto& hp : mini_cluster_->master_rpc_addrs()) {
+      master_addrs.emplace_back(hp.ToString());
+    }
+    return JoinStrings(master_addrs, ",");
+  }
+
  protected:
   // Note: Each test case must have a single invocation of RunLoadgen() 
otherwise it leads to
   //       memory leaks.
@@ -673,6 +682,7 @@ class ToolTest : public KuduTest {
                   string* tool_stderr = nullptr);
   void StartExternalMiniCluster(ExternalMiniClusterOptions opts = {});
   void StartMiniCluster(InternalMiniClusterOptions opts = {});
+  void StopMiniCluster();
   unique_ptr<ExternalMiniCluster> cluster_;
   unique_ptr<MiniClusterFsInspector> inspect_;
   unordered_map<string, TServerDetails*> ts_map_;
@@ -970,6 +980,10 @@ void ToolTest::StartMiniCluster(InternalMiniClusterOptions 
opts) {
   ASSERT_OK(mini_cluster_->Start());
 }
 
+void ToolTest::StopMiniCluster() {
+  mini_cluster_->Shutdown();
+}
+
 TEST_F(ToolTest, TestHelpXML) {
   string stdout;
   string stderr;
@@ -1088,7 +1102,8 @@ TEST_F(ToolTest, TestModeHelp) {
         "cmeta.*Operate on a local tablet replica's consensus",
         "data_size.*Summarize the data size",
         "dump.*Dump a Kudu filesystem",
-        "copy_from_remote.*Copy a tablet replica",
+        "copy_from_remote.*Copy a tablet replica from a remote server",
+        "copy_from_local.*Copy a tablet replica from local filesystem",
         "delete.*Delete tablet replicas from the local filesystem",
         "list.*Show list of tablet replicas",
     };
@@ -1124,6 +1139,16 @@ TEST_F(ToolTest, TestModeHelp) {
     NO_FATALS(RunTestHelp("local-replica copy-from-remote --help",
                           kLocalReplicaCopyFromRemoteRegexes));
   }
+  {
+    const vector<string> kLocalReplicaCopyFromRemoteRegexes = {
+        "Copy a tablet replica from local filesystem",
+    };
+    NO_FATALS(RunTestHelp("local_replica copy_from_local --help",
+                          kLocalReplicaCopyFromRemoteRegexes));
+    // Try with hyphens instead of underscores.
+    NO_FATALS(RunTestHelp("local-replica copy-from-local --help",
+                          kLocalReplicaCopyFromRemoteRegexes));
+  }
   {
     const string kCmd = "master";
     const vector<string> kMasterModeRegexes = {
@@ -6845,7 +6870,9 @@ TEST_P(ControlShellToolTest, TestControlShell) {
 }
 
 static void CreateTableWithFlushedData(const string& table_name,
-                                       InternalMiniCluster* cluster) {
+                                       InternalMiniCluster* cluster,
+                                       int num_tablets = 1,
+                                       int num_replicas = 1) {
   // Use a schema with a high number of columns to encourage the creation of
   // many data blocks.
   KuduSchemaBuilder schema_builder;
@@ -6865,10 +6892,11 @@ static void CreateTableWithFlushedData(const string& 
table_name,
   TestWorkload workload(cluster);
   workload.set_schema(schema);
   workload.set_table_name(table_name);
-  workload.set_num_replicas(1);
+  workload.set_num_tablets(num_tablets);
+  workload.set_num_replicas(num_replicas);
   workload.Setup();
   workload.Start();
-  ASSERT_EVENTUALLY([&](){
+  ASSERT_EVENTUALLY([&]() {
     ASSERT_GE(workload.rows_inserted(), 10000);
   });
   workload.StopAndJoin();
@@ -7728,6 +7756,7 @@ TEST_F(ToolTest, TestNonDefaultPrincipal) {
                          "--sasl_protocol_name=oryx",
                          
HostPort::ToCommaSeparatedString(cluster_->master_rpc_addrs())}));
 }
+
 class UnregisterTServerTest : public ToolTest, public 
::testing::WithParamInterface<bool> {
  public:
   void StartCluster() {
@@ -7736,14 +7765,6 @@ class UnregisterTServerTest : public ToolTest, public 
::testing::WithParamInterf
     opts.num_masters = 3;
     StartMiniCluster(std::move(opts));
   }
-
-  string GetMasterAddrsStr() {
-    vector<string> master_addrs;
-    for (const auto& hp : mini_cluster_->master_rpc_addrs()) {
-      master_addrs.emplace_back(hp.ToString());
-    }
-    return JoinStrings(master_addrs, ",");
-  }
 };
 
 INSTANTIATE_TEST_SUITE_P(, UnregisterTServerTest, ::testing::Bool());
@@ -7864,5 +7885,186 @@ TEST_F(UnregisterTServerTest, 
TestUnregisterTServerNotPresumedDead) {
   }
 }
 
+TEST_F(ToolTest, TestLocalReplicaCopyLocal) {
+  // Create replicas and fill some data.
+  InternalMiniClusterOptions opts;
+  opts.num_data_dirs = 3;
+  NO_FATALS(StartMiniCluster(std::move(opts)));
+  NO_FATALS(CreateTableWithFlushedData("tablename", mini_cluster_.get()));
+
+  string tablet_id;
+  string src_fs_paths_with_prefix;
+  string src_fs_paths;
+  {
+    tablet_id = mini_cluster_->mini_tablet_server(0)->ListTablets()[0];
+
+    // Obtain source filesystem options.
+    auto fs_manager = 
mini_cluster_->mini_tablet_server(0)->server()->fs_manager();
+    string src_fs_wal_dir = fs_manager->GetWalsRootDir();
+    src_fs_wal_dir = src_fs_wal_dir.substr(0, src_fs_wal_dir.length() - 
strlen("/wals"));
+    string src_fs_data_dirs = JoinMapped(fs_manager->GetDataRootDirs(),
+        [] (const string& data_dir) {
+          return data_dir.substr(0, data_dir.length() - strlen("/data"));
+        }, ",");
+    src_fs_paths_with_prefix = "--src_fs_wal_dir=" + src_fs_wal_dir + " "
+                               "--src_fs_data_dirs=" + src_fs_data_dirs;
+    src_fs_paths = "--fs_wal_dir=" + src_fs_wal_dir + " "
+                   "--fs_data_dirs=" + src_fs_data_dirs;
+  }
+
+  string dst_fs_paths_with_prefix;
+  string dst_fs_paths;
+  {
+    // Build destination filesystem layout, and obtain filesystem options.
+    const string kTestDstDir = GetTestPath("dst_test");
+    unique_ptr<FsManager> dst_fs_manager =
+        std::make_unique<FsManager>(Env::Default(), 
FsManagerOpts(kTestDstDir));
+    ASSERT_OK(dst_fs_manager->CreateInitialFileSystemLayout());
+    ASSERT_OK(dst_fs_manager->Open());
+    dst_fs_paths_with_prefix = "--dst_fs_wal_dir=" + kTestDstDir + " "
+                               "--dst_fs_data_dirs=" + kTestDstDir;
+    dst_fs_paths = "--fs_wal_dir=" + kTestDstDir + " "
+                   "--fs_data_dirs=" + kTestDstDir;
+  }
+
+  string encryption_args = env_->IsEncryptionEnabled() ? GetEncryptionArgs() : 
"";
+
+  // Copy replica from local filesystem failed, because the tserver on source 
filesystem
+  // is still running.
+  string stdout;
+  string stderr;
+  Status s = RunActionStdoutStderrString(
+      Substitute("local_replica copy_from_local $0 $1 $2 $3",
+                 tablet_id, src_fs_paths_with_prefix, 
dst_fs_paths_with_prefix, encryption_args),
+      &stdout, &stderr);
+  SCOPED_TRACE(stdout);
+  SCOPED_TRACE(stderr);
+  ASSERT_FALSE(s.ok());
+  ASSERT_STR_MATCHES(stderr,
+                     "failed to load instance files: Could not lock instance 
file. "
+                     "Make sure that Kudu is not already running");
+
+  // Shutdown mini cluster and copy replica from local filesystem successfully.
+  mini_cluster_->Shutdown();
+  NO_FATALS(RunActionStdoutString(
+      Substitute("local_replica copy_from_local $0 $1 $2 $3",
+                 tablet_id, src_fs_paths_with_prefix, 
dst_fs_paths_with_prefix, encryption_args),
+      &stdout));
+  SCOPED_TRACE(stdout);
+
+  // Check the source and destination data is matched.
+  string src_stdout;
+  NO_FATALS(RunActionStdoutString(
+      Substitute("local_replica data_size $0 $1 $2",
+                 tablet_id, src_fs_paths, encryption_args), &src_stdout));
+  SCOPED_TRACE(src_stdout);
+
+  string dst_stdout;
+  NO_FATALS(RunActionStdoutString(
+      Substitute("local_replica data_size $0 $1 $2",
+                 tablet_id, dst_fs_paths, encryption_args), &dst_stdout));
+  SCOPED_TRACE(dst_stdout);
+
+  ASSERT_EQ(src_stdout, dst_stdout);
+}
+
+TEST_F(ToolTest, TestRebuildTserverByLocalReplicaCopy) {
+  // Create replicas and fill some data.
+  const int kNumTserver = 3;
+  InternalMiniClusterOptions opts;
+  opts.num_tablet_servers = kNumTserver;
+  // The source filesystem has only 1 data directory.
+  opts.num_data_dirs = 1;
+  NO_FATALS(StartMiniCluster(std::move(opts)));
+  NO_FATALS(CreateTableWithFlushedData("tablename", mini_cluster_.get(), 8, 
3));
+
+  // Obtain source cluster options.
+  const int kCopyTserverIndex = 2;
+  vector<string> tablet_ids = 
mini_cluster_->mini_tablet_server(kCopyTserverIndex)->ListTablets();
+  vector<HostPort> hps;
+  hps.reserve(mini_cluster_->num_tablet_servers());
+  for (int i = 0; i < mini_cluster_->num_tablet_servers(); ++i) {
+    
hps.emplace_back(HostPort(mini_cluster_->mini_tablet_server(i)->bound_rpc_addr()));
+  }
+  auto fs_manager = 
mini_cluster_->mini_tablet_server(kCopyTserverIndex)->server()->fs_manager();
+  string src_fs_wal_dir = fs_manager->GetWalsRootDir();
+  src_fs_wal_dir = src_fs_wal_dir.substr(0, src_fs_wal_dir.length() - 
strlen("/wals"));
+  string src_fs_data_dirs = JoinMapped(fs_manager->GetDataRootDirs(),
+      [] (const string& data_dir) {
+        return data_dir.substr(0, data_dir.length() - strlen("/data"));
+      }, ",");
+  string src_fs_paths_with_prefix = "--src_fs_wal_dir=" + src_fs_wal_dir + " "
+                                    "--src_fs_data_dirs=" + src_fs_data_dirs;
+  string src_fs_paths = "--fs_wal_dir=" + src_fs_wal_dir + " "
+                        "--fs_data_dirs=" + src_fs_data_dirs;
+
+  // Build destination filesystem layout with source fs uuid, and obtain 
filesystem options.
+  string dst_fs_paths_with_prefix;
+  string src_tserver_fs_root = 
mini_cluster_->GetTabletServerFsRoot(kCopyTserverIndex);
+  string dst_tserver_fs_root = 
mini_cluster_->GetTabletServerFsRoot(kNumTserver);
+  {
+    FsManagerOpts fs_opts;
+    // The new fs layout has 3 data directories.
+    MiniTabletServer::InitFsOpts(3, dst_tserver_fs_root, &fs_opts);
+    string dst_fs_wal_dir = fs_opts.wal_root;
+    string dst_fs_data_dirs = JoinMapped(fs_opts.data_roots,
+        [] (const string& data_root) {
+          return data_root;
+        }, ",");
+    string dst_fs_paths = "--fs_wal_dir=" + dst_fs_wal_dir + " "
+                          "--fs_data_dirs=" + dst_fs_data_dirs;
+    ASSERT_OK(Env::Default()->CreateDir(dst_tserver_fs_root));
+    // Use 'kudu fs format' with '--uuid' to format the new data directories.
+    NO_FATALS(RunActionStdoutNone(Substitute("fs format $0 --uuid=$1",
+                                             dst_fs_paths, 
fs_manager->uuid())));
+    dst_fs_paths_with_prefix = "--dst_fs_wal_dir=" + dst_fs_wal_dir + " "
+                               "--dst_fs_data_dirs=" + dst_fs_data_dirs;
+  }
+
+  // Shutdown the cluster before copying.
+  NO_FATALS(StopMiniCluster());
+
+  // Copy source tserver's all replicas from local filesystem.
+  string encryption_args = env_->IsEncryptionEnabled() ? GetEncryptionArgs() : 
"";
+  for (const auto& tablet_id : tablet_ids) {
+    string stdout;
+    NO_FATALS(RunActionStdoutString(Substitute("local_replica copy_from_local 
$0 $1 $2 $3",
+                                               tablet_id,
+                                               src_fs_paths_with_prefix,
+                                               dst_fs_paths_with_prefix,
+                                               encryption_args),
+                                    &stdout));
+    SCOPED_TRACE(stdout);
+  }
+
+  // Replace the old data/wal dirs with the new ones.
+  ASSERT_OK(Env::Default()->RenameFile(src_tserver_fs_root, 
src_tserver_fs_root + ".bak"));
+  ASSERT_OK(Env::Default()->RenameFile(dst_tserver_fs_root, 
src_tserver_fs_root));
+
+  // Start masters only.
+  mini_cluster_->opts_.num_tablet_servers = 0;
+  NO_FATALS(mini_cluster_->Start());
+  // Then start tserver with old host:port.
+  ASSERT_OK(mini_cluster_->AddTabletServer(hps[0]));
+  ASSERT_OK(mini_cluster_->AddTabletServer(hps[1]));
+  // The new tserver has 3 data directories.
+  mini_cluster_->opts_.num_data_dirs = 3;
+  ASSERT_OK(mini_cluster_->AddTabletServer(hps[2]));
+  // Wait all tserver start normally.
+  ASSERT_OK(mini_cluster_->WaitForTabletServerCount(3));
+  for (int i = 0; i < mini_cluster_->num_tablet_servers(); ++i) {
+    ASSERT_OK(mini_cluster_->mini_tablet_server(i)->WaitStarted());
+  }
+
+  // Restart the cluster, and check the cluster state is OK.
+  ASSERT_EVENTUALLY([&] {
+    string out;
+    string err;
+    Status s =
+        RunActionStdoutStderrString(Substitute("cluster ksck $0", 
GetMasterAddrsStr()), &out, &err);
+    ASSERT_TRUE(s.ok()) << s.ToString() << ", err:\n" << err << ", out:\n" << 
out;
+  });
+}
+
 } // namespace tools
 } // namespace kudu
diff --git a/src/kudu/tools/tool_action_local_replica.cc 
b/src/kudu/tools/tool_action_local_replica.cc
index 03d39ad81..8dca80a2f 100644
--- a/src/kudu/tools/tool_action_local_replica.cc
+++ b/src/kudu/tools/tool_action_local_replica.cc
@@ -22,6 +22,7 @@
 #include <iostream>
 #include <map>
 #include <memory>
+#include <set>
 #include <string>
 #include <unordered_map>
 #include <utility>
@@ -77,13 +78,16 @@
 #include "kudu/tools/tool_action_common.h"
 #include "kudu/tserver/tablet_copy_client.h"
 #include "kudu/tserver/ts_tablet_manager.h"
+#include "kudu/util/countdown_latch.h"
 #include "kudu/util/env.h"
 #include "kudu/util/env_util.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/status.h"
+#include "kudu/util/thread.h"
 
 namespace kudu {
 namespace rpc {
@@ -112,6 +116,30 @@ DEFINE_bool(ignore_nonexistent, false,
             "Whether to ignore non-existent tablet replicas when deleting: if "
             "set to 'true', the tool does not report an error if the requested 
"
             "tablet replica to remove is not found");
+DEFINE_string(src_fs_wal_dir, "",
+              "Source: Directory with write-ahead logs.");
+DEFINE_string(src_fs_data_dirs, "",
+              "Source: Comma-separated list of directories with data blocks. 
If this "
+              "is not specified, --src_fs_wal_dir will be used as the sole 
data "
+              "block directory.");
+DEFINE_string(src_fs_metadata_dir, "",
+              "Source: Directory with metadata. If this is not specified, for "
+              "compatibility with Kudu 1.6 and below, Kudu will check the "
+              "first entry of --dst_fs_data_dirs for metadata and use it as 
the "
+              "metadata directory if any exists. If none exists, 
--dst_fs_wal_dir "
+              "will be used as the metadata directory.");
+DEFINE_string(dst_fs_wal_dir, "",
+              "Destination: Directory with write-ahead logs.");
+DEFINE_string(dst_fs_data_dirs, "",
+              "Destination: Comma-separated list of directories with data 
blocks. If this "
+              "is not specified, --dst_fs_wal_dir will be used as the sole 
data "
+              "block directory.");
+DEFINE_string(dst_fs_metadata_dir, "",
+              "Destination: Directory with metadata. If this is not specified, 
for "
+              "compatibility with Kudu 1.6 and below, Kudu will check the "
+              "first entry of --dst_fs_data_dirs for metadata and use it as 
the "
+              "metadata directory if any exists. If none exists, 
--dst_fs_wal_dir "
+              "will be used as the metadata directory.");;
 
 DECLARE_int32(tablet_copy_download_threads_nums_per_session);
 
@@ -133,13 +161,16 @@ using kudu::tablet::RowIteratorOptions;
 using kudu::tablet::RowSetMetadata;
 using kudu::tablet::TabletDataState;
 using kudu::tablet::TabletMetadata;
+using kudu::tablet::TabletReplica;
+using kudu::tserver::LocalTabletCopyClient;
+using kudu::tserver::RemoteTabletCopyClient;
 using kudu::tserver::TSTabletManager;
-using kudu::tserver::TabletCopyClient;
 using std::cout;
 using std::endl;
 using std::map;
 using std::pair;
 using std::shared_ptr;
+using std::set;
 using std::string;
 using std::unique_ptr;
 using std::vector;
@@ -168,6 +199,7 @@ constexpr const char* const kRaftPeersArgDesc =
 string Indent(int indent) {
   return string(indent, ' ');
 }
+} // anonymous namespace
 
 Status FsInit(bool skip_block_manager, unique_ptr<FsManager>* fs_manager) {
   FsManagerOpts fs_opts;
@@ -398,13 +430,68 @@ Status CopyFromRemote(const RunnerContext& context) {
   scoped_refptr<ConsensusMetadataManager> cmeta_manager(new 
ConsensusMetadataManager(&fs_manager));
   shared_ptr<Messenger> messenger;
   RETURN_NOT_OK(BuildMessenger("tablet_copy_client", &messenger));
-  TabletCopyClient client(tablet_id, &fs_manager, cmeta_manager,
-                          messenger, nullptr /* no metrics */);
+  RemoteTabletCopyClient client(tablet_id, &fs_manager, cmeta_manager,
+                                messenger, nullptr /* no metrics */);
   RETURN_NOT_OK(client.Start(hp, nullptr));
   RETURN_NOT_OK(client.FetchAll(nullptr));
   return client.Finish();
 }
 
+Status CopyFromLocal(const RunnerContext& context) {
+  const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg);
+
+  FsManagerOpts src_fs_mgr_opt;
+  src_fs_mgr_opt.wal_root = FLAGS_src_fs_wal_dir;
+  src_fs_mgr_opt.metadata_root = FLAGS_src_fs_metadata_dir;
+  src_fs_mgr_opt.data_roots = strings::Split(FLAGS_src_fs_data_dirs, ",", 
strings::SkipEmpty());
+  FsManager src_fs_manager(Env::Default(), src_fs_mgr_opt);
+  RETURN_NOT_OK(src_fs_manager.Open());
+
+  vector<string> tablet_ids;
+  RETURN_NOT_OK(src_fs_manager.ListTabletIds(&tablet_ids));
+  set<string> tablet_ids_set(tablet_ids.begin(), tablet_ids.end());
+  if (!ContainsKey(tablet_ids_set, tablet_id)) {
+    return Status::NotFound("There is no matched tablet");
+  }
+
+  FsManagerOpts dst_fs_mgr_opt;
+  dst_fs_mgr_opt.wal_root = FLAGS_dst_fs_wal_dir;
+  dst_fs_mgr_opt.metadata_root = FLAGS_dst_fs_metadata_dir;
+  dst_fs_mgr_opt.data_roots = strings::Split(FLAGS_dst_fs_data_dirs, ",", 
strings::SkipEmpty());
+
+  FsManager dst_fs_manager(Env::Default(), dst_fs_mgr_opt);
+  RETURN_NOT_OK(dst_fs_manager.Open());
+  scoped_refptr<ConsensusMetadataManager> dst_cmeta_manager(
+      new ConsensusMetadataManager(&dst_fs_manager));
+
+  LocalTabletCopyClient client(tablet_id,
+                               &dst_fs_manager,
+                               dst_cmeta_manager,
+                               /* messenger */ nullptr,
+                               /* tablet_copy_client_metrics */ nullptr,
+                               &src_fs_manager,
+                               /* tablet_copy_source_metrics */ nullptr);
+
+  // Create a fake replica to obtain copy process periodically.
+  scoped_refptr<TabletReplica> fake_replica(new TabletReplica());
+  CountDownLatch latch(1);
+  scoped_refptr<Thread> check_thread;
+  RETURN_NOT_OK(Thread::Create("Kudu-tool", "check_progress",
+                               [&latch, fake_replica]() {
+                                 while 
(!latch.WaitFor(MonoDelta::FromSeconds(10))) {
+                                   LOG(INFO) << fake_replica->last_status();
+                                }
+      }, &check_thread));
+
+  RETURN_NOT_OK(client.Start(tablet_id, /* meta */ nullptr));
+  RETURN_NOT_OK(client.FetchAll(fake_replica));
+  RETURN_NOT_OK(client.Finish());
+  latch.CountDown();
+  check_thread->Join();
+
+  return Status::OK();
+}
+
 Status DeleteLocalReplica(const string& tablet_id,
                           FsManager* fs_manager,
                           const scoped_refptr<ConsensusMetadataManager>& 
cmeta_manager) {
@@ -898,8 +985,6 @@ unique_ptr<Mode> BuildDumpMode() {
       .Build();
 }
 
-} // anonymous namespace
-
 unique_ptr<Mode> BuildLocalReplicaMode() {
   unique_ptr<Action> print_replica_uuids =
       ActionBuilder("print_replica_uuids", &PrintReplicaUuids)
@@ -954,6 +1039,20 @@ unique_ptr<Mode> BuildLocalReplicaMode() {
       .AddOptionalParameter("tablet_copy_download_threads_nums_per_session")
       .Build();
 
+  unique_ptr<Action> copy_from_local =
+      ActionBuilder("copy_from_local", &CopyFromLocal)
+      .Description("Copy a tablet replica from local filesystem. Before using 
this tool, you "
+          "MUST stop the master/tserver you want to copy from, and make sure 
--src_*_dir(s) and "
+          "--dst_*_dir(s) are exactly what whey should be.")
+      .AddRequiredParameter({ kTabletIdArg, kTabletIdArgDesc })
+      .AddOptionalParameter("src_fs_wal_dir")
+      .AddOptionalParameter("src_fs_metadata_dir")
+      .AddOptionalParameter("src_fs_data_dirs")
+      .AddOptionalParameter("dst_fs_wal_dir")
+      .AddOptionalParameter("dst_fs_metadata_dir")
+      .AddOptionalParameter("dst_fs_data_dirs")
+      .Build();
+
   unique_ptr<Action> list =
       ActionBuilder("list", &ListLocalReplicas)
       .Description("Show list of tablet replicas in the local filesystem")
@@ -988,6 +1087,7 @@ unique_ptr<Mode> BuildLocalReplicaMode() {
   return ModeBuilder("local_replica")
       .Description("Operate on local tablet replicas via the local filesystem")
       .AddMode(std::move(cmeta))
+      .AddAction(std::move(copy_from_local))
       .AddAction(std::move(copy_from_remote))
       .AddAction(std::move(data_size))
       .AddAction(std::move(delete_local_replica))
diff --git a/src/kudu/tserver/mini_tablet_server.cc 
b/src/kudu/tserver/mini_tablet_server.cc
index 8825e085e..43c76d900 100644
--- a/src/kudu/tserver/mini_tablet_server.cc
+++ b/src/kudu/tserver/mini_tablet_server.cc
@@ -72,16 +72,23 @@ MiniTabletServer::MiniTabletServer(string fs_root,
   opts_.rpc_opts.rpc_bind_addresses = rpc_bind_addr.ToString();
   opts_.webserver_opts.bind_interface = rpc_bind_addr.host();
   opts_.webserver_opts.port = 0;
+  InitFsOpts(num_data_dirs, fs_root_, &opts_.fs_opts);
+}
+
+void MiniTabletServer::InitFsOpts(int num_data_dirs, const string& fs_root,
+                                  FsManagerOpts* fs_opts) {
+  CHECK(fs_opts);
   if (num_data_dirs == 1) {
-    opts_.fs_opts.wal_root = fs_root_;
-    opts_.fs_opts.data_roots = { fs_root_ };
+    fs_opts->wal_root = fs_root;
+    fs_opts->data_roots = { fs_root };
   } else {
     vector<string> fs_data_dirs;
+    fs_data_dirs.reserve(num_data_dirs);
     for (int dir = 0; dir < num_data_dirs; dir++) {
-      fs_data_dirs.emplace_back(JoinPathSegments(fs_root_, 
Substitute("data-$0", dir)));
+      fs_data_dirs.emplace_back(JoinPathSegments(fs_root, 
Substitute("data-$0", dir)));
     }
-    opts_.fs_opts.wal_root = JoinPathSegments(fs_root_, "wal");
-    opts_.fs_opts.data_roots = fs_data_dirs;
+    fs_opts->wal_root = JoinPathSegments(fs_root, "wal");
+    fs_opts->data_roots = fs_data_dirs;
   }
 }
 
diff --git a/src/kudu/tserver/mini_tablet_server.h 
b/src/kudu/tserver/mini_tablet_server.h
index 329385ef9..f428dac55 100644
--- a/src/kudu/tserver/mini_tablet_server.h
+++ b/src/kudu/tserver/mini_tablet_server.h
@@ -28,6 +28,7 @@ namespace kudu {
 class HostPort;
 class Schema;
 class Status;
+struct FsManagerOpts;
 
 namespace consensus {
 class RaftConfigPB;
@@ -100,6 +101,9 @@ class MiniTabletServer {
 
   void FailHeartbeats();
 
+  static void InitFsOpts(int num_data_dirs, const std::string& fs_root,
+                         FsManagerOpts* fs_opts);
+
  private:
   const std::string fs_root_;
   TabletServerOptions opts_;
diff --git a/src/kudu/tserver/tablet_copy-test-base.h 
b/src/kudu/tserver/tablet_copy-test-base.h
index 0d5221a94..60534bd3f 100644
--- a/src/kudu/tserver/tablet_copy-test-base.h
+++ b/src/kudu/tserver/tablet_copy-test-base.h
@@ -126,8 +126,8 @@ class TabletCopyTest : public TabletServerTestBase {
     return tablet_replica_->permanent_uuid();
   }
 
-  const std::string& GetTabletId() const {
-    return tablet_replica_->tablet()->tablet_id();
+  virtual const std::string& GetTabletId() const {
+    return tablet_replica_->tablet_id();
   }
 
   // Read a block file from the file system fully into memory and return a
diff --git a/src/kudu/tserver/tablet_copy_client-test.cc 
b/src/kudu/tserver/tablet_copy_client-test.cc
index 670ee0854..96427dc07 100644
--- a/src/kudu/tserver/tablet_copy_client-test.cc
+++ b/src/kudu/tserver/tablet_copy_client-test.cc
@@ -16,6 +16,7 @@
 // under the License.
 #include "kudu/tserver/tablet_copy_client.h"
 
+#include <cstring>
 #include <cstdint>
 #include <cstdlib>
 #include <limits>
@@ -30,10 +31,12 @@
 #include <glog/logging.h>
 #include <glog/stl_logging.h>
 #include <gtest/gtest.h>
+#include <gtest/gtest_prod.h>
 
 #include "kudu/common/wire_protocol.h"
 #include "kudu/consensus/consensus_meta_manager.h"
 #include "kudu/consensus/log.h"
+#include "kudu/consensus/log_anchor_registry.h"
 #include "kudu/consensus/log_reader.h"
 #include "kudu/consensus/log_util.h"
 #include "kudu/consensus/metadata.pb.h"
@@ -52,8 +55,11 @@
 #include "kudu/tablet/metadata.pb.h"
 #include "kudu/tablet/tablet_metadata.h"
 #include "kudu/tablet/tablet_replica.h"
+#include "kudu/tserver/mini_tablet_server.h"
 #include "kudu/tserver/tablet_copy-test-base.h"
 #include "kudu/tserver/tablet_copy.pb.h"
+#include "kudu/tserver/tablet_copy_source_session.h"
+#include "kudu/tserver/tablet_server.h"
 #include "kudu/util/crc.h"
 #include "kudu/util/env.h"
 #include "kudu/util/env_util.h"
@@ -97,7 +103,8 @@ namespace tserver {
 class TabletCopyClientTest : public TabletCopyTest {
  public:
   TabletCopyClientTest()
-      : rand_(SeedRandom()) {
+      : mode_(TabletCopyMode::REMOTE),
+        rand_(SeedRandom()) {
   }
 
   void SetUp() override {
@@ -122,32 +129,41 @@ class TabletCopyClientTest : public TabletCopyTest {
 
   // Sets up a new tablet copy client.
   Status ResetTabletCopyClient() {
-    scoped_refptr<ConsensusMetadataManager> cmeta_manager(
-        new ConsensusMetadataManager(fs_manager_.get()));
+    if (mode_ == TabletCopyMode::REMOTE) {
+      return ResetRemoteTabletCopyClient();
+    }
 
-    tablet_replica_->WaitUntilConsensusRunning(MonoDelta::FromSeconds(10.0));
-    rpc::MessengerBuilder(CURRENT_TEST_NAME()).Build(&messenger_);
-    client_.reset(new TabletCopyClient(GetTabletId(),
-                                       fs_manager_.get(),
-                                       cmeta_manager,
-                                       messenger_,
-                                       nullptr /* no metrics */));
-    RaftPeerPB* cstate_leader;
-    ConsensusStatePB cstate;
-    RETURN_NOT_OK(tablet_replica_->consensus()->ConsensusState(&cstate));
-    RETURN_NOT_OK(GetRaftConfigLeader(&cstate, &cstate_leader));
-    leader_ = *cstate_leader;
-    return Status::OK();
+    CHECK(mode_ == TabletCopyMode::LOCAL);
+    return ResetLocalTabletCopyClient();
   }
 
   // Starts the tablet copy.
   Status StartCopy() {
-    HostPort host_port = HostPortFromPB(leader_.last_known_addr());
-    return client_->Start(host_port, &meta_);
+    if (mode_ == TabletCopyMode::REMOTE) {
+      HostPort host_port = HostPortFromPB(leader_.last_known_addr());
+      return client_->Start(host_port, &meta_);
+    }
+
+    CHECK(mode_ == TabletCopyMode::LOCAL);
+    return client_->Start(tablet_id_, &meta_);
+  }
+
+  const std::string& GetTabletId() const override {
+    if (mode_ == TabletCopyMode::REMOTE) {
+      return tablet_replica_->tablet_id();
+    }
+
+    CHECK(mode_ == TabletCopyMode::LOCAL);
+    return tablet_id_;
   }
 
  protected:
+  FRIEND_TEST(TabletCopyClientBasicTest, TestDownloadWalSegment);
+  FRIEND_TEST(TabletCopyClientBasicTest, TestSupportsLiveRowCount);
+
   Status CompareFileContents(const string& path1, const string& path2);
+  Status ResetRemoteTabletCopyClient();
+  Status ResetLocalTabletCopyClient();
 
   // Injection of 'supports_live_row_count' modifiers.
   void GenerateTestData() override {
@@ -156,7 +172,9 @@ class TabletCopyClientTest : public TabletCopyTest {
     NO_FATALS(TabletCopyTest::GenerateTestData());
   }
 
+  TabletCopyMode mode_;
   Random rand_;
+  string tablet_id_;
   MetricRegistry metric_registry_;
   scoped_refptr<MetricEntity> metric_entity_;
   unique_ptr<FsManager> fs_manager_;
@@ -164,6 +182,10 @@ class TabletCopyClientTest : public TabletCopyTest {
   unique_ptr<TabletCopyClient> client_;
   scoped_refptr<TabletMetadata> meta_;
   RaftPeerPB leader_;
+
+  unique_ptr<FsManager> src_fs_manager_;
+  MetricRegistry src_metric_registry_;
+  scoped_refptr<MetricEntity> src_metric_entity_;
 };
 
 Status TabletCopyClientTest::CompareFileContents(const string& path1, const 
string& path2) {
@@ -197,14 +219,86 @@ Status TabletCopyClientTest::CompareFileContents(const 
string& path1, const stri
   return Status::OK();
 }
 
+Status TabletCopyClientTest::ResetRemoteTabletCopyClient() {
+  scoped_refptr<ConsensusMetadataManager> cmeta_manager(
+      new ConsensusMetadataManager(fs_manager_.get()));
+
+  tablet_replica_->WaitUntilConsensusRunning(MonoDelta::FromSeconds(10.0));
+  rpc::MessengerBuilder(CURRENT_TEST_NAME()).Build(&messenger_);
+  client_.reset(new RemoteTabletCopyClient(GetTabletId(),
+                                           fs_manager_.get(),
+                                           cmeta_manager,
+                                           messenger_,
+                                           nullptr /* no metrics */));
+  RaftPeerPB* cstate_leader;
+  ConsensusStatePB cstate;
+  RETURN_NOT_OK(tablet_replica_->consensus()->ConsensusState(&cstate));
+  RETURN_NOT_OK(GetRaftConfigLeader(&cstate, &cstate_leader));
+  leader_ = *cstate_leader;
+  return Status::OK();
+}
+
+Status TabletCopyClientTest::ResetLocalTabletCopyClient() {
+  // client_ will be reset many times in test cases, we only shutdown 
mini_server_
+  // at the first time, and will not start it again.
+  if (mini_server_->is_started()) {
+    // Store tablet id in cache, because tablet_replica_ will be reset later.
+    tablet_id_ = tablet_replica_->tablet_id();
+
+    // Prepare parameters to create source FsManager.
+    rpc::MessengerBuilder(CURRENT_TEST_NAME()).Build(&messenger_);
+
+    FsManagerOpts opts;
+    string wal = mini_server_->server()->fs_manager()->GetWalsRootDir();
+    opts.wal_root = wal.substr(0, wal.length() - strlen("/wals"));
+    for (const auto& data : 
mini_server_->server()->fs_manager()->GetDataRootDirs()) {
+      opts.data_roots.emplace_back(data.substr(0, data.length() - 
strlen("/data")));
+    }
+
+    // Shutdown mini_server_ before copy tablet in local mode.
+    
RETURN_NOT_OK(tablet_replica_->log_anchor_registry()->Unregister(&anchor_));
+    tablet_replica_.reset();
+    mini_server_->Shutdown();
+
+    // Create source FsManager.
+    src_fs_manager_.reset(new FsManager(Env::Default(), opts));
+    RETURN_NOT_OK(src_fs_manager_->Open());
+  }
+
+  scoped_refptr<ConsensusMetadataManager> cmeta_manager(
+      new ConsensusMetadataManager(fs_manager_.get()));
+
+  client_.reset(new LocalTabletCopyClient(tablet_id_,
+                                          fs_manager_.get(),
+                                          cmeta_manager,
+                                          messenger_,
+                                          /* tablet_copy_client_metrics */ 
nullptr,
+                                          src_fs_manager_.get(),
+                                          /* tablet_copy_source_metrics */ 
nullptr));
+
+  return Status::OK();
+}
+
+class TabletCopyClientBasicTest : public TabletCopyClientTest,
+                                  public 
::testing::WithParamInterface<TabletCopyMode> {
+ public:
+  TabletCopyClientBasicTest() {
+    mode_ = GetParam();
+  }
+};
+
+INSTANTIATE_TEST_SUITE_P(TabletCopyClientBasicTestModes, 
TabletCopyClientBasicTest,
+                         testing::Values(TabletCopyMode::REMOTE,
+                                         TabletCopyMode::LOCAL));
+
 // Test a tablet copy going through the various states in the copy state
 // machine.
-TEST_F(TabletCopyClientTest, TestLifeCycle) {
+TEST_P(TabletCopyClientBasicTest, TestLifeCycle) {
   // Target fault injection for the tablet metadata directories, but do not
   // start injecting failures just yet.
   const vector<string> meta_dirs = {
-      JoinPathSegments(client_->fs_manager_->GetConsensusMetadataDir(), "**"),
-      JoinPathSegments(client_->fs_manager_->GetTabletMetadataDir(), "**") };
+      JoinPathSegments(client_->dst_fs_manager_->GetConsensusMetadataDir(), 
"**"),
+      JoinPathSegments(client_->dst_fs_manager_->GetTabletMetadataDir(), "**") 
};
   FLAGS_env_inject_eio_globs = JoinStrings(meta_dirs, ",");
 
   ASSERT_EQ(TabletCopyClient::State::kInitialized, client_->state_);
@@ -248,7 +342,8 @@ TEST_F(TabletCopyClientTest, TestLifeCycle) {
 
   // Make sure we are still in the appropriate state if we fail to finish.
   ASSERT_OK(ResetTabletCopyClient());
-  client_->SetTabletToReplace(meta_, 0);
+  s = client_->SetTabletToReplace(meta_, 0);
+  ASSERT_TRUE(s.ok()) << s.ToString();
   ASSERT_OK(StartCopy());
   FLAGS_env_inject_eio = 1.0;
   s = client_->Finish();
@@ -264,21 +359,21 @@ TEST_F(TabletCopyClientTest, TestLifeCycle) {
 }
 
 // Implementation test that no blocks exist in the new superblock before 
fetching.
-TEST_F(TabletCopyClientTest, TestNoBlocksAtStart) {
+TEST_P(TabletCopyClientBasicTest, TestNoBlocksAtStart) {
   ASSERT_OK(StartCopy());
   ASSERT_GT(ListBlocks(*client_->remote_superblock_).size(), 0);
   ASSERT_EQ(0, ListBlocks(*client_->superblock_).size());
 }
 
 // Basic begin / end tablet copy session.
-TEST_F(TabletCopyClientTest, TestBeginEndSession) {
+TEST_P(TabletCopyClientBasicTest, TestBeginEndSession) {
   ASSERT_OK(StartCopy());
   ASSERT_OK(client_->FetchAll(nullptr /* no listener */));
   ASSERT_OK(client_->Finish());
 }
 
 // Basic data block download unit test.
-TEST_F(TabletCopyClientTest, TestDownloadBlock) {
+TEST_P(TabletCopyClientBasicTest, TestDownloadBlock) {
   ASSERT_OK(StartCopy());
   BlockId block_id = FirstColumnBlockId(*client_->remote_superblock_);
   Slice slice;
@@ -300,7 +395,7 @@ TEST_F(TabletCopyClientTest, TestDownloadBlock) {
 
 // Test that error status is properly reported if there was a failure in any
 // of multiple threads downloading tablet's data blocks.
-TEST_F(TabletCopyClientTest, TestDownloadBlockMayFail) {
+TEST_P(TabletCopyClientBasicTest, TestDownloadBlockMayFail) {
   FLAGS_tablet_copy_fault_crash_during_download_block = 0.5;
   FLAGS_tablet_copy_download_threads_nums_per_session = 16;
 
@@ -313,7 +408,7 @@ TEST_F(TabletCopyClientTest, TestDownloadBlockMayFail) {
 
 // Test that error status is properly reported if there was a failure in any
 // of multiple threads downloading tablet's wal segments.
-TEST_F(TabletCopyClientTest, TestDownloadWalMayFail) {
+TEST_P(TabletCopyClientBasicTest, TestDownloadWalMayFail) {
   FLAGS_tablet_copy_fault_crash_during_download_wal = 1;
   FLAGS_tablet_copy_download_threads_nums_per_session = 4;
 
@@ -325,7 +420,7 @@ TEST_F(TabletCopyClientTest, TestDownloadWalMayFail) {
 }
 
 // Basic WAL segment download unit test.
-TEST_F(TabletCopyClientTest, TestDownloadWalSegment) {
+TEST_P(TabletCopyClientBasicTest, TestDownloadWalSegment) {
   ASSERT_OK(StartCopy());
   ASSERT_OK(env_util::CreateDirIfMissing(
       env_, fs_manager_->GetTabletWalDir(GetTabletId())));
@@ -337,17 +432,22 @@ TEST_F(TabletCopyClientTest, TestDownloadWalSegment) {
   ASSERT_OK(client_->DownloadWAL(seqno));
   ASSERT_TRUE(fs_manager_->Exists(path));
 
-  log::SegmentSequence local_segments;
-  tablet_replica_->log()->reader()->GetSegmentsSnapshot(&local_segments);
-  const scoped_refptr<log::ReadableLogSegment>& segment = local_segments[0];
-  string server_path = segment->path();
+  string server_path;
+  if (mode_ == TabletCopyMode::REMOTE) {
+    log::SegmentSequence local_segments;
+    tablet_replica_->log()->reader()->GetSegmentsSnapshot(&local_segments);
+    const scoped_refptr<log::ReadableLogSegment>& segment = local_segments[0];
+    server_path = segment->path();
+  } else {
+    server_path = src_fs_manager_->GetWalSegmentFileName(GetTabletId(), seqno);
+  }
 
   // Compare the downloaded file with the source file.
   ASSERT_OK(CompareFileContents(path, server_path));
 }
 
 // Ensure that we detect data corruption at the per-transfer level.
-TEST_F(TabletCopyClientTest, TestVerifyData) {
+TEST_P(TabletCopyClientBasicTest, TestVerifyData) {
   ASSERT_OK(StartCopy());
   string good = "This is a known good string";
   string bad = "This is a known bad! string";
@@ -383,7 +483,7 @@ TEST_F(TabletCopyClientTest, TestVerifyData) {
   LOG(INFO) << "Expected error returned: " << s.ToString();
 }
 
-TEST_F(TabletCopyClientTest, TestDownloadAllBlocks) {
+TEST_P(TabletCopyClientBasicTest, TestDownloadAllBlocks) {
   ASSERT_OK(StartCopy());
   // Download and commit all the blocks.
   ASSERT_OK(client_->DownloadBlocks());
@@ -417,7 +517,7 @@ TEST_F(TabletCopyClientTest, TestDownloadAllBlocks) {
 
 // Test that failing a disk outside fo the tablet copy client will eventually
 // stop the copy client and cause it to fail.
-TEST_F(TabletCopyClientTest, TestFailedDiskStopsClient) {
+TEST_P(TabletCopyClientBasicTest, TestFailedDiskStopsClient) {
   ASSERT_OK(StartCopy());
   DataDirManager* dd_manager = fs_manager_->dd_manager();
 
@@ -445,10 +545,17 @@ TEST_F(TabletCopyClientTest, TestFailedDiskStopsClient) {
   ASSERT_TRUE(s.IsIOError()) << s.ToString();
 }
 
-TEST_F(TabletCopyClientTest, TestSupportsLiveRowCount) {
+TEST_P(TabletCopyClientBasicTest, TestSupportsLiveRowCount) {
   ASSERT_OK(StartCopy());
-  ASSERT_EQ(meta_->supports_live_row_count(),
-      tablet_replica_->tablet_metadata()->supports_live_row_count());
+  bool supports_live_row_count = false;
+  if (mode_ == TabletCopyMode::REMOTE) {
+    supports_live_row_count = 
tablet_replica_->tablet_metadata()->supports_live_row_count();
+  } else {
+    scoped_refptr<TabletMetadata> metadata;
+    ASSERT_OK(TabletMetadata::Load(src_fs_manager_.get(), GetTabletId(), 
&metadata));
+    supports_live_row_count = metadata->supports_live_row_count();
+  }
+  ASSERT_EQ(meta_->supports_live_row_count(), supports_live_row_count);
 }
 
 enum DownloadBlocks {
@@ -468,8 +575,13 @@ struct AbortTestParams {
 
 class TabletCopyClientAbortTest : public TabletCopyClientTest,
                                   public ::testing::WithParamInterface<
-                                      tuple<DownloadBlocks, DeleteTrigger>> {
+                                      tuple<DownloadBlocks, DeleteTrigger, 
TabletCopyMode>> {
  public:
+  TabletCopyClientAbortTest() {
+    tuple<DownloadBlocks, DeleteTrigger, TabletCopyMode> param = GetParam();
+    mode_ = std::get<2>(param);
+  }
+
   void SetUp() override {
     TabletCopyClientTest::SetUp();
     ASSERT_OK(StartCopy());
@@ -483,7 +595,8 @@ INSTANTIATE_TEST_SUITE_P(BlockDeleteTriggers,
                          TabletCopyClientAbortTest,
                          ::testing::Combine(
                              ::testing::Values(kDownloadBlocks, 
kNoDownloadBlocks),
-                             ::testing::Values(kAbortMethod, kDestructor, 
kNoDelete)));
+                             ::testing::Values(kAbortMethod, kDestructor, 
kNoDelete),
+                             ::testing::Values(TabletCopyMode::REMOTE, 
TabletCopyMode::LOCAL)));
 
 void TabletCopyClientAbortTest::CreateTestBlocks(int num_blocks) {
   for (int i = 0; i < num_blocks; i++) {
@@ -498,7 +611,7 @@ void TabletCopyClientAbortTest::CreateTestBlocks(int 
num_blocks) {
 // Abort() or implicitly by destroying the TabletCopyClient instance before
 // calling Finish(). Also ensure that no data loss occurs.
 TEST_P(TabletCopyClientAbortTest, TestAbort) {
-  tuple<DownloadBlocks, DeleteTrigger> param = GetParam();
+  tuple<DownloadBlocks, DeleteTrigger, TabletCopyMode> param = GetParam();
   DownloadBlocks download_blocks = std::get<0>(param);
   DeleteTrigger trigger = std::get<1>(param);
 
diff --git a/src/kudu/tserver/tablet_copy_client.cc 
b/src/kudu/tserver/tablet_copy_client.cc
index f8fd97834..2edfbfc8f 100644
--- a/src/kudu/tserver/tablet_copy_client.cc
+++ b/src/kudu/tserver/tablet_copy_client.cc
@@ -36,6 +36,8 @@
 #include "kudu/consensus/consensus_meta.h"
 #include "kudu/consensus/consensus_meta_manager.h"
 #include "kudu/consensus/log.h"
+#include "kudu/consensus/log.pb.h"
+#include "kudu/consensus/log_util.h"
 #include "kudu/consensus/metadata.pb.h"
 #include "kudu/consensus/opid.pb.h"
 #include "kudu/consensus/opid_util.h"
@@ -45,6 +47,7 @@
 #include "kudu/fs/fs.pb.h"
 #include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/walltime.h"
 #include "kudu/rpc/rpc_controller.h"
@@ -54,6 +57,7 @@
 #include "kudu/tablet/tablet_replica.h"
 #include "kudu/tserver/tablet_copy.pb.h"
 #include "kudu/tserver/tablet_copy.proxy.h"
+#include "kudu/tserver/tablet_copy_service.h"
 #include "kudu/tserver/ts_tablet_manager.h"
 #include "kudu/util/crc.h"
 #include "kudu/util/env.h"
@@ -163,7 +167,6 @@ using consensus::ConsensusMetadata;
 using consensus::ConsensusMetadataManager;
 using consensus::MakeOpId;
 using consensus::OpId;
-using env_util::CopyFile;
 using fs::BlockManager;
 using fs::CreateBlockOptions;
 using fs::WritableBlock;
@@ -188,13 +191,13 @@ TabletCopyClientMetrics::TabletCopyClientMetrics(const 
scoped_refptr<MetricEntit
 }
 
 TabletCopyClient::TabletCopyClient(
-    std::string tablet_id,
-    FsManager* fs_manager,
+    string tablet_id,
+    FsManager* dst_fs_manager,
     scoped_refptr<ConsensusMetadataManager> cmeta_manager,
     shared_ptr<Messenger> messenger,
-    TabletCopyClientMetrics* tablet_copy_metrics)
+    TabletCopyClientMetrics* dst_tablet_copy_metrics)
     : tablet_id_(std::move(tablet_id)),
-      fs_manager_(fs_manager),
+      dst_fs_manager_(dst_fs_manager),
       cmeta_manager_(std::move(cmeta_manager)),
       messenger_(std::move(messenger)),
       state_(kInitialized),
@@ -202,27 +205,24 @@ TabletCopyClient::TabletCopyClient(
       session_idle_timeout_millis_(FLAGS_tablet_copy_begin_session_timeout_ms),
       start_time_micros_(0),
       rng_(GetRandomSeed32()),
-      tablet_copy_metrics_(tablet_copy_metrics) {
-  BlockManager* bm = fs_manager->block_manager();
+      dst_tablet_copy_metrics_(dst_tablet_copy_metrics) {
+  BlockManager* bm = dst_fs_manager->block_manager();
   transaction_ = bm->NewCreationTransaction();
-  if (tablet_copy_metrics_) {
-    tablet_copy_metrics_->open_client_sessions->Increment();
+  if (dst_tablet_copy_metrics_) {
+    dst_tablet_copy_metrics_->open_client_sessions->Increment();
   }
-  ThreadPoolBuilder("tablet-download-pool-" + tablet_id)
-    .set_max_threads(FLAGS_tablet_copy_download_threads_nums_per_session)
-    .set_min_threads(FLAGS_tablet_copy_download_threads_nums_per_session)
-    .Build(&tablet_download_pool_);
+  CHECK_OK(ThreadPoolBuilder("tablet-download-pool-" + tablet_id)
+               
.set_max_threads(FLAGS_tablet_copy_download_threads_nums_per_session)
+               
.set_min_threads(FLAGS_tablet_copy_download_threads_nums_per_session)
+               .Build(&tablet_download_pool_));
 }
 
 TabletCopyClient::~TabletCopyClient() {
-  // Note: Ending the tablet copy session releases anchors on the remote.
-  WARN_NOT_OK(EndRemoteSession(), Substitute("$0Unable to close tablet copy 
session",
-                                             LogPrefix()));
   WARN_NOT_OK(Abort(), Substitute("$0Failed to fully clean up tablet after 
aborted copy",
                                   LogPrefix()));
   tablet_download_pool_->Shutdown();
-  if (tablet_copy_metrics_) {
-    tablet_copy_metrics_->open_client_sessions->IncrementBy(-1);
+  if (dst_tablet_copy_metrics_) {
+    dst_tablet_copy_metrics_->open_client_sessions->IncrementBy(-1);
   }
 }
 
@@ -272,8 +272,8 @@ Status TabletCopyClient::SetTabletToReplace(const 
scoped_refptr<TabletMetadata>&
   return Status::OK();
 }
 
-Status TabletCopyClient::Start(const HostPort& copy_source_addr,
-                               scoped_refptr<TabletMetadata>* meta) {
+Status RemoteTabletCopyClient::Start(const HostPort& copy_source_addr,
+                                     scoped_refptr<TabletMetadata>* meta) {
   CHECK_EQ(kInitialized, state_);
   start_time_micros_ = GetCurrentTimeMicros();
 
@@ -292,7 +292,7 @@ Status TabletCopyClient::Start(const HostPort& 
copy_source_addr,
   proxy_.reset(new TabletCopyServiceProxy(messenger_, addr, 
copy_source_addr.host()));
 
   BeginTabletCopySessionRequestPB req;
-  req.set_requestor_uuid(fs_manager_->uuid());
+  req.set_requestor_uuid(dst_fs_manager_->uuid());
   req.set_tablet_id(tablet_id_);
 
   rpc::RpcController controller;
@@ -307,10 +307,11 @@ Status TabletCopyClient::Start(const HostPort& 
copy_source_addr,
 
   string copy_peer_uuid = resp.has_responder_uuid()
       ? resp.responder_uuid() : "(unknown uuid)";
-  if (resp.superblock().tablet_data_state() != tablet::TABLET_DATA_READY) {
-    Status s = Status::IllegalState("Remote peer (" + copy_peer_uuid + ")" +
-                                    " is currently copying itself!",
-                                    
pb_util::SecureShortDebugString(resp.superblock()));
+  TabletDataState data_state = resp.superblock().tablet_data_state();
+  if (data_state != tablet::TABLET_DATA_READY) {
+    Status s = Status::IllegalState(
+        Substitute("Remote peer ($0) is not ready itself! state: $1",
+                   copy_peer_uuid, TabletDataState_Name(data_state)));
     LOG_WITH_PREFIX(WARNING) << s.ToString();
     return s;
   }
@@ -347,6 +348,25 @@ Status TabletCopyClient::Start(const HostPort& 
copy_source_addr,
   RETURN_NOT_OK_PREPEND(SchemaFromPB(superblock_->schema(), &schema),
                         "Cannot deserialize schema from remote superblock");
 
+  RETURN_NOT_OK_PREPEND(InitTabletMeta(schema, copy_peer_uuid),
+                        "Init tablet metadata failed");
+
+  RETURN_NOT_OK(dst_fs_manager_->dd_manager()->GetDataDirGroupPB(
+      tablet_id_, superblock_->mutable_data_dir_group()));
+
+  // Create the ConsensusMetadata before returning from Start() so that it's
+  // possible to vote while we are copying the replica for the first time.
+  RETURN_NOT_OK(WriteConsensusMetadata());
+  TRACE("Wrote new consensus metadata");
+
+  state_ = kStarted;
+  if (meta) {
+    *meta = meta_;
+  }
+  return Status::OK();
+}
+
+Status TabletCopyClient::InitTabletMeta(const Schema& schema, const string& 
copy_peer_uuid) {
   if (replace_tombstoned_tablet_) {
     // Also validate the term of the source peer, in case they are
     // different. This is a sanity check that protects us in case a bug or
@@ -384,8 +404,8 @@ Status TabletCopyClient::Start(const HostPort& 
copy_source_addr,
         "Could not replace superblock with COPYING data state");
     TRACE("Replaced tombstoned tablet metadata.");
 
-    
RETURN_NOT_OK_PREPEND(fs_manager_->dd_manager()->CreateDataDirGroup(tablet_id_),
-        "Could not create a new directory group for tablet copy");
+    
RETURN_NOT_OK_PREPEND(dst_fs_manager_->dd_manager()->CreateDataDirGroup(tablet_id_),
+                          "Could not create a new directory group for tablet 
copy");
   } else {
     // HACK: Set the initial tombstoned last-logged OpId to 1.0 when copying a
     // replica for the first time, so that if the tablet copy fails, the
@@ -399,7 +419,8 @@ Status TabletCopyClient::Start(const HostPort& 
copy_source_addr,
                                           schema, &partition_schema));
 
     // Create the superblock on disk.
-    RETURN_NOT_OK(TabletMetadata::CreateNew(fs_manager_, tablet_id_,
+    RETURN_NOT_OK(TabletMetadata::CreateNew(
+        dst_fs_manager_, tablet_id_,
         superblock_->table_name(),
         superblock_->table_id(),
         schema,
@@ -421,18 +442,7 @@ Status TabletCopyClient::Start(const HostPort& 
copy_source_addr,
     // machine so we know there is state to clean up in case we fail.
     state_ = kStarting;
   }
-  CHECK_OK(fs_manager_->dd_manager()->GetDataDirGroupPB(
-      tablet_id_, superblock_->mutable_data_dir_group()));
-
-  // Create the ConsensusMetadata before returning from Start() so that it's
-  // possible to vote while we are copying the replica for the first time.
-  RETURN_NOT_OK(WriteConsensusMetadata());
-  TRACE("Wrote new consensus metadata");
 
-  state_ = kStarted;
-  if (meta) {
-    *meta = meta_;
-  }
   return Status::OK();
 }
 
@@ -486,11 +496,11 @@ Status TabletCopyClient::Finish() {
   RETURN_NOT_OK(meta_->ReplaceSuperBlock(*superblock_));
 
   if (FLAGS_tablet_copy_save_downloaded_metadata) {
-    string meta_path = fs_manager_->GetTabletMetadataPath(tablet_id_);
+    string meta_path = dst_fs_manager_->GetTabletMetadataPath(tablet_id_);
     string meta_copy_path = Substitute("$0.copy.$1$2", meta_path, 
start_time_micros_, kTmpInfix);
     WritableFileOptions opts;
     opts.is_sensitive = true;
-    RETURN_NOT_OK_PREPEND(CopyFile(Env::Default(), meta_path, meta_copy_path, 
opts),
+    RETURN_NOT_OK_PREPEND(env_util::CopyFile(Env::Default(), meta_path, 
meta_copy_path, opts),
                           "Unable to make copy of tablet metadata");
   }
 
@@ -583,10 +593,10 @@ Status TabletCopyClient::DownloadWALs() {
 
   // Delete and recreate WAL dir if it already exists, to ensure stray files 
are
   // not kept from previous copies and runs.
-  RETURN_NOT_OK(log::Log::DeleteOnDiskData(fs_manager_, tablet_id_));
-  string path = fs_manager_->GetTabletWalDir(tablet_id_);
-  RETURN_NOT_OK(fs_manager_->env()->CreateDir(path));
-  RETURN_NOT_OK(fs_manager_->env()->SyncDir(DirName(path))); // fsync() parent 
dir.
+  RETURN_NOT_OK(log::Log::DeleteOnDiskData(dst_fs_manager_, tablet_id_));
+  string path = dst_fs_manager_->GetTabletWalDir(tablet_id_);
+  RETURN_NOT_OK(dst_fs_manager_->env()->CreateDir(path));
+  RETURN_NOT_OK(dst_fs_manager_->env()->SyncDir(DirName(path))); // fsync() 
parent dir.
 
   // Download the WAL segments.
   const int num_segments = wal_seqnos_.size();
@@ -763,17 +773,18 @@ Status TabletCopyClient::DownloadWAL(uint64_t 
wal_segment_seqno) {
   DataIdPB data_id;
   data_id.set_type(DataIdPB::LOG_SEGMENT);
   data_id.set_wal_segment_seqno(wal_segment_seqno);
-  string dest_path = fs_manager_->GetWalSegmentFileName(tablet_id_, 
wal_segment_seqno);
+  string dest_path = dst_fs_manager_->GetWalSegmentFileName(tablet_id_, 
wal_segment_seqno);
 
   WritableFileOptions opts;
   opts.sync_on_close = true;
   opts.is_sensitive = true;
   unique_ptr<WritableFile> writer;
-  RETURN_NOT_OK_PREPEND(fs_manager_->env()->NewWritableFile(opts, dest_path, 
&writer),
+  RETURN_NOT_OK_PREPEND(dst_fs_manager_->env()->NewWritableFile(opts, 
dest_path, &writer),
                         "Unable to open file for writing");
-  RETURN_NOT_OK_PREPEND(DownloadFile(data_id, writer.get()),
+  RETURN_NOT_OK_PREPEND(TransferFile(data_id, writer.get()),
                         Substitute("Unable to download WAL segment with seq. 
number $0",
                                    wal_segment_seqno));
+
   return Status::OK();
 }
 
@@ -793,11 +804,11 @@ Status TabletCopyClient::WriteConsensusMetadata() {
   RETURN_NOT_OK(cmeta_->Flush());
 
   if (FLAGS_tablet_copy_save_downloaded_metadata) {
-    string cmeta_path = fs_manager_->GetConsensusMetadataPath(tablet_id_);
+    string cmeta_path = dst_fs_manager_->GetConsensusMetadataPath(tablet_id_);
     string cmeta_copy_path = Substitute("$0.copy.$1$2", cmeta_path, 
start_time_micros_, kTmpInfix);
     WritableFileOptions opts;
     opts.is_sensitive = true;
-    RETURN_NOT_OK_PREPEND(CopyFile(Env::Default(), cmeta_path, 
cmeta_copy_path, opts),
+    RETURN_NOT_OK_PREPEND(env_util::CopyFile(Env::Default(), cmeta_path, 
cmeta_copy_path, opts),
                           "Unable to make copy of consensus metadata");
   }
 
@@ -849,13 +860,13 @@ Status TabletCopyClient::DownloadBlock(const BlockId& 
old_block_id,
 
   unique_ptr<WritableBlock> block;
   // log_block_manager uses a lock to guarantee the block_id is unique.
-  RETURN_NOT_OK_PREPEND(fs_manager_->CreateNewBlock(CreateBlockOptions({ 
tablet_id_ }), &block),
+  RETURN_NOT_OK_PREPEND(dst_fs_manager_->CreateNewBlock(CreateBlockOptions({ 
tablet_id_ }), &block),
                         "Unable to create new block");
 
   DataIdPB data_id;
   data_id.set_type(DataIdPB::BLOCK);
   old_block_id.CopyToPB(data_id.mutable_block_id());
-  RETURN_NOT_OK_PREPEND(DownloadFile(data_id, block.get()),
+  RETURN_NOT_OK_PREPEND(TransferFile(data_id, block.get()),
                         Substitute("Unable to download block $0",
                                    old_block_id.ToString()));
 
@@ -869,8 +880,8 @@ Status TabletCopyClient::DownloadBlock(const BlockId& 
old_block_id,
 }
 
 template<class Appendable>
-Status TabletCopyClient::DownloadFile(const DataIdPB& data_id,
-                                      Appendable* appendable) {
+Status RemoteTabletCopyClient::DownloadFile(const DataIdPB& data_id,
+                                            Appendable* appendable) {
   uint64_t offset = 0;
   rpc::RpcController controller;
   
controller.set_timeout(MonoDelta::FromMilliseconds(session_idle_timeout_millis_));
@@ -886,7 +897,7 @@ Status TabletCopyClient::DownloadFile(const DataIdPB& 
data_id,
     // Request the next data chunk.
     FetchDataResponsePB resp;
     RETURN_NOT_OK_PREPEND(SendRpcWithRetry(&controller, [&] {
-          return proxy_->FetchData(req, &resp, &controller);
+        return proxy_->FetchData(req, &resp, &controller);
     }), "unable to fetch data from remote");
 
     // Sanity-check for corruption.
@@ -906,8 +917,8 @@ Status TabletCopyClient::DownloadFile(const DataIdPB& 
data_id,
     auto chunk_size = resp.chunk().data().size();
     done = offset + chunk_size == resp.chunk().total_data_length();
     offset += chunk_size;
-    if (tablet_copy_metrics_) {
-      tablet_copy_metrics_->bytes_fetched->IncrementBy(chunk_size);
+    if (dst_tablet_copy_metrics_) {
+      dst_tablet_copy_metrics_->bytes_fetched->IncrementBy(chunk_size);
     }
   }
 
@@ -939,11 +950,11 @@ Status TabletCopyClient::VerifyData(uint64_t offset, 
const DataChunkPB& chunk) {
 
 string TabletCopyClient::LogPrefix() {
   return Substitute("T $0 P $1: tablet copy: ",
-                    tablet_id_, fs_manager_->uuid());
+                    tablet_id_, dst_fs_manager_->uuid());
 }
 
 Status TabletCopyClient::CheckHealthyDirGroup() const {
-  if (fs_manager_->dd_manager()->IsTabletInFailedDir(tablet_id_)) {
+  if (dst_fs_manager_->dd_manager()->IsTabletInFailedDir(tablet_id_)) {
     return Status::IOError(
         Substitute("Tablet $0 is in a failed directory", tablet_id_));
   }
@@ -979,5 +990,172 @@ Status 
TabletCopyClient::SendRpcWithRetry(rpc::RpcController* controller, F f) {
   }
 }
 
+RemoteTabletCopyClient::RemoteTabletCopyClient(
+    string tablet_id,
+    FsManager* dst_fs_manager,
+    scoped_refptr<ConsensusMetadataManager> cmeta_manager,
+    shared_ptr<Messenger> messenger,
+    TabletCopyClientMetrics* dst_tablet_copy_metrics)
+    : TabletCopyClient(std::move(tablet_id),
+                       dst_fs_manager,
+                       std::move(cmeta_manager),
+                       std::move(messenger),
+                       dst_tablet_copy_metrics) {
+}
+
+RemoteTabletCopyClient::~RemoteTabletCopyClient() {
+  // Note: Ending the tablet copy session releases anchors on the remote.
+  WARN_NOT_OK(EndRemoteSession(),
+              Substitute("$0Unable to close tablet copy session", 
LogPrefix()));
+}
+
+Status RemoteTabletCopyClient::TransferFile(const DataIdPB& data_id, 
WritableBlock* appendable) {
+  return DownloadFile(data_id, appendable);
+}
+
+Status RemoteTabletCopyClient::TransferFile(const DataIdPB& data_id, 
WritableFile* appendable) {
+  return DownloadFile(data_id, appendable);
+}
+
+LocalTabletCopyClient::LocalTabletCopyClient(
+    string tablet_id,
+    FsManager* dst_fs_manager,
+    scoped_refptr<ConsensusMetadataManager> cmeta_manager,
+    shared_ptr<Messenger> messenger,
+    TabletCopyClientMetrics* dst_tablet_copy_metrics,
+    FsManager* src_fs_manager,
+    TabletCopySourceMetrics* src_tablet_copy_metrics)
+    : TabletCopyClient(std::move(tablet_id),
+                       dst_fs_manager,
+                       std::move(cmeta_manager),
+                       std::move(messenger),
+                       dst_tablet_copy_metrics),
+      src_fs_manager_(src_fs_manager),
+      src_tablet_copy_metrics_(src_tablet_copy_metrics) {
+}
+
+Status LocalTabletCopyClient::Start(const string& tablet_id,
+                                    scoped_refptr<TabletMetadata>* meta) {
+  CHECK_EQ(kInitialized, state_);
+  start_time_micros_ = GetCurrentTimeMicros();
+
+  LOG_WITH_PREFIX(INFO) <<
+      Substitute("Beginning tablet copy from WAL: $0, DATA: $1",
+                 src_fs_manager_->GetWalsRootDir(),
+                 JoinMapped(src_fs_manager_->GetDataRootDirs(),
+                     [](const string& dir) { return dir; }, ","));
+
+  TRACE("Tablet copy session begun");
+
+  // Create LocalTabletCopySourceSession.
+  session_.reset(new LocalTabletCopySourceSession(
+      tablet_id, src_fs_manager_, src_tablet_copy_metrics_));
+  RETURN_NOT_OK(session_->Init());
+  TabletDataState data_state = 
session_->tablet_superblock().tablet_data_state();
+  if (data_state != tablet::TABLET_DATA_READY) {
+    Status s = Status::IllegalState(Substitute("Source tablet is not ready 
itself! state: $0",
+                                               
TabletDataState_Name(data_state)));
+    LOG_WITH_PREFIX(WARNING) << s.ToString();
+    return s;
+  }
+
+  session_id_ = tablet_id;
+  session_idle_timeout_millis_ = 0;
+
+  // Store a copy of the source superblock.
+  remote_superblock_.reset(new 
TabletSuperBlockPB(session_->tablet_superblock()));
+
+  // Make a copy of the source superblock. We first clear out the blocks
+  // from this structure and then add them back in as they are downloaded.
+  superblock_.reset(new TabletSuperBlockPB(*remote_superblock_));
+
+  // The block ids (in active rowsets as well as from orphaned blocks) on the
+  // source have no meaning to us and could cause data loss if accidentally
+  // deleted locally. We must clear them all.
+  superblock_->clear_rowsets();
+  superblock_->clear_orphaned_blocks();
+
+  // The UUIDs within the DataDirGroupPB on the source are also unique to the
+  // source and have no meaning to us.
+  superblock_->clear_data_dir_group();
+
+  // Set the data state to COPYING to indicate that, on crash, this replica
+  // should be discarded.
+  superblock_->set_tablet_data_state(tablet::TABLET_DATA_COPYING);
+
+  wal_seqnos_.clear();
+  for (const auto& segment : session_->log_segments()) {
+    wal_seqnos_.push_back(segment->header().sequence_number());
+  }
+
+  remote_cstate_.reset(new 
consensus::ConsensusStatePB(session_->initial_cstate()));
+
+  Schema schema;
+  RETURN_NOT_OK_PREPEND(SchemaFromPB(superblock_->schema(), &schema),
+                        "Cannot deserialize schema from source superblock");
+
+  RETURN_NOT_OK_PREPEND(InitTabletMeta(schema, src_fs_manager_->uuid()),
+                        "Init tablet metadata failed");
+
+  RETURN_NOT_OK(dst_fs_manager_->dd_manager()->GetDataDirGroupPB(
+      tablet_id_, superblock_->mutable_data_dir_group()));
+
+  // Create the ConsensusMetadata before returning from Start() so that it's
+  // possible to vote while we are copying the replica for the first time.
+  RETURN_NOT_OK(WriteConsensusMetadata());
+  TRACE("Wrote new consensus metadata");
+
+  state_ = kStarted;
+  if (meta) {
+    *meta = meta_;
+  }
+  return Status::OK();
+}
+
+Status LocalTabletCopyClient::TransferFile(const DataIdPB& data_id, 
WritableBlock* appendable) {
+  return CopyFile(data_id, appendable);
+}
+
+Status LocalTabletCopyClient::TransferFile(const DataIdPB& data_id, 
WritableFile* appendable) {
+  return CopyFile(data_id, appendable);
+}
+
+template<class Appendable>
+Status LocalTabletCopyClient::CopyFile(const DataIdPB& data_id, Appendable* 
appendable) {
+  uint64_t offset = 0;
+  int64_t client_maxlen = FLAGS_tablet_copy_transfer_chunk_size_bytes;
+
+  TabletCopyErrorPB::Code error_code = TabletCopyErrorPB::UNKNOWN_ERROR;
+  RETURN_NOT_OK(TabletCopyServiceImpl::ValidateFetchRequestDataId(data_id, 
&error_code));
+  int64_t total_data_length = 0;
+  bool done = false;
+  while (!done) {
+    string data;
+    if (data_id.type() == DataIdPB::BLOCK) {
+      // Fetching a data block chunk.
+      const BlockId& block_id = BlockId::FromPB(data_id.block_id());
+      RETURN_NOT_OK(session_->GetBlockPiece(
+          block_id, offset, client_maxlen, &data, &total_data_length, 
&error_code));
+    } else {
+      // Fetching a log segment chunk.
+      uint64_t segment_seqno = data_id.wal_segment_seqno();
+      RETURN_NOT_OK(session_->GetLogSegmentPiece(
+          segment_seqno, offset, client_maxlen, &data, &total_data_length, 
&error_code));
+    }
+
+    // Write the data.
+    RETURN_NOT_OK(appendable->Append(data));
+
+    auto chunk_size = data.size();
+    offset += chunk_size;
+    done = (offset == total_data_length);
+    if (dst_tablet_copy_metrics_) {
+      dst_tablet_copy_metrics_->bytes_fetched->IncrementBy(chunk_size);
+    }
+  }
+
+  return Status::OK();
+}
+
 } // namespace tserver
 } // namespace kudu
diff --git a/src/kudu/tserver/tablet_copy_client.h 
b/src/kudu/tserver/tablet_copy_client.h
index 3452d6ee0..127334cba 100644
--- a/src/kudu/tserver/tablet_copy_client.h
+++ b/src/kudu/tserver/tablet_copy_client.h
@@ -26,6 +26,7 @@
 
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/tserver/tablet_copy_source_session.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/random.h"
@@ -37,7 +38,9 @@ class BlockId;
 class BlockIdPB;
 class FsManager;
 class HostPort;
+class Schema;
 class ThreadPool;
+class WritableFile;
 
 namespace consensus {
 class ConsensusMetadata;
@@ -47,6 +50,7 @@ class ConsensusStatePB;
 
 namespace fs {
 class BlockCreationTransaction;
+class WritableBlock;
 } // namespace fs
 
 namespace rpc {
@@ -79,19 +83,9 @@ struct TabletCopyClientMetrics {
 //
 class TabletCopyClient {
  public:
-
-  // Construct the tablet copy client.
-  //
-  // Objects behind raw pointers must remain valid until this object is 
destroyed.
-  TabletCopyClient(std::string tablet_id,
-                   FsManager* fs_manager,
-                   scoped_refptr<consensus::ConsensusMetadataManager> 
cmeta_manager,
-                   std::shared_ptr<rpc::Messenger> messenger,
-                   TabletCopyClientMetrics* tablet_copy_metrics);
-
   // Attempt to clean up resources on the remote end by sending an
   // EndTabletCopySession() RPC
-  ~TabletCopyClient();
+  virtual ~TabletCopyClient();
 
   // Pass in the existing metadata for a tombstoned tablet, which will be
   // replaced if validation checks pass in Start().
@@ -112,8 +106,16 @@ class TabletCopyClient {
   //
   // Upon success, tablet metadata will be created and the tablet will be
   // assigned to a data directory group.
-  Status Start(const HostPort& copy_source_addr,
-               scoped_refptr<tablet::TabletMetadata>* meta);
+  virtual Status Start(const HostPort& /*copy_source_addr*/,
+                       scoped_refptr<tablet::TabletMetadata>* /*meta*/) {
+    return Status::NotSupported("copy from remote peer not supported");
+  }
+
+  // Similar to above, but copy from local filesystem.
+  virtual Status Start(const std::string& /*tablet_id*/,
+                       scoped_refptr<tablet::TabletMetadata>* /*meta*/) {
+    return Status::NotSupported("copy from local filesystem not supported");
+  }
 
   // Runs a "full" tablet copy, copying the physical layout of a tablet
   // from the leader of the specified consensus configuration.
@@ -130,18 +132,27 @@ class TabletCopyClient {
   // WALs downloaded so far. Does nothing if called after Finish().
   Status Abort();
 
- private:
-  FRIEND_TEST(TabletCopyClientTest, TestNoBlocksAtStart);
-  FRIEND_TEST(TabletCopyClientTest, TestBeginEndSession);
-  FRIEND_TEST(TabletCopyClientTest, TestDownloadBlock);
-  FRIEND_TEST(TabletCopyClientTest, TestLifeCycle);
-  FRIEND_TEST(TabletCopyClientTest, TestVerifyData);
-  FRIEND_TEST(TabletCopyClientTest, TestDownloadBlockMayFail);
-  FRIEND_TEST(TabletCopyClientTest, TestDownloadWalMayFail);
-  FRIEND_TEST(TabletCopyClientTest, TestDownloadWalSegment);
-  FRIEND_TEST(TabletCopyClientTest, TestDownloadAllBlocks);
+ protected:
+  FRIEND_TEST(TabletCopyClientBasicTest, TestNoBlocksAtStart);
+  FRIEND_TEST(TabletCopyClientBasicTest, TestBeginEndSession);
+  FRIEND_TEST(TabletCopyClientBasicTest, TestDownloadBlock);
+  FRIEND_TEST(TabletCopyClientBasicTest, TestLifeCycle);
+  FRIEND_TEST(TabletCopyClientBasicTest, TestVerifyData);
+  FRIEND_TEST(TabletCopyClientBasicTest, TestDownloadBlockMayFail);
+  FRIEND_TEST(TabletCopyClientBasicTest, TestDownloadWalMayFail);
+  FRIEND_TEST(TabletCopyClientBasicTest, TestDownloadWalSegment);
+  FRIEND_TEST(TabletCopyClientBasicTest, TestDownloadAllBlocks);
   FRIEND_TEST(TabletCopyClientAbortTest, TestAbort);
 
+  // Construct the tablet copy client.
+  //
+  // Objects behind raw pointers must remain valid until this object is 
destroyed.
+  TabletCopyClient(std::string tablet_id,
+                   FsManager* dst_fs_manager,
+                   scoped_refptr<consensus::ConsensusMetadataManager> 
cmeta_manager,
+                   std::shared_ptr<rpc::Messenger> messenger,
+                   TabletCopyClientMetrics* dst_tablet_copy_metrics);
+
   // State machine that guides the progression of a single tablet copy.
   // A tablet copy will go through the states:
   //
@@ -181,6 +192,8 @@ class TabletCopyClient {
   // End the tablet copy session.
   Status EndRemoteSession();
 
+  Status InitTabletMeta(const Schema& schema, const std::string& 
copy_peer_uuid);
+
   // Download all WAL files in parallel.
   Status DownloadWALs();
 
@@ -244,15 +257,8 @@ class TabletCopyClient {
   Status DownloadBlock(const BlockId& old_block_id,
                        BlockId* new_block_id);
 
-  // Download a single remote file. The block and WAL implementations delegate
-  // to this method when downloading files.
-  //
-  // An Appendable is typically a WritableBlock (block) or WritableFile (WAL).
-  //
-  // Only used in one compilation unit, otherwise the implementation would
-  // need to be in the header.
-  template<class Appendable>
-  Status DownloadFile(const DataIdPB& data_id, Appendable* appendable);
+  virtual Status TransferFile(const DataIdPB& data_id, fs::WritableBlock* 
appendable) = 0;
+  virtual Status TransferFile(const DataIdPB& data_id, WritableFile* 
appendable) = 0;
 
   Status VerifyData(uint64_t offset, const DataChunkPB& chunk);
 
@@ -266,7 +272,7 @@ class TabletCopyClient {
 
   // Set-once members.
   const std::string tablet_id_;
-  FsManager* const fs_manager_;
+  FsManager* const dst_fs_manager_;
   const scoped_refptr<consensus::ConsensusMetadataManager> cmeta_manager_;
   const std::shared_ptr<rpc::Messenger> messenger_;
 
@@ -296,7 +302,7 @@ class TabletCopyClient {
 
   Random rng_;
 
-  TabletCopyClientMetrics* tablet_copy_metrics_;
+  TabletCopyClientMetrics* dst_tablet_copy_metrics_;
 
   // Block transaction for the tablet copy.
   std::unique_ptr<fs::BlockCreationTransaction> transaction_;
@@ -311,5 +317,61 @@ class TabletCopyClient {
   DISALLOW_COPY_AND_ASSIGN(TabletCopyClient);
 };
 
+class RemoteTabletCopyClient : public TabletCopyClient {
+ public:
+  RemoteTabletCopyClient(
+      std::string tablet_id,
+      FsManager* dst_fs_manager,
+      scoped_refptr<consensus::ConsensusMetadataManager> cmeta_manager,
+      std::shared_ptr<rpc::Messenger> messenger,
+      TabletCopyClientMetrics* dst_tablet_copy_metrics);
+
+  ~RemoteTabletCopyClient() override;
+
+  Status Start(const HostPort& copy_source_addr,
+               scoped_refptr<tablet::TabletMetadata>* meta) override;
+
+ private:
+  Status TransferFile(const DataIdPB& data_id, fs::WritableBlock* appendable) 
override;
+  Status TransferFile(const DataIdPB& data_id, WritableFile* appendable) 
override;
+
+  // Download a single remote file. The block and WAL implementations delegate
+  // to this method when downloading files.
+  //
+  // An Appendable is typically a fs::WritableBlock (block) or WritableFile 
(WAL).
+  //
+  // Only used in one compilation unit, otherwise the implementation would
+  // need to be in the header.
+  template<class Appendable>
+  Status DownloadFile(const DataIdPB& data_id, Appendable* appendable);
+};
+
+class LocalTabletCopyClient : public TabletCopyClient {
+ public:
+  LocalTabletCopyClient(
+      std::string tablet_id,
+      FsManager* dst_fs_manager,
+      scoped_refptr<consensus::ConsensusMetadataManager> cmeta_manager,
+      std::shared_ptr<rpc::Messenger> messenger,
+      TabletCopyClientMetrics* dst_tablet_copy_metrics,
+      FsManager* src_fs_manager,
+      TabletCopySourceMetrics* src_tablet_copy_metrics);
+
+  Status Start(const std::string& tablet_id,
+               scoped_refptr<tablet::TabletMetadata>* meta) override;
+
+ private:
+  Status TransferFile(const DataIdPB& data_id, fs::WritableBlock* appendable) 
override;
+  Status TransferFile(const DataIdPB& data_id, WritableFile* appendable) 
override;
+
+  // Similar to RemoteTabletCopyClient::DownloadFile, but copy file from local 
filesystem.
+  template<class Appendable>
+  Status CopyFile(const DataIdPB& data_id, Appendable* appendable);
+
+  FsManager* const src_fs_manager_;
+  TabletCopySourceMetrics* src_tablet_copy_metrics_;
+  scoped_refptr<TabletCopySourceSession> session_;
+};
+
 } // namespace tserver
 } // namespace kudu
diff --git a/src/kudu/tserver/tablet_copy_service.cc 
b/src/kudu/tserver/tablet_copy_service.cc
index 68fc2b249..540569bd2 100644
--- a/src/kudu/tserver/tablet_copy_service.cc
+++ b/src/kudu/tserver/tablet_copy_service.cc
@@ -116,7 +116,8 @@ TabletCopyServiceImpl::TabletCopyServiceImpl(
                           &session_expiration_thread_));
 }
 
-TabletCopyServiceImpl::SessionEntry::SessionEntry(scoped_refptr<TabletCopySourceSession>
 session_in)
+TabletCopyServiceImpl::SessionEntry::SessionEntry(
+    scoped_refptr<RemoteTabletCopySourceSession> session_in)
     : session(std::move(session_in)),
       last_accessed_time(MonoTime::Now()),
       
expire_timeout(MonoDelta::FromSeconds(FLAGS_tablet_copy_idle_timeout_sec)) {
@@ -149,7 +150,7 @@ void TabletCopyServiceImpl::BeginTabletCopySession(
                     Substitute("Unable to find specified tablet: $0", 
tablet_id),
                     context);
 
-  scoped_refptr<TabletCopySourceSession> session;
+  scoped_refptr<RemoteTabletCopySourceSession> session;
   bool new_session;
   {
     MutexLock l(sessions_lock_);
@@ -160,9 +161,9 @@ void TabletCopyServiceImpl::BeginTabletCopySession(
           "Beginning new tablet copy session on tablet $0 from peer $1"
           " at $2: session id = $3",
           tablet_id, requestor_uuid, context->requestor_string(), session_id);
-      session.reset(new TabletCopySourceSession(tablet_replica, session_id,
-                                                requestor_uuid, fs_manager_,
-                                                &tablet_copy_metrics_));
+      session.reset(new RemoteTabletCopySourceSession(tablet_replica, 
session_id,
+                                                      requestor_uuid, 
fs_manager_,
+                                                      &tablet_copy_metrics_));
       InsertOrDie(&sessions_, session_id, SessionEntry(session));
     } else {
       session = session_entry->session;
@@ -233,7 +234,7 @@ void TabletCopyServiceImpl::CheckSessionActive(
   const string& session_id = req->session_id();
 
   // Look up and validate tablet copy session.
-  scoped_refptr<TabletCopySourceSession> session;
+  scoped_refptr<RemoteTabletCopySourceSession> session;
   MutexLock l(sessions_lock_);
   TabletCopyErrorPB::Code app_error;
   Status status = FindSessionUnlocked(session_id, &app_error, &session);
@@ -261,10 +262,10 @@ void TabletCopyServiceImpl::FetchData(const 
FetchDataRequestPB* req,
   const string& session_id = req->session_id();
 
   // Look up and validate tablet copy session.
-  scoped_refptr<TabletCopySourceSession> session;
+  scoped_refptr<RemoteTabletCopySourceSession> session;
   {
     MutexLock l(sessions_lock_);
-    TabletCopyErrorPB::Code app_error;
+    TabletCopyErrorPB::Code app_error = TabletCopyErrorPB::UNKNOWN_ERROR;
     RPC_RETURN_NOT_OK(FindSessionUnlocked(session_id, &app_error, &session),
                       app_error, "No such session", context);
     ResetSessionExpirationUnlocked(session_id);
@@ -324,7 +325,7 @@ void TabletCopyServiceImpl::EndTabletCopySession(
         rpc::RpcContext* context) {
   {
     MutexLock l(sessions_lock_);
-    TabletCopyErrorPB::Code app_error;
+    TabletCopyErrorPB::Code app_error = TabletCopyErrorPB::UNKNOWN_ERROR;
     LOG_WITH_PREFIX(INFO) << "Request end of tablet copy session " << 
req->session_id()
                           << " received from " << context->requestor_string();
     RPC_RETURN_NOT_OK(DoEndTabletCopySessionUnlocked(req->session_id(), 
&app_error),
@@ -356,7 +357,7 @@ void TabletCopyServiceImpl::Shutdown() {
 Status TabletCopyServiceImpl::FindSessionUnlocked(
         const string& session_id,
         TabletCopyErrorPB::Code* app_error,
-        scoped_refptr<TabletCopySourceSession>* session) const {
+        scoped_refptr<RemoteTabletCopySourceSession>* session) const {
   const SessionEntry* session_entry = FindOrNull(sessions_, session_id);
   if (!session_entry) {
     *app_error = TabletCopyErrorPB::NO_SESSION;
@@ -408,7 +409,7 @@ Status 
TabletCopyServiceImpl::DoEndTabletCopySessionUnlocked(
         const std::string& session_id,
         TabletCopyErrorPB::Code* app_error) {
   sessions_lock_.AssertAcquired();
-  scoped_refptr<TabletCopySourceSession> session;
+  scoped_refptr<RemoteTabletCopySourceSession> session;
   RETURN_NOT_OK(FindSessionUnlocked(session_id, app_error, &session));
   // Remove the session from the map.
   // It will get destroyed once there are no outstanding refs.
diff --git a/src/kudu/tserver/tablet_copy_service.h 
b/src/kudu/tserver/tablet_copy_service.h
index 4882f7926..44ec596e2 100644
--- a/src/kudu/tserver/tablet_copy_service.h
+++ b/src/kudu/tserver/tablet_copy_service.h
@@ -80,11 +80,15 @@ class TabletCopyServiceImpl : public TabletCopyServiceIf {
 
   void Shutdown() override;
 
+  // Validate the data identifier in a FetchData request.
+  static Status ValidateFetchRequestDataId(const DataIdPB& data_id,
+                                           TabletCopyErrorPB::Code* app_error);
+
  private:
   struct SessionEntry {
-    explicit SessionEntry(scoped_refptr<TabletCopySourceSession> session_in);
+    explicit SessionEntry(scoped_refptr<RemoteTabletCopySourceSession> 
session_in);
 
-    scoped_refptr<TabletCopySourceSession> session;
+    scoped_refptr<RemoteTabletCopySourceSession> session;
     MonoTime last_accessed_time; // Time this session was last accessed.
     MonoDelta expire_timeout;
   };
@@ -94,11 +98,7 @@ class TabletCopyServiceImpl : public TabletCopyServiceIf {
   // Look up session in session map.
   Status FindSessionUnlocked(const std::string& session_id,
                              TabletCopyErrorPB::Code* app_error,
-                             scoped_refptr<TabletCopySourceSession>* session) 
const;
-
-  // Validate the data identifier in a FetchData request.
-  static Status ValidateFetchRequestDataId(const DataIdPB& data_id,
-                                           TabletCopyErrorPB::Code* app_error);
+                             scoped_refptr<RemoteTabletCopySourceSession>* 
session) const;
 
   // Take note of session activity; Re-update the session timeout deadline.
   void ResetSessionExpirationUnlocked(const std::string& session_id);
diff --git a/src/kudu/tserver/tablet_copy_source_session-test.cc 
b/src/kudu/tserver/tablet_copy_source_session-test.cc
index bf57e4549..03811dce0 100644
--- a/src/kudu/tserver/tablet_copy_source_session-test.cc
+++ b/src/kudu/tserver/tablet_copy_source_session-test.cc
@@ -106,7 +106,8 @@ using tablet::TabletReplica;
 using tablet::TabletSuperBlockPB;
 using tablet::WriteOpState;
 
-class TabletCopyTest : public KuduTabletTest {
+class TabletCopyTest : public KuduTabletTest,
+                       public ::testing::WithParamInterface<TabletCopyMode> {
  public:
   TabletCopyTest()
       : KuduTabletTest(Schema({ ColumnSchema("key", STRING),
@@ -231,8 +232,14 @@ class TabletCopyTest : public KuduTabletTest {
   }
 
   void InitSession() {
-    session_.reset(new TabletCopySourceSession(tablet_replica_.get(), 
"TestSession", "FakeUUID",
-                   fs_manager(), nullptr /* no metrics */));
+    if (GetParam() == TabletCopyMode::REMOTE) {
+      session_.reset(new RemoteTabletCopySourceSession(
+          tablet_replica_.get(), "TestSession", "FakeUUID",
+          fs_manager(), nullptr /* no metrics */));
+    } else {
+      session_.reset(new LocalTabletCopySourceSession(
+          tablet_replica_->tablet_id(), fs_manager(), nullptr /* no metrics 
*/));
+    }
     ASSERT_OK(session_->Init());
   }
 
@@ -276,9 +283,13 @@ class TabletCopyTest : public KuduTabletTest {
   scoped_refptr<TabletCopySourceSession> session_;
 };
 
+INSTANTIATE_TEST_SUITE_P(TabletCopyTestModes, TabletCopyTest,
+                         testing::Values(TabletCopyMode::REMOTE,
+                                         TabletCopyMode::LOCAL));
+
 // Ensure that the serialized SuperBlock included in the 
TabletCopySourceSession is
 // equal to the serialized live superblock (on a quiesced tablet).
-TEST_F(TabletCopyTest, TestSuperBlocksEqual) {
+TEST_P(TabletCopyTest, TestSuperBlocksEqual) {
   // Compare content of superblocks.
   faststring session_buf;
   faststring tablet_buf;
@@ -307,7 +318,7 @@ TEST_F(TabletCopyTest, TestSuperBlocksEqual) {
 
 // Test fetching all files from tablet server, ensure the checksums for each
 // chunk and the total file sizes match.
-TEST_F(TabletCopyTest, TestBlocksEqual) {
+TEST_P(TabletCopyTest, TestBlocksEqual) {
   TabletSuperBlockPB tablet_superblock;
   ASSERT_OK(tablet()->metadata()->ToSuperBlock(&tablet_superblock));
   for (int i = 0; i < tablet_superblock.rowsets_size(); i++) {
@@ -351,7 +362,7 @@ TEST_F(TabletCopyTest, TestBlocksEqual) {
 
 // Ensure that blocks are still readable through the open session even
 // after they've been deleted.
-TEST_F(TabletCopyTest, TestBlocksAreFetchableAfterBeingDeleted) {
+TEST_P(TabletCopyTest, TestBlocksAreFetchableAfterBeingDeleted) {
   TabletSuperBlockPB tablet_superblock;
   ASSERT_OK(tablet()->metadata()->ToSuperBlock(&tablet_superblock));
 
diff --git a/src/kudu/tserver/tablet_copy_source_session.cc 
b/src/kudu/tserver/tablet_copy_source_session.cc
index 999157da8..1db3069e0 100644
--- a/src/kudu/tserver/tablet_copy_source_session.cc
+++ b/src/kudu/tserver/tablet_copy_source_session.cc
@@ -26,8 +26,11 @@
 #include <gflags/gflags.h>
 
 #include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/consensus_meta.h"
+#include "kudu/consensus/consensus_meta_manager.h"
 #include "kudu/consensus/log.h"
 #include "kudu/consensus/log.pb.h"
+#include "kudu/consensus/log_index.h"
 #include "kudu/consensus/log_reader.h"
 #include "kudu/consensus/opid.pb.h"
 #include "kudu/consensus/opid_util.h"
@@ -95,12 +98,10 @@ TabletCopySourceMetrics::TabletCopySourceMetrics(const 
scoped_refptr<MetricEntit
 }
 
 TabletCopySourceSession::TabletCopySourceSession(
-    scoped_refptr<TabletReplica> tablet_replica, std::string session_id,
-    std::string requestor_uuid, FsManager* fs_manager,
+    string tablet_id,
+    FsManager* fs_manager,
     TabletCopySourceMetrics* tablet_copy_metrics)
-    : tablet_replica_(std::move(tablet_replica)),
-      session_id_(std::move(session_id)),
-      requestor_uuid_(std::move(requestor_uuid)),
+    : tablet_id_(std::move(tablet_id)),
       fs_manager_(fs_manager),
       blocks_deleter_(&blocks_),
       logs_deleter_(&logs_),
@@ -111,8 +112,6 @@ TabletCopySourceSession::TabletCopySourceSession(
 }
 
 TabletCopySourceSession::~TabletCopySourceSession() {
-  // No lock taken in the destructor, should only be 1 thread with access now.
-  CHECK_OK(UnregisterAnchorIfNeededUnlocked());
   if (tablet_copy_metrics_) {
     tablet_copy_metrics_->open_source_sessions->IncrementBy(-1);
   }
@@ -122,7 +121,7 @@ Status TabletCopySourceSession::Init() {
   return init_once_.Init(&TabletCopySourceSession::InitOnce, this);
 }
 
-Status TabletCopySourceSession::InitOnce() {
+Status RemoteTabletCopySourceSession::InitOnce() {
   // Inject latency during Init() for testing purposes.
   if (PREDICT_FALSE(FLAGS_tablet_copy_session_inject_latency_on_init_ms > 0)) {
     TRACE("Injecting $0ms of latency due to 
--tablet_copy_session_inject_latency_on_init_ms",
@@ -133,8 +132,6 @@ Status TabletCopySourceSession::InitOnce() {
   RETURN_NOT_OK(tablet_replica_->CheckRunning());
   RETURN_NOT_OK(CheckHealthyDirGroup());
 
-  const string& tablet_id = tablet_replica_->tablet_id();
-
   // Prevent log GC while we grab log segments and Tablet metadata.
   string anchor_owner_token = Substitute("TabletCopy-$0", session_id_);
   tablet_replica_->log_anchor_registry()->Register(
@@ -144,7 +141,7 @@ Status TabletCopySourceSession::InitOnce() {
   const scoped_refptr<TabletMetadata>& metadata = 
tablet_replica_->tablet_metadata();
   RETURN_NOT_OK_PREPEND(metadata->ReadSuperBlockFromDisk(&tablet_superblock_),
                         Substitute("Unable to access superblock for tablet $0",
-                                   tablet_id));
+                                   tablet_id_));
 
   // Anchor the data blocks by opening them and adding them to the cache.
   //
@@ -171,7 +168,7 @@ Status TabletCopySourceSession::InitOnce() {
     return Status::IllegalState(Substitute(
         "Unable to initialize tablet copy session for tablet $0. "
         "Log reader is not available. Tablet state: $1 ($2)",
-        tablet_id, tablet::TabletStatePB_Name(tablet_state), tablet_state));
+        tablet_id_, tablet::TabletStatePB_Name(tablet_state), tablet_state));
   }
   reader->GetSegmentsSnapshot(&log_segments_);
   for (const scoped_refptr<ReadableLogSegment>& segment : log_segments_) {
@@ -188,7 +185,7 @@ Status TabletCopySourceSession::InitOnce() {
     return Status::IllegalState(Substitute(
         "Unable to initialize tablet copy session for tablet $0. "
         "Raft Consensus is not available. Tablet state: $1 ($2)",
-        tablet_id, tablet::TabletStatePB_Name(tablet_state), tablet_state));
+        tablet_id_, tablet::TabletStatePB_Name(tablet_state), tablet_state));
   }
   RETURN_NOT_OK_PREPEND(consensus->ConsensusState(&initial_cstate_),
                         "Consensus state not available");
@@ -203,18 +200,10 @@ Status TabletCopySourceSession::InitOnce() {
 
   LOG(INFO) << Substitute(
       "T $0 P $1: Tablet Copy: opened $2 blocks and $3 log segments",
-      tablet_id, consensus->peer_uuid(), data_blocks.size(), 
log_segments_.size());
+      tablet_id_, consensus->peer_uuid(), data_blocks.size(), 
log_segments_.size());
   return Status::OK();
 }
 
-const std::string& TabletCopySourceSession::tablet_id() const {
-  return tablet_replica_->tablet_id();
-}
-
-const std::string& TabletCopySourceSession::requestor_uuid() const {
-  return requestor_uuid_;
-}
-
 // Determine the length of the data chunk to return to the client.
 static int64_t DetermineReadLength(int64_t bytes_remaining, int64_t 
requested_len) {
   // Overhead in the RPC for things like headers, protobuf data, etc.
@@ -323,13 +312,14 @@ Status TabletCopySourceSession::GetBlockPiece(const 
BlockId& block_id,
 
 Status TabletCopySourceSession::GetLogSegmentPiece(uint64_t segment_seqno,
                                                    uint64_t offset, int64_t 
client_maxlen,
-                                                   std::string* data, int64_t* 
log_file_size,
+                                                   string* data, int64_t* 
log_file_size,
                                                    TabletCopyErrorPB::Code* 
error_code) {
   DCHECK(init_once_.init_succeeded());
   RETURN_NOT_OK_PREPEND(CheckHealthyDirGroup(error_code),
                         "Tablet copy source could not get log segment");
-  ImmutableRWFileInfo* file_info;
+  ImmutableRWFileInfo* file_info = nullptr;
   RETURN_NOT_OK(FindLogSegment(segment_seqno, &file_info, error_code));
+  CHECK(file_info);
   const auto& kHeaderSize = file_info->readable->GetEncryptionHeaderSize();
   // To make sure unencrypted tservers can copy tablets from encrypted tservers
   // and vice versa (which is a valid scenario when encrypting an existing
@@ -371,12 +361,12 @@ static Status AddImmutableFileToMap(Collection* const 
cache,
 }
 
 Status TabletCopySourceSession::CheckHealthyDirGroup(TabletCopyErrorPB::Code* 
error_code) const {
-  if (fs_manager_->dd_manager()->IsTabletInFailedDir(tablet_id())) {
+  if (fs_manager_->dd_manager()->IsTabletInFailedDir(tablet_id_)) {
     if (error_code) {
       *error_code = TabletCopyErrorPB::IO_ERROR;
     }
     return Status::IOError(
-        Substitute("Tablet $0 is in a failed directory", tablet_id()));
+        Substitute("Tablet $0 is in a failed directory", tablet_id_));
   }
   return Status::OK();
 }
@@ -452,9 +442,85 @@ Status TabletCopySourceSession::FindLogSegment(uint64_t 
segment_seqno,
   return Status::OK();
 }
 
-Status TabletCopySourceSession::UnregisterAnchorIfNeededUnlocked() {
+RemoteTabletCopySourceSession::RemoteTabletCopySourceSession(
+    scoped_refptr<TabletReplica> tablet_replica,
+    string session_id,
+    string requestor_uuid,
+    FsManager* fs_manager,
+    TabletCopySourceMetrics* tablet_copy_metrics)
+    : TabletCopySourceSession(tablet_replica->tablet_id(),
+                              fs_manager,
+                              tablet_copy_metrics),
+      tablet_replica_(std::move(tablet_replica)),
+      session_id_(std::move(session_id)),
+      requestor_uuid_(std::move(requestor_uuid)) {
+}
+
+RemoteTabletCopySourceSession::~RemoteTabletCopySourceSession() {
+  // No lock taken in the destructor, should only be 1 thread with access now.
+  CHECK_OK(UnregisterAnchorIfNeededUnlocked());
+}
+
+Status RemoteTabletCopySourceSession::UnregisterAnchorIfNeededUnlocked() {
   return 
tablet_replica_->log_anchor_registry()->UnregisterIfAnchored(&log_anchor_);
 }
 
+LocalTabletCopySourceSession::LocalTabletCopySourceSession(
+    string tablet_id,
+    FsManager* fs_manager,
+    TabletCopySourceMetrics* tablet_copy_metrics)
+    : TabletCopySourceSession(std::move(tablet_id),
+                              fs_manager,
+                              tablet_copy_metrics) {
+}
+
+Status LocalTabletCopySourceSession::InitOnce() {
+  RETURN_NOT_OK(CheckHealthyDirGroup());
+
+  // Read the SuperBlock from disk.
+  string path = fs_manager_->GetTabletMetadataPath(tablet_id_);
+  RETURN_NOT_OK_PREPEND(
+      pb_util::ReadPBContainerFromPath(fs_manager_->env(), path, 
&tablet_superblock_,
+                                       pb_util::SENSITIVE),
+      Substitute("Unable to access superblock for tablet $0", tablet_id_));
+
+  // Open the data blocks and add them to the cache.
+  // NOTE: Since the server is stopped, it's not needed to anchor either
+  // data blocks or log segments.
+  //
+  // All subsequent requests should reuse the opened blocks.
+  vector<BlockIdPB> data_blocks =
+      TabletMetadata::CollectBlockIdPBs(tablet_superblock_);
+  for (const BlockIdPB& block_id : data_blocks) {
+    VLOG(1) << "Opening block " << pb_util::SecureDebugString(block_id);
+    RETURN_NOT_OK(OpenBlock(BlockId::FromPB(block_id)));
+  }
+
+  // Get the current segments from the log.
+  shared_ptr<log::LogReader> reader;
+  RETURN_NOT_OK(log::LogReader::Open(fs_manager_,
+                                     /*index*/nullptr,
+                                     tablet_id_,
+                                     /*metric_entity*/nullptr,
+                                     /*file_cache*/nullptr,
+                                     &reader));
+  reader->GetSegmentsSnapshot(&log_segments_);
+  for (const scoped_refptr<ReadableLogSegment>& segment : log_segments_) {
+    RETURN_NOT_OK(OpenLogSegment(segment->header().sequence_number()));
+  }
+
+  // Load the current consensus state.
+  scoped_refptr<consensus::ConsensusMetadataManager> cmeta_mgr(
+      new consensus::ConsensusMetadataManager(fs_manager_));
+  scoped_refptr<consensus::ConsensusMetadata> cmeta;
+  RETURN_NOT_OK(cmeta_mgr->Load(tablet_id_, &cmeta));
+  initial_cstate_ = cmeta->ToConsensusStatePB();
+
+  LOG(INFO) << Substitute(
+      "T $0: Tablet Copy: opened $1 blocks and $2 log segments",
+      tablet_id_, data_blocks.size(), log_segments_.size());
+  return Status::OK();
+}
+
 } // namespace tserver
 } // namespace kudu
diff --git a/src/kudu/tserver/tablet_copy_source_session.h 
b/src/kudu/tserver/tablet_copy_source_session.h
index b3e9ef392..0e8439168 100644
--- a/src/kudu/tserver/tablet_copy_source_session.h
+++ b/src/kudu/tserver/tablet_copy_source_session.h
@@ -91,18 +91,17 @@ struct ImmutableReadableBlockInfo {
   }
 };
 
+enum class TabletCopyMode {
+  REMOTE = 0,
+  LOCAL = 1
+};
+
 // A potential Learner must establish a TabletCopySourceSession with the 
leader in order
 // to fetch the needed superblock, blocks, and log segments.
 // This class is refcounted to make it easy to remove it from the session map
 // on expiration while it is in use by another thread.
 class TabletCopySourceSession : public 
RefCountedThreadSafe<TabletCopySourceSession> {
  public:
-  TabletCopySourceSession(scoped_refptr<tablet::TabletReplica> tablet_replica,
-                          std::string session_id,
-                          std::string requestor_uuid,
-                          FsManager* fs_manager,
-                          TabletCopySourceMetrics* tablet_copy_metrics);
-
   // Initialize the session, including anchoring files (TODO) and fetching the
   // tablet superblock and list of WAL segments.
   //
@@ -115,13 +114,7 @@ class TabletCopySourceSession : public 
RefCountedThreadSafe<TabletCopySourceSess
   }
 
   // Return ID of tablet corresponding to this session.
-  const std::string& tablet_id() const;
-
-  // Return ID of session.
-  const std::string& session_id() const { return session_id_; }
-
-  // Return UUID of the requestor that initiated this session.
-  const std::string& requestor_uuid() const;
+  const std::string& tablet_id() const { return tablet_id_; }
 
   // Open block for reading, if it's not already open, and read some of it.
   // If maxlen is 0, we use a system-selected length for the data piece.
@@ -162,7 +155,11 @@ class TabletCopySourceSession : public 
RefCountedThreadSafe<TabletCopySourceSess
   // Check if a block is currently open.
   bool IsBlockOpenForTests(const BlockId& block_id) const;
 
- private:
+ protected:
+  TabletCopySourceSession(std::string tablet_id,
+                          FsManager* fs_manager,
+                          TabletCopySourceMetrics* tablet_copy_metrics);
+
   friend class RefCountedThreadSafe<TabletCopySourceSession>;
 
   typedef std::unordered_map<
@@ -172,10 +169,10 @@ class TabletCopySourceSession : public 
RefCountedThreadSafe<TabletCopySourceSess
       BlockIdEqual> BlockMap;
   typedef std::unordered_map<uint64_t, ImmutableRWFileInfo*> LogMap;
 
-  ~TabletCopySourceSession();
+  virtual ~TabletCopySourceSession();
 
   // Internal helper method for Init().
-  Status InitOnce();
+  virtual Status InitOnce() = 0;
 
   // Returns an error if any directories in the tablet's directory group are
   // unhealthy and sets 'error_code' appropriately.
@@ -197,12 +194,7 @@ class TabletCopySourceSession : public 
RefCountedThreadSafe<TabletCopySourceSess
                         ImmutableRWFileInfo** file_info,
                         TabletCopyErrorPB::Code* error_code);
 
-  // Unregister log anchor, if it's registered.
-  Status UnregisterAnchorIfNeededUnlocked();
-
-  const scoped_refptr<tablet::TabletReplica> tablet_replica_;
-  const std::string session_id_;
-  const std::string requestor_uuid_;
+  const std::string tablet_id_;
   FsManager* const fs_manager_;
 
   // Protects concurrent access to Init().
@@ -224,5 +216,44 @@ class TabletCopySourceSession : public 
RefCountedThreadSafe<TabletCopySourceSess
   DISALLOW_COPY_AND_ASSIGN(TabletCopySourceSession);
 };
 
+// Copy replica from remote peer.
+class RemoteTabletCopySourceSession : public TabletCopySourceSession {
+ public:
+  RemoteTabletCopySourceSession(scoped_refptr<tablet::TabletReplica> 
tablet_replica,
+                                std::string session_id,
+                                std::string requestor_uuid,
+                                FsManager* fs_manager,
+                                TabletCopySourceMetrics* tablet_copy_metrics);
+
+  ~RemoteTabletCopySourceSession() override;
+
+  // Return ID of session.
+  const std::string& session_id() const { return session_id_; }
+
+  // Return UUID of the requestor that initiated this session.
+  const std::string& requestor_uuid() const { return requestor_uuid_; }
+
+  // Unregister log anchor, if it's registered.
+  Status UnregisterAnchorIfNeededUnlocked();
+
+ private:
+  Status InitOnce() override;
+
+  scoped_refptr<tablet::TabletReplica> tablet_replica_;
+  const std::string session_id_;
+  const std::string requestor_uuid_;
+};
+
+// Copy replica from local file system.
+class LocalTabletCopySourceSession : public TabletCopySourceSession {
+ public:
+  LocalTabletCopySourceSession(std::string tablet_id,
+                               FsManager* fs_manager,
+                               TabletCopySourceMetrics* tablet_copy_metrics);
+
+ private:
+  Status InitOnce() override;
+};
+
 } // namespace tserver
 } // namespace kudu
diff --git a/src/kudu/tserver/ts_tablet_manager.cc 
b/src/kudu/tserver/ts_tablet_manager.cc
index 829f45c1c..118373005 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -924,8 +924,8 @@ void TSTabletManager::RunTabletCopy(
   //   to the leader.
   //
   // TODO(aserbin): make this robust and more optimal than it is now.
-  TabletCopyClient tc_client(tablet_id, fs_manager_, cmeta_manager_,
-                             server_->messenger(), &tablet_copy_metrics_);
+  RemoteTabletCopyClient tc_client(tablet_id, fs_manager_, cmeta_manager_,
+                                   server_->messenger(), 
&tablet_copy_metrics_);
 
   // Download and persist the remote superblock in TABLET_DATA_COPYING state.
   if (replacing_tablet) {

Reply via email to