This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 9f5fbcd841a87b144f3410891fe6bf1f8fe4288c
Author: Zoltan Borok-Nagy <borokna...@cloudera.com>
AuthorDate: Mon Jun 10 17:25:35 2024 +0200

    IMPALA-13138: Never smallify existing StringValue objects, only new ones 
during DeepCopy
    
    Currently we have the following problematic StringValue::Smallify() call
    in BufferedTupleStream. It modifies the string value of an existing
    tuple and it can corrupt the BufferedTupleStream.
    
    We should only smallify string values during deepcopy, and only the
    target string value, never the source. To ensure it, this patch
    makes StringValue::Smallify() private and adds comments to warn
    the callers. Same is true for Tuple::SmallifyStrings().
    
    The bug was reproducible by a complex query against a few large tables.
    One JOIN builder crashed Impala during spilling due to a corrupted
    buffered tuple stream. create-tables-impala-13138.test and
    query-impala-13138.test contain the repro steps.
    
    Testing:
     * updated backend tests
     * added test that crashes Impala without this fix
    
    Change-Id: I739048b37a59a81c41c85d475fad00cb520a5f99
    Reviewed-on: http://gerrit.cloudera.org:8080/21502
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Reviewed-by: Daniel Becker <daniel.bec...@cloudera.com>
---
 be/src/runtime/buffered-tuple-stream-test.cc       | 27 ++++----
 be/src/runtime/buffered-tuple-stream.cc            |  6 +-
 be/src/runtime/buffered-tuple-stream.h             |  4 +-
 be/src/runtime/spillable-row-batch-queue.cc        |  2 +-
 be/src/runtime/string-value-test.cc                | 77 +++++++++++++---------
 be/src/runtime/string-value.h                      | 27 ++++++--
 be/src/runtime/tuple.cc                            |  1 +
 be/src/runtime/tuple.h                             |  6 +-
 .../QueryTest/create-tables-impala-13138.test      | 77 ++++++++++++++++++++++
 .../queries/QueryTest/query-impala-13138.test      | 29 ++++++++
 tests/query_test/test_join_queries.py              | 23 +++++++
 11 files changed, 220 insertions(+), 59 deletions(-)

diff --git a/be/src/runtime/buffered-tuple-stream-test.cc 
b/be/src/runtime/buffered-tuple-stream-test.cc
index 46b34a5be..098fb0b81 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -1939,7 +1939,7 @@ TEST_F(MultiNullableTupleStreamTest, 
MultiNullableTupleManyBufferSpill) {
   TestIntValuesInterleaved(100, 15, true, buffer_size);
 }
 
-/// Test that ComputeRowSizeAndSmallifyStrings handles nulls
+/// Test that ComputeRowSize handles nulls
 TEST_F(MultiNullableTupleStreamTest, TestComputeRowSize) {
   Init(BUFFER_POOL_LIMIT);
   const vector<TupleDescriptor*>& tuple_descs = 
string_desc_->tuple_descriptors();
@@ -1968,28 +1968,27 @@ TEST_F(MultiNullableTupleStreamTest, 
TestComputeRowSize) {
   row->SetTuple(1, nullptr);
   row->SetTuple(2, nullptr);
   EXPECT_EQ(tuple_null_indicator_bytes + tuple_descs[0]->byte_size(),
-      stream.ComputeRowSizeAndSmallifyStrings(row.get()));
+      stream.ComputeRowSize(row.get()));
 
   // Tuples are initialized to empty and have no var-len data.
   row->SetTuple(1, tuple1.get());
   row->SetTuple(2, tuple2.get());
   EXPECT_EQ(tuple_null_indicator_bytes + string_desc_->GetRowSize(),
-      stream.ComputeRowSizeAndSmallifyStrings(row.get()));
+      stream.ComputeRowSize(row.get()));
 
   // Tuple 0 has some data.
   const SlotDescriptor* string_slot = tuple_descs[0]->slots()[0];
   StringValue* sv = tuple0->GetStringSlot(string_slot->tuple_offset());
