chaoyli commented on a change in pull request #3637:
URL: https://github.com/apache/incubator-doris/pull/3637#discussion_r429084494



##########
File path: be/src/olap/memory/CMakeLists.txt
##########
@@ -29,5 +29,8 @@ add_library(Memory STATIC
     delta_index.cpp
     hash_index.cpp
     mem_tablet.cpp
+    mem_sub_tablet.cpp
+    partial_row_batch.cpp
     schema.cpp
+    write_txn.cpp

Review comment:
       write_txn use txn abbreviation, so I think you can unify the name in 
this pull request.

##########
File path: be/src/olap/memory/mem_sub_tablet.cpp
##########
@@ -0,0 +1,235 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/memory/mem_sub_tablet.h"
+
+#include "olap/memory/column.h"
+#include "olap/memory/column_reader.h"
+#include "olap/memory/column_writer.h"
+#include "olap/memory/hash_index.h"
+#include "olap/memory/partial_row_batch.h"
+#include "olap/memory/schema.h"
+
+namespace doris {
+namespace memory {
+
+Status MemSubTablet::create(uint64_t version, const Schema& schema,
+                            std::unique_ptr<MemSubTablet>* ret) {
+    std::unique_ptr<MemSubTablet> tmp(new MemSubTablet());
+    tmp->_versions.reserve(64);
+    tmp->_versions.emplace_back(version, 0);
+    tmp->_columns.resize(schema.cid_size());
+    for (size_t i = 0; i < schema.num_columns(); i++) {
+        // TODO: support storage_type != c.type
+        auto& c = *schema.get(i);
+        if (!supported(c.type())) {
+            return Status::NotSupported("column type not supported");
+        }
+        tmp->_columns[c.cid()].reset(new Column(c, c.type(), version));
+    }
+    tmp.swap(*ret);
+    return Status::OK();
+}
+
+MemSubTablet::MemSubTablet() : _index(new HashIndex(1 << 16)) {}
+
+MemSubTablet::~MemSubTablet() {}
+
+Status MemSubTablet::get_size(uint64_t version, size_t* size) const {
+    std::lock_guard<std::mutex> lg(_lock);
+    if (version == static_cast<uint64_t>(-1)) {
+        // get latest
+        *size = _versions.back().size;
+        return Status::OK();
+    }
+    if (_versions[0].version > version) {
+        return Status::NotFound("get_size failed, version too old");
+    }
+    for (size_t i = 1; i < _versions.size(); i++) {
+        if (_versions[i].version > version) {
+            *size = _versions[i - 1].size;
+            return Status::OK();
+        }
+    }
+    *size = _versions.back().size;
+    return Status::OK();
+}
+
+Status MemSubTablet::read_column(uint64_t version, uint32_t cid,
+                                 std::unique_ptr<ColumnReader>* reader) {
+    scoped_refptr<Column> cl;
+    {
+        std::lock_guard<std::mutex> lg(_lock);
+        if (cid < _columns.size()) {
+            cl = _columns[cid];
+        }
+    }
+    if (!cl) {
+        return Status::NotFound("column not found");
+    }
+    return cl->create_reader(version, reader);
+}
+
+Status MemSubTablet::get_index_to_read(scoped_refptr<HashIndex>* index) {
+    *index = _index;
+    return Status::OK();
+}
+
+Status MemSubTablet::begin_write(scoped_refptr<Schema>* schema) {
+    _schema = *schema;
+    _row_size = latest_size();
+    _write_index = _index;
+    _writers.clear();
+    _writers.resize(_columns.size());
+    // precache key columns
+    for (size_t i = 0; i < _schema->num_key_columns(); i++) {
+        uint32_t cid = _schema->get(i)->cid();
+        if (!_writers[cid]) {
+            RETURN_IF_ERROR(_columns[cid]->create_writer(&_writers[cid]));
+        }
+    }
+    _temp_hash_entries.reserve(8);
+
+    // setup stats
+    _write_start = GetMonoTimeSecondsAsDouble();
+    _num_insert = 0;
+    _num_update = 0;
+    _num_update_cell = 0;
+    return Status::OK();
+}
+
+Status MemSubTablet::apply_partial_row(const PartialRowReader& row) {
+    DCHECK_GE(row.cell_size(), 1);
+    const ColumnSchema* dsc;
+    const void* key;
+    // get key column and find in hash index
+    // TODO: support multi-column row key
+    row.get_cell(0, &dsc, &key);
+    ColumnWriter* keyw = _writers[1].get();
+    // find candidate rowids, and check equality
+    uint64_t hashcode = keyw->hashcode(key, 0);
+    _temp_hash_entries.clear();
+    uint32_t newslot = _write_index->find(hashcode, &_temp_hash_entries);
+    uint32_t rid = -1;
+    for (size_t i = 0; i < _temp_hash_entries.size(); i++) {
+        uint32_t test_rid = _temp_hash_entries[i];
+        if (keyw->equals(test_rid, key, 0)) {
+            rid = test_rid;
+            break;
+        }
+    }
+    // if rowkey not found, do insertion/append
+    if (rid == -1) {
+        _num_insert++;
+        rid = _row_size;
+        // add all columns
+        //DLOG(INFO) << StringPrintf"insert rid=%u", rid);
+        for (size_t i = 0; i < row.cell_size(); i++) {
+            const void* data;
+            RETURN_IF_ERROR(row.get_cell(i, &dsc, &data));
+            uint32_t cid = dsc->cid();
+            if (!_writers[cid]) {
+                RETURN_IF_ERROR(_columns[cid]->create_writer(&_writers[cid]));
+            }
+            RETURN_IF_ERROR(_writers[cid]->insert(rid, data));
+        }
+        _write_index->set(newslot, hashcode, rid);
+        _row_size++;
+        if (_write_index->need_rebuild()) {
+            scoped_refptr<HashIndex> new_index;
+            // TODO: trace memory usage
+            size_t new_capacity = _row_size * 2;
+            while (true) {
+                new_index = rebuild_hash_index(new_capacity);
+                if (new_index) {
+                    break;
+                } else {
+                    new_capacity += 1 << 16;
+                }
+            }
+            _write_index = new_index;
+        }
+    } else {
+        // rowkey found, do update
+        _num_update++;

Review comment:
       If update failed, this place add the _num_update will be non-sense.
   I think it should be placed after success operation.

##########
File path: be/src/olap/memory/mem_sub_tablet.cpp
##########
@@ -0,0 +1,247 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/memory/mem_sub_tablet.h"
+
+#include "olap/memory/column.h"
+#include "olap/memory/column_reader.h"
+#include "olap/memory/column_writer.h"
+#include "olap/memory/hash_index.h"
+#include "olap/memory/partial_row_batch.h"
+#include "olap/memory/schema.h"
+
+namespace doris {
+namespace memory {
+
+Status MemSubTablet::create(uint64_t version, const Schema& schema,
+                            std::unique_ptr<MemSubTablet>* ret) {
+    std::unique_ptr<MemSubTablet> tmp(new MemSubTablet());
+    tmp->_versions.reserve(64);
+    tmp->_versions.emplace_back(version, 0);
+    tmp->_columns.resize(schema.cid_size());
+    for (size_t i = 0; i < schema.num_columns(); i++) {
+        // TODO: support storage_type != c.type
+        auto& c = *schema.get(i);
+        if (!supported(c.type())) {
+            return Status::NotSupported("column type not supported");
+        }
+        tmp->_columns[c.cid()].reset(new Column(c, c.type(), version));
+    }
+    tmp.swap(*ret);
+    return Status::OK();
+}
+
+MemSubTablet::MemSubTablet() : _index(new HashIndex(1 << 16)) {}
+
+MemSubTablet::~MemSubTablet() {}
+
+Status MemSubTablet::get_size(uint64_t version, size_t* size) const {
+    std::lock_guard<std::mutex> lg(_lock);
+    if (version == static_cast<uint64_t>(-1)) {
+        // get latest
+        *size = _versions.back().size;
+        return Status::OK();
+    }
+    if (_versions[0].version > version) {
+        return Status::NotFound("get_size failed, version too old");
+    }
+    for (size_t i = 1; i < _versions.size(); i++) {
+        if (_versions[i].version > version) {
+            *size = _versions[i - 1].size;
+            return Status::OK();
+        }
+    }
+    *size = _versions.back().size;
+    return Status::OK();
+}
+
+Status MemSubTablet::read_column(uint64_t version, uint32_t cid,
+                                 std::unique_ptr<ColumnReader>* reader) {
+    scoped_refptr<Column> cl;
+    {
+        std::lock_guard<std::mutex> lg(_lock);
+        if (cid < _columns.size()) {
+            cl = _columns[cid];
+        }
+    }
+    if (!cl) {
+        return Status::NotFound("column not found");
+    }
+    return cl->create_reader(version, reader);
+}
+
+Status MemSubTablet::get_index_to_read(scoped_refptr<HashIndex>* index) {
+    *index = _index;
+    return Status::OK();
+}
+
+Status MemSubTablet::begin_write(scoped_refptr<Schema>* schema) {
+    _schema = *schema;
+    _row_size = latest_size();
+    _write_index = _index;
+    _writers.clear();
+    _writers.resize(_columns.size());
+    // precache key columns
+    for (size_t i = 0; i < _schema->num_key_columns(); i++) {
+        uint32_t cid = _schema->get(i)->cid();
+        if (!_writers[cid]) {
+            RETURN_IF_ERROR(_columns[cid]->create_writer(&_writers[cid]));
+        }
+    }
+    _temp_hash_entries.reserve(8);
+
+    // setup stats
+    _write_start = GetMonoTimeSecondsAsDouble();
+    _num_insert = 0;
+    _num_update = 0;
+    _num_update_cell = 0;
+    return Status::OK();
+}
+
+Status MemSubTablet::apply_partial_row_batch(PartialRowBatch* batch) {
+    while (true) {
+        bool has_row = false;
+        RETURN_IF_ERROR(batch->next_row(&has_row));
+        if (!has_row) {
+            break;
+        }
+        RETURN_IF_ERROR(apply_partial_row(*batch));
+    }
+    return Status::OK();
+}
+
+Status MemSubTablet::apply_partial_row(const PartialRowBatch& row) {
+    DCHECK_GE(row.cur_row_cell_size(), 1);
+    const ColumnSchema* dsc;
+    const void* key;
+    // get key column and find in hash index
+    // TODO: support multi-column row key
+    row.cur_row_get_cell(0, &dsc, &key);
+    ColumnWriter* keyw = _writers[1].get();
+    // find candidate rowids, and check equality
+    uint64_t hashcode = keyw->hashcode(key, 0);
+    _temp_hash_entries.clear();
+    uint32_t newslot = _write_index->find(hashcode, &_temp_hash_entries);
+    uint32_t rid = -1;
+    for (size_t i = 0; i < _temp_hash_entries.size(); i++) {
+        uint32_t test_rid = _temp_hash_entries[i];
+        if (keyw->equals(test_rid, key, 0)) {
+            rid = test_rid;
+            break;
+        }
+    }
+    // if rowkey not found, do insertion/append
+    if (rid == -1) {
+        _num_insert++;

Review comment:
       If insert failed, this place add the _num_insert will be non-sense.
   I think it should be placed after success operation.

##########
File path: be/src/olap/memory/partial_row_batch.h
##########
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "olap/memory/common.h"
+#include "olap/memory/schema.h"
+
+namespace doris {
+namespace memory {
+
+// A chunk of memory that stores a batch of serialized partial rows
+// User can iterate through all the partial rows, get each partial row's cells.
+//
+// Serialization format for a batch:
+// 4 byte len | serialized partial row
+// 4 byte len | serialized partial row
+// ...
+// 4 byte len | serialized partial row
+//
+// Serialization format for a partial row
+// bit vector(se + null) byte size (2 byte) |
+// bit vector mark set cells |
+// bit vector mark nullable cells' null value |
+// 8bit padding
+// serialized not null cells
+//
+// Example usage:
+// PartialRowBatch rb(&schema);
+// rb.load(buffer);
+// while (true) {
+//     bool has;
+//     rb.next(&has);
+//     if (!has) break;
+//     for (size_t j=0; j < reader.cell_size(); j++) {
+//         const ColumnSchema* cs = nullptr;
+//         const void* data = nullptr;
+//         // get column cell type and data
+//         rb.get_cell(j, &cs, &data);
+//     }
+// }
+//
+// Note: currently only fixed length column types are supported. All length 
and scalar types store
+// in native byte order(little endian in x86-64).
+//
+// Note: The serialization format is simple, it only provides basic 
functionalities
+// so we can quickly complete the whole create/read/write pipeline. The format 
may change
+// as the project evolves.
+class PartialRowBatch {
+public:
+    explicit PartialRowBatch(scoped_refptr<Schema>* schema);
+    ~PartialRowBatch();
+
+    const Schema& schema() const { return *_schema.get(); }
+
+    // Load from a serialized buffer
+    Status load(std::vector<uint8_t>&& buffer);
+
+    // Return row count in this batch
+    size_t row_size() const { return _row_size; }
+
+    // Iterate to next row, mark has_row to false if there is no more rows
+    Status next_row(bool* has_row);
+
+    // Get row operation cell count
+    size_t cur_row_cell_size() const { return _cells.size(); }
+    // Get row operation cell by index idx, return ColumnSchema and data 
pointer
+    Status cur_row_get_cell(size_t idx, const ColumnSchema** cs, const void** 
data) const;
+
+private:
+    scoped_refptr<Schema> _schema;
+
+    bool _delete = false;
+    size_t _bit_set_size = 0;
+    struct CellInfo {
+        CellInfo(uint32_t cid, const void* data)
+                : cid(cid), data(reinterpret_cast<const uint8_t*>(data)) {}
+        uint32_t cid = 0;
+        const uint8_t* data = nullptr;
+    };
+    vector<CellInfo> _cells;
+
+    size_t _next_row = 0;
+    size_t _row_size = 0;
+    const uint8_t* _pos = nullptr;
+    std::vector<uint8_t> _buffer;
+};
+
+// Writer for PartialRowBatch
+//
+// Example usage:
+// scoped_refptr<Schema> sc;
+// Schema::create("id int,uv int,pv int,city tinyint null", &sc);
+// PartialRowWriter writer(*sc.get());
+// writer.start_batch();
+// for (auto& row : rows) {
+//     writer.start_row();
+//     writer.set("column_name", value);
+//     ...
+//     writer.set(column_id, value);
+//     writer.end_row();
+// }
+// vector<uint8_t> buffer;
+// writer.end_batch(&buffer);
+class PartialRowWriter {
+public:
+    static const size_t DEFAULT_BYTE_CAPACITY = 1 << 20;
+    static const size_t DEFAULT_ROW_CAPACIT = 1 << 16;
+
+    explicit PartialRowWriter(scoped_refptr<Schema>* schema);
+    ~PartialRowWriter();
+
+    Status start_batch(size_t row_capacity = DEFAULT_ROW_CAPACIT,
+                       size_t byte_capacity = DEFAULT_BYTE_CAPACITY);
+
+    // Start writing a new row
+    Status start_row();
+
+    // Set cell value by column name
+    // param data's memory must remain valid before calling build
+    Status set(const string& col, const void* data);
+
+    // Set cell value by column id
+    // param data's memory must remain valid before calling build
+    Status set(uint32_t cid, const void* data);
+
+    // set this row is delete operation
+    Status set_delete();
+
+    // Finish writing a row
+    Status end_row();
+
+    // Finish writing the whole ParitialRowBatch, return a serialized buffer
+    Status finish_batch(vector<uint8_t>* buffer);
+
+private:
+    Status set(const ColumnSchema* cs, uint32_t cid, const void* data);
+    size_t byte_size() const;
+    Status write(uint8_t** ppos);
+
+    scoped_refptr<Schema> _schema;
+    struct CellInfo {
+        CellInfo() = default;
+        uint32_t isset = 0;
+        uint32_t isnullable = 0;
+        const uint8_t* data = nullptr;
+    };
+    vector<CellInfo> _temp_cells;
+    size_t _bit_set_size = 0;
+    size_t _bit_null_size = 0;

Review comment:
       _bit_nullable_size

##########
File path: be/src/olap/memory/partial_row_batch.h
##########
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "olap/memory/common.h"
+#include "olap/memory/schema.h"
+
+namespace doris {
+namespace memory {
+
+// A chunk of memory that stores a batch of serialized partial rows
+// User can iterate through all the partial rows, get each partial row's cells.
+//
+// Serialization format for a batch:
+// 4 byte len | serialized partial row
+// 4 byte len | serialized partial row
+// ...
+// 4 byte len | serialized partial row
+//
+// Serialization format for a partial row
+// bit vector(se + null) byte size (2 byte) |
+// bit vector mark set cells |
+// bit vector mark nullable cells' null value |
+// 8bit padding
+// serialized not null cells
+//
+// Example usage:
+// PartialRowBatch rb(&schema);
+// rb.load(buffer);
+// while (true) {
+//     bool has;
+//     rb.next(&has);
+//     if (!has) break;
+//     for (size_t j=0; j < reader.cell_size(); j++) {
+//         const ColumnSchema* cs = nullptr;
+//         const void* data = nullptr;
+//         // get column cell type and data
+//         rb.get_cell(j, &cs, &data);
+//     }
+// }
+//
+// Note: currently only fixed length column types are supported. All length 
and scalar types store
+// in native byte order(little endian in x86-64).
+//
+// Note: The serialization format is simple, it only provides basic 
functionalities
+// so we can quickly complete the whole create/read/write pipeline. The format 
may change
+// as the project evolves.
+class PartialRowBatch {
+public:
+    explicit PartialRowBatch(scoped_refptr<Schema>* schema);
+    ~PartialRowBatch();
+
+    const Schema& schema() const { return *_schema.get(); }
+
+    // Load from a serialized buffer
+    Status load(std::vector<uint8_t>&& buffer);
+
+    // Return row count in this batch
+    size_t row_size() const { return _row_size; }
+
+    // Iterate to next row, mark has_row to false if there is no more rows
+    Status next_row(bool* has_row);
+
+    // Get row operation cell count
+    size_t cur_row_cell_size() const { return _cells.size(); }
+    // Get row operation cell by index idx, return ColumnSchema and data 
pointer
+    Status cur_row_get_cell(size_t idx, const ColumnSchema** cs, const void** 
data) const;
+
+private:
+    scoped_refptr<Schema> _schema;
+
+    bool _delete = false;
+    size_t _bit_set_size = 0;
+    struct CellInfo {
+        CellInfo(uint32_t cid, const void* data)
+                : cid(cid), data(reinterpret_cast<const uint8_t*>(data)) {}
+        uint32_t cid = 0;
+        const uint8_t* data = nullptr;
+    };
+    vector<CellInfo> _cells;
+
+    size_t _next_row = 0;
+    size_t _row_size = 0;
+    const uint8_t* _pos = nullptr;
+    std::vector<uint8_t> _buffer;
+};
+
+// Writer for PartialRowBatch
+//
+// Example usage:
+// scoped_refptr<Schema> sc;
+// Schema::create("id int,uv int,pv int,city tinyint null", &sc);
+// PartialRowWriter writer(*sc.get());
+// writer.start_batch();
+// for (auto& row : rows) {
+//     writer.start_row();
+//     writer.set("column_name", value);
+//     ...
+//     writer.set(column_id, value);
+//     writer.end_row();
+// }
+// vector<uint8_t> buffer;
+// writer.end_batch(&buffer);

Review comment:
       finish_batch is the function name




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to