This is an automated email from the ASF dual-hosted git repository.
lixueclaire pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-graphar.git
The following commit(s) were added to refs/heads/main by this push:
new b86304a7 feat(c++): label filtering API, benchmarks, and examples
(#654)
b86304a7 is described below
commit b86304a7c1f3a5bee01910dd2b196dec54303ad6
Author: Elssky <[email protected]>
AuthorDate: Mon Nov 11 09:45:36 2024 +0800
feat(c++): label filtering API, benchmarks, and examples (#654)
---
cpp/CMakeLists.txt | 1 +
cpp/benchmarks/benchmark_util.h | 6 +
cpp/benchmarks/label_filter_benchmark.cc | 136 ++++++++++++++
cpp/examples/label_filtering_example.cc | 95 ++++++++++
cpp/src/graphar/high-level/graph_reader.cc | 292 ++++++++++++++++++++++++++++-
cpp/src/graphar/high-level/graph_reader.h | 188 +++++++++++++++++--
cpp/src/graphar/label.cc | 113 +++++++++++
cpp/src/graphar/label.h | 62 ++++++
8 files changed, 874 insertions(+), 19 deletions(-)
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index cd12d486..730c46c0 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -477,6 +477,7 @@ if (BUILD_BENCHMARKS)
target_link_libraries(${target} PRIVATE benchmark::benchmark_main
graphar ${CMAKE_DL_LIBS})
endmacro()
add_benchmark(arrow_chunk_reader_benchmark SRCS
benchmarks/arrow_chunk_reader_benchmark.cc)
+ add_benchmark(label_filter_benchmark SRCS
benchmarks/label_filter_benchmark.cc)
add_benchmark(graph_info_benchmark SRCS benchmarks/graph_info_benchmark.cc)
endif()
diff --git a/cpp/benchmarks/benchmark_util.h b/cpp/benchmarks/benchmark_util.h
index 19cd4737..6c0494ff 100644
--- a/cpp/benchmarks/benchmark_util.h
+++ b/cpp/benchmarks/benchmark_util.h
@@ -41,6 +41,10 @@ class BenchmarkFixture : public ::benchmark::Fixture {
path_ = std::string(c_root) + "/ldbc_sample/parquet/ldbc_sample.graph.yml";
auto maybe_graph_info = GraphInfo::Load(path_);
graph_info_ = maybe_graph_info.value();
+
+ second_path_ = std::string(c_root) + "/ldbc/parquet/ldbc.graph.yml";
+ auto second_maybe_graph_info = GraphInfo::Load(second_path_);
+ second_graph_info_ = second_maybe_graph_info.value();
}
void TearDown(const ::benchmark::State& state) override {}
@@ -48,5 +52,7 @@ class BenchmarkFixture : public ::benchmark::Fixture {
protected:
std::string path_;
std::shared_ptr<GraphInfo> graph_info_;
+ std::string second_path_;
+ std::shared_ptr<GraphInfo> second_graph_info_;
};
} // namespace graphar
diff --git a/cpp/benchmarks/label_filter_benchmark.cc
b/cpp/benchmarks/label_filter_benchmark.cc
new file mode 100644
index 00000000..d575a075
--- /dev/null
+++ b/cpp/benchmarks/label_filter_benchmark.cc
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "benchmark/benchmark.h"
+
+#include "./benchmark_util.h"
+#include "graphar/api/high_level_reader.h"
+#include "graphar/api/info.h"
+
+namespace graphar {
+
+std::shared_ptr<graphar::VerticesCollection> SingleLabelFilter(
+ const std::shared_ptr<graphar::GraphInfo>& graph_info) {
+ std::string type = "organisation";
+ auto vertex_info = graph_info->GetVertexInfo(type);
+ auto labels = vertex_info->GetLabels();
+ std::string filter_label = "university";
+ auto maybe_filter_vertices_collection =
+ VerticesCollection::verticesWithLabel(filter_label, graph_info, type);
+ auto filter_vertices = maybe_filter_vertices_collection.value();
+ return filter_vertices;
+}
+
+void SingleLabelFilterbyAcero(
+ const std::shared_ptr<graphar::GraphInfo>& graph_info) {
+ std::string type = "organisation";
+ auto vertex_info = graph_info->GetVertexInfo(type);
+ auto labels = vertex_info->GetLabels();
+ std::string filter_label = "university";
+ auto maybe_filter_vertices_collection =
+ VerticesCollection::verticesWithLabelbyAcero(filter_label, graph_info,
+ type);
+ auto filter_vertices = maybe_filter_vertices_collection.value();
+}
+
+void MultiLabelFilter(const std::shared_ptr<graphar::GraphInfo>& graph_info) {
+ std::string type = "organisation";
+ auto vertex_info = graph_info->GetVertexInfo(type);
+ auto labels = vertex_info->GetLabels();
+ std::vector<std::string> filter_label = {"university", "company"};
+ auto maybe_filter_vertices_collection =
+ VerticesCollection::verticesWithMultipleLabels(filter_label, graph_info,
+ type);
+ auto filter_vertices = maybe_filter_vertices_collection.value();
+}
+
+void MultiLabelFilterbyAcero(
+ const std::shared_ptr<graphar::GraphInfo>& graph_info) {
+ std::string type = "organisation";
+ auto vertex_info = graph_info->GetVertexInfo(type);
+ auto labels = vertex_info->GetLabels();
+ std::vector<std::string> filter_label = {"university", "company"};
+ auto maybe_filter_vertices_collection =
+ VerticesCollection::verticesWithMultipleLabelsbyAcero(filter_label,
+ graph_info, type);
+ auto filter_vertices = maybe_filter_vertices_collection.value();
+}
+
+std::shared_ptr<graphar::VerticesCollection> LabelFilterFromSet(
+ const std::shared_ptr<graphar::GraphInfo>& graph_info,
+ const std::shared_ptr<VerticesCollection>& vertices_collection) {
+ std::string type = "organisation";
+ auto vertex_info = graph_info->GetVertexInfo(type);
+ auto labels = vertex_info->GetLabels();
+ std::vector<std::string> filter_label = {"company", "public"};
+ auto maybe_filter_vertices_collection =
+ VerticesCollection::verticesWithMultipleLabels(filter_label,
+ vertices_collection);
+ auto filter_vertices = maybe_filter_vertices_collection.value();
+ return filter_vertices;
+}
+
+BENCHMARK_DEFINE_F(BenchmarkFixture, SingleLabelFilter)
+(::benchmark::State& state) { // NOLINT
+ for (auto _ : state) {
+ SingleLabelFilter(second_graph_info_);
+ }
+}
+
+BENCHMARK_DEFINE_F(BenchmarkFixture, SingleLabelFilterbyAcero)
+(::benchmark::State& state) { // NOLINT
+ for (auto _ : state) {
+ SingleLabelFilterbyAcero(second_graph_info_);
+ }
+}
+
+BENCHMARK_DEFINE_F(BenchmarkFixture, MultiLabelFilter)
+(::benchmark::State& state) { // NOLINT
+ for (auto _ : state) {
+ MultiLabelFilter(second_graph_info_);
+ }
+}
+
+BENCHMARK_DEFINE_F(BenchmarkFixture, MultiLabelFilterbyAcero)
+(::benchmark::State& state) { // NOLINT
+ for (auto _ : state) {
+ MultiLabelFilterbyAcero(second_graph_info_);
+ }
+}
+
+BENCHMARK_DEFINE_F(BenchmarkFixture, LabelFilterFromSet)
+(::benchmark::State& state) { // NOLINT
+ for (auto _ : state) {
+ state.PauseTiming();
+ auto vertices_collection = SingleLabelFilter(second_graph_info_);
+ auto vertices_collection_2 =
+ LabelFilterFromSet(second_graph_info_, vertices_collection);
+ state.ResumeTiming();
+ LabelFilterFromSet(second_graph_info_, vertices_collection_2);
+ }
+}
+
+BENCHMARK_REGISTER_F(BenchmarkFixture, SingleLabelFilter)->Iterations(10);
+BENCHMARK_REGISTER_F(BenchmarkFixture, SingleLabelFilterbyAcero)
+ ->Iterations(10);
+BENCHMARK_REGISTER_F(BenchmarkFixture, MultiLabelFilter)->Iterations(10);
+BENCHMARK_REGISTER_F(BenchmarkFixture,
MultiLabelFilterbyAcero)->Iterations(10);
+BENCHMARK_REGISTER_F(BenchmarkFixture, LabelFilterFromSet)->Iterations(10);
+
+} // namespace graphar
diff --git a/cpp/examples/label_filtering_example.cc
b/cpp/examples/label_filtering_example.cc
new file mode 100644
index 00000000..e519bdda
--- /dev/null
+++ b/cpp/examples/label_filtering_example.cc
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <iostream>
+
+#include "arrow/api.h"
+#include "arrow/filesystem/api.h"
+
+#include "./config.h"
+#include "graphar/api/arrow_reader.h"
+#include "graphar/api/high_level_reader.h"
+
+void vertices_collection(
+ const std::shared_ptr<graphar::GraphInfo>& graph_info) {
+ std::string type = "organisation";
+ auto vertex_info = graph_info->GetVertexInfo("organisation");
+ auto labels = vertex_info->GetLabels();
+
+ std::cout << "Query vertices with a specific label" << std::endl;
+ std::cout << "--------------------------------------" << std::endl;
+
+ auto maybe_filter_vertices_collection =
+ graphar::VerticesCollection::verticesWithLabel(std::string("company"),
+ graph_info, type);
+
+ ASSERT(!maybe_filter_vertices_collection.has_error());
+ auto filter_vertices = maybe_filter_vertices_collection.value();
+ std::cout << "valid vertices num: " << filter_vertices->size() << std::endl;
+
+ std::cout << std::endl;
+ std::cout << "Query vertices with specific label in a filtered vertices set"
+ << std::endl;
+ std::cout << "--------------------------------------" << std::endl;
+
+ auto maybe_filter_vertices_collection_2 =
+ graphar::VerticesCollection::verticesWithLabel(std::string("public"),
+ filter_vertices);
+ ASSERT(!maybe_filter_vertices_collection_2.has_error());
+ auto filter_vertices_2 = maybe_filter_vertices_collection_2.value();
+ std::cout << "valid vertices num: " << filter_vertices_2->size() <<
std::endl;
+
+ std::cout << std::endl;
+ std::cout << "Test vertices with multi labels" << std::endl;
+ std::cout << "--------------------------------------" << std::endl;
+ auto maybe_filter_vertices_collection_3 =
+ graphar::VerticesCollection::verticesWithMultipleLabels(
+ {"company", "public"}, graph_info, type);
+ ASSERT(!maybe_filter_vertices_collection_3.has_error());
+ auto filter_vertices_3 = maybe_filter_vertices_collection_3.value();
+ std::cout << "valid vertices num: " << filter_vertices_3->size() <<
std::endl;
+
+ for (auto it = filter_vertices_3->begin(); it != filter_vertices_3->end();
+ ++it) {
+ // get a node's all labels
+ auto label_result = it.label();
+ std::cout << "id: " << it.id() << " ";
+ if (!label_result.has_error()) {
+ for (auto label : label_result.value()) {
+ std::cout << label << " ";
+ }
+ }
+ std::cout << "name: ";
+ auto property = it.property<std::string>("name").value();
+ std::cout << property << " ";
+ std::cout << std::endl;
+ }
+}
+
+int main(int argc, char* argv[]) {
+ // read file and construct graph info
+ std::string path = GetTestingResourceRoot() + "/ldbc/parquet/ldbc.graph.yml";
+ auto graph_info = graphar::GraphInfo::Load(path).value();
+
+ // vertices collection
+ std::cout << "Vertices collection" << std::endl;
+ std::cout << "-------------------" << std::endl;
+ vertices_collection(graph_info);
+ std::cout << std::endl;
+}
diff --git a/cpp/src/graphar/high-level/graph_reader.cc
b/cpp/src/graphar/high-level/graph_reader.cc
index 9d9857d3..2cfe5b36 100644
--- a/cpp/src/graphar/high-level/graph_reader.cc
+++ b/cpp/src/graphar/high-level/graph_reader.cc
@@ -17,8 +17,14 @@
* under the License.
*/
-#include "graphar/high-level/graph_reader.h"
+#include <algorithm>
+#include <unordered_set>
+
+#include "arrow/array.h"
+#include "graphar/api/arrow_reader.h"
#include "graphar/convert_to_arrow_type.h"
+#include "graphar/high-level/graph_reader.h"
+#include "graphar/label.h"
#include "graphar/types.h"
namespace graphar {
@@ -94,6 +100,290 @@ Vertex::Vertex(IdType id,
}
}
+Result<bool> VertexIter::hasLabel(const std::string& label) noexcept {
+ std::shared_ptr<arrow::ChunkedArray> column(nullptr);
+ label_reader_.seek(cur_offset_);
+ GAR_ASSIGN_OR_RAISE(auto chunk_table, label_reader_.GetLabelChunk());
+ column = util::GetArrowColumnByName(chunk_table, label);
+ if (column != nullptr) {
+ auto array = util::GetArrowArrayByChunkIndex(column, 0);
+ auto bool_array = std::dynamic_pointer_cast<arrow::BooleanArray>(array);
+ return bool_array->Value(0);
+ }
+ return Status::KeyError("label with name ", label,
+ " does not exist in the vertex.");
+}
+
+Result<std::vector<std::string>> VertexIter::label() noexcept {
+ std::shared_ptr<arrow::ChunkedArray> column(nullptr);
+ std::vector<std::string> vertex_label;
+ if (is_filtered_)
+ label_reader_.seek(filtered_ids_[cur_offset_]);
+ else
+ label_reader_.seek(cur_offset_);
+ GAR_ASSIGN_OR_RAISE(auto chunk_table, label_reader_.GetLabelChunk());
+ for (auto label : labels_) {
+ column = util::GetArrowColumnByName(chunk_table, label);
+ if (column != nullptr) {
+ auto array = util::GetArrowArrayByChunkIndex(column, 0);
+ auto bool_array = std::dynamic_pointer_cast<arrow::BooleanArray>(array);
+ if (bool_array->Value(0)) {
+ vertex_label.push_back(label);
+ }
+ }
+ }
+ return vertex_label;
+}
+
+static inline bool IsValid(bool* state, int column_number) {
+ for (int i = 0; i < column_number; ++i) {
+ // AND case
+ if (!state[i])
+ return false;
+ // OR case
+ // if (state[i]) return true;
+ }
+ // AND case
+ return true;
+ // OR case
+ // return false;
+}
+
+Result<std::vector<IdType>> VerticesCollection::filter(
+ std::vector<std::string> filter_labels,
+ std::vector<IdType>* new_valid_chunk) {
+ std::vector<int> indices;
+ const int TOT_ROWS_NUM = vertex_num_;
+ const int CHUNK_SIZE = vertex_info_->GetChunkSize();
+ const int TOT_LABEL_NUM = labels_.size();
+ const int TESTED_LABEL_NUM = filter_labels.size();
+ std::vector<int> tested_label_ids;
+
+ for (const auto& filter_label : filter_labels) {
+ auto it = std::find(labels_.begin(), labels_.end(), filter_label);
+ if (it != labels_.end()) {
+ tested_label_ids.push_back(std::distance(labels_.begin(), it));
+ }
+ }
+ if (tested_label_ids.empty())
+ return Status::KeyError(
+ "query label"
+ " does not exist in the vertex.");
+
+ uint64_t* bitmap = new uint64_t[TOT_ROWS_NUM / 64 + 1];
+ memset(bitmap, 0, sizeof(uint64_t) * (TOT_ROWS_NUM / 64 + 1));
+ int total_count = 0;
+ int row_num;
+
+ if (is_filtered_) {
+ for (int chunk_idx : valid_chunk_) {
+ row_num = std::min(CHUNK_SIZE, TOT_ROWS_NUM - chunk_idx * CHUNK_SIZE);
+ std::string new_filename =
+ prefix_ + vertex_info_->GetPrefix() + "labels/chunk";
+ int count = read_parquet_file_and_get_valid_indices(
+ new_filename.c_str(), row_num, TOT_LABEL_NUM, TESTED_LABEL_NUM,
+ tested_label_ids, IsValid, chunk_idx, CHUNK_SIZE, &indices, bitmap,
+ QUERY_TYPE::INDEX);
+ if (count != 0 && new_valid_chunk != nullptr)
+ new_valid_chunk->emplace_back(static_cast<IdType>(chunk_idx));
+ }
+ } else {
+ for (int chunk_idx = 0; chunk_idx * CHUNK_SIZE < TOT_ROWS_NUM;
+ ++chunk_idx) {
+ row_num = std::min(CHUNK_SIZE, TOT_ROWS_NUM - chunk_idx * CHUNK_SIZE);
+ std::string new_filename =
+ prefix_ + vertex_info_->GetPrefix() + "labels/chunk";
+ int count = read_parquet_file_and_get_valid_indices(
+ new_filename.c_str(), row_num, TOT_LABEL_NUM, TESTED_LABEL_NUM,
+ tested_label_ids, IsValid, chunk_idx, CHUNK_SIZE, &indices, bitmap,
+ QUERY_TYPE::INDEX);
+ if (count != 0)
+ valid_chunk_.emplace_back(static_cast<IdType>(chunk_idx));
+ }
+ }
+ // std::cout << "Total valid count: " << total_count << std::endl;
+ std::vector<int64_t> indices64;
+
+ for (int value : indices) {
+ indices64.push_back(static_cast<int64_t>(value));
+ }
+
+ delete[] bitmap;
+
+ return indices64;
+}
+
+Result<std::vector<IdType>> VerticesCollection::filter_by_acero(
+ std::vector<std::string> filter_labels) const {
+ std::vector<int> indices;
+ const int TOT_ROWS_NUM = vertex_num_;
+ const int CHUNK_SIZE = vertex_info_->GetChunkSize();
+
+ std::vector<int> tested_label_ids;
+ for (const auto& filter_label : filter_labels) {
+ auto it = std::find(labels_.begin(), labels_.end(), filter_label);
+ if (it != labels_.end()) {
+ tested_label_ids.push_back(std::distance(labels_.begin(), it));
+ }
+ }
+ int total_count = 0;
+ int row_num;
+ std::vector<std::shared_ptr<Expression>> filters;
+ std::shared_ptr<Expression> combined_filter = nullptr;
+
+ for (const auto& label : filter_labels) {
+ filters.emplace_back(
+ graphar::_Equal(graphar::_Property(label), graphar::_Literal(true)));
+ }
+
+ for (const auto& filter : filters) {
+ if (!combined_filter) {
+ combined_filter = graphar::_And(filter, filter);
+ } else {
+ combined_filter = graphar::_And(combined_filter, filter);
+ }
+ }
+
+ auto maybe_filter_reader = graphar::VertexPropertyArrowChunkReader::Make(
+ vertex_info_, labels_, prefix_, {});
+ auto filter_reader = maybe_filter_reader.value();
+ filter_reader->Filter(combined_filter);
+ for (int chunk_idx = 0; chunk_idx * CHUNK_SIZE < TOT_ROWS_NUM; ++chunk_idx) {
+ auto filter_result = filter_reader->GetLabelChunk();
+ auto filter_table = filter_result.value();
+ total_count += filter_table->num_rows();
+ filter_reader->next_chunk();
+ }
+ // std::cout << "Total valid count: " << total_count << std::endl;
+ std::vector<int64_t> indices64;
+
+ for (int value : indices) {
+ indices64.push_back(static_cast<int64_t>(value));
+ }
+
+ return indices64;
+}
+
+Result<std::shared_ptr<VerticesCollection>>
+VerticesCollection::verticesWithLabel(
+ const std::string& filter_label,
+ const std::shared_ptr<GraphInfo>& graph_info, const std::string& type) {
+ auto prefix = graph_info->GetPrefix();
+ auto vertex_info = graph_info->GetVertexInfo(type);
+ auto labels = vertex_info->GetLabels();
+ auto vertices_collection =
+ std::make_shared<VerticesCollection>(vertex_info, prefix);
+ vertices_collection->filtered_ids_ =
+ vertices_collection->filter({filter_label}).value();
+ vertices_collection->is_filtered_ = true;
+ return vertices_collection;
+}
+
+Result<std::shared_ptr<VerticesCollection>>
+VerticesCollection::verticesWithLabelbyAcero(
+ const std::string& filter_label,
+ const std::shared_ptr<GraphInfo>& graph_info, const std::string& type) {
+ auto prefix = graph_info->GetPrefix();
+ auto vertex_info = graph_info->GetVertexInfo(type);
+ auto labels = vertex_info->GetLabels();
+ auto vertices_collection =
+ std::make_shared<VerticesCollection>(vertex_info, prefix);
+ vertices_collection->filtered_ids_ =
+ vertices_collection->filter_by_acero({filter_label}).value();
+ vertices_collection->is_filtered_ = true;
+ return vertices_collection;
+}
+
+Result<std::shared_ptr<VerticesCollection>>
+VerticesCollection::verticesWithLabel(
+ const std::string& filter_label,
+ const std::shared_ptr<VerticesCollection>& vertices_collection) {
+ auto new_vertices_collection = std::make_shared<VerticesCollection>(
+ vertices_collection->vertex_info_, vertices_collection->prefix_);
+ auto filtered_ids =
+ new_vertices_collection
+ ->filter({filter_label}, &new_vertices_collection->valid_chunk_)
+ .value();
+ if (vertices_collection->is_filtered_) {
+ std::unordered_set<IdType> origin_set(
+ vertices_collection->filtered_ids_.begin(),
+ vertices_collection->filtered_ids_.end());
+ std::unordered_set<int> intersection;
+ for (int num : filtered_ids) {
+ if (origin_set.count(num)) {
+ intersection.insert(num);
+ }
+ }
+ filtered_ids =
+ std::vector<IdType>(intersection.begin(), intersection.end());
+
+ new_vertices_collection->is_filtered_ = true;
+ }
+ new_vertices_collection->filtered_ids_ = filtered_ids;
+
+ return new_vertices_collection;
+}
+
+Result<std::shared_ptr<VerticesCollection>>
+VerticesCollection::verticesWithMultipleLabels(
+ const std::vector<std::string>& filter_labels,
+ const std::shared_ptr<GraphInfo>& graph_info, const std::string& type) {
+ auto prefix = graph_info->GetPrefix();
+ auto vertex_info = graph_info->GetVertexInfo(type);
+ auto labels = vertex_info->GetLabels();
+ auto vertices_collection =
+ std::make_shared<VerticesCollection>(vertex_info, prefix);
+ vertices_collection->filtered_ids_ =
+ vertices_collection->filter(filter_labels).value();
+ vertices_collection->is_filtered_ = true;
+ return vertices_collection;
+}
+
+Result<std::shared_ptr<VerticesCollection>>
+VerticesCollection::verticesWithMultipleLabelsbyAcero(
+ const std::vector<std::string>& filter_labels,
+ const std::shared_ptr<GraphInfo>& graph_info, const std::string& type) {
+ auto prefix = graph_info->GetPrefix();
+ auto vertex_info = graph_info->GetVertexInfo(type);
+ auto labels = vertex_info->GetLabels();
+ auto vertices_collection =
+ std::make_shared<VerticesCollection>(vertex_info, prefix);
+ vertices_collection->filtered_ids_ =
+ vertices_collection->filter_by_acero(filter_labels).value();
+ vertices_collection->is_filtered_ = true;
+ return vertices_collection;
+}
+
+Result<std::shared_ptr<VerticesCollection>>
+VerticesCollection::verticesWithMultipleLabels(
+ const std::vector<std::string>& filter_labels,
+ const std::shared_ptr<VerticesCollection>& vertices_collection) {
+ auto new_vertices_collection = std::make_shared<VerticesCollection>(
+ vertices_collection->vertex_info_, vertices_collection->prefix_);
+ auto filtered_ids =
+ vertices_collection
+ ->filter(filter_labels, &new_vertices_collection->valid_chunk_)
+ .value();
+ if (vertices_collection->is_filtered_) {
+ std::unordered_set<IdType> origin_set(
+ vertices_collection->filtered_ids_.begin(),
+ vertices_collection->filtered_ids_.end());
+ std::unordered_set<int> intersection;
+ for (int num : filtered_ids) {
+ if (origin_set.count(num)) {
+ intersection.insert(num);
+ }
+ }
+ filtered_ids =
+ std::vector<IdType>(intersection.begin(), intersection.end());
+
+ new_vertices_collection->is_filtered_ = true;
+ }
+ new_vertices_collection->filtered_ids_ = filtered_ids;
+
+ return new_vertices_collection;
+}
+
template <typename T>
Result<T> Vertex::property(const std::string& property) const {
if constexpr (std::is_final<T>::value) {
diff --git a/cpp/src/graphar/high-level/graph_reader.h
b/cpp/src/graphar/high-level/graph_reader.h
index 20e7d8f8..19c8f716 100644
--- a/cpp/src/graphar/high-level/graph_reader.h
+++ b/cpp/src/graphar/high-level/graph_reader.h
@@ -74,6 +74,13 @@ class Vertex {
template <typename T>
Result<T> property(const std::string& property) const;
+ /**
+ * @brief Get the label of the vertex.
+ * @return Result: The label of the vertex.
+ */
+ template <typename T>
+ Result<T> label() const;
+
/**
* @brief Return true if value at the property is valid (not null).
*
@@ -172,40 +179,80 @@ class VertexIter {
* @param offset The current offset of the readers.
*/
explicit VertexIter(const std::shared_ptr<VertexInfo>& vertex_info,
- const std::string& prefix, IdType offset) noexcept {
+ const std::string& prefix, IdType offset,
+ const std::vector<std::string>& labels,
+ const bool& is_filtered = false,
+ const std::vector<IdType>& filtered_ids = {}) noexcept {
+ if (!labels.empty()) {
+ labels_ = labels;
+ label_reader_ =
+ VertexPropertyArrowChunkReader(vertex_info, labels, prefix);
+ }
for (const auto& pg : vertex_info->GetPropertyGroups()) {
readers_.emplace_back(vertex_info, pg, prefix);
}
+ is_filtered_ = is_filtered;
+ filtered_ids_ = filtered_ids;
cur_offset_ = offset;
}
/** Copy constructor. */
VertexIter(const VertexIter& other)
- : readers_(other.readers_), cur_offset_(other.cur_offset_) {}
+ : readers_(other.readers_),
+ cur_offset_(other.cur_offset_),
+ labels_(other.labels_),
+ label_reader_(other.label_reader_),
+ is_filtered_(other.is_filtered_),
+ filtered_ids_(other.filtered_ids_) {}
/** Construct and return the vertex of the current offset. */
Vertex operator*() noexcept {
- for (auto& reader : readers_) {
- reader.seek(cur_offset_);
+ if (is_filtered_) {
+ for (auto& reader : readers_) {
+ reader.seek(filtered_ids_[cur_offset_]);
+ }
+ } else {
+ for (auto& reader : readers_) {
+ reader.seek(cur_offset_);
+ }
}
+
return Vertex(cur_offset_, readers_);
}
/** Get the vertex id of the current offset. */
- IdType id() { return cur_offset_; }
+ IdType id() {
+ if (is_filtered_) {
+ return filtered_ids_[cur_offset_];
+ } else {
+ return cur_offset_;
+ }
+ }
/** Get the value for a property of the current vertex. */
template <typename T>
Result<T> property(const std::string& property) noexcept {
std::shared_ptr<arrow::ChunkedArray> column(nullptr);
- for (auto& reader : readers_) {
- reader.seek(cur_offset_);
- GAR_ASSIGN_OR_RAISE(auto chunk_table, reader.GetChunk());
- column = util::GetArrowColumnByName(chunk_table, property);
- if (column != nullptr) {
- break;
+ if (is_filtered_) {
+ for (auto& reader : readers_) {
+ reader.seek(filtered_ids_[cur_offset_]);
+ GAR_ASSIGN_OR_RAISE(auto chunk_table, reader.GetChunk());
+ column = util::GetArrowColumnByName(chunk_table, property);
+ if (column != nullptr) {
+ break;
+ }
+ }
+ } else {
+ for (auto& reader : readers_) {
+ reader.seek(cur_offset_);
+ GAR_ASSIGN_OR_RAISE(auto chunk_table, reader.GetChunk());
+ column = util::GetArrowColumnByName(chunk_table, property);
+ if (column != nullptr) {
+ break;
+ }
}
}
+
if (column != nullptr) {
auto array = util::GetArrowArrayByChunkIndex(column, 0);
GAR_ASSIGN_OR_RAISE(auto data, util::GetArrowArrayData(array));
@@ -215,6 +262,12 @@ class VertexIter {
" does not exist in the vertex.");
}
+ /** Determine whether a vertex has the input label. */
+ Result<bool> hasLabel(const std::string& label) noexcept;
+
+ /** Get the labels of the current vertex. */
+ Result<std::vector<std::string>> label() noexcept;
+
/** The prefix increment operator. */
VertexIter& operator++() noexcept {
++cur_offset_;
@@ -253,7 +306,11 @@ class VertexIter {
private:
std::vector<VertexPropertyArrowChunkReader> readers_;
+ VertexPropertyArrowChunkReader label_reader_;
+ std::vector<std::string> labels_;
IdType cur_offset_;
+ bool is_filtered_;
+ std::vector<IdType> filtered_ids_;
};
/**
@@ -266,11 +323,18 @@ class VerticesCollection {
* @brief Initialize the VerticesCollection.
*
* @param vertex_info The vertex info that describes the vertex type.
+ * @param labels The labels of the vertex.
* @param prefix The absolute prefix.
*/
explicit VerticesCollection(const std::shared_ptr<VertexInfo>& vertex_info,
- const std::string& prefix)
- : vertex_info_(std::move(vertex_info)), prefix_(prefix) {
+ const std::string& prefix,
+ const bool is_filtered = false,
+ const std::vector<IdType> filtered_ids = {})
+ : vertex_info_(std::move(vertex_info)),
+ prefix_(prefix),
+ labels_(vertex_info->GetLabels()),
+ is_filtered_(is_filtered),
+ filtered_ids_(filtered_ids) {
// get the vertex num
std::string base_dir;
GAR_ASSIGN_OR_RAISE_ERROR(auto fs,
@@ -283,21 +347,104 @@ class VerticesCollection {
}
/** The iterator pointing to the first vertex. */
- VertexIter begin() noexcept { return VertexIter(vertex_info_, prefix_, 0); }
+ VertexIter begin() noexcept {
+ return VertexIter(vertex_info_, prefix_, 0, labels_, is_filtered_,
+ filtered_ids_);
+ }
/** The iterator pointing to the past-the-end element. */
VertexIter end() noexcept {
- return VertexIter(vertex_info_, prefix_, vertex_num_);
+ if (is_filtered_)
+ return VertexIter(vertex_info_, prefix_, filtered_ids_.size(), labels_,
+ is_filtered_, filtered_ids_);
+ return VertexIter(vertex_info_, prefix_, vertex_num_, labels_,
is_filtered_,
+ filtered_ids_);
}
/** The iterator pointing to the vertex with specific id. */
- VertexIter find(IdType id) { return VertexIter(vertex_info_, prefix_, id); }
+ VertexIter find(IdType id) {
+ return VertexIter(vertex_info_, prefix_, id, labels_);
+ }
/** Get the number of vertices in the collection. */
- size_t size() const noexcept { return vertex_num_; }
+ size_t size() const noexcept {
+ if (is_filtered_)
+ return filtered_ids_.size();
+ else
+ return vertex_num_;
+ }
+
+ /** The vertex id list that satisfies the label filter condition. */
+ Result<std::vector<IdType>> filter(
+ std::vector<std::string> filter_labels,
+ std::vector<IdType>* new_valid_chunk = nullptr);
+
+ Result<std::vector<IdType>> filter_by_acero(
+ std::vector<std::string> filter_labels) const;
+
+ /**
+ * @brief Query vertices with a specific label
+ *
+ * @param filter_label The label to query vertices by
+ * @param graph_info A smart pointer to GraphInfo that contains details about
+ * the graph
+ * @param type The type of vertices to query
+ * @return A VerticesCollection containing all vertices that have the
+ * specified label
+ */
+ static Result<std::shared_ptr<VerticesCollection>> verticesWithLabel(
+ const std::string& filter_label,
+ const std::shared_ptr<GraphInfo>& graph_info, const std::string& type);
+
+ static Result<std::shared_ptr<VerticesCollection>> verticesWithLabelbyAcero(
+ const std::string& filter_label,
+ const std::shared_ptr<GraphInfo>& graph_info, const std::string& type);
+
+ /**
+ * @brief Query vertices with a specific label within a given collection
+ *
+ * @param filter_label The label to query vertices by
+ * @param vertices_collection The collection of vertices to search within
+ * @return A VerticesCollection containing all vertices from the specified
+ * collection that have the specified label
+ */
+ static Result<std::shared_ptr<VerticesCollection>> verticesWithLabel(
+ const std::string& filter_label,
+ const std::shared_ptr<VerticesCollection>& vertices_collection);
+
+ /**
+ * @brief Query vertices with multiple labels
+ *
+ * @param filter_labels A vector of labels to query vertices by
+ * @param graph_info A smart pointer to GraphInfo that contains details about
+ * the graph
+ * @param type The type of vertices to query
+ * @return A VerticesCollection containing all vertices that have all of the
+ * specified labels
+ */
+ static Result<std::shared_ptr<VerticesCollection>>
verticesWithMultipleLabels(
+ const std::vector<std::string>& filter_labels,
+ const std::shared_ptr<GraphInfo>& graph_info, const std::string& type);
+
+ static Result<std::shared_ptr<VerticesCollection>>
+ verticesWithMultipleLabelsbyAcero(
+ const std::vector<std::string>& filter_labels,
+ const std::shared_ptr<GraphInfo>& graph_info, const std::string& type);
+
+ /**
+ * @brief Query vertices with multiple labels within a given collection
+ *
+ * @param filter_labels A vector of labels to query vertices by
+ * @param vertices_collection The collection of vertices to search within
+ * @return A VerticesCollection containing all vertices from the specified
+ * collection that have all of the specified labels
+ */
+ static Result<std::shared_ptr<VerticesCollection>>
verticesWithMultipleLabels(
+ const std::vector<std::string>& filter_labels,
+ const std::shared_ptr<VerticesCollection>& vertices_collection);
/**
- * @brief Construct a VerticesCollection from graph info and vertex type.
+ * @brief Construct a VerticesCollection from graph info and vertex label.
*
* @param graph_info The graph info.
* @param type The vertex type.
@@ -305,6 +452,7 @@ class VerticesCollection {
static Result<std::shared_ptr<VerticesCollection>> Make(
const std::shared_ptr<GraphInfo>& graph_info, const std::string& type) {
auto vertex_info = graph_info->GetVertexInfo(type);
+ auto labels = vertex_info->GetLabels();
if (!vertex_info) {
return Status::KeyError("The vertex ", type, " doesn't exist.");
}
@@ -315,6 +463,10 @@ class VerticesCollection {
private:
std::shared_ptr<VertexInfo> vertex_info_;
std::string prefix_;
+ std::vector<std::string> labels_;
+ bool is_filtered_;
+ std::vector<IdType> filtered_ids_;
+ std::vector<IdType> valid_chunk_;
IdType vertex_num_;
};
diff --git a/cpp/src/graphar/label.cc b/cpp/src/graphar/label.cc
new file mode 100644
index 00000000..e8b23797
--- /dev/null
+++ b/cpp/src/graphar/label.cc
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "graphar/label.h"
+
+#include <cassert>
+#include <cstring>
+#include <fstream>
+#include <iostream>
+#include <memory>
+#include <set>
+
+/// Read a parquet file by ParquetReader & get valid indices
+/// The first column_num labels are concerned.
+int read_parquet_file_and_get_valid_indices(
+ const char* parquet_filename, const int row_num, const int tot_label_num,
+ const int tested_label_num, std::vector<int> tested_label_ids,
+ const std::function<bool(bool*, int)>& IsValid, int chunk_idx,
+ int chunk_size, std::vector<int>* indices, uint64_t* bitmap,
+ const QUERY_TYPE query_type) {
+ // Create a ParquetReader instance
+ std::unique_ptr<parquet::ParquetFileReader> parquet_reader =
+ parquet::ParquetFileReader::OpenFile(
+ parquet_filename + std::to_string(chunk_idx), false);
+
+ // Get the File MetaData
+ std::shared_ptr<parquet::FileMetaData> file_metadata =
+ parquet_reader->metadata();
+ int row_group_count = file_metadata->num_row_groups();
+ int num_columns = file_metadata->num_columns();
+
+ // Initialize the column row counts
+ std::vector<int> col_row_counts(num_columns, 0);
+ bool** value = new bool*[num_columns];
+ for (int i = 0; i < num_columns; i++) {
+ value[i] = new bool[row_num];
+ }
+
+ // Iterate over all the RowGroups in the file
+ for (int rg = 0; rg < row_group_count; ++rg) {
+ // Get the RowGroup Reader
+ std::shared_ptr<parquet::RowGroupReader> row_group_reader =
+ parquet_reader->RowGroup(rg);
+
+ int64_t values_read = 0;
+ int64_t rows_read = 0;
+ std::shared_ptr<parquet::ColumnReader> column_reader;
+
+ ARROW_UNUSED(rows_read); // prevent warning in release build
+
+ // Read the label columns
+ for (int k = 0; k < tested_label_num; k++) {
+ int col_id = tested_label_ids[k];
+ // Get the Column Reader for the Bool column
+ column_reader = row_group_reader->Column(col_id);
+ parquet::BoolReader* bool_reader =
+ static_cast<parquet::BoolReader*>(column_reader.get());
+ // Read all the rows in the column
+ while (bool_reader->HasNext()) {
+ // Read BATCH_SIZE values at a time. The number of rows read is
+ // returned. values_read contains the number of non-null rows
+
+ rows_read = bool_reader->ReadBatch(BATCH_SIZE, nullptr, nullptr,
+ value[k] + col_row_counts[col_id],
+ &values_read);
+
+ // There are no NULL values in the rows written
+ col_row_counts[col_id] += rows_read;
+ }
+ }
+ }
+ const int kTotLabelNum = tot_label_num;
+ bool state[kTotLabelNum];
+ int count = 0;
+ int offset = chunk_idx * chunk_size;
+ for (int i = 0; i < row_num; i++) {
+ for (int j = 0; j < tested_label_num; j++) {
+ state[j] = value[j][i];
+ }
+ if (IsValid(state, tested_label_num)) {
+ count++;
+ if (query_type == QUERY_TYPE::INDEX)
+
+ indices->push_back(i + offset);
+ else if (query_type == QUERY_TYPE::BITMAP)
+ SetBitmap(bitmap, i);
+ }
+ }
+
+ // destroy the allocated space
+ for (int i = 0; i < num_columns; i++) {
+ delete[] value[i];
+ }
+ delete[] value;
+
+ return count;
+}
diff --git a/cpp/src/graphar/label.h b/cpp/src/graphar/label.h
new file mode 100644
index 00000000..145312e4
--- /dev/null
+++ b/cpp/src/graphar/label.h
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef CPP_SRC_GRAPHAR_LABEL_H_
+#define CPP_SRC_GRAPHAR_LABEL_H_
+
+#include <arrow/io/file.h>
+#include <arrow/util/logging.h>
+#include <parquet/api/reader.h>
+#include <parquet/api/writer.h>
+#include <parquet/properties.h>
+
+#include <iostream>
+#include <set>
+#include <vector>
+
+using parquet::ConvertedType;
+using parquet::Encoding;
+using parquet::Repetition;
+using parquet::Type;
+using parquet::schema::GroupNode;
+using parquet::schema::PrimitiveNode;
+
+constexpr int BATCH_SIZE = 1024; // the batch size
+
+/// The query type
+enum QUERY_TYPE {
+ COUNT, // return the number of valid vertices
+ INDEX, // return the indices of valid vertices
+ BITMAP, // return the bitmap of valid vertices
+ ADAPTIVE // adaptively return indices or bitmap
+};
+
+/// Set bit
+static inline void SetBitmap(uint64_t* bitmap, const int index) {
+ bitmap[index >> 6] |= (1ULL << (index & 63));
+}
+
+int read_parquet_file_and_get_valid_indices(
+ const char* parquet_filename, const int row_num, const int tot_label_num,
+ const int tested_label_num, std::vector<int> tested_label_ids,
+ const std::function<bool(bool*, int)>& IsValid, int chunk_idx,
+ int chunk_size, std::vector<int>* indices = nullptr,
+ uint64_t* bitmap = nullptr, const QUERY_TYPE query_type = COUNT);
+
+#endif // CPP_SRC_GRAPHAR_LABEL_H_
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]