This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 56c98e9900fa4898c1997ab834ca4c9ee81f3688
Author: Michael Ho <k...@cloudera.com>
AuthorDate: Tue Mar 6 18:31:18 2018 -0800

    IMPALA-5168: Codegen HASH_PARTITIONED KrpcDataStreamSender::Send()
    
    This change codegens the hash partitioning logic of
    KrpcDataStreamSender::Send() when the partitioning strategy
    is HASH_PARTITIONED. It does so by unrolling the loop which
    evaluates each row against the partitioning expressions and
    hashes the result. It also replaces the number of channels
    of that sender with a constant at runtime.
    
    With this change, we get reasonable speedup with some benchmarks:
    
    
+------------+-----------------------+---------+------------+------------+----------------+
    | Workload   | File Format           | Avg (s) | Delta(Avg) | GeoMean(s) | 
Delta(GeoMean) |
    
+------------+-----------------------+---------+------------+------------+----------------+
    | TPCH(_300) | parquet / none / none | 20.03   | -6.44%     | 13.56      | 
-7.15%         |
    
+------------+-----------------------+---------+------------+------------+----------------+
    
    
+---------------------+-----------------------+---------+------------+------------+----------------+
    | Workload            | File Format           | Avg (s) | Delta(Avg) | 
GeoMean(s) | Delta(GeoMean) |
    
+---------------------+-----------------------+---------+------------+------------+----------------+
    | TARGETED-PERF(_300) | parquet / none / none | 58.59   | -5.56%     | 
12.28      | -5.30%         |
    
+---------------------+-----------------------+---------+------------+------------+----------------+
    
    
+-------------------------+-----------------------+---------+------------+------------+----------------+
    | Workload                | File Format           | Avg (s) | Delta(Avg) | 
GeoMean(s) | Delta(GeoMean) |
    
+-------------------------+-----------------------+---------+------------+------------+----------------+
    | TPCDS-UNMODIFIED(_1000) | parquet / none / none | 15.60   | -3.10%     | 
7.16       | -4.33%         |
    
+-------------------------+-----------------------+---------+------------+------------+----------------+
    
    
+-------------------+-----------------------+---------+------------+------------+----------------+
    | Workload          | File Format           | Avg (s) | Delta(Avg) | 
GeoMean(s) | Delta(GeoMean) |
    
+-------------------+-----------------------+---------+------------+------------+----------------+
    | TPCH_NESTED(_300) | parquet / none / none | 30.93   | -3.02%     | 17.46  
    | -4.71%         |
    
+-------------------+-----------------------+---------+------------+------------+----------------+
    
    Change-Id: I1c44cc9312c062cc7a5a4ac9156ceaa31fb887ff
    Reviewed-on: http://gerrit.cloudera.org:8080/10421
    Reviewed-by: Michael Ho <k...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 be/src/codegen/gen_ir_descriptions.py              |   8 +-
 be/src/codegen/impala-ir.cc                        |   1 +
 be/src/exec/data-sink.cc                           |   4 +
 be/src/exec/data-sink.h                            |   4 +
 be/src/exec/exchange-node.cc                       |   2 +-
 be/src/exec/exec-node.cc                           |   8 -
 be/src/exec/exec-node.h                            |   3 -
 be/src/exec/hdfs-scan-node-base.cc                 |   2 +-
 be/src/exec/partial-sort-node.cc                   |   2 +-
 be/src/exec/partitioned-aggregation-node.cc        |   2 +-
 be/src/exec/partitioned-hash-join-builder.cc       |   6 +-
 be/src/exec/partitioned-hash-join-builder.h        |   2 +-
 be/src/exec/partitioned-hash-join-node.cc          |   2 +-
 be/src/exec/sort-node.cc                           |   2 +-
 be/src/exec/topn-node.cc                           |   2 +-
 be/src/runtime/CMakeLists.txt                      |   1 +
 be/src/runtime/fragment-instance-state.cc          |   2 +
 be/src/runtime/krpc-data-stream-sender-ir.cc       |  49 +++++
 be/src/runtime/krpc-data-stream-sender.cc          | 237 ++++++++++++++++++---
 be/src/runtime/krpc-data-stream-sender.h           |  71 ++++--
 be/src/runtime/raw-value-ir.cc                     |  28 +++
 be/src/runtime/raw-value.cc                        |  27 ---
 be/src/runtime/runtime-state.h                     |   9 +
 be/src/util/runtime-profile.h                      |   3 +-
 .../QueryTest/datastream-sender-codegen.test       |  41 ++++
 tests/query_test/test_codegen.py                   |   8 +-
 26 files changed, 425 insertions(+), 101 deletions(-)

diff --git a/be/src/codegen/gen_ir_descriptions.py 
b/be/src/codegen/gen_ir_descriptions.py
index 26a8ad7..dd2df9e 100755
--- a/be/src/codegen/gen_ir_descriptions.py
+++ b/be/src/codegen/gen_ir_descriptions.py
@@ -196,6 +196,8 @@ ir_functions = [
    "_ZN6impala8RawValue7CompareEPKvS2_RKNS_10ColumnTypeE"],
   ["RAW_VALUE_GET_HASH_VALUE",
    "_ZN6impala8RawValue12GetHashValueEPKvRKNS_10ColumnTypeEj"],
