github-actions[bot] commented on code in PR #15670:
URL: https://github.com/apache/doris/pull/15670#discussion_r1063067165
##########
be/src/vec/exec/format/table/iceberg_reader.cpp:
##########
@@ -163,161 +179,158 @@ Status IcebergTableReader::init_row_filters(const
TFileRangeDesc& range) {
}
eof = false;
size_t read_rows = 0;
- RETURN_IF_ERROR(delete_reader.get_next_block(&block,
&read_rows, &eof));
+ create_status = delete_reader.get_next_block(&block,
&read_rows, &eof);
+ if (!create_status.ok()) {
+ return nullptr;
+ }
if (read_rows > 0) {
ColumnPtr path_column =
block.get_by_name(ICEBERG_FILE_PATH).column;
DCHECK_EQ(path_column->size(), read_rows);
- std::pair<int, int> path_range;
+ ColumnPtr pos_column =
block.get_by_name(ICEBERG_ROW_POS).column;
+ using ColumnType = typename
PrimitiveTypeTraits<TYPE_BIGINT>::ColumnType;
+ const int64_t* src_data =
+ assert_cast<const
ColumnType&>(*pos_column).get_data().data();
+ IcebergTableReader::PositionDeleteRange range;
if (dictionary_coded) {
- path_range = _binary_search(assert_cast<const
ColumnDictI32&>(*path_column),
- data_file_path);
+ range = _get_range(assert_cast<const
ColumnDictI32&>(*path_column));
} else {
- path_range = _binary_search(assert_cast<const
ColumnString&>(*path_column),
- data_file_path);
+ range = _get_range(assert_cast<const
ColumnString&>(*path_column));
}
-
- int skip_count = path_range.first;
- int valid_count = path_range.second;
- if (valid_count > 0) {
- // delete position
- ColumnPtr pos_column =
block.get_by_name(ICEBERG_ROW_POS).column;
- CHECK_EQ(pos_column->size(), read_rows);
- using ColumnType = typename
PrimitiveTypeTraits<TYPE_BIGINT>::ColumnType;
- const int64_t* src_data =
- assert_cast<const
ColumnType&>(*pos_column).get_data().data() +
- skip_count;
- const int64_t* src_data_end = src_data + valid_count;
- const int64_t* cpy_start =
- std::lower_bound(src_data, src_data_end,
whole_range.first_row);
- const int64_t* cpy_end =
- std::lower_bound(cpy_start, src_data_end,
whole_range.last_row);
- int64_t cpy_count = cpy_end - cpy_start;
-
- if (cpy_count > 0) {
- int64_t origin_size = delete_rows.size();
- delete_rows.resize(origin_size + cpy_count);
- int64_t* dest_position = &delete_rows[origin_size];
- memcpy(dest_position, cpy_start, cpy_count *
sizeof(int64_t));
- num_delete_rows += cpy_count;
+ for (int i = 0; i < range.range.size(); ++i) {
+ std::string key = range.data_file_path[i];
+ auto iter = position_delete->find(key);
+ DeleteRows* delete_rows;
+ if (iter == position_delete->end()) {
+ delete_rows = new DeleteRows;
+ std::unique_ptr<DeleteRows>
delete_rows_ptr(delete_rows);
+ (*position_delete)[key] =
std::move(delete_rows_ptr);
+ } else {
+ delete_rows = iter->second.get();
}
+ const int64_t* cpy_start = src_data +
range.range[i].first;
+ const int64_t cpy_count = range.range[i].second -
range.range[i].first;
+ int64_t origin_size = delete_rows->size();
+ delete_rows->resize(origin_size + cpy_count);
+ int64_t* dest_position = &(*delete_rows)[origin_size];
+ memcpy(dest_position, cpy_start, cpy_count *
sizeof(int64_t));
}
}
}
- delete_rows_iter++;
+ return position_delete;
+ });
+ if (create_status.is<ErrorCode::END_OF_FILE>()) {
+ continue;
+ } else if (!create_status.ok()) {
+ return create_status;
}
- if (num_delete_rows > 0) {
- for (auto iter = delete_rows_list.begin(); iter !=
delete_rows_list.end();) {
- if (iter->empty()) {
- delete_rows_list.erase(iter++);
- } else {
- iter++;
- }
+ DeleteFile& delete_file_map = *((DeleteFile*)delete_file_cache);
+ auto iter = delete_file_map.find(data_file_path);
+ if (iter != delete_file_map.end() && iter->second->size() > 0) {
+ num_delete_rows += iter->second->size();
+ DeleteRows* row_ids = iter->second.get();
+ delete_rows_array.emplace_back(row_ids);
+ if (row_ids->front() >= whole_range.first_row &&
+ row_ids->back() < whole_range.last_row) {
+ // TODO(gaoxin): how to safely erase data in multithreading.
+ erase_data.emplace_back(delete_file_cache, iter);
}
- SCOPED_TIMER(_iceberg_profile.delete_rows_sort_time);
- _merge_sort(delete_rows_list, num_delete_rows);
- parquet_reader->set_delete_rows(&_delete_rows);
- COUNTER_UPDATE(_iceberg_profile.num_delete_rows, num_delete_rows);
}
}
- // todo: equality delete
- COUNTER_UPDATE(_iceberg_profile.num_delete_files, files.size());
+ if (num_delete_rows > 0) {
+ SCOPED_TIMER(_iceberg_profile.delete_rows_sort_time);
+ _sort_delete_rows(delete_rows_array, num_delete_rows);
+ parquet_reader->set_delete_rows(&_delete_rows);
+ COUNTER_UPDATE(_iceberg_profile.num_delete_rows, num_delete_rows);
+ }
return Status::OK();
Review Comment:
warning: non-void function does not return a value in all control paths
[clang-diagnostic-return-type]
```cpp
}
^
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]