This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push: new b607633fd KUDU-3568 Fix compaction budgeting test by setting memory hard limit b607633fd is described below commit b607633fd3c2b676fbd2cbe57c44bddf818dc457 Author: Ashwani Raina <ara...@cloudera.com> AuthorDate: Thu May 9 22:23:14 2024 +0530 KUDU-3568 Fix compaction budgeting test by setting memory hard limit TestRowSetCompactionSkipWithBudgetingConstraints can fail if the memory on node running the test is high. It happens because the test generates deltas of size worth a few MBs that is multiplied with a preset factor to ensure the result (i.e. memory required for rowset compaction completion) is of high value of the order of 200 GB per rowset. Even though nodes running the test generally don't have so much physical memory, it is still possible to end up with high memory nodes. On such nodes, the test might fail. The patch fixes that problem by deterministically ensuring that compaction memory requirement is always higher than the memory hard limit. It does that by doing the following: 1. Move out the budgeting compaction tests out in a separate binary. 2. This gives flexibility to set the memory hard limit as per test needs. It is important to node that once a memory hard limit is set, it remains the same for all tests executed through binary lifecycle. 3. Set the hard memory limit to 1 GB which is enough to handle compaction requirements for TestRowSetCompactionProceedWithNoBudgetingConstraints. For TestRowSetCompactionSkipWithBudgetingConstraints, it is not enough because we set the delta memory factor high to exceed 1 GB. Both the test are now expected to succeed deterministically. Change-Id: I85d104e1d066507ce8e72a00cc5165cc4b85e48d Reviewed-on: http://gerrit.cloudera.org:8080/21416 Tested-by: Alexey Serbin <ale...@apache.org> Reviewed-by: Alexey Serbin <ale...@apache.org> --- src/kudu/tablet/CMakeLists.txt | 1 + src/kudu/tablet/compaction-highmem-test.cc | 220 +++++++++++++++++++++++++++++ src/kudu/tablet/compaction-test.cc | 143 ------------------- 3 files changed, 221 insertions(+), 143 deletions(-) diff --git a/src/kudu/tablet/CMakeLists.txt b/src/kudu/tablet/CMakeLists.txt index c48089ed8..71af1dab0 100644 --- a/src/kudu/tablet/CMakeLists.txt +++ b/src/kudu/tablet/CMakeLists.txt @@ -105,6 +105,7 @@ SET_KUDU_TEST_LINK_LIBS(tablet tablet_test_util) ADD_KUDU_TEST(all_types-scan-correctness-test NUM_SHARDS 8 PROCESSORS 2) ADD_KUDU_TEST(cfile_set-test) ADD_KUDU_TEST(compaction-test) +ADD_KUDU_TEST(compaction-highmem-test) ADD_KUDU_TEST(compaction_policy-test DATA_FILES ycsb-test-rowsets.tsv) ADD_KUDU_TEST(composite-pushdown-test) ADD_KUDU_TEST(delta_compaction-test) diff --git a/src/kudu/tablet/compaction-highmem-test.cc b/src/kudu/tablet/compaction-highmem-test.cc new file mode 100644 index 000000000..43dbcb63d --- /dev/null +++ b/src/kudu/tablet/compaction-highmem-test.cc @@ -0,0 +1,220 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <cstdint> +#include <memory> +#include <ostream> +#include <string> +#include <type_traits> +#include <vector> + +#include <gflags/gflags_declare.h> +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "kudu/common/common.pb.h" +#include "kudu/common/partial_row.h" +#include "kudu/common/row_operations.pb.h" +#include "kudu/common/schema.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/gutil/strings/join.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/tablet/local_tablet_writer.h" +#include "kudu/tablet/tablet-test-util.h" +#include "kudu/tablet/tablet.h" +#include "kudu/util/logging_test_util.h" +#include "kudu/util/status.h" +#include "kudu/util/stopwatch.h" +#include "kudu/util/test_macros.h" +#include "kudu/util/test_util.h" +#include "kudu/util/trace.h" + +DECLARE_bool(rowset_compaction_enforce_preset_factor); +DECLARE_bool(rowset_compaction_memory_estimate_enabled); +DECLARE_bool(rowset_compaction_ancient_delta_threshold_enabled); +DECLARE_double(rowset_compaction_delta_memory_factor); +DECLARE_int64(memory_limit_hard_bytes); +DECLARE_uint32(rowset_compaction_estimate_min_deltas_size_mb); + +namespace kudu { +namespace tablet { + +class TestHighMemCompaction : public KuduRowSetTest { + public: + TestHighMemCompaction() + : KuduRowSetTest(CreateSchema()) { + } + + static Schema CreateSchema() { + SchemaBuilder builder; + CHECK_OK(builder.AddKeyColumn("key", STRING)); + CHECK_OK(builder.AddColumn("val", INT64)); + CHECK_OK(builder.AddNullableColumn("nullable_val", INT32)); + return builder.BuildWithoutIds(); + } + + Status InsertOrUpsertTestRows(RowOperationsPB::Type type, + int64_t first_row, + int64_t count, + int32_t val) { + LocalTabletWriter writer(tablet().get(), &client_schema()); + KuduPartialRow row(&client_schema()); + + for (int64_t i = first_row; i < first_row + count; i++) { + RETURN_NOT_OK(row.SetStringCopy("key", Substitute("hello $0", i))); + RETURN_NOT_OK(row.SetInt64("val", val)); + if (type == RowOperationsPB::INSERT) { + RETURN_NOT_OK(writer.Insert(row)); + } else if (type == RowOperationsPB::UPSERT) { + RETURN_NOT_OK(writer.Upsert(row)); + } else { + return Status::InvalidArgument( + Substitute("unknown row operation type: $0", type)); + } + } + return Status::OK(); + } + + void InsertOriginalRows(int64_t num_rowsets, int64_t rows_per_rowset) { + for (int64_t rowset_id = 0; rowset_id < num_rowsets; rowset_id++) { + ASSERT_OK(InsertOrUpsertTestRows(RowOperationsPB::INSERT, + rowset_id * rows_per_rowset, + rows_per_rowset, + /*val*/0)); + ASSERT_OK(tablet()->Flush()); + } + ASSERT_EQ(num_rowsets, tablet()->num_rowsets()); + } + + void UpdateOriginalRowsNoFlush(int64_t num_rowsets, int64_t rows_per_rowset, + int32_t val) { + for (int64_t rowset_id = 0; rowset_id < num_rowsets; rowset_id++) { + ASSERT_OK(InsertOrUpsertTestRows(RowOperationsPB::UPSERT, + rowset_id * rows_per_rowset, + rows_per_rowset, + val)); + } + ASSERT_EQ(num_rowsets, tablet()->num_rowsets()); + } + + // Workload to generate large sized deltas for compaction. + // The method generates 1 MB size worth of deltas with size_factor as 1. + // Callers can adjust the size_factor. + // For example, to generate 5MB, set size_factor as 5. + // Similarly, to generate 35MB, set size_factor as 35. + void GenHighMemConsumptionDeltas(const uint32_t size_factor); + + // Enables compaction memory budgeting and then runs rowset compaction. + // Caller can set constraints on budget and expect the results accordingly. + // If constraints are applied, compaction may be skipped. + void TestRowSetCompactionWithOrWithoutBudgetingConstraints(bool budgeting_constraints_applied); + + static void SetUpTestSuite() { + // Keep the memory hard limit as 1GB for deterministic results. + // The tests in this file have a requirement of memory hard limit to be of + // lower value in order to ensure that test expectations are met. + // Since we have initialized memory hard limit here to 1 GB, it is going to + // remain the same throughout the lifecyle of this binary. + // It is important that no test in this file is expecting memory hard limit + // set to physical memory on the node (running the test) i.e. all the tests + // are working with the assumption that memory hard limit is limited to 1 GB. + FLAGS_memory_limit_hard_bytes = 1024 * 1024 * 1024; + + FLAGS_rowset_compaction_ancient_delta_threshold_enabled = true; + FLAGS_rowset_compaction_enforce_preset_factor = true; + FLAGS_rowset_compaction_memory_estimate_enabled = true; + + // Ensure memory budgeting applies + FLAGS_rowset_compaction_estimate_min_deltas_size_mb = 0; + } +}; + +void TestHighMemCompaction::TestRowSetCompactionWithOrWithoutBudgetingConstraints( + bool budgeting_constraints_applied) { + // size factor as 2 generates ~2MB memory size worth of deltas + GenHighMemConsumptionDeltas(2); + + // Run rowset compaction. + StringVectorSink sink; + ScopedRegisterSink reg(&sink); + scoped_refptr<Trace> trace(new Trace); + Stopwatch sw; + sw.start(); + { + ADOPT_TRACE(trace.get()); + ASSERT_OK(tablet()->Compact(Tablet::COMPACT_NO_FLAGS)); + } + sw.stop(); + LOG(INFO) << Substitute("CompactRowSetsOp complete. Timing: $0 Metrics: $1", + sw.elapsed().ToString(), + trace->MetricsAsJSON()); + + if (budgeting_constraints_applied) { + ASSERT_STR_CONTAINS(JoinStrings(sink.logged_msgs(), "\n"), + "removed from compaction input due to memory constraints"); + } else { + ASSERT_STR_NOT_CONTAINS(JoinStrings(sink.logged_msgs(), "\n"), + "removed from compaction input due to memory constraints"); + } +} + +void TestHighMemCompaction::GenHighMemConsumptionDeltas(const uint32_t size_factor) { + constexpr const uint32_t num_rowsets = 10; + constexpr const uint32_t num_rows_per_rowset = 2; + const uint32_t num_updates = 5000 * size_factor; + + NO_FATALS(InsertOriginalRows(num_rowsets, num_rows_per_rowset)); + + // Mutate all of the rows. + for (int i = 1; i <= num_updates; i++) { + UpdateOriginalRowsNoFlush(num_rowsets, num_rows_per_rowset, i); + } + ASSERT_OK(tablet()->FlushAllDMSForTests()); +} + +// This test adds workload of rowsets updates in order to +// generate some number of REDO deltas. Along with that, memory +// budgeting constraints denoted by flags are enabled in order +// to make sure that when rowset compaction is invoked, it takes +// into consideration the amount of free memory left and based on +// that proceed with the compaction because of availability of memory. +TEST_F(TestHighMemCompaction, TestRowSetCompactionProceedWithNoBudgetingConstraints) { + SKIP_IF_SLOW_NOT_ALLOWED(); + + // 1 as mem factor implies ~(2*1)MB memory requirements for all rowsets, + // ok for compaction to proceed + FLAGS_rowset_compaction_delta_memory_factor = 1; + TestRowSetCompactionWithOrWithoutBudgetingConstraints(false); +} + +// This test adds workload of rowsets updates in order to +// generate huge number of REDO deltas. Along with that, memory +// budgeting constraints denoted by flags are enabled in order +// to make sure that when rowset compaction is invoked, it takes +// into consideration the amount of free memory left and based on +// that skip the compaction because of lack of memory. +TEST_F(TestHighMemCompaction, TestRowSetCompactionSkipWithBudgetingConstraints) { + SKIP_IF_SLOW_NOT_ALLOWED(); + + // 1024000 mem factor implies ~(2*1024000)MB memory requirements for all rowsets, + // forces to skip compaction + FLAGS_rowset_compaction_delta_memory_factor = 1024000; + TestRowSetCompactionWithOrWithoutBudgetingConstraints(true); +} + +} // namespace tablet +} // namespace kudu diff --git a/src/kudu/tablet/compaction-test.cc b/src/kudu/tablet/compaction-test.cc index a0d2c35d5..92d81a98b 100644 --- a/src/kudu/tablet/compaction-test.cc +++ b/src/kudu/tablet/compaction-test.cc @@ -41,7 +41,6 @@ #include "kudu/common/partial_row.h" #include "kudu/common/row.h" #include "kudu/common/row_changelist.h" -#include "kudu/common/row_operations.pb.h" #include "kudu/common/rowblock.h" #include "kudu/common/rowblock_memory.h" #include "kudu/common/rowid.h" @@ -57,7 +56,6 @@ #include "kudu/gutil/casts.h" #include "kudu/gutil/map-util.h" #include "kudu/gutil/ref_counted.h" -#include "kudu/gutil/strings/join.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/tablet/diskrowset.h" #include "kudu/tablet/local_tablet_writer.h" @@ -73,7 +71,6 @@ #include "kudu/tablet/tablet_metadata.h" #include "kudu/util/env.h" #include "kudu/util/faststring.h" -#include "kudu/util/logging_test_util.h" #include "kudu/util/memory/arena.h" #include "kudu/util/monotime.h" #include "kudu/util/random.h" @@ -82,7 +79,6 @@ #include "kudu/util/stopwatch.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" -#include "kudu/util/trace.h" DEFINE_string(merge_benchmark_input_dir, "", "Directory to benchmark merge. The benchmark will merge " @@ -95,12 +91,7 @@ DEFINE_uint32(merge_benchmark_num_rowsets, 3, DEFINE_uint32(merge_benchmark_num_rows_per_rowset, 500000, "Number of rowsets as input to the merge"); -DECLARE_bool(rowset_compaction_enforce_preset_factor); -DECLARE_bool(rowset_compaction_memory_estimate_enabled); -DECLARE_bool(rowset_compaction_ancient_delta_threshold_enabled); -DECLARE_double(rowset_compaction_delta_memory_factor); DECLARE_string(block_manager); -DECLARE_uint32(rowset_compaction_estimate_min_deltas_size_mb); using kudu::consensus::OpId; using kudu::log::LogAnchorRegistry; @@ -140,50 +131,6 @@ class TestCompaction : public KuduRowSetTest { return builder.BuildWithoutIds(); } - Status InsertOrUpsertTestRows(RowOperationsPB::Type type, - int64_t first_row, - int64_t count, - int32_t val) { - LocalTabletWriter writer(tablet().get(), &client_schema()); - KuduPartialRow row(&client_schema()); - - for (int64_t i = first_row; i < first_row + count; i++) { - RETURN_NOT_OK(row.SetStringCopy("key", Substitute("hello $0", i))); - RETURN_NOT_OK(row.SetInt64("val", val)); - if (type == RowOperationsPB::INSERT) { - RETURN_NOT_OK(writer.Insert(row)); - } else if (type == RowOperationsPB::UPSERT) { - RETURN_NOT_OK(writer.Upsert(row)); - } else { - return Status::InvalidArgument( - Substitute("unknown row operation type: $0", type)); - } - } - return Status::OK(); - } - - void InsertOriginalRows(int64_t num_rowsets, int64_t rows_per_rowset) { - for (int64_t rowset_id = 0; rowset_id < num_rowsets; rowset_id++) { - ASSERT_OK(InsertOrUpsertTestRows(RowOperationsPB::INSERT, - rowset_id * rows_per_rowset, - rows_per_rowset, - /*val*/0)); - ASSERT_OK(tablet()->Flush()); - } - ASSERT_EQ(num_rowsets, tablet()->num_rowsets()); - } - - void UpdateOriginalRowsNoFlush(int64_t num_rowsets, int64_t rows_per_rowset, - int32_t val) { - for (int64_t rowset_id = 0; rowset_id < num_rowsets; rowset_id++) { - ASSERT_OK(InsertOrUpsertTestRows(RowOperationsPB::UPSERT, - rowset_id * rows_per_rowset, - rows_per_rowset, - val)); - } - ASSERT_EQ(num_rowsets, tablet()->num_rowsets()); - } - // Insert n_rows rows of data. // Each row is the tuple: (string key=hello <n*10 + delta>, val=<n>) void InsertRows(MemRowSet* mrs, int n_rows, int delta) { @@ -607,18 +554,6 @@ class TestCompaction : public KuduRowSetTest { void AddExpectedReinsert(Mutation** current_head, int64_t val); void AddUpdateAndDelete(RowSet* rs, CompactionInputRow* row, int64_t row_id, int64_t val); - // Workload to generate large sized deltas for compaction. - // The method generates 1 MB size worth of deltas with size_factor as 1. - // Callers can adjust the size_factor. - // For example, to generate 5MB, set size_factor as 5. - // Similarly, to generate 35MB, set size_factor as 35. - void GenHighMemConsumptionDeltas(uint32_t size_factor); - - // Enables compaction memory budgeting and then runs rowset compaction. - // Caller can set constraints on budget and expect the results accordingly. - // If constraints are applied, compaction may be skipped. - void TestRowSetCompactionWithOrWithoutBudgetingConstraints(bool budgeting_constraints_applied); - protected: OpId op_id_; @@ -633,34 +568,6 @@ class TestCompaction : public KuduRowSetTest { TabletMemTrackers mem_trackers_; }; -// This test adds workload of rowsets updates in order to -// generate some number of REDO deltas. Along with that, memory -// budgeting constraints denoted by flags are enabled in order -// to make sure that when rowset compaction is invoked, it takes -// into consideration the amount of free memory left and based on -// that proceed with the compaction because of availability of memory. -TEST_F(TestCompaction, TestRowSetCompactionProceedWithNoBudgetingConstraints) { - SKIP_IF_SLOW_NOT_ALLOWED(); - - // 1 as mem factor implies ~(2*1)MB memory requirements, ok for compaction to proceed - FLAGS_rowset_compaction_delta_memory_factor = 1; - TestRowSetCompactionWithOrWithoutBudgetingConstraints(false); -} - -// This test adds workload of rowsets updates in order to -// generate huge number of REDO deltas. Along with that, memory -// budgeting constraints denoted by flags are enabled in order -// to make sure that when rowset compaction is invoked, it takes -// into consideration the amount of free memory left and based on -// that skip the compaction because of lack of memory. -TEST_F(TestCompaction, TestRowSetCompactionSkipWithBudgetingConstraints) { - SKIP_IF_SLOW_NOT_ALLOWED(); - - // 1024000 mem factor implies ~(2*1024000)MB memory requirements forces to skip compaction - FLAGS_rowset_compaction_delta_memory_factor = 1024000; - TestRowSetCompactionWithOrWithoutBudgetingConstraints(true); -} - TEST_F(TestCompaction, TestMemRowSetInput) { // Create a memrowset with 10 rows and several updates. shared_ptr<MemRowSet> mrs; @@ -878,56 +785,6 @@ void TestCompaction::AddUpdateAndDelete( AddExpectedReinsert(&row->undo_head, val); } -void TestCompaction::TestRowSetCompactionWithOrWithoutBudgetingConstraints( - bool budgeting_constraints_applied) { - FLAGS_rowset_compaction_memory_estimate_enabled = true; - FLAGS_rowset_compaction_enforce_preset_factor = true; - FLAGS_rowset_compaction_ancient_delta_threshold_enabled = true; - - // Ensure memory budgeting applies - FLAGS_rowset_compaction_estimate_min_deltas_size_mb = 0; - - // size factor as 2 generates ~2MB memory size worth of deltas - GenHighMemConsumptionDeltas(2); - - // Run rowset compaction. - StringVectorSink sink; - ScopedRegisterSink reg(&sink); - scoped_refptr<Trace> trace(new Trace); - Stopwatch sw; - sw.start(); - { - ADOPT_TRACE(trace.get()); - ASSERT_OK(tablet()->Compact(Tablet::COMPACT_NO_FLAGS)); - } - sw.stop(); - LOG(INFO) << Substitute("CompactRowSetsOp complete. Timing: $0 Metrics: $1", - sw.elapsed().ToString(), - trace->MetricsAsJSON()); - - if (budgeting_constraints_applied) { - ASSERT_STR_CONTAINS(JoinStrings(sink.logged_msgs(), "\n"), - "removed from compaction input due to memory constraints"); - } else { - ASSERT_STR_NOT_CONTAINS(JoinStrings(sink.logged_msgs(), "\n"), - "removed from compaction input due to memory constraints"); - } -} - -void TestCompaction::GenHighMemConsumptionDeltas(const uint32_t size_factor) { - const uint32_t num_rowsets = 10; - const uint32_t num_rows_per_rowset = 2; - const uint32_t num_updates = 5000 * size_factor; - - NO_FATALS(InsertOriginalRows(num_rowsets, num_rows_per_rowset)); - - // Mutate all of the rows. - for (int i = 1; i <= num_updates; i++) { - UpdateOriginalRowsNoFlush(num_rowsets, num_rows_per_rowset, i); - } - ASSERT_OK(tablet()->FlushAllDMSForTests()); -} - // Build several layers of overlapping rowsets with many ghost rows. // Repeatedly merge all the generated RowSets until we are left with a single RowSet, then make // sure that its history matches our expected history.