-  *sv = STRINGS[0];
-  sv->Smallify();
+  sv->Assign(StringValue::MakeSmallStringFrom(STRINGS[0]));
   int64_t expected_len =
       tuple_null_indicator_bytes + string_desc_->GetRowSize() +
       (sv->IsSmall() ? 0 : sv->Len());
-  EXPECT_EQ(expected_len, stream.ComputeRowSizeAndSmallifyStrings(row.get()));
+  EXPECT_EQ(expected_len, stream.ComputeRowSize(row.get()));
 
   // Check that external slots aren't included in count.
   sv = tuple1->GetStringSlot(external_string_slot->tuple_offset());
   sv->Assign(reinterpret_cast<char*>(1234), 1234);
-  EXPECT_EQ(expected_len, stream.ComputeRowSizeAndSmallifyStrings(row.get()));
+  EXPECT_EQ(expected_len, stream.ComputeRowSize(row.get()));
 
   stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
 }
@@ -2055,7 +2054,7 @@ TEST_F(ArrayTupleStreamTest, TestArrayDeepCopy) {
     builder.CommitTuples(array_len);
 
     // Check that internal row size computation gives correct result.
-    EXPECT_EQ(expected_row_size, 
stream.ComputeRowSizeAndSmallifyStrings(row.get()));
+    EXPECT_EQ(expected_row_size, stream.ComputeRowSize(row.get()));
     bool b = stream.AddRow(row.get(), &status);
     ASSERT_TRUE(b);
     ASSERT_OK(status);
@@ -2104,7 +2103,7 @@ TEST_F(ArrayTupleStreamTest, TestArrayDeepCopy) {
   stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
 }
 
-/// Test that ComputeRowSizeAndSmallifyStrings handles nulls
+/// Test that ComputeRowSize handles nulls
 TEST_F(ArrayTupleStreamTest, TestComputeRowSize) {
   Init(BUFFER_POOL_LIMIT);
   const vector<TupleDescriptor*>& tuple_descs = 
array_desc_->tuple_descriptors();
@@ -2130,13 +2129,13 @@ TEST_F(ArrayTupleStreamTest, TestComputeRowSize) {
   row->SetTuple(0, nullptr);
   row->SetTuple(1, nullptr);
   EXPECT_EQ(tuple_null_indicator_bytes,
-      stream.ComputeRowSizeAndSmallifyStrings(row.get()));
+      stream.ComputeRowSize(row.get()));
 
   // Tuples are initialized to empty and have no var-len data.
   row->SetTuple(0, tuple0.get());
   row->SetTuple(1, tuple1.get());
   EXPECT_EQ(tuple_null_indicator_bytes + array_desc_->GetRowSize(),
-      stream.ComputeRowSizeAndSmallifyStrings(row.get()));
+      stream.ComputeRowSize(row.get()));
 
   // Tuple 0 has an array.
   int expected_row_size = tuple_null_indicator_bytes + 
array_desc_->GetRowSize();
@@ -2160,19 +2159,19 @@ TEST_F(ArrayTupleStreamTest, TestComputeRowSize) {
     expected_row_size += str->Len();
   }
   builder.CommitTuples(array_len);
-  EXPECT_EQ(expected_row_size, 
stream.ComputeRowSizeAndSmallifyStrings(row.get()));
+  EXPECT_EQ(expected_row_size, stream.ComputeRowSize(row.get()));
 
   // Check that the external slot isn't included in size.
   cv = tuple0->GetCollectionSlot(external_array_slot->tuple_offset());
   // ptr of external slot shouldn't be dereferenced when computing size.
   cv->ptr = reinterpret_cast<uint8_t*>(1234);
   cv->num_tuples = 1234;
-  EXPECT_EQ(expected_row_size, 
stream.ComputeRowSizeAndSmallifyStrings(row.get()));
+  EXPECT_EQ(expected_row_size, stream.ComputeRowSize(row.get()));
 
   // Check that the array is excluded if tuple 0's array has its null 
indicator set.
   tuple0->SetNull(array_slot->null_indicator_offset());
   EXPECT_EQ(tuple_null_indicator_bytes + array_desc_->GetRowSize(),
-      stream.ComputeRowSizeAndSmallifyStrings(row.get()));
+      stream.ComputeRowSize(row.get()));
 
   stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
 }