+  ["RAW_VALUE_GET_HASH_VALUE_FAST_HASH",
+   "_ZN6impala8RawValue20GetHashValueFastHashEPKvRKNS_10ColumnTypeEm"],
   ["TOPN_NODE_INSERT_BATCH",
    "_ZN6impala8TopNNode11InsertBatchEPNS_8RowBatchE"],
   ["MEMPOOL_ALLOCATE",
@@ -219,7 +221,11 @@ ir_functions = [
   ["FLOAT_MIN_MAX_FILTER_INSERT", "_ZN6impala17FloatMinMaxFilter6InsertEPv"],
   ["DOUBLE_MIN_MAX_FILTER_INSERT", "_ZN6impala18DoubleMinMaxFilter6InsertEPv"],
   ["STRING_MIN_MAX_FILTER_INSERT", "_ZN6impala18StringMinMaxFilter6InsertEPv"],
-  ["TIMESTAMP_MIN_MAX_FILTER_INSERT", 
"_ZN6impala21TimestampMinMaxFilter6InsertEPv"]
+  ["TIMESTAMP_MIN_MAX_FILTER_INSERT", 
"_ZN6impala21TimestampMinMaxFilter6InsertEPv"],
+  ["KRPC_DSS_GET_PART_EXPR_EVAL",
+  "_ZN6impala20KrpcDataStreamSender25GetPartitionExprEvaluatorEi"],
+  ["KRPC_DSS_HASH_AND_ADD_ROWS",
+  "_ZN6impala20KrpcDataStreamSender14HashAndAddRowsEPNS_8RowBatchE"]
 ]
 
 enums_preamble = '\
diff --git a/be/src/codegen/impala-ir.cc b/be/src/codegen/impala-ir.cc
index 4b79e8b..9c5b3eb 100644
--- a/be/src/codegen/impala-ir.cc
+++ b/be/src/codegen/impala-ir.cc
@@ -54,6 +54,7 @@
 #include "exprs/timestamp-functions-ir.cc"
 #include "exprs/udf-builtins-ir.cc"
 #include "exprs/utility-functions-ir.cc"
+#include "runtime/krpc-data-stream-sender-ir.cc"
 #include "runtime/mem-pool.h"
 #include "runtime/raw-value-ir.cc"
 #include "runtime/runtime-filter-ir.cc"
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index 9140b3e..2ca0019 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -134,6 +134,10 @@ Status DataSink::Prepare(RuntimeState* state, MemTracker* 
parent_mem_tracker) {
   return Status::OK();
 }
 
+void DataSink::Codegen(LlvmCodeGen* codegen) {
+  return;
+}
+
 Status DataSink::Open(RuntimeState* state) {
   DCHECK_EQ(output_exprs_.size(), output_expr_evals_.size());
   return ScalarExprEvaluator::Open(output_expr_evals_, state);
diff --git a/be/src/exec/data-sink.h b/be/src/exec/data-sink.h
index 605b46d..d4f2040 100644
--- a/be/src/exec/data-sink.h
+++ b/be/src/exec/data-sink.h
@@ -63,6 +63,10 @@ class DataSink {
   /// initializes their evaluators. Subclasses must call DataSink::Prepare().
   virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker);
 
+  /// Codegen expressions in the sink. Overridden by sink type which supports 
codegen.
+  /// No-op by default.
+  virtual void Codegen(LlvmCodeGen* codegen);
+
   /// Call before Send() to open the sink and initialize output expression 
evaluators.
   virtual Status Open(RuntimeState* state);
 
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index 297a805..8884f30 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -94,7 +94,7 @@ Status ExchangeNode::Prepare(RuntimeState* state) {
   if (is_merging_) {
     less_than_.reset(
         new TupleRowComparator(ordering_exprs_, is_asc_order_, nulls_first_));
-    AddCodegenDisabledMessage(state);
+    state->CheckAndAddCodegenDisabledMessage(runtime_profile());
   }
   return Status::OK();
 }
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index 3a99f18..85d965b 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -501,14 +501,6 @@ Status ExecNode::QueryMaintenance(RuntimeState* state) {
   return state->CheckQueryState();
 }
 
-void ExecNode::AddCodegenDisabledMessage(RuntimeState* state) {
-  if (state->CodegenDisabledByQueryOption()) {
-    runtime_profile()->AddCodegenMsg(false, "disabled by query option 
DISABLE_CODEGEN");
-  } else if (state->CodegenDisabledByHint()) {
-    runtime_profile()->AddCodegenMsg(false, "disabled due to optimization 
hints");
-  }
-}
-
 bool ExecNode::IsNodeCodegenDisabled() const {
   return disable_codegen_;
 }
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index 80241b1..fac8312 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -216,9 +216,6 @@ class ExecNode {
   /// check to see if codegen was enabled for the enclosing fragment.
   bool IsNodeCodegenDisabled() const;
 
-  /// Add codegen disabled message if codegen is disabled for this ExecNode.
-  void AddCodegenDisabledMessage(RuntimeState* state);
-
   /// Extract node id from p->name().
   static int GetNodeIdFromProfile(RuntimeProfile* p);
 
diff --git a/be/src/exec/hdfs-scan-node-base.cc 
b/be/src/exec/hdfs-scan-node-base.cc
index a163e63..67f07cd 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -264,7 +264,7 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
   UpdateHdfsSplitStats(*scan_range_params_, &per_volume_stats);
   PrintHdfsSplitStats(per_volume_stats, &str);
   runtime_profile()->AddInfoString(HDFS_SPLIT_STATS_DESC, str.str());
-  AddCodegenDisabledMessage(state);
+  state->CheckAndAddCodegenDisabledMessage(runtime_profile());
   return Status::OK();
 }
 
diff --git a/be/src/exec/partial-sort-node.cc b/be/src/exec/partial-sort-node.cc
index 911f014..9fd653c 100644
--- a/be/src/exec/partial-sort-node.cc
+++ b/be/src/exec/partial-sort-node.cc
@@ -62,7 +62,7 @@ Status PartialSortNode::Prepare(RuntimeState* state) {
       resource_profile_.spillable_buffer_size, runtime_profile(), state, id(), 
false));
   RETURN_IF_ERROR(sorter_->Prepare(pool_));
   DCHECK_GE(resource_profile_.min_reservation, 
sorter_->ComputeMinReservation());
-  AddCodegenDisabledMessage(state);
+  state->CheckAndAddCodegenDisabledMessage(runtime_profile());
   input_batch_.reset(
       new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker()));
   return Status::OK();
diff --git a/be/src/exec/partitioned-aggregation-node.cc 
b/be/src/exec/partitioned-aggregation-node.cc
index d7b8c0a..3a90f14 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -225,7 +225,7 @@ Status PartitionedAggregationNode::Prepare(RuntimeState* 
state) {
         state->fragment_hash_seed(), MAX_PARTITION_DEPTH, 1, expr_perm_pool(),
         expr_results_pool(), expr_results_pool(), &ht_ctx_));
   }
-  AddCodegenDisabledMessage(state);
+  state->CheckAndAddCodegenDisabledMessage(runtime_profile());
   return Status::OK();
 }
 
