imay closed pull request #388: Remove batch process code from push handler 
(#385)
URL: https://github.com/apache/incubator-doris/pull/388
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h
index f65a82bd..a51aeb09 100644
--- a/be/src/olap/olap_define.h
+++ b/be/src/olap/olap_define.h
@@ -234,6 +234,8 @@ enum OLAPStatus {
     OLAP_ERR_PUSH_TABLE_NOT_EXIST = -909,
     OLAP_ERR_PUSH_INPUT_DATA_ERROR = -910,
     OLAP_ERR_PUSH_TRANSACTION_ALREADY_EXIST = -911,
+    // only support realtime push api, batch process is deprecated and is 
removed
+    OLAP_ERR_PUSH_BATCH_PROCESS_REMOVED = -912, 
 
     // SegmentGroup
     // [-1000, -1100)
diff --git a/be/src/olap/olap_engine.cpp b/be/src/olap/olap_engine.cpp
index 5051c90e..b5e17a5e 100644
--- a/be/src/olap/olap_engine.cpp
+++ b/be/src/olap/olap_engine.cpp
@@ -2598,7 +2598,7 @@ OLAPStatus OLAPEngine::delete_data(
     if (request.__isset.transaction_id) {
         res = push_handler.process_realtime_push(table, request, 
PUSH_FOR_DELETE, tablet_info_vec);
     } else {
-        res = push_handler.process(table, request, PUSH_FOR_DELETE, 
tablet_info_vec);
+        res = OLAP_ERR_PUSH_BATCH_PROCESS_REMOVED;
     }
 
     if (res != OLAP_SUCCESS) {
@@ -2953,7 +2953,7 @@ OLAPStatus OLAPEngine::push(
     } else {
         {
             SCOPED_RAW_TIMER(&duration_ns);
-            res = push_handler.process(olap_table, request, type, 
tablet_info_vec);
+            res = OLAP_ERR_PUSH_BATCH_PROCESS_REMOVED;
         }
     }
 
diff --git a/be/src/olap/olap_snapshot.cpp b/be/src/olap/olap_snapshot.cpp
index b63ed0f8..43d3f890 100644
--- a/be/src/olap/olap_snapshot.cpp
+++ b/be/src/olap/olap_snapshot.cpp
@@ -396,6 +396,14 @@ OLAPStatus OLAPEngine::_create_snapshot_files(
             for (const VersionEntity& entity : shortest_versions) {
                 if (entity.version.second == request.version) {
                     if (entity.version.first != request.version) {
+                        // visible version in fe is 900
+                        // A need to clone 900 from B, but B's last version is 
901, and 901 is not a visible version
+                        // and 901 will be reverted
+                        // since 900 is not the last version in B, 900 maybe 
compacted with other versions
+                        // if A only get 900, then A's last version will be a 
comulative delta
+                        // many codes in be assumes that the last version is a 
single delta
+                        // both clone and backup restore depend on this logic
+                        // TODO (yiguolei) fix it in the future
                         res = _append_single_delta(request, store);
                         if (res != OLAP_SUCCESS) {
                             OLAP_LOG_WARNING("fail to append single delta. 
[res=%d]", res);
@@ -586,7 +594,11 @@ OLAPStatus OLAPEngine::_append_single_delta(
         empty_push.version_hash = 0;
         
         PushHandler handler;
-        res = handler.process(tablet, empty_push, PUSH_NORMAL, NULL);
+        // res = handler.process(tablet, empty_push, PUSH_NORMAL, NULL);
+        // TODO (yiguolei) should create a empty version, call create new 
rowset meta and set version
+        // just return success to skip push a empty rowset into the snapshot 
since has alreay removed
+        // batch process code from push handler
+        res = OLAP_SUCCESS;
         if (res != OLAP_SUCCESS) {
             OLAP_LOG_WARNING("fail to push empty version. [res=%d version=%d]",
                              res, empty_push.version);
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index f2fa3997..ecf1eabb 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -35,88 +35,90 @@ using std::vector;
 namespace doris {
 
 // Process push command, the main logical is as follows:
-//    a. related tables not exist:
-//        current table isn't in schemachange state, only push for current 
table
-//    b. related tables exist
-//       I.  current table is old table:
+//    a. related tablets not exist:
+//        current table isn't in schemachange state, only push for current 
tablet
+//    b. related tablets exist
+//       I.  current tablet is old table (cur.create_time < 
related.create_time):
 //           push for current table and than convert data for related tables
 //       II. current table is new table:
 //           this usually means schema change is over,
-//           clear schema change info in both current table and related tables,
-//           finally we will only push for current tables
+//           clear schema change info in both current tablet and related 
tablets,
+//           finally we will only push for current tablets. this is very 
useful in rollup action.
 OLAPStatus PushHandler::process_realtime_push(
-        TabletSharedPtr olap_table,
+        TabletSharedPtr tablet,
         const TPushReq& request,
         PushType push_type,
         vector<TTabletInfo>* tablet_info_vec) {
-    LOG(INFO) << "begin to realtime push. tablet=" << olap_table->full_name()
+    LOG(INFO) << "begin to realtime push. tablet=" << tablet->full_name()
               << ", transaction_id=" << request.transaction_id;
 
     OLAPStatus res = OLAP_SUCCESS;
     _request = request;
-    vector<TableVars> table_infoes(1);
-    table_infoes[0].olap_table = olap_table;
-    AlterTabletType alter_table_type;
+    vector<TabletVars> tablet_infos(1);
+    tablet_infos[0].tablet = tablet;
+    AlterTabletType alter_tablet_type;
 
     // add transaction in engine, then check sc status
     // lock, prevent sc handler checking transaction concurrently
-    olap_table->obtain_push_lock();
+    tablet->obtain_push_lock();
     PUniqueId load_id;
     load_id.set_hi(0);
     load_id.set_lo(0);
     res = OLAPEngine::get_instance()->add_transaction(
         request.partition_id, request.transaction_id,
-        olap_table->tablet_id(), olap_table->schema_hash(), load_id);
+        tablet->tablet_id(), tablet->schema_hash(), load_id);
 
     // if transaction exists, exit
     if (res == OLAP_ERR_PUSH_TRANSACTION_ALREADY_EXIST) {
 
         // if push finished, report success to fe
-        if (olap_table->has_pending_data(request.transaction_id)) {
+        if (tablet->has_pending_data(request.transaction_id)) {
             OLAP_LOG_WARNING("pending data exists in tablet, which means push 
finished,"
                              "return success. [table=%s transaction_id=%ld]",
-                             olap_table->full_name().c_str(), 
request.transaction_id);
+                             tablet->full_name().c_str(), 
request.transaction_id);
             res = OLAP_SUCCESS;
         }
-        olap_table->release_push_lock();
+        tablet->release_push_lock();
         goto EXIT;
     }
 
     // only when fe sends schema_change true, should consider to push related 
table
     if (_request.is_schema_changing) {
         VLOG(3) << "push req specify schema changing is true. "
-                << "tablet=" << olap_table->full_name()
+                << "tablet=" << tablet->full_name()
                 << ", transaction_id=" << request.transaction_id;
         TTabletId related_tablet_id;
         TSchemaHash related_schema_hash;
 
-        olap_table->obtain_header_rdlock();
-        bool is_schema_changing = olap_table->get_schema_change_request(
-            &related_tablet_id, &related_schema_hash, NULL, &alter_table_type);
-        olap_table->release_header_lock();
+        tablet->obtain_header_rdlock();
+        bool is_schema_changing = tablet->get_schema_change_request(
+            &related_tablet_id, &related_schema_hash, NULL, 
&alter_tablet_type);
+        tablet->release_header_lock();
 
         if (is_schema_changing) {
             LOG(INFO) << "find schema_change status when realtime push. "
-                      << "tablet=" << olap_table->full_name() 
+                      << "tablet=" << tablet->full_name() 
                       << ", related_tablet_id=" << related_tablet_id
                       << ", related_schema_hash=" << related_schema_hash
                       << ", transaction_id=" << request.transaction_id;
-            TabletSharedPtr related_olap_table = 
OLAPEngine::get_instance()->get_table(
+            TabletSharedPtr related_tablet = 
OLAPEngine::get_instance()->get_table(
                 related_tablet_id, related_schema_hash);
 
             // if related tablet not exists, only push current tablet
-            if (NULL == related_olap_table.get()) {
+            if (NULL == related_tablet.get()) {
+                // only print a warn log here not call clear schema change 
state in tablet
+                // the schema change state will be cleared when another schema 
change request comes
                 OLAP_LOG_WARNING("can't find related table, only push current 
tablet. "
                                  "[table=%s related_tablet_id=%ld 
related_schema_hash=%d]",
-                                 olap_table->full_name().c_str(),
+                                 tablet->full_name().c_str(),
                                  related_tablet_id, related_schema_hash);
 
             // if current tablet is new table, only push current tablet
-            } else if (olap_table->creation_time() > 
related_olap_table->creation_time()) {
+            } else if (tablet->creation_time() > 
related_tablet->creation_time()) {
                 OLAP_LOG_WARNING("current table is new, only push current 
tablet. "
-                                 "[table=%s related_olap_table=%s]",
-                                 olap_table->full_name().c_str(),
-                                 related_olap_table->full_name().c_str());
+                                 "[table=%s related_tablet=%s]",
+                                 tablet->full_name().c_str(),
+                                 related_tablet->full_name().c_str());
 
             // add related transaction in engine
             } else {
@@ -125,34 +127,37 @@ OLAPStatus PushHandler::process_realtime_push(
                 load_id.set_lo(0);
                 res = OLAPEngine::get_instance()->add_transaction(
                     request.partition_id, request.transaction_id,
-                    related_olap_table->tablet_id(), 
related_olap_table->schema_hash(), load_id);
+                    related_tablet->tablet_id(), 
related_tablet->schema_hash(), load_id);
 
                 // if related tablet's transaction exists, only push current 
tablet
                 if (res == OLAP_ERR_PUSH_TRANSACTION_ALREADY_EXIST) {
                     OLAP_LOG_WARNING("related tablet's transaction exists in 
engine, "
                                      "only push current tablet. "
                                      "[related_table=%s transaction_id=%ld]",
-                                     related_olap_table->full_name().c_str(),
+                                     related_tablet->full_name().c_str(),
                                      request.transaction_id);
                 } else {
-                    table_infoes.push_back(TableVars());
-                    TableVars& new_item = table_infoes.back();
-                    new_item.olap_table = related_olap_table;
+                    tablet_infos.push_back(TabletVars());
+                    TabletVars& new_item = tablet_infos.back();
+                    new_item.tablet = related_tablet;
                 }
             }
         }
     }
-    olap_table->release_push_lock();
+    tablet->release_push_lock();
 
-    if (table_infoes.size() == 1) {
-        table_infoes.resize(2);
+    if (tablet_infos.size() == 1) {
+        tablet_infos.resize(2);
     }
 
+    // not call validate request here, because realtime load does not 
+    // contain version info 
+
     // check delete condition if push for delete
     if (push_type == PUSH_FOR_DELETE) {
 
-        for (TableVars& table_var : table_infoes) {
-            if (table_var.olap_table.get() == NULL) {
+        for (TabletVars& tablet_var : tablet_infos) {
+            if (tablet_var.tablet.get() == NULL) {
                 continue;
             }
 
@@ -163,41 +168,41 @@ OLAPStatus PushHandler::process_realtime_push(
             }
 
             DeleteConditionHandler del_cond_handler;
-            table_var.olap_table->obtain_header_rdlock();
+            tablet_var.tablet->obtain_header_rdlock();
             for (const TCondition& cond : request.delete_conditions) {
-                res = 
del_cond_handler.check_condition_valid(table_var.olap_table, cond);
+                res = 
del_cond_handler.check_condition_valid(tablet_var.tablet, cond);
                 if (res != OLAP_SUCCESS) {
                     OLAP_LOG_WARNING("fail to check delete condition. 
[table=%s res=%d]",
-                                     
table_var.olap_table->full_name().c_str(), res);
-                    table_var.olap_table->release_header_lock();
+                                     tablet_var.tablet->full_name().c_str(), 
res);
+                    tablet_var.tablet->release_header_lock();
                     goto EXIT;
                 }
             }
-            table_var.olap_table->release_header_lock();
+            tablet_var.tablet->release_header_lock();
             LOG(INFO) << "success to check delete condition when realtime 
push. "
-                      << "tablet=" << table_var.olap_table->full_name()
+                      << "tablet=" << tablet_var.tablet->full_name()
                       << ", transaction_id=" << request.transaction_id;
         }
     }
 
     // write
-    res = _convert(table_infoes[0].olap_table, table_infoes[1].olap_table,
-                   &(table_infoes[0].added_indices), 
&(table_infoes[1].added_indices),
-                   alter_table_type);
+    res = _convert(tablet_infos[0].tablet, tablet_infos[1].tablet,
+                   &(tablet_infos[0].added_indices), 
&(tablet_infos[1].added_indices),
+                   alter_tablet_type);
     if (res != OLAP_SUCCESS) {
         OLAP_LOG_WARNING("fail to convert tmp file when realtime push. 
[res=%d]", res);
         goto EXIT;
     }
 
     // add pending data to tablet
-    for (TableVars& table_var : table_infoes) {
-        if (table_var.olap_table.get() == NULL) {
+    for (TabletVars& tablet_var : tablet_infos) {
+        if (tablet_var.tablet.get() == NULL) {
             continue;
         }
 
-        for (SegmentGroup* segment_group : table_var.added_indices) {
+        for (SegmentGroup* segment_group : tablet_var.added_indices) {
 
-            res = table_var.olap_table->add_pending_data(
+            res = tablet_var.tablet->add_pending_data(
                 segment_group, push_type == PUSH_FOR_DELETE ? 
&request.delete_conditions : NULL);
 
             // if pending data exists in tablet, which means push finished
@@ -207,7 +212,7 @@ OLAPStatus PushHandler::process_realtime_push(
 
             } else if (res != OLAP_SUCCESS) {
                 OLAP_LOG_WARNING("fail to add pending data to tablet. 
[table=%s transaction_id=%ld]",
-                                 table_var.olap_table->full_name().c_str(), 
request.transaction_id);
+                                 tablet_var.tablet->full_name().c_str(), 
request.transaction_id);
                 goto EXIT;
             }
         }
@@ -218,35 +223,35 @@ OLAPStatus PushHandler::process_realtime_push(
     if (res == OLAP_ERR_PUSH_TRANSACTION_ALREADY_EXIST) {
         OLAP_LOG_WARNING("find transaction existed when realtime push, not 
report. ",
                          "[table=%s partition_id=%ld transaction_id=%ld]",
-                         olap_table->full_name().c_str(),
+                         tablet->full_name().c_str(),
                          request.partition_id, request.transaction_id);
         return res;
     }
 
     if (res == OLAP_SUCCESS) {
         if (tablet_info_vec != NULL) {
-            _get_tablet_infos(table_infoes, tablet_info_vec);
+            _get_tablet_infos(tablet_infos, tablet_info_vec);
         }
         LOG(INFO) << "process realtime push successfully. "
-                  << "tablet=" << olap_table->full_name()
+                  << "tablet=" << tablet->full_name()
                   << ", partition_id=" << request.partition_id
                   << ", transaction_id=" << request.transaction_id;
     } else {
 
         // error happens, clear
         OLAP_LOG_WARNING("failed to process realtime push. [table=%s 
transaction_id=%ld]",
-                         olap_table->full_name().c_str(), 
request.transaction_id);
-        for (TableVars& table_var : table_infoes) {
-            if (table_var.olap_table.get() == NULL) {
+                         tablet->full_name().c_str(), request.transaction_id);
+        for (TabletVars& tablet_var : tablet_infos) {
+            if (tablet_var.tablet.get() == NULL) {
                 continue;
             }
 
             OLAPEngine::get_instance()->delete_transaction(
                 request.partition_id, request.transaction_id,
-                table_var.olap_table->tablet_id(), 
table_var.olap_table->schema_hash());
+                tablet_var.tablet->tablet_id(), 
tablet_var.tablet->schema_hash());
 
             // actually, olap_index may has been deleted in 
delete_transaction()
-            for (SegmentGroup* segment_group : table_var.added_indices) {
+            for (SegmentGroup* segment_group : tablet_var.added_indices) {
                 segment_group->release();
                 OLAPEngine::get_instance()->add_unused_index(segment_group);
             }
@@ -257,27 +262,27 @@ OLAPStatus PushHandler::process_realtime_push(
 }
 
 void PushHandler::_get_tablet_infos(
-        const vector<TableVars>& table_infoes,
+        const vector<TabletVars>& tablet_infos,
         vector<TTabletInfo>* tablet_info_vec) {
-    for (const TableVars& table_var : table_infoes) {
-        if (table_var.olap_table.get() == NULL) {
+    for (const TabletVars& tablet_var : tablet_infos) {
+        if (tablet_var.tablet.get() == NULL) {
             continue;
         }
 
         TTabletInfo tablet_info;
-        tablet_info.tablet_id = table_var.olap_table->tablet_id();
-        tablet_info.schema_hash = table_var.olap_table->schema_hash();
+        tablet_info.tablet_id = tablet_var.tablet->tablet_id();
+        tablet_info.schema_hash = tablet_var.tablet->schema_hash();
         OLAPEngine::get_instance()->report_tablet_info(&tablet_info);
         tablet_info_vec->push_back(tablet_info);
     }
 }
 
 OLAPStatus PushHandler::_convert(
-        TabletSharedPtr curr_olap_table,
-        TabletSharedPtr new_olap_table,
+        TabletSharedPtr curr_tablet,
+        TabletSharedPtr new_tablet,
         Indices* curr_olap_indices,
         Indices* new_olap_indices,
-        AlterTabletType alter_table_type) {
+        AlterTabletType alter_tablet_type) {
     OLAPStatus res = OLAP_SUCCESS;
     RowCursor row;
     BinaryFile raw_file;
@@ -288,10 +293,10 @@ OLAPStatus PushHandler::_convert(
 
     do {
         VLOG(3) << "start to convert delta file.";
-        std::vector<FieldInfo> tablet_schema = 
curr_olap_table->tablet_schema();
+        std::vector<FieldInfo> tablet_schema = curr_tablet->tablet_schema();
 
-        //curr_olap_table->set_tablet_schema();
-        tablet_schema = curr_olap_table->tablet_schema();
+        //curr_tablet->set_tablet_schema();
+        tablet_schema = curr_tablet->tablet_schema();
 
         // 1. Init BinaryReader to read raw file if exist,
         //    in case of empty push and delete data, this will be skipped.
@@ -311,43 +316,43 @@ OLAPStatus PushHandler::_convert(
             }
             if (NULL == (reader = IBinaryReader::create(need_decompress))) {
                 OLAP_LOG_WARNING("fail to create reader. [table='%s' 
file='%s']",
-                                 curr_olap_table->full_name().c_str(),
+                                 curr_tablet->full_name().c_str(),
                                  _request.http_file_path.c_str());
                 res = OLAP_ERR_MALLOC_ERROR;
                 break;
             }
 
             // init BinaryReader
-            if (OLAP_SUCCESS != (res = reader->init(curr_olap_table, 
&raw_file))) {
+            if (OLAP_SUCCESS != (res = reader->init(curr_tablet, &raw_file))) {
                 OLAP_LOG_WARNING("fail to init reader. [res=%d table='%s' 
file='%s']",
                                  res,
-                                 curr_olap_table->full_name().c_str(),
+                                 curr_tablet->full_name().c_str(),
                                  _request.http_file_path.c_str());
                 res = OLAP_ERR_PUSH_INIT_ERROR;
                 break;
             }
         }
 
-        // 2. New SegmentGroup of curr_olap_table for current push
+        // 2. New SegmentGroup of curr_tablet for current push
         VLOG(3) << "init SegmentGroup.";
 
         if (_request.__isset.transaction_id) {
             // create pending data dir
-            string dir_path = 
curr_olap_table->construct_pending_data_dir_path();
+            string dir_path = curr_tablet->construct_pending_data_dir_path();
             if (!check_dir_existed(dir_path) && (res = create_dirs(dir_path)) 
!= OLAP_SUCCESS) {
                 if (!check_dir_existed(dir_path)) {
                     OLAP_LOG_WARNING("fail to create pending dir. [res=%d 
table=%s]",
-                                     res, 
curr_olap_table->full_name().c_str());
+                                     res, curr_tablet->full_name().c_str());
                     break;
                 }
             }
 
             delta_segment_group = new(std::nothrow) SegmentGroup(
-                curr_olap_table.get(), (_request.push_type == 
TPushType::LOAD_DELETE),
+                curr_tablet.get(), (_request.push_type == 
TPushType::LOAD_DELETE),
                 0, 0, true, _request.partition_id, _request.transaction_id);
         } else {
             delta_segment_group = new(std::nothrow) SegmentGroup(
-                curr_olap_table.get(),
+                curr_tablet.get(),
                 Version(_request.version, _request.version),
                 _request.version_hash,
                 (_request.push_type == TPushType::LOAD_DELETE),
@@ -356,30 +361,30 @@ OLAPStatus PushHandler::_convert(
 
         if (NULL == delta_segment_group) {
             OLAP_LOG_WARNING("fail to malloc SegmentGroup. [table='%s' 
size=%ld]",
-                             curr_olap_table->full_name().c_str(), 
sizeof(SegmentGroup));
+                             curr_tablet->full_name().c_str(), 
sizeof(SegmentGroup));
             res = OLAP_ERR_MALLOC_ERROR;
             break;
         }
         curr_olap_indices->push_back(delta_segment_group);
 
         // 3. New Writer to write data into SegmentGroup
-        VLOG(3) << "init writer. tablet=" << curr_olap_table->full_name()
-                << ", block_row_size=" << 
curr_olap_table->num_rows_per_row_block();
+        VLOG(3) << "init writer. tablet=" << curr_tablet->full_name()
+                << ", block_row_size=" << 
curr_tablet->num_rows_per_row_block();
 
-        if (NULL == (writer = ColumnDataWriter::create(curr_olap_table, 
delta_segment_group, true))) {
+        if (NULL == (writer = ColumnDataWriter::create(curr_tablet, 
delta_segment_group, true))) {
             OLAP_LOG_WARNING("fail to create writer. [table='%s']",
-                             curr_olap_table->full_name().c_str());
+                             curr_tablet->full_name().c_str());
             res = OLAP_ERR_MALLOC_ERROR;
             break;
         }
 
         // 4. Init RowCursor
-        if (OLAP_SUCCESS != (res = 
row.init(curr_olap_table->tablet_schema()))) {
+        if (OLAP_SUCCESS != (res = row.init(curr_tablet->tablet_schema()))) {
             OLAP_LOG_WARNING("fail to init rowcursor. [res=%d]", res);
             break;
         }
 
-        // 5. Read data from raw file and write into SegmentGroup of 
curr_olap_table
+        // 5. Read data from raw file and write into SegmentGroup of 
curr_tablet
         if (_request.__isset.http_file_path) {
             // Convert from raw to delta
             VLOG(3) << "start to convert row file to delta.";
@@ -387,7 +392,7 @@ OLAPStatus PushHandler::_convert(
                 if (OLAP_SUCCESS != (res = writer->attached_by(&row))) {
                     OLAP_LOG_WARNING(
                             "fail to attach row to writer. [res=%d table='%s' 
read_rows=%u]",
-                            res, curr_olap_table->full_name().c_str(), 
num_rows);
+                            res, curr_tablet->full_name().c_str(), num_rows);
                     break;
                 }
 
@@ -419,7 +424,7 @@ OLAPStatus PushHandler::_convert(
         VLOG(3) << "load the index.";
         if (OLAP_SUCCESS != (res = delta_segment_group->load())) {
             OLAP_LOG_WARNING("fail to load index. [res=%d table='%s' 
version=%ld]",
-                             res, curr_olap_table->full_name().c_str(), 
_request.version);
+                             res, curr_tablet->full_name().c_str(), 
_request.version);
             break;
         }
         _write_bytes += delta_segment_group->data_size();
@@ -427,27 +432,27 @@ OLAPStatus PushHandler::_convert(
 
         // 7. Convert data for schema change tables
         VLOG(10) << "load to related tables of schema_change if possible.";
-        if (NULL != new_olap_table.get()) {
+        if (NULL != new_tablet.get()) {
             // create related tablet's pending data dir
-            string dir_path = 
new_olap_table->construct_pending_data_dir_path();
+            string dir_path = new_tablet->construct_pending_data_dir_path();
             if (!check_dir_existed(dir_path) && (res = create_dirs(dir_path)) 
!= OLAP_SUCCESS) {
                 if (!check_dir_existed(dir_path)) {
                     OLAP_LOG_WARNING("fail to create pending dir. [res=%d 
table=%s]",
-                                     res, new_olap_table->full_name().c_str());
+                                     res, new_tablet->full_name().c_str());
                     break;
                 }
             }
 
             SchemaChangeHandler schema_change;
             res = schema_change.schema_version_convert(
-                    curr_olap_table,
-                    new_olap_table,
+                    curr_tablet,
+                    new_tablet,
                     curr_olap_indices,
                     new_olap_indices);
             if (res != OLAP_SUCCESS) {
                 OLAP_LOG_WARNING("failed to change schema version for delta."
                                  "[res=%d new_table='%s']",
-                                 res, new_olap_table->full_name().c_str());
+                                 res, new_tablet->full_name().c_str());
             }
 
         }
@@ -457,216 +462,7 @@ OLAPStatus PushHandler::_convert(
     SAFE_DELETE(writer);
     OLAP_LOG_NOTICE_PUSH("processed_rows", "%d", num_rows);
     VLOG(10) << "convert delta file end. res=" << res
-             << ", tablet=" << curr_olap_table->full_name();
-    return res;
-}
-
-OLAPStatus PushHandler::_validate_request(
-        TabletSharedPtr olap_table_for_raw,
-        TabletSharedPtr olap_table_for_schema_change,
-        bool is_new_tablet_effective,
-        PushType push_type) {
-    const PDelta* latest_delta = olap_table_for_raw->lastest_delta();
-
-    if (NULL == latest_delta) {
-        const PDelta* lastest_version = olap_table_for_raw->lastest_version();
-
-        // PUSH the first version when the version is 0, or
-        // tablet is in alter table status.
-        if (NULL == lastest_version
-                && (0 == _request.version || NULL != 
olap_table_for_schema_change.get())) {
-            return OLAP_SUCCESS;
-        } else if (NULL != lastest_version
-                && (lastest_version->end_version() + 1 == _request.version)) {
-            return OLAP_SUCCESS;
-        }
-
-        OLAP_LOG_WARNING("no last pushed delta, the comming version should be 
0. [table='%s']",
-                         olap_table_for_raw->full_name().c_str());
-        return OLAP_ERR_PUSH_VERSION_INCORRECT;
-    }
-
-    if (is_new_tablet_effective) {
-        LOG(INFO) << "maybe a alter tablet has already created from base 
tablet. "
-                  << "tablet=" << olap_table_for_raw->full_name()
-                  << ", version=" << _request.version;
-        if (push_type == PUSH_FOR_DELETE
-                && _request.version == latest_delta->start_version()
-                && _request.version_hash == latest_delta->version_hash()) {
-            LOG(INFO) << "base tablet has already convert delete version for 
new tablet. "
-                      << "version=" << _request.version << ", version_hash=" 
<< _request.version_hash;
-            return OLAP_ERR_PUSH_VERSION_ALREADY_EXIST;
-        }
-    } else {
-        // Never allow two push has same version and version hash,
-        // but same verson and different version hash is allowed.
-        if (_request.version < latest_delta->start_version()
-                || _request.version > latest_delta->start_version() + 1) {
-            OLAP_LOG_WARNING(
-                    "try to push a delta with incorrect version. "
-                    "[new_version=%ld lastest_version=%u "
-                    "new_version_hash=%ld lastest_version_hash=%lu]",
-                    _request.version, latest_delta->start_version(),
-                    _request.version_hash, latest_delta->version_hash());
-            return OLAP_ERR_PUSH_VERSION_INCORRECT;
-        } else if (_request.version == latest_delta->start_version()
-                && _request.version_hash == latest_delta->version_hash()) {
-            OLAP_LOG_WARNING(
-                    "try to push a already exist delta. "
-                    "[new_version=%ld lastest_version=%u "
-                    "new_version_hash=%ld lastest_version_hash=%lu]",
-                    _request.version, latest_delta->start_version(),
-                    _request.version_hash, latest_delta->version_hash());
-            return OLAP_ERR_PUSH_VERSION_ALREADY_EXIST;
-        }
-    }
-
-    return OLAP_SUCCESS;
-}
-
-
-// The latest version can be reverted for following scene:
-// user submit a push job and cancel it soon, but some 
-// tablets already push success.
-OLAPStatus PushHandler::_get_versions_reverted(
-        TabletSharedPtr olap_table,
-        bool is_new_tablet,
-        PushType push_type,
-        Versions* unused_versions) {
-    const PDelta* latest_delta = olap_table->lastest_delta();
-
-    if (NULL == latest_delta) {
-        const PDelta* lastest_version = olap_table->lastest_version();
-
-        // PUSH the first version, and the version is 0
-        if ((NULL == lastest_version
-                && (0 == _request.version || is_new_tablet))) {
-            return OLAP_SUCCESS;
-        } else if (NULL != lastest_version
-                && lastest_version->end_version() + 1 == _request.version) {
-            return OLAP_SUCCESS;
-        }
-
-        OLAP_LOG_WARNING("no last pushed delta, the comming version should be 
0. [table='%s']",
-                         olap_table->full_name().c_str());
-        return OLAP_ERR_PUSH_VERSION_INCORRECT;
-    }
-
-    VLOG(3) << "latest deltas was founded. tablet=" << olap_table->full_name()
-            << ", version=" << latest_delta->start_version() << "-" << 
latest_delta->end_version();
-    // Remove the cumulative delta that end_version == request.version()
-    if (_request.version == latest_delta->start_version()) {
-        Versions all_versions;
-        olap_table->list_versions(&all_versions);
-
-        for (Versions::const_iterator v = all_versions.begin(); v != 
all_versions.end(); ++v) {
-            if (v->second == _request.version) {
-                unused_versions->push_back(*v);
-                VLOG(3) << "Add unused version. tablet=" << 
olap_table->full_name()
-                        << "version=" << v->first << "-" << v->second;
-            }
-        }
-
-        // Remove delete condition if current type is PUSH_FOR_DELETE,
-        // this occurs when user cancel delete_data soon after submit it.
-        if (push_type != PUSH_FOR_DELETE) {
-            DeleteConditionHandler del_cond_handler;
-            del_cond_handler.delete_cond(olap_table, _request.version, false);
-        }
-    }
-
-    return OLAP_SUCCESS;
-}
-
-OLAPStatus PushHandler::_update_header(
-        TabletSharedPtr olap_table,
-        Versions* unused_versions,
-        Indices* new_indices,
-        Indices* unused_indices) {
-    OLAPStatus res = OLAP_SUCCESS;
-
-    res = olap_table->replace_data_sources(
-            unused_versions,
-            new_indices,
-            unused_indices);
-    if (res != OLAP_SUCCESS) {
-        LOG(FATAL) << "fail to replace data sources. res=" << res
-                   << ", tablet=" << olap_table->full_name();
-        return res;
-    }
-
-    // Avoid double update
-    new_indices->clear();
-    unused_versions->clear();
-
-    // Save header fail will not impact service for memory state
-    // has already changed, but some data may lost when OLAPEngine restart;
-    // Note we don't return fail here.
-    res = olap_table->save_header();
-    if (res != OLAP_SUCCESS) {
-        LOG(FATAL) << "fail to save header. res=" << res
-                   << ", tablet=" << olap_table->full_name();
-    }
-
-    return res;
-}
-
-void PushHandler::_delete_old_indices(Indices* unused_indices) {
-    if (!unused_indices->empty()) {
-        OLAPEngine* unused_index = OLAPEngine::get_instance();
-
-        for (Indices::iterator it = unused_indices->begin();
-                it != unused_indices->end(); ++it) {
-            unused_index->add_unused_index(*it);
-        }
-    }
-}
-
-OLAPStatus PushHandler::_clear_alter_table_info(
-        TabletSharedPtr tablet,
-        TabletSharedPtr related_tablet) {
-    OLAPStatus res = OLAP_SUCCESS;
-    _obtain_header_wrlock();
-
-    do {
-        res = SchemaChangeHandler::clear_schema_change_single_info(
-                tablet, NULL, false, false);
-        if (res != OLAP_SUCCESS) {
-            LOG(FATAL) << "fail to clear schema change info of new table. 
res=" << res
-                       << ", tablet=" << tablet->full_name();
-            break;
-        }
-        
-        res = tablet->save_header();
-        if (res != OLAP_SUCCESS) {
-            LOG(FATAL) << "fail to save header. res=" << res
-                       << ", table=" << tablet->full_name();
-            break;
-        }
-
-        TTabletId tablet_id;
-        TSchemaHash schema_hash;
-        bool is_sc = related_tablet->get_schema_change_request(
-                &tablet_id, &schema_hash, NULL, NULL);
-        if (is_sc && tablet_id == tablet->tablet_id() && schema_hash == 
tablet->schema_hash()) {
-            res = SchemaChangeHandler::clear_schema_change_single_info(
-                    related_tablet, NULL, false, false);
-            if (res != OLAP_SUCCESS) {
-                LOG(FATAL) << "fail to clear schema change info of old table. 
res=" << res
-                           << ", tablet=" << related_tablet->full_name();
-                break;
-            }
-            
-            res = related_tablet->save_header();
-            if (res != OLAP_SUCCESS) {
-                LOG(FATAL) << "fail to save header. res=" << res
-                           << "table=" << related_tablet->full_name();
-                break;
-            }
-        }
-    } while (0);
-
-    _release_header_lock();
+             << ", tablet=" << curr_tablet->full_name();
     return res;
 }
 
diff --git a/be/src/olap/push_handler.h b/be/src/olap/push_handler.h
index f4f69352..2d0161fc 100644
--- a/be/src/olap/push_handler.h
+++ b/be/src/olap/push_handler.h
@@ -41,8 +41,8 @@ class BinaryReader;
 class ColumnMapping;
 class RowCursor;
 
-struct TableVars {
-    TabletSharedPtr olap_table;
+struct TabletVars {
+    TabletSharedPtr tablet;
     Versions unused_versions;
     Indices unused_indices;
     Indices added_indices;
@@ -52,7 +52,7 @@ class PushHandler {
 public:
     typedef std::vector<ColumnMapping> SchemaMapping;
 
-    PushHandler() : _header_locked(false) {}
+    PushHandler() {}
     ~PushHandler() {}
 
     // Load local data file into specified tablet.
@@ -65,22 +65,6 @@ class PushHandler {
     int64_t write_bytes() const { return _write_bytes; }
     int64_t write_rows() const { return _write_rows; }
 private:
-    // Validate request, mainly data version check.
-    OLAPStatus _validate_request(
-            TabletSharedPtr olap_table_for_raw,
-            TabletSharedPtr olap_table_for_schema_change,
-            bool is_rollup_new_table,
-            PushType push_type);
-
-    // The latest version can be reverted for following scene:
-    // user submit a push job and cancel it soon, but some 
-    // tablets already push success.
-    OLAPStatus _get_versions_reverted(
-            TabletSharedPtr olap_table,
-            bool is_schema_change_tablet,
-            PushType push_type,
-            Versions* unused_versions);
-
     // Convert local data file to internal formatted delta,
     // return new delta's SegmentGroup
     OLAPStatus _convert(
@@ -90,73 +74,16 @@ class PushHandler {
             Indices* new_olap_indices,
             AlterTabletType alter_table_type);
 
-    // Update header info when new version add or dirty version removed.
-    OLAPStatus _update_header(
-            TabletSharedPtr olap_table,
-            Versions* unused_versions,
-            Indices* new_indices,
-            Indices* unused_indices);
-
-    // remove all old file of cumulatives versions
-    void _delete_old_indices(Indices* indices);
-
-    // Clear schema change information.
-    OLAPStatus _clear_alter_table_info(
-            TabletSharedPtr olap_table,
-            TabletSharedPtr related_olap_table);
-
     // Only for debug
     std::string _debug_version_list(const Versions& versions) const;
 
-    // Lock tablet header before read header info.
-    void _obtain_header_rdlock() {
-        for (std::list<TabletSharedPtr>::iterator it = _olap_table_arr.begin();
-                it != _olap_table_arr.end(); ++it) {
-            VLOG(3) << "obtain all header locks rd. tablet=" << 
(*it)->full_name();
-            (*it)->obtain_header_rdlock();
-        }
-
-        _header_locked = true;
-    }
-
-    // Locak tablet header before write header info.
-    void _obtain_header_wrlock() {
-        for (std::list<TabletSharedPtr>::iterator it = _olap_table_arr.begin();
-                it != _olap_table_arr.end(); ++it) {
-            VLOG(3) << "obtain all header locks wr. tablet=" << 
(*it)->full_name();
-            (*it)->obtain_header_wrlock();
-        }
-
-        _header_locked = true;
-    }
-
-    // Release tablet header lock.
-    void _release_header_lock() {
-        if (_header_locked) {
-            for (std::list<TabletSharedPtr>::reverse_iterator it = 
_olap_table_arr.rbegin();
-                    it != _olap_table_arr.rend(); ++it) {
-                VLOG(3) << "release all header locks. tablet=" << 
(*it)->full_name();
-                (*it)->release_header_lock();
-            }
-
-            _header_locked = false;
-        }
-    }
-
     void _get_tablet_infos(
-            const std::vector<TableVars>& table_infoes,
+            const std::vector<TabletVars>& tablet_infos,
             std::vector<TTabletInfo>* tablet_info_vec);
 
     // mainly tablet_id, version and delta file path
     TPushReq _request;
 
-    // maily contains specified tablet object
-    // contains related tables also if in schema change, tablet split or rollup
-    std::list<TabletSharedPtr> _olap_table_arr;
-
-    // lock tablet header before modify tabelt header
-    bool _header_locked;
-
     int64_t _write_bytes = 0;
     int64_t _write_rows = 0;
     DISALLOW_COPY_AND_ASSIGN(PushHandler);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to