diff --git a/be/src/runtime/buffered-tuple-stream.cc 
b/be/src/runtime/buffered-tuple-stream.cc
index c1451d83a..cff554c21 100644
--- a/be/src/runtime/buffered-tuple-stream.cc
+++ b/be/src/runtime/buffered-tuple-stream.cc
@@ -945,7 +945,7 @@ void BufferedTupleStream::FixUpCollectionsForRead(
   }
 }
 
-int64_t BufferedTupleStream::ComputeRowSizeAndSmallifyStrings(TupleRow* row)
+int64_t BufferedTupleStream::ComputeRowSize(TupleRow* row)
     const noexcept {
   int64_t size = 0;
   if (has_nullable_tuple_) {
@@ -965,7 +965,7 @@ int64_t 
BufferedTupleStream::ComputeRowSizeAndSmallifyStrings(TupleRow* row)
     for (auto it = slots.begin(); it != slots.end(); ++it) {
       if (tuple->IsNull((*it)->null_indicator_offset())) continue;
       StringValue* sv = tuple->GetStringSlot((*it)->tuple_offset());
-      if (sv->Smallify()) continue;
+      if (sv->IsSmall()) continue;
       size += sv->Len();
     }
   }
@@ -992,7 +992,7 @@ int64_t 
BufferedTupleStream::ComputeRowSizeAndSmallifyStrings(TupleRow* row)
 
 bool BufferedTupleStream::AddRowSlow(TupleRow* row, Status* status) noexcept {
   // Use AddRowCustom*() to do the work of advancing the page.
-  int64_t row_size = ComputeRowSizeAndSmallifyStrings(row);
+  int64_t row_size = ComputeRowSize(row);
   uint8_t* data = AddRowCustomBeginSlow(row_size, status);
   if (data == nullptr) return false;
   bool success = DeepCopy(row, &data, data + row_size);
diff --git a/be/src/runtime/buffered-tuple-stream.h 
b/be/src/runtime/buffered-tuple-stream.h
index 37a34ab30..6002d112d 100644
--- a/be/src/runtime/buffered-tuple-stream.h
+++ b/be/src/runtime/buffered-tuple-stream.h
@@ -557,9 +557,7 @@ class BufferedTupleStream {
   /// Returns the total additional bytes that this row will consume in 
write_page_ if
   /// appended to the page. This includes the row's null indicators, the fixed 
length
   /// part of the row and the data for inlined_string_slots_ and 
inlined_coll_slots_.
-  /// It will also try to smallify string values, so small string data won't 
require
-  /// additional space.
-  int64_t ComputeRowSizeAndSmallifyStrings(TupleRow* row) const noexcept;
+  int64_t ComputeRowSize(TupleRow* row) const noexcept;
 
  private:
   DISALLOW_COPY_AND_ASSIGN(BufferedTupleStream);
diff --git a/be/src/runtime/spillable-row-batch-queue.cc 
b/be/src/runtime/spillable-row-batch-queue.cc
index 38888aa90..601a66aeb 100644
--- a/be/src/runtime/spillable-row-batch-queue.cc
+++ b/be/src/runtime/spillable-row-batch-queue.cc
@@ -102,7 +102,7 @@ Status SpillableRowBatchQueue::AddBatch(RowBatch* batch) {
                                     "in unpinned mode unless an error 
occurred. "
                                     "batch_queue_: $1",
             PrettyPrinter::PrintBytes(
-                
batch_queue_->ComputeRowSizeAndSmallifyStrings(batch_itr.Get())),
+                batch_queue_->ComputeRowSize(batch_itr.Get())),
             batch_queue_->DebugString());
       }
     }