diff --git a/be/src/exec/partitioned-hash-join-builder.cc 
b/be/src/exec/partitioned-hash-join-builder.cc
index 194bb92..c627b98 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -137,11 +137,7 @@ Status PhjBuilder::Prepare(RuntimeState* state, 
MemTracker* parent_mem_tracker)
   partition_build_rows_timer_ = ADD_TIMER(profile(), "BuildRowsPartitionTime");
   build_hash_table_timer_ = ADD_TIMER(profile(), "HashTablesBuildTime");
   repartition_timer_ = ADD_TIMER(profile(), "RepartitionTime");
-  if (state->CodegenDisabledByQueryOption()) {
-    profile()->AddCodegenMsg(false, "disabled by query option 
DISABLE_CODEGEN");
-  } else if (state->CodegenDisabledByHint()) {
-    profile()->AddCodegenMsg(false, "disabled due to optimization hints");
-  }
+  state->CheckAndAddCodegenDisabledMessage(profile());
   return Status::OK();
 }
 
diff --git a/be/src/exec/partitioned-hash-join-builder.h 
b/be/src/exec/partitioned-hash-join-builder.h
index e075beb..f6105fc 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -91,7 +91,7 @@ class PhjBuilder : public DataSink {
   /// Does all codegen for the builder (if codegen is enabled).
   /// Updates the the builder's runtime profile with info about whether any 
errors
   /// occured during codegen.
-  void Codegen(LlvmCodeGen* codegen);
+  virtual void Codegen(LlvmCodeGen* codegen) override;
 
   /////////////////////////////////////////
   // The following functions are used only by PartitionedHashJoinNode.
diff --git a/be/src/exec/partitioned-hash-join-node.cc 
b/be/src/exec/partitioned-hash-join-node.cc
index 99e07eb..909a427 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -136,7 +136,7 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* 
state) {
       ADD_COUNTER(runtime_profile(), "ProbeRowsPartitioned", TUnit::UNIT);
   num_hash_table_builds_skipped_ =
       ADD_COUNTER(runtime_profile(), "NumHashTableBuildsSkipped", TUnit::UNIT);
-  AddCodegenDisabledMessage(state);
+  state->CheckAndAddCodegenDisabledMessage(runtime_profile());
   return Status::OK();
 }
 
diff --git a/be/src/exec/sort-node.cc b/be/src/exec/sort-node.cc
index 9ab1435..546f3ba 100644
--- a/be/src/exec/sort-node.cc
+++ b/be/src/exec/sort-node.cc
@@ -58,7 +58,7 @@ Status SortNode::Prepare(RuntimeState* state) {
           resource_profile_.spillable_buffer_size, runtime_profile(), state, 
id(), true));
   RETURN_IF_ERROR(sorter_->Prepare(pool_));
   DCHECK_GE(resource_profile_.min_reservation, 
sorter_->ComputeMinReservation());
-  AddCodegenDisabledMessage(state);
+  state->CheckAndAddCodegenDisabledMessage(runtime_profile());
   return Status::OK();
 }
 
diff --git a/be/src/exec/topn-node.cc b/be/src/exec/topn-node.cc
index 31bf58b..97c05f9 100644
--- a/be/src/exec/topn-node.cc
+++ b/be/src/exec/topn-node.cc
@@ -79,7 +79,7 @@ Status TopNNode::Prepare(RuntimeState* state) {
       new TupleRowComparator(ordering_exprs_, is_asc_order_, nulls_first_));
   output_tuple_desc_ = row_descriptor_.tuple_descriptors()[0];
   insert_batch_timer_ = ADD_TIMER(runtime_profile(), "InsertBatchTime");
