decster commented on a change in pull request #3637:
URL: https://github.com/apache/incubator-doris/pull/3637#discussion_r429256669



##########
File path: be/src/olap/memory/mem_sub_tablet.cpp
##########
@@ -0,0 +1,235 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/memory/mem_sub_tablet.h"
+
+#include "olap/memory/column.h"
+#include "olap/memory/column_reader.h"
+#include "olap/memory/column_writer.h"
+#include "olap/memory/hash_index.h"
+#include "olap/memory/partial_row_batch.h"
+#include "olap/memory/schema.h"
+
+namespace doris {
+namespace memory {
+
+Status MemSubTablet::create(uint64_t version, const Schema& schema,
+                            std::unique_ptr<MemSubTablet>* ret) {
+    std::unique_ptr<MemSubTablet> tmp(new MemSubTablet());
+    tmp->_versions.reserve(64);
+    tmp->_versions.emplace_back(version, 0);
+    tmp->_columns.resize(schema.cid_size());
+    for (size_t i = 0; i < schema.num_columns(); i++) {
+        // TODO: support storage_type != c.type
+        auto& c = *schema.get(i);
+        if (!supported(c.type())) {
+            return Status::NotSupported("column type not supported");
+        }
+        tmp->_columns[c.cid()].reset(new Column(c, c.type(), version));
+    }
+    tmp.swap(*ret);
+    return Status::OK();
+}
+
+MemSubTablet::MemSubTablet() : _index(new HashIndex(1 << 16)) {}
+
+MemSubTablet::~MemSubTablet() {}
+
+Status MemSubTablet::get_size(uint64_t version, size_t* size) const {
+    std::lock_guard<std::mutex> lg(_lock);
+    if (version == static_cast<uint64_t>(-1)) {
+        // get latest
+        *size = _versions.back().size;
+        return Status::OK();
+    }
+    if (_versions[0].version > version) {
+        return Status::NotFound("get_size failed, version too old");
+    }
+    for (size_t i = 1; i < _versions.size(); i++) {
+        if (_versions[i].version > version) {
+            *size = _versions[i - 1].size;
+            return Status::OK();
+        }
+    }
+    *size = _versions.back().size;
+    return Status::OK();
+}
+
+Status MemSubTablet::read_column(uint64_t version, uint32_t cid,
+                                 std::unique_ptr<ColumnReader>* reader) {
+    scoped_refptr<Column> cl;
+    {
+        std::lock_guard<std::mutex> lg(_lock);
+        if (cid < _columns.size()) {
+            cl = _columns[cid];
+        }
+    }
+    if (!cl) {
+        return Status::NotFound("column not found");
+    }
+    return cl->create_reader(version, reader);
+}
+
+Status MemSubTablet::get_index_to_read(scoped_refptr<HashIndex>* index) {
+    *index = _index;
+    return Status::OK();
+}
+
+Status MemSubTablet::begin_write(scoped_refptr<Schema>* schema) {
+    _schema = *schema;
+    _row_size = latest_size();
+    _write_index = _index;
+    _writers.clear();
+    _writers.resize(_columns.size());
+    // precache key columns
+    for (size_t i = 0; i < _schema->num_key_columns(); i++) {
+        uint32_t cid = _schema->get(i)->cid();
+        if (!_writers[cid]) {
+            RETURN_IF_ERROR(_columns[cid]->create_writer(&_writers[cid]));
+        }
+    }
+    _temp_hash_entries.reserve(8);
+
+    // setup stats
+    _write_start = GetMonoTimeSecondsAsDouble();
+    _num_insert = 0;
+    _num_update = 0;
+    _num_update_cell = 0;
+    return Status::OK();
+}
+
+Status MemSubTablet::apply_partial_row(const PartialRowReader& row) {
+    DCHECK_GE(row.cell_size(), 1);
+    const ColumnSchema* dsc;
+    const void* key;
+    // get key column and find in hash index
+    // TODO: support multi-column row key
+    row.get_cell(0, &dsc, &key);
+    ColumnWriter* keyw = _writers[1].get();
+    // find candidate rowids, and check equality
+    uint64_t hashcode = keyw->hashcode(key, 0);
+    _temp_hash_entries.clear();
+    uint32_t newslot = _write_index->find(hashcode, &_temp_hash_entries);
+    uint32_t rid = -1;
+    for (size_t i = 0; i < _temp_hash_entries.size(); i++) {
+        uint32_t test_rid = _temp_hash_entries[i];
+        if (keyw->equals(test_rid, key, 0)) {
+            rid = test_rid;
+            break;
+        }
+    }
+    // if rowkey not found, do insertion/append
+    if (rid == -1) {
+        _num_insert++;
+        rid = _row_size;
+        // add all columns
+        //DLOG(INFO) << StringPrintf"insert rid=%u", rid);
+        for (size_t i = 0; i < row.cell_size(); i++) {
+            const void* data;
+            RETURN_IF_ERROR(row.get_cell(i, &dsc, &data));
+            uint32_t cid = dsc->cid();
+            if (!_writers[cid]) {
+                RETURN_IF_ERROR(_columns[cid]->create_writer(&_writers[cid]));
+            }
+            RETURN_IF_ERROR(_writers[cid]->insert(rid, data));
+        }
+        _write_index->set(newslot, hashcode, rid);
+        _row_size++;
+        if (_write_index->need_rebuild()) {
+            scoped_refptr<HashIndex> new_index;
+            // TODO: trace memory usage
+            size_t new_capacity = _row_size * 2;
+            while (true) {
+                new_index = rebuild_hash_index(new_capacity);
+                if (new_index) {
+                    break;
+                } else {
+                    new_capacity += 1 << 16;
+                }
+            }
+            _write_index = new_index;
+        }
+    } else {
+        // rowkey found, do update
+        _num_update++;

Review comment:
       fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to