diff --git a/be/src/runtime/string-value-test.cc 
b/be/src/runtime/string-value-test.cc
index a68dc2052..4b9c926a7 100644
--- a/be/src/runtime/string-value-test.cc
+++ b/be/src/runtime/string-value-test.cc
@@ -66,7 +66,32 @@ void TestCompareImpl(StringValue* svs, int NUM_STRINGS) {
   }
 }
 
-TEST(StringValueTest, TestCompare) {
+class StringValueTest : public ::testing::Test {
+protected:
+    void SmallifySV(StringValue* sv) { sv->Smallify(); }
+
+    void SmallifySVExpect(StringValue* sv, bool expect_to_succeed) {
+      if (expect_to_succeed) {
+        EXPECT_TRUE(sv->Smallify());
+      } else {
+        EXPECT_FALSE(sv->Smallify());
+      }
+    }
+
+    void TestLargestSmallerString(StringValue& sv, const string& expected) {
+      EXPECT_EQ(sv.LargestSmallerString(), expected);
+      sv.Smallify();
+      EXPECT_EQ(sv.LargestSmallerString(), expected);
+    }
+
+    void TestLeastLargerString(StringValue& sv, const string& expected) {
+      EXPECT_EQ(sv.LeastLargerString(), expected);
+      sv.Smallify();
+      EXPECT_EQ(sv.LeastLargerString(), expected);
+    }
+};
+
+TEST_F(StringValueTest, TestCompare) {
   string empty_str = "";
   string str1_str("\0", 1);
   string str2_str("\0xy", 3);
@@ -96,7 +121,7 @@ TEST(StringValueTest, TestCompare) {
 
   TestCompareImpl(svs, NUM_STRINGS);
   for (int i = 0; i < NUM_STRINGS; ++i) {
-    svs[i].Smallify();
+    SmallifySV(&svs[i]);
   }
   TestCompareImpl(svs, NUM_STRINGS);
 }
@@ -111,7 +136,7 @@ void TestUnpaddedCharLength(StringValue* chars) {
   EXPECT_EQ(StringValue::UnpaddedCharLength(chars[6].Ptr(), 20), 17);
 }
 
-TEST(StringValueTest, TestCharFunctions) {
+TEST_F(StringValueTest, TestCharFunctions) {
   string char0_str("hi", 2);
   string char1_str("hi  ", 4);
   string char2_str(" hi  ", 5);
@@ -132,7 +157,7 @@ TEST(StringValueTest, TestCharFunctions) {
 
   TestUnpaddedCharLength(chars);
   for (int i = 0; i < NUM_CHARS; ++i) {
-    chars[i].Smallify();
+    SmallifySV(&chars[i]);
   }
   TestUnpaddedCharLength(chars);
 
@@ -157,7 +182,7 @@ void TestConvertToUInt64Impl(StringValue* svs) {
   EXPECT_EQ(svs[6].ToUInt64(), 0x102030405060707);
 }
 
-TEST(StringValueTest, TestConvertToUInt64) {
+TEST_F(StringValueTest, TestConvertToUInt64) {
   // Test converting StringValues to uint64_t which utilizes up to first 8 
bytes.
   const int NUM_STRINGS = 7;
   string strings[NUM_STRINGS];
@@ -177,46 +202,34 @@ TEST(StringValueTest, TestConvertToUInt64) {
 
   TestConvertToUInt64Impl(svs);
   for (int i = 0; i < NUM_STRINGS; ++i) {
-    svs[i].Smallify();
+    SmallifySV(&svs[i]);
   }
   TestConvertToUInt64Impl(svs);
 }
 
-void TestLargestSmallString(StringValue& sv, string expected) {
-  EXPECT_EQ(sv.LargestSmallerString(), expected);
-  sv.Smallify();
-  EXPECT_EQ(sv.LargestSmallerString(), expected);
-}
-
 // Test finding the largest smaller strings.
-TEST(StringValueTest, TestLargestSmallerString) {
+TEST_F(StringValueTest, TestLargestSmallerString) {
   string oneKbNullStr(1024, 0x00);
   string a1023NullStr(1023, 0x00);
   EXPECT_EQ(StringValue(oneKbNullStr).LargestSmallerString(), a1023NullStr);
 
   StringValue asv(const_cast<char*>("\x12\xef"), 2);
-  TestLargestSmallString(asv, "\x12\xee");
+  TestLargestSmallerString(asv, "\x12\xee");
   StringValue bsv(const_cast<char*>("\x12\x00"), 2);
-  TestLargestSmallString(bsv, "\x12");
+  TestLargestSmallerString(bsv, "\x12");
 
   // "0x00" is the smallest non-empty string.
   string oneNullStr("\00", 1);
   StringValue oneNullStrSv(oneNullStr);
-  TestLargestSmallString(oneNullStrSv, "");
+  TestLargestSmallerString(oneNullStrSv, "");
 
   // The empty string is the absolute smallest string.
   StringValue emptySv(const_cast<char*>(""));
-  TestLargestSmallString(emptySv, "");
-}
-
-void TestLeastLargerString(StringValue& sv, const string& expected) {
-  EXPECT_EQ(sv.LeastLargerString(), expected);
-  sv.Smallify();
-  EXPECT_EQ(sv.LeastLargerString(), expected);
+  TestLargestSmallerString(emptySv, "");
 }
 
 // Test finding the least larger strings.
-TEST(StringValueTest, TestLeastLargerString) {
+TEST_F(StringValueTest, TestLeastLargerString) {
   string nullStr(const_cast<char*>("\x00"), 1);
   StringValue nullStrSv(nullStr);
   TestLeastLargerString(nullStrSv, string("\x01", 1));
@@ -241,7 +254,7 @@ TEST(StringValueTest, TestLeastLargerString) {
   TestLeastLargerString(emptySv, string("\00", 1));
 }
 
-TEST(StringValueTest, TestConstructors) {
+TEST_F(StringValueTest, TestConstructors) {
   // Test that all strings are non-small initially.
   StringValue def_ctor;
   EXPECT_FALSE(def_ctor.IsSmall());
@@ -249,7 +262,7 @@ TEST(StringValueTest, TestConstructors) {
   StringValue copy_ctor(def_ctor);
   EXPECT_FALSE(copy_ctor.IsSmall());
   // Modify 'copy_ctor' to make Clang Tidy happy.
-  EXPECT_TRUE(copy_ctor.Smallify());
+  SmallifySVExpect(&copy_ctor, true);
 
   StringValue char_ctor(const_cast<char*>("small"));
   EXPECT_FALSE(char_ctor.IsSmall());
@@ -262,7 +275,7 @@ TEST(StringValueTest, TestConstructors) {
   EXPECT_FALSE(string_ctor.IsSmall());
 }
 
-TEST(StringValueTest, TestSmallify) {
+TEST_F(StringValueTest, TestSmallify) {
   StringValue nullstr;
   StringValue empty(const_cast<char*>(""), 0);
   StringValue one_char(const_cast<char*>("a"), 1);
@@ -275,11 +288,11 @@ TEST(StringValueTest, TestSmallify) {
   StringValue limit_clone(limit);
   StringValue over_the_limit_clone(over_the_limit);
 
-  EXPECT_TRUE(nullstr.Smallify());
-  EXPECT_TRUE(empty.Smallify());
-  EXPECT_TRUE(one_char.Smallify());
-  EXPECT_TRUE(limit.Smallify());
-  EXPECT_FALSE(over_the_limit.Smallify());
+  SmallifySVExpect(&nullstr, true);
+  SmallifySVExpect(&empty, true);
+  SmallifySVExpect(&one_char, true);
+  SmallifySVExpect(&limit, true);
+  SmallifySVExpect(&over_the_limit, false);
 
   EXPECT_EQ(nullstr, nullstr_clone);
   EXPECT_NE(nullstr.Ptr(), nullstr_clone.Ptr());
diff --git a/be/src/runtime/string-value.h b/be/src/runtime/string-value.h
index ab9c840a9..f3d1730c9 100644
--- a/be/src/runtime/string-value.h
+++ b/be/src/runtime/string-value.h
@@ -30,6 +30,9 @@
 
 namespace impala {
 
+class StringValueTest;
+class Tuple;
+
 /// The format of a string-typed slot.
 /// The returned StringValue of all functions that return StringValue
 /// shares its buffer with the parent.
@@ -64,6 +67,16 @@ public:
   explicit StringValue(const char* s)
     : string_impl_(s) {}
 
+  /// Only valid to call if source's length is small enough. Returns a 
StringValue object
+  /// that is smallified.
+  static StringValue MakeSmallStringFrom(const StringValue& source) {
+    DCHECK_LE(source.Len(), SmallableString::SMALL_LIMIT);
+    StringValue sv(source);
+    sv.Smallify();
+    DCHECK(sv.IsSmall());
+    return sv;
+  }
+
   void Assign(const StringValue& other) { 
string_impl_.Assign(other.string_impl_); }
 
   void Assign(char* ptr, int len) {
@@ -78,10 +91,6 @@ public:
 
   bool IsSmall() const { return string_impl_.IsSmall(); }
 
-  /// Tries to apply Small String Optimization if possible. Returns 'true' on 
success,
-  /// 'false' otherwise. In the latter case the object remains unmodified.
-  bool Smallify() { return string_impl_.Smallify(); }
-
   int Len() const { return string_impl_.Len(); }
 
   /// Sets the length of this String object. Length can only be decreased.
@@ -174,6 +183,16 @@ public:
   static const char* LLVM_CLASS_NAME;
 
 private:
+  friend Tuple;
+  friend StringValueTest;
+  /// !!! THIS IS UNSAFE TO CALL ON EXISTING STRINGVALUE OBJECTS !!!
+  /// Please make sure you only invoke it for newly created StringValues, e.g. 
on the
+  /// target StringValue object of a deep copy operation.
+  /// Tries to apply Small String Optimization if possible. Returns 'true' on 
success,
+  /// 'false' otherwise. In the latter case the object remains unmodified.
+  /// !!! THIS IS UNSAFE TO CALL ON EXISTING STRINGVALUE OBJECTS !!!
+  bool Smallify() { return string_impl_.Smallify(); }
+
   SmallableString string_impl_;
 };
 
diff --git a/be/src/runtime/tuple.cc b/be/src/runtime/tuple.cc
index a17f1c1e9..3ddb6f19a 100644
--- a/be/src/runtime/tuple.cc
+++ b/be/src/runtime/tuple.cc
@@ -109,6 +109,7 @@ Tuple* Tuple::DeepCopy(const TupleDescriptor& desc, 
MemPool* pool) {
 void Tuple::DeepCopy(Tuple* dst, const TupleDescriptor& desc, MemPool* pool) {
   if (desc.HasVarlenSlots()) {
     memcpy(dst, this, desc.byte_size());
+    // 'dst' is a new tuple, so it is safe to smallify its string values.
     dst->SmallifyStrings(desc);
     dst->DeepCopyVarlenData(desc, pool);
   } else {
diff --git a/be/src/runtime/tuple.h b/be/src/runtime/tuple.h
index 1682e5d9b..0c5334149 100644
--- a/be/src/runtime/tuple.h
+++ b/be/src/runtime/tuple.h
@@ -328,8 +328,6 @@ class Tuple {
         reinterpret_cast<const char*>(this) + offset);
   }
 
-  void SmallifyStrings(const TupleDescriptor& desc);
-
   /// For C++/IR interop, we need to be able to look up types by name.
   static const char* LLVM_CLASS_NAME;
 
@@ -385,6 +383,10 @@ class Tuple {
   char* AllocateStrings(const char* err_ctx, RuntimeState* state, int64_t 
bytes,
       MemPool* pool, Status* status) noexcept;
 
+  /// Smallify string values of the tuple. It should only be called for newly 
created
+  /// tuples, e.g. in DeepCopy().
+  void SmallifyStrings(const TupleDescriptor& desc);
+
   // Defined in tuple-ir.cc to force the compilation of the CodegenTypes 
struct.
   void dummy(Tuple::CodegenTypes*);
 };
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/create-tables-impala-13138.test
 
b/testdata/workloads/functional-query/queries/QueryTest/create-tables-impala-13138.test
new file mode 100644
index 000000000..2defc1eb1
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/create-tables-impala-13138.test
@@ -0,0 +1,77 @@
+====
+---- QUERY
+CREATE TABLE letterwargroups (
+   war_group STRING, war STRING
+);
+
+CREATE TABLE letter_missions_combined (
+   letter_mission INT,
+   letter_mission_name STRING
+ )
+STORED AS PARQUET;
+
+CREATE TABLE dim_letter (
+   letter_hash STRING)
+PARTITIONED BY (`date` STRING)
+STORED AS PARQUET;
+
+CREATE TABLE subscriptions (
+   id BIGINT,
+   war STRING,
+   letter_hash STRING,
+   is_affiliate BOOLEAN
+ )
+PARTITIONED BY (`date` STRING)
+STORED AS PARQUET;
+
+CREATE TABLE letter_marketing_response_events (
+   letter_mission INT,
+   send_account_sk INT,
+   letter_hash STRING
+ )
+PARTITIONED BY (`date` STRING)
+STORED AS PARQUET;
+
+-- INSERT VALUES ('0', '0'), ('1', '1'), ... , ('50', '50')
+with v   as (values (0 as x), (1), (2), (3), (4), (5), (6), (7), (8), (9)),
+     v10 as (select 10*x as x from v),
+    range_i as (select cast(v10.x + v.x as STRING) as stri from v10, v where 
v10.x + v.x <= 50)
+insert into letterwargroups select stri, stri from range_i;
+
+insert into subscriptions (`date`, id, war, letter_hash, is_affiliate)
+values ('2024-06-07', 100, '100', 'letter_hash_100', TRUE);
+
+with v        as (values (0 as x), (1), (2), (3), (4), (5), (6), (7), (8), 
(9)),
+     v10      as (select 10*x as x from v),
+     v100     as (select 10*x as x from v10),
+     v1000    as (select 10*x as x from v100),
+     v10000   as (select 10*x as x from v1000),
+     v100000  as (select 10*x as x from v1000),
+     range_i  as (select v100000.x + v10000.x + v1000.x + v100.x + v10.x + v.x 
+ 1 as i
+                  from   v100000,    v10000,    v1000,    v100,    v10,    v)
+insert into subscriptions partition (`date`)
+select cast(random(1)*100000000 as bigint), cast(cast(random(2)*100 as int) as 
string), base64encode(cast(random(3) as string)), false, `date` from 
subscriptions, range_i;
+
+insert into subscriptions partition (`date`) select cast(random(1)*100000000 
as bigint),
+cast(cast(random(2)*100 as int) as string), base64encode(cast(random(3) as 
string)), false, `date` from subscriptions;
+insert into subscriptions partition (`date`) select cast(random(4)*100000000 
as bigint),
+cast(cast(random(5)*100 as int) as string), base64encode(cast(random(6) as 
string)), false, `date` from subscriptions;
+insert into subscriptions partition (`date`) select cast(random(7)*100000000 
as bigint),
+cast(cast(random(8)*100 as int) as string), base64encode(cast(random(9) as 
string)), false, `date` from subscriptions;
+insert into subscriptions partition (`date`) select cast(random(10)*100000000 
as bigint),
+cast(cast(random(11)*100 as int) as string), base64encode(cast(random(12) as 
string)), false, `date` from subscriptions;
+insert into subscriptions partition (`date`) select cast(random(13)*100000000 
as bigint),
+cast(cast(random(14)*100 as int) as string), base64encode(cast(random(15) as 
string)), false, `date` from subscriptions;
+insert into subscriptions partition (`date`) select cast(random(16)*100000000 
as bigint),
+cast(cast(random(17)*100 as int) as string), base64encode(cast(random(18) as 
string)), false, `date` from subscriptions;
+
+insert into letter_marketing_response_events (`date`, letter_hash, 
letter_mission, send_account_sk)
+select `date`, letter_hash, cast(random(19)*100 as int), cast(random(20)*100 
as int) from subscriptions;
+
+insert into letterwargroups select 'war_group',war from subscriptions group by 
war;
+
+insert into dim_letter(letter_hash, `date`) select letter_hash, `date` from 
subscriptions;
+
+insert into letter_missions_combined(letter_mission, letter_mission_name)
+values (cast(random(21)*100 as int), "name");
+====
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/query-impala-13138.test 
b/testdata/workloads/functional-query/queries/QueryTest/query-impala-13138.test
new file mode 100644
index 000000000..d3f655ba7
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/query-impala-13138.test
@@ -0,0 +1,29 @@
+====
+---- QUERY
+set RUNTIME_FILTER_WAIT_TIME_MS=10000;
+set MT_DOP=12;
+set RUNTIME_FILTER_MIN_SIZE=8192;
+set RUNTIME_FILTER_MAX_SIZE=2097152;
+set MINMAX_FILTER_THRESHOLD=0.5;
+set MINMAX_FILTERING_LEVEL=PAGE;
+SELECT ecc.letter_mission_name, re.`date` as date_, c.war_group
+FROM letter_marketing_response_events re
+    LEFT JOIN letter_missions_combined ecc ON 
ecc.letter_mission=re.letter_mission
+    LEFT JOIN (SELECT b.letter_hash, b.war
+               FROM (SELECT letter_hash, war, id,
+                            row_number() over(partition by letter_hash order 
by id desc) as latest
+                     FROM subscriptions
+                     WHERE is_affiliate=0
+                     GROUP BY letter_hash, war, id) b
+                WHERE latest=1
+                GROUP BY b.letter_hash, b.war) su ON 
re.letter_hash=su.letter_hash
+    LEFT JOIN letterwargroups c ON upper(c.war)=upper(su.war)
+    LEFT JOIN dim_letter em ON em.letter_hash=re.letter_hash
+    WHERE re.`date`>='2024-06-01'
+      AND re.send_account_sk not in (43)
+    GROUP BY ecc.letter_mission_name, re.`date`, c.war_group;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:.*,'2024-06-07',.*
+---- TYPES
+STRING,STRING,STRING
+====
diff --git a/tests/query_test/test_join_queries.py 
b/tests/query_test/test_join_queries.py
index 48abae1ad..322617696 100644
--- a/tests/query_test/test_join_queries.py
+++ b/tests/query_test/test_join_queries.py
@@ -194,3 +194,26 @@ class TestSemiJoinQueries(ImpalaTestSuite):
     """Expensive and memory-intensive semi-join tests."""
     if self.exploration_strategy() != 'exhaustive': pytest.skip()
     self.run_test_case('QueryTest/semi-joins-exhaustive', vector)
+
+
+class TestSpillingHashJoin(ImpalaTestSuite):
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestSpillingHashJoin, cls).add_test_dimensions()
+    # To cut down on test execution time, only run in exhaustive.
+    if cls.exploration_strategy() != 'exhaustive':
+      cls.ImpalaTestMatrix.add_constraint(lambda v: False)
+    cls.ImpalaTestMatrix.add_constraint(
+      lambda v: v.get_value('table_format').file_format == 'parquet')
+
+  def test_spilling_hash_join(self, vector, unique_database):
+    """Regression test for IMPALA-13138. It loads a few large tables and runs 
a complex
+    query that spills during JOIN build that crashed Impala before 
IMPALA-13138."""
+    self.run_test_case('QueryTest/create-tables-impala-13138', vector, 
unique_database)
+    for i in range(0, 5):
+      self.run_test_case('QueryTest/query-impala-13138', vector, 
unique_database)

Reply via email to