-  AddCodegenDisabledMessage(state);
+  state->CheckAndAddCodegenDisabledMessage(runtime_profile());
   tuple_pool_reclaim_counter_ = ADD_COUNTER(runtime_profile(), 
"TuplePoolReclamations",
       TUnit::UNIT);
   return Status::OK();
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 89cbbb9..7dcf45c 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -47,6 +47,7 @@ add_library(Runtime
   krpc-data-stream-mgr.cc
   krpc-data-stream-recvr.cc
   krpc-data-stream-sender.cc
+  krpc-data-stream-sender-ir.cc
   lib-cache.cc
   mem-tracker.cc
   mem-pool.cc
diff --git a/be/src/runtime/fragment-instance-state.cc 
b/be/src/runtime/fragment-instance-state.cc
index 629fb96..e6fa489 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -247,6 +247,8 @@ Status FragmentInstanceState::Open() {
       SCOPED_THREAD_COUNTER_MEASUREMENT(
           runtime_state_->codegen()->llvm_thread_counters());
       exec_tree_->Codegen(runtime_state_);
+      sink_->Codegen(runtime_state_->codegen());
+
       // It shouldn't be fatal to fail codegen. However, until IMPALA-4233 is 
fixed,
       // ScalarFnCall has no fall back to interpretation when codegen fails so 
propagates
       // the error status for now.
diff --git a/be/src/runtime/krpc-data-stream-sender-ir.cc 
b/be/src/runtime/krpc-data-stream-sender-ir.cc
new file mode 100644
index 0000000..335e16f
--- /dev/null
+++ b/be/src/runtime/krpc-data-stream-sender-ir.cc
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exprs/scalar-expr-evaluator.h"
+#include "runtime/krpc-data-stream-sender.h"
+#include "runtime/raw-value.h"
+#include "runtime/row-batch.h"
+
+namespace impala {
+
+ScalarExprEvaluator* KrpcDataStreamSender::GetPartitionExprEvaluator(int i) {
+  return partition_expr_evals_[i];
+}
+
+Status KrpcDataStreamSender::HashAndAddRows(RowBatch* batch) {
+  const int num_rows = batch->num_rows();
+  const int num_channels = GetNumChannels();
+  int channel_ids[RowBatch::HASH_BATCH_SIZE];
+  int row_idx = 0;
+  while (row_idx < num_rows) {
+    int row_count = 0;
+    FOREACH_ROW_LIMIT(batch, row_idx, RowBatch::HASH_BATCH_SIZE, 
row_batch_iter) {
+      TupleRow* row = row_batch_iter.Get();
+      channel_ids[row_count++] = HashRow(row) % num_channels;
+    }
+    row_count = 0;
+    FOREACH_ROW_LIMIT(batch, row_idx, RowBatch::HASH_BATCH_SIZE, 
row_batch_iter) {
+      RETURN_IF_ERROR(AddRowToChannel(channel_ids[row_count++], 
row_batch_iter.Get()));
+    }
+    row_idx += row_count;
+  }
+  return Status::OK();
+}
+
+}
diff --git a/be/src/runtime/krpc-data-stream-sender.cc 
b/be/src/runtime/krpc-data-stream-sender.cc
index cd30f06..74eb129 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -25,6 +25,8 @@
 #include <thrift/protocol/TDebugProtocol.h>
 
 #include "common/logging.h"
+#include "codegen/codegen-anyval.h"
+#include "codegen/llvm-codegen.h"
 #include "exec/kudu-util.h"
 #include "exprs/scalar-expr.h"
 #include "exprs/scalar-expr-evaluator.h"
@@ -61,6 +63,10 @@ DECLARE_int32(rpc_retry_interval_ms);
 
 namespace impala {
 
+const char* KrpcDataStreamSender::HASH_ROW_SYMBOL =
+    "KrpcDataStreamSender7HashRowEPNS_8TupleRowE";
+const char* KrpcDataStreamSender::LLVM_CLASS_NAME = 
"class.impala::KrpcDataStreamSender";
+
 // A datastream sender may send row batches to multiple destinations. There is 
one
 // channel for each destination.
 //
@@ -645,6 +651,7 @@ Status KrpcDataStreamSender::Prepare(
   for (int i = 0; i < channels_.size(); ++i) {
     RETURN_IF_ERROR(channels_[i]->Init(state));
   }
+  state->CheckAndAddCodegenDisabledMessage(profile());
   return Status::OK();
 }
 
@@ -653,6 +660,198 @@ Status KrpcDataStreamSender::Open(RuntimeState* state) {
   return ScalarExprEvaluator::Open(partition_expr_evals_, state);
 }
 
+//
+// An example of generated code with int type.
+//
+// define i64 
@KrpcDataStreamSenderHashRow(%"class.impala::KrpcDataStreamSender"* %this,
+//                                         %"class.impala::TupleRow"* %row) 
#46 {
+// entry:
+//   %0 = alloca i32
+//   %1 = call %"class.impala::ScalarExprEvaluator"*
+//       @_ZN6impala20KrpcDataStreamSender25GetPartitionExprEvaluatorEi(
+//           %"class.impala::KrpcDataStreamSender"* %this, i32 0)
+//   %partition_val = call i64 @GetSlotRef(
+//       %"class.impala::ScalarExprEvaluator"* %1, %"class.impala::TupleRow"* 
%row)
+//   %is_null = trunc i64 %partition_val to i1
+//   br i1 %is_null, label %is_null_block, label %not_null_block
+//
+// is_null_block:                                ; preds = %entry
+//   br label %hash_val_block
+//
+// not_null_block:                               ; preds = %entry
+//   %2 = ashr i64 %partition_val, 32
+//   %3 = trunc i64 %2 to i32
+//   store i32 %3, i32* %0
+//   %native_ptr = bitcast i32* %0 to i8*
+//   br label %hash_val_block
+//
+// hash_val_block:                               ; preds = %not_null_block, 
%is_null_block
+//   %val_ptr_phi = phi i8* [ %native_ptr, %not_null_block ], [ null, 
%is_null_block ]
+//   %hash_val = call i64
+//       @_ZN6impala8RawValue20GetHashValueFastHashEPKvRKNS_10ColumnTypeEm(
+//           i8* %val_ptr_phi, %"struct.impala::ColumnType"* @expr_type_arg,
+//               i64 7403188670037225271)
+//   ret i64 %hash_val
+// }
+Status KrpcDataStreamSender::CodegenHashRow(LlvmCodeGen* codegen, 
llvm::Function** fn) {
+  llvm::LLVMContext& context = codegen->context();
+  LlvmBuilder builder(context);
+
+  LlvmCodeGen::FnPrototype prototype(
+      codegen, "KrpcDataStreamSenderHashRow", codegen->i64_type());
+  prototype.AddArgument(
+      LlvmCodeGen::NamedVariable("this", 
codegen->GetNamedPtrType(LLVM_CLASS_NAME)));
+  prototype.AddArgument(
+      LlvmCodeGen::NamedVariable("row", 
codegen->GetStructPtrType<TupleRow>()));
+
+  llvm::Value* args[2];
+  llvm::Function* hash_row_fn = prototype.GeneratePrototype(&builder, args);
+  llvm::Value* this_arg = args[0];
+  llvm::Value* row_arg = args[1];
+
+  // Store the initial seed to hash_val
+  llvm::Value* hash_val = codegen->GetI64Constant(EXCHANGE_HASH_SEED);
+
+  // Unroll the loop and codegen each of the partition expressions
+  for (int i = 0; i < partition_exprs_.size(); ++i) {
+    llvm::Function* compute_fn;
+    RETURN_IF_ERROR(partition_exprs_[i]->GetCodegendComputeFn(codegen, 
&compute_fn));
+
+    // Load the expression evaluator for the i-th partition expression
+    llvm::Function* get_expr_eval_fn =
+        codegen->GetFunction(IRFunction::KRPC_DSS_GET_PART_EXPR_EVAL, false);
+    DCHECK(get_expr_eval_fn != nullptr);
+    llvm::Value* expr_eval_arg =
+        builder.CreateCall(get_expr_eval_fn, {this_arg, 
codegen->GetI32Constant(i)});
+
+    // Compute the value against the i-th partition expression
+    llvm::Value* compute_fn_args[] = {expr_eval_arg, row_arg};
+    CodegenAnyVal partition_val = CodegenAnyVal::CreateCallWrapped(codegen, 
&builder,
+        partition_exprs_[i]->type(), compute_fn, compute_fn_args, 
"partition_val");
+
+    llvm::BasicBlock* is_null_block =
+        llvm::BasicBlock::Create(context, "is_null_block", hash_row_fn);
+    llvm::BasicBlock* not_null_block =
+        llvm::BasicBlock::Create(context, "not_null_block", hash_row_fn);
+    llvm::BasicBlock* hash_val_block =
+        llvm::BasicBlock::Create(context, "hash_val_block", hash_row_fn);
+
+    // Check if 'partition_val' is NULL
+    llvm::Value* val_is_null = partition_val.GetIsNull();
+    builder.CreateCondBr(val_is_null, is_null_block, not_null_block);
+
+    // Set the pointer to NULL in case 'partition_val' evaluates to NULL
+    builder.SetInsertPoint(is_null_block);
+    llvm::Value* null_ptr = codegen->null_ptr_value();
+    builder.CreateBr(hash_val_block);
+
+    // Saves 'partition_val' on the stack and passes a pointer to it to the 
hash function
+    builder.SetInsertPoint(not_null_block);
+    llvm::Value* native_ptr = partition_val.ToNativePtr();
+    native_ptr = builder.CreatePointerCast(native_ptr, codegen->ptr_type(), 
"native_ptr");
+    builder.CreateBr(hash_val_block);
+
+    // Picks the input value to hash function
+    builder.SetInsertPoint(hash_val_block);
+    llvm::PHINode* val_ptr_phi = builder.CreatePHI(codegen->ptr_type(), 2, 
"val_ptr_phi");
+    val_ptr_phi->addIncoming(native_ptr, not_null_block);
+    val_ptr_phi->addIncoming(null_ptr, is_null_block);
+
+    // Creates a global constant of the partition expression's ColumnType. It 
has to be a
+    // constant for constant propagation and dead code elimination in 
'get_hash_value_fn'
+    llvm::Type* col_type = codegen->GetStructType<ColumnType>();
+    llvm::Constant* expr_type_arg = codegen->ConstantToGVPtr(
+        col_type, partition_exprs_[i]->type().ToIR(codegen), "expr_type_arg");
+
+    // Update 'hash_val' with the new 'partition-val'
+    llvm::Value* get_hash_value_args[] = {val_ptr_phi, expr_type_arg, 
hash_val};
+    llvm::Function* get_hash_value_fn =
+        codegen->GetFunction(IRFunction::RAW_VALUE_GET_HASH_VALUE_FAST_HASH, 
false);
+    DCHECK(get_hash_value_fn != nullptr);
+    hash_val = builder.CreateCall(get_hash_value_fn, get_hash_value_args, 
"hash_val");
+  }
+
+  builder.CreateRet(hash_val);
+  *fn = codegen->FinalizeFunction(hash_row_fn);
+  if (*fn == nullptr) {
+    return Status("Codegen'd KrpcDataStreamSenderHashRow() fails verification. 
See log");
+  }
+
+  return Status::OK();
+}
+
+string KrpcDataStreamSender::PartitionTypeName() const {
+  switch (partition_type_) {
+  case TPartitionType::UNPARTITIONED:
+    return "Unpartitioned";
+  case TPartitionType::HASH_PARTITIONED:
+    return "Hash Partitioned";
+  case TPartitionType::RANDOM:
+    return "Random Partitioned";
+  case TPartitionType::KUDU:
+    return "Kudu Partitioned";
+  default:
+    DCHECK(false) << partition_type_;
+    return "";
+  }
+}
+
+void KrpcDataStreamSender::Codegen(LlvmCodeGen* codegen) {
+  const string sender_name = PartitionTypeName() + " Sender";
+  if (partition_type_ != TPartitionType::HASH_PARTITIONED) {
+    const string& msg = Substitute("not $0",
+        partition_type_ == TPartitionType::KUDU ? "supported" : "needed");
+    profile()->AddCodegenMsg(false, msg, sender_name);
+    return;
+  }
+
+  llvm::Function* hash_row_fn;
+  Status codegen_status = CodegenHashRow(codegen, &hash_row_fn);
+  if (codegen_status.ok()) {
+    llvm::Function* hash_and_add_rows_fn =
+        codegen->GetFunction(IRFunction::KRPC_DSS_HASH_AND_ADD_ROWS, true);
+    DCHECK(hash_and_add_rows_fn != nullptr);
+
+    int num_replaced;
+    // Replace GetNumChannels() with a constant.
+    num_replaced = codegen->ReplaceCallSitesWithValue(hash_and_add_rows_fn,
+        codegen->GetI32Constant(GetNumChannels()), "GetNumChannels");
+    DCHECK_EQ(num_replaced, 1);
+
+    // Replace HashRow() with the handcrafted IR function.
+    num_replaced = codegen->ReplaceCallSites(hash_and_add_rows_fn,
+        hash_row_fn, HASH_ROW_SYMBOL);
+    DCHECK_EQ(num_replaced, 1);
+
+    hash_and_add_rows_fn = codegen->FinalizeFunction(hash_and_add_rows_fn);
+    if (hash_and_add_rows_fn == nullptr) {
+      codegen_status =
+          Status("Codegen'd HashAndAddRows() failed verification. See log");
+    } else {
+      codegen->AddFunctionToJit(hash_and_add_rows_fn,
+          reinterpret_cast<void**>(&hash_and_add_rows_fn_));
+    }
+  }
+  profile()->AddCodegenMsg(codegen_status.ok(), codegen_status, sender_name);
+}
+
+Status KrpcDataStreamSender::AddRowToChannel(const int channel_id, TupleRow* 
row) {
+  return channels_[channel_id]->AddRow(row);
+}
+
+uint64_t KrpcDataStreamSender::HashRow(TupleRow* row) {
+  uint64_t hash_val = EXCHANGE_HASH_SEED;
+  for (ScalarExprEvaluator* eval : partition_expr_evals_) {
+    void* partition_val = eval->GetValue(row);
+    // We can't use the crc hash function here because it does not result in
+    // uncorrelated hashes with different seeds. Instead we use FastHash.
+    // TODO: fix crc hash/GetHashValue()
+    hash_val = RawValue::GetHashValueFastHash(
+        partition_val, eval->root().type(), hash_val);
+  }
+  return hash_val;
+}
+
 Status KrpcDataStreamSender::Send(RuntimeState* state, RowBatch* batch) {
   SCOPED_TIMER(profile()->total_time_counter());
   DCHECK(!closed_);
@@ -703,40 +902,10 @@ Status KrpcDataStreamSender::Send(RuntimeState* state, 
RowBatch* batch) {
     }
   } else {
     DCHECK_EQ(partition_type_, TPartitionType::HASH_PARTITIONED);
-    // hash-partition batch's rows across channels
-    // TODO: encapsulate this in an Expr as we've done for Kudu above and 
remove this case
-    // once we have codegen here.
-    int num_channels = channels_.size();
-    const int num_partition_exprs = partition_exprs_.size();
-    const int num_rows = batch->num_rows();
-    const int hash_batch_size = RowBatch::HASH_BATCH_SIZE;
-    int channel_ids[hash_batch_size];
-
-    // Break the loop into two parts break the data dependency between 
computing
-    // the hash and calling AddRow()
-    // To keep stack allocation small a RowBatch::HASH_BATCH is used
-    for (int batch_start = 0; batch_start < num_rows; batch_start += 
hash_batch_size) {
-      int batch_window_size = min(num_rows - batch_start, hash_batch_size);
-      for (int i = 0; i < batch_window_size; ++i) {
-        TupleRow* row = batch->GetRow(i + batch_start);
-        uint64_t hash_val = EXCHANGE_HASH_SEED;
-        for (int j = 0; j < num_partition_exprs; ++j) {
-          ScalarExprEvaluator* eval = partition_expr_evals_[j];
-          void* partition_val = eval->GetValue(row);
-          // We can't use the crc hash function here because it does not 
result in
-          // uncorrelated hashes with different seeds. Instead we use FastHash.
-          // TODO: fix crc hash/GetHashValue()
-          DCHECK(&(eval->root()) == partition_exprs_[j]);
-          hash_val = RawValue::GetHashValueFastHash(
-              partition_val, partition_exprs_[j]->type(), hash_val);
-        }
-        channel_ids[i] = hash_val % num_channels;
-      }
-
-      for (int i = 0; i < batch_window_size; ++i) {
-        TupleRow* row = batch->GetRow(i + batch_start);
-        RETURN_IF_ERROR(channels_[channel_ids[i]]->AddRow(row));
-      }
+    if (hash_and_add_rows_fn_ != nullptr) {
+      RETURN_IF_ERROR(hash_and_add_rows_fn_(this, batch));
+    } else {
+      RETURN_IF_ERROR(HashAndAddRows(batch));
     }
   }
   COUNTER_ADD(total_sent_rows_counter_, batch->num_rows());
diff --git a/be/src/runtime/krpc-data-stream-sender.h 
b/be/src/runtime/krpc-data-stream-sender.h
index e6c6ccf..6757c2a 100644
--- a/be/src/runtime/krpc-data-stream-sender.h
+++ b/be/src/runtime/krpc-data-stream-sender.h
@@ -23,9 +23,11 @@
 #include <string>
 
 #include "exec/data-sink.h"
+#include "codegen/impala-ir.h"
 #include "common/global-types.h"
 #include "common/object-pool.h"
 #include "common/status.h"
+#include "exprs/scalar-expr.h"
 #include "runtime/row-batch.h"
 #include "util/runtime-profile.h"
 
@@ -48,7 +50,7 @@ class TPlanFragmentDestination;
 /// TODO: create a PlanNode equivalent class for DataSink.
 class KrpcDataStreamSender : public DataSink {
  public:
-  /// Construct a sender according to the output specification (tsink), 
sending to the
+  /// Constructs a sender according to the output specification (tsink), 
sending to the
   /// given destinations:
   /// 'sender_id' identifies this sender instance, and is unique within a 
fragment.
   /// 'row_desc' is the descriptor of the tuple row. It must out-live the sink.
@@ -65,40 +67,44 @@ class KrpcDataStreamSender : public DataSink {
 
   virtual ~KrpcDataStreamSender();
 
-  /// Initialize the sender by initializing all the channels and allocates all
+  /// Initializes the sender by initializing all the channels and allocates all
   /// the stat counters. Return error status if any channels failed to 
initialize.
-  virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker);
+  virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) 
override;
 
-  /// Initialize the evaluator of the partitioning expressions. Return error 
status
+  /// Codegen HashAndAddRows() if partitioning type is HASH_PARTITIONED.
+  /// Replaces HashRow() and GetNumChannels() based on runtime information.
+  virtual void Codegen(LlvmCodeGen* codegen) override;
+
+  /// Initializes the evaluator of the partitioning expressions. Return error 
status
   /// if initialization failed.
-  virtual Status Open(RuntimeState* state);
+  virtual Status Open(RuntimeState* state) override;
 
-  /// Flush all buffered data and close all existing channels to destination 
hosts.
+  /// Flushes all buffered data and close all existing channels to destination 
hosts.
   /// Further Send() calls are illegal after calling FlushFinal(). It is legal 
to call
   /// FlushFinal() no more than once. Return error status if Send() failed or 
the end
   /// of stream call failed.
-  virtual Status FlushFinal(RuntimeState* state);
+  virtual Status FlushFinal(RuntimeState* state) override;
 
-  /// Send data in 'batch' to destination nodes according to partitioning
+  /// Sends data in 'batch' to destination nodes according to partitioning
   /// specification provided in c'tor.
   /// Blocks until all rows in batch are placed in their appropriate outgoing
   /// buffers (ie, blocks if there are still in-flight rpcs from the last
   /// Send() call).
-  virtual Status Send(RuntimeState* state, RowBatch* batch);
+  virtual Status Send(RuntimeState* state, RowBatch* batch) override;
 
   /// Shutdown all existing channels to destination hosts. Further 
FlushFinal() calls are
   /// illegal after calling Close().
-  virtual void Close(RuntimeState* state);
+  virtual void Close(RuntimeState* state) override;
 
  protected:
   friend class DataStreamTest;
 
-  /// Initialize any partitioning expressions based on 'thrift_output_exprs' 
and stores
+  /// Initializes any partitioning expressions based on 'thrift_output_exprs' 
and stores
   /// them in 'partition_exprs_'. Returns error status if the initialization 
failed.
   virtual Status Init(const std::vector<TExpr>& thrift_output_exprs,
-      const TDataSink& tsink, RuntimeState* state);
+      const TDataSink& tsink, RuntimeState* state) override;
 
-  /// Return total number of bytes sent. If batches are broadcast to multiple 
receivers,
+  /// Returns total number of bytes sent. If batches are broadcast to multiple 
receivers,
   /// they are counted once per receiver.
   int64_t GetNumDataBytesSent() const;
 
@@ -111,6 +117,35 @@ class KrpcDataStreamSender : public DataSink {
   /// updating the stat counters.
   Status SerializeBatch(RowBatch* src, OutboundRowBatch* dest, int 
num_receivers = 1);
 
+  /// Returns 'partition_expr_evals_[i]'. Used by the codegen'd HashRow() IR 
function.
+  ScalarExprEvaluator* GetPartitionExprEvaluator(int i);
+
+  /// Returns the number of channels in this data stream sender. Not inlined 
for the
+  /// cross-compiled code as it's to be replaced with a constant during 
codegen.
+  int IR_NO_INLINE GetNumChannels() const { return channels_.size(); }
+
+  /// Evaluates the input row against partition expressions and hashes the 
expression
+  /// values. Returns the final hash value.
+  uint64_t HashRow(TupleRow* row);
+
+  /// Used when 'partition_type_' is HASH_PARTITIONED. Call HashRow() against 
each row
+  /// in the input batch and adds it to the corresponding channel based on the 
hash value.
+  /// Cross-compiled to be patched by Codegen() at runtime. Returns error 
status if
+  /// insertion into the channel fails. Returns OK status otherwise.
+  Status HashAndAddRows(RowBatch* batch);
+
+  /// Adds the given row to 'channels_[channel_id]'.
+  Status AddRowToChannel(const int channel_id, TupleRow* row);
+
+  /// Codegen the HashRow() function and returns the codegen'd function in 
'fn'.
+  /// This involves unrolling the loop in HashRow(), codegens each of the 
partition
+  /// expressions and replaces the column type argument to the hash function 
with
+  /// constants to eliminate some branches. Returns error status on failure.
+  Status CodegenHashRow(LlvmCodeGen* codegen, llvm::Function** fn);
+
+  /// Returns the name of the partitioning type of this data stream sender.
+  string PartitionTypeName() const;
+
   /// Sender instance id, unique within a fragment.
   const int sender_id_;
 
@@ -187,8 +222,18 @@ class KrpcDataStreamSender : public DataSink {
   /// or when errors are encountered.
   int next_unknown_partition_;
 
+  /// Types and pointers for the codegen'd HashAndAddRows() functions.
+  /// NULL if codegen is disabled or failed.
+  typedef Status (*HashAndAddRowsFn)(KrpcDataStreamSender*, RowBatch* row);
+  HashAndAddRowsFn hash_and_add_rows_fn_ = nullptr;
+
+  /// KrpcDataStreamSender::HashRow() symbol. Used for call-site replacement.
+  static const char* HASH_ROW_SYMBOL;
+
   /// An arbitrary hash seed used for exchanges.
   static constexpr uint64_t EXCHANGE_HASH_SEED = 0x66bd68df22c3ef37;
+
+  static const char* LLVM_CLASS_NAME;
 };
 
 } // namespace impala
diff --git a/be/src/runtime/raw-value-ir.cc b/be/src/runtime/raw-value-ir.cc
index 93b77f2..2e44d6a 100644
--- a/be/src/runtime/raw-value-ir.cc
+++ b/be/src/runtime/raw-value-ir.cc
@@ -23,6 +23,7 @@
 #include "runtime/raw-value.inline.h"
 #include "runtime/string-value.inline.h"
 #include "runtime/timestamp-value.h"
+#include "util/hash-util.h"
 
 using namespace impala;
 
@@ -162,3 +163,30 @@ uint32_t IR_ALWAYS_INLINE RawValue::GetHashValue(
       return 0;
   }
 }
+
+uint64_t IR_ALWAYS_INLINE RawValue::GetHashValueFastHash(const void* v,
+    const ColumnType& type, uint64_t seed) {
+  // Hash with an arbitrary constant to ensure we don't return seed.
+  if (v == nullptr) {
+    return HashUtil::FastHash64(&HASH_VAL_NULL, sizeof(HASH_VAL_NULL), seed);
+  }
+  switch (type.type) {
+    case TYPE_STRING:
+    case TYPE_VARCHAR: {
+      const StringValue* string_value = reinterpret_cast<const 
StringValue*>(v);
+      return HashUtil::FastHash64(string_value->ptr,
+          static_cast<size_t>(string_value->len), seed);
+    }
+    case TYPE_BOOLEAN: return HashUtil::FastHash64(v, 1, seed);
+    case TYPE_TINYINT: return HashUtil::FastHash64(v, 1, seed);
+    case TYPE_SMALLINT: return HashUtil::FastHash64(v, 2, seed);
+    case TYPE_INT: return HashUtil::FastHash64(v, 4, seed);
+    case TYPE_BIGINT: return HashUtil::FastHash64(v, 8, seed);
+    case TYPE_FLOAT: return HashUtil::FastHash64(v, 4, seed);
+    case TYPE_DOUBLE: return HashUtil::FastHash64(v, 8, seed);
+    case TYPE_TIMESTAMP: return HashUtil::FastHash64(v, 12, seed);
+    case TYPE_CHAR: return HashUtil::FastHash64(v, type.len, seed);
+    case TYPE_DECIMAL: return HashUtil::FastHash64(v, type.GetByteSize(), 
seed);
+    default: DCHECK(false); return 0;
+  }
+}
diff --git a/be/src/runtime/raw-value.cc b/be/src/runtime/raw-value.cc
index 1db9741..d030460 100644
--- a/be/src/runtime/raw-value.cc
+++ b/be/src/runtime/raw-value.cc
@@ -193,33 +193,6 @@ void RawValue::Write(const void* value, Tuple* tuple, 
const SlotDescriptor* slot
   }
 }
 
-uint64_t RawValue::GetHashValueFastHash(const void* v, const ColumnType& type,
-    uint64_t seed) {
-  // Hash with an arbitrary constant to ensure we don't return seed.
-  if (v == nullptr) {
-    return HashUtil::FastHash64(&HASH_VAL_NULL, sizeof(HASH_VAL_NULL), seed);
-  }
-  switch (type.type) {
-    case TYPE_STRING:
-    case TYPE_VARCHAR: {
-      const StringValue* string_value = reinterpret_cast<const 
StringValue*>(v);
-      return HashUtil::FastHash64(string_value->ptr,
-          static_cast<size_t>(string_value->len), seed);
-    }
-    case TYPE_BOOLEAN: return HashUtil::FastHash64(v, 1, seed);
-    case TYPE_TINYINT: return HashUtil::FastHash64(v, 1, seed);
-    case TYPE_SMALLINT: return HashUtil::FastHash64(v, 2, seed);
-    case TYPE_INT: return HashUtil::FastHash64(v, 4, seed);
-    case TYPE_BIGINT: return HashUtil::FastHash64(v, 8, seed);
-    case TYPE_FLOAT: return HashUtil::FastHash64(v, 4, seed);
-    case TYPE_DOUBLE: return HashUtil::FastHash64(v, 8, seed);
-    case TYPE_TIMESTAMP: return HashUtil::FastHash64(v, 12, seed);
-    case TYPE_CHAR: return HashUtil::FastHash64(v, type.len, seed);
-    case TYPE_DECIMAL: return HashUtil::FastHash64(v, type.GetByteSize(), 
seed);
-    default: DCHECK(false); return 0;
-  }
-}
-
 void RawValue::PrintValue(
     const void* value, const ColumnType& type, int scale, std::stringstream* 
stream) {
   if (value == NULL) {
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 47eaf7a..8901d7e 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -152,6 +152,15 @@ class RuntimeState {
   /// expressions' Prepare() are invoked.
   bool ScalarFnNeedsCodegen() const { return !scalar_fns_to_codegen_.empty(); }
 
+  /// Check if codegen was disabled and if so, add a message to the runtime 
profile.
+  void CheckAndAddCodegenDisabledMessage(RuntimeProfile* profile) {
+    if (CodegenDisabledByQueryOption()) {
+      profile->AddCodegenMsg(false, "disabled by query option 
DISABLE_CODEGEN");
+    } else if (CodegenDisabledByHint()) {
+      profile->AddCodegenMsg(false, "disabled due to optimization hints");
+    }
+  }
+
   /// Returns true if there is a hint to disable codegen. This can be true for 
single node
   /// optimization or expression evaluation request from FE to BE (see 
fe-support.cc).
   /// Note that this internal flag is advisory and it may be ignored if the 
fragment has
diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h
index d0e296f..cae2462 100644
--- a/be/src/util/runtime-profile.h
+++ b/be/src/util/runtime-profile.h
@@ -234,7 +234,8 @@ class RuntimeProfile { // NOLINT: This struct is not 
packed, but there are not s
   /// then no error occurred).
   void AddCodegenMsg(bool codegen_enabled, const Status& codegen_status,
       const std::string& extra_label = "") {
-    AddCodegenMsg(codegen_enabled, codegen_status.GetDetail(), extra_label);
+    const string& err_msg = codegen_status.ok() ? "" : 
codegen_status.msg().msg();
+    AddCodegenMsg(codegen_enabled, err_msg, extra_label);
   }
 
   /// Creates and returns a new EventSequence (owned by the runtime
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/datastream-sender-codegen.test
 
b/testdata/workloads/functional-query/queries/QueryTest/datastream-sender-codegen.test
new file mode 100644
index 0000000..ad396ad
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/datastream-sender-codegen.test
@@ -0,0 +1,41 @@
+====
+---- QUERY
+set disable_codegen_rows_threshold=0;
+select count(*) from alltypes t1
+  join /* +SHUFFLE */ alltypes t2
+    on t1.int_col= t2.int_col and
+       t1.string_col = t2.string_col
+---- RESULTS
+5329000
+---- TYPES
+bigint
+---- RUNTIME_PROFILE
+# Verify that codegen was enabled
+row_regex: .*Hash Partitioned Sender Codegen Enabled.*
+====
+---- QUERY
+set disable_codegen_rows_threshold=0;
+select count(*) from alltypes t1
+  join /* +BROADCAST */ alltypes t2
+    on t1.int_col= t2.int_col and
+       t1.string_col = t2.string_col
+---- RESULTS
+5329000
+---- TYPES
+bigint
+---- RUNTIME_PROFILE
+# Verify that codegen was enabled
+row_regex: .*Unpartitioned Sender Codegen Disabled: not needed.*
+====
+---- QUERY
+set disable_codegen_rows_threshold=0;
+select count(*) from chars_tiny t1
+  join /* +SHUFFLE */ chars_tiny t2 on t1.cs=t2.cs;
+---- RESULTS
+10
+---- TYPES
+bigint
+---- RUNTIME_PROFILE
+# Verify that codegen was disabled
+row_regex: .*Hash Partitioned Sender Codegen Disabled: Codegen for Char not 
supported.*
+====
diff --git a/tests/query_test/test_codegen.py b/tests/query_test/test_codegen.py
index ccd85e1..a4c02f5 100644
--- a/tests/query_test/test_codegen.py
+++ b/tests/query_test/test_codegen.py
@@ -18,6 +18,7 @@
 # Tests end-to-end codegen behaviour.
 
 from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.skip import SkipIf
 from tests.common.test_dimensions import create_exec_option_dimension_from_dict
 from tests.common.test_result_verifier import get_node_exec_options,\
     assert_codegen_enabled
@@ -49,4 +50,9 @@ class TestCodegen(ImpalaTestSuite):
     exec_options = get_node_exec_options(result.runtime_profile, 1)
     # Make sure test fails if there are no exec options in the profile for the 
node
     assert len(exec_options) > 0
-    assert_codegen_enabled(result.runtime_profile, [1])
\ No newline at end of file
+    assert_codegen_enabled(result.runtime_profile, [1])
+
+  @SkipIf.not_krpc
+  def test_datastream_sender_codegen(self, vector):
+    """Test the KrpcDataStreamSender's codegen logic"""
+    self.run_test_case('QueryTest/datastream-sender-codegen', vector)

Reply via email to