This is an automated email from the ASF dual-hosted git repository. tarmstrong pushed a commit to branch 2.x in repository https://gitbox.apache.org/repos/asf/impala.git
commit 56c98e9900fa4898c1997ab834ca4c9ee81f3688 Author: Michael Ho <k...@cloudera.com> AuthorDate: Tue Mar 6 18:31:18 2018 -0800 IMPALA-5168: Codegen HASH_PARTITIONED KrpcDataStreamSender::Send() This change codegens the hash partitioning logic of KrpcDataStreamSender::Send() when the partitioning strategy is HASH_PARTITIONED. It does so by unrolling the loop which evaluates each row against the partitioning expressions and hashes the result. It also replaces the number of channels of that sender with a constant at runtime. With this change, we get reasonable speedup with some benchmarks: +------------+-----------------------+---------+------------+------------+----------------+ | Workload | File Format | Avg (s) | Delta(Avg) | GeoMean(s) | Delta(GeoMean) | +------------+-----------------------+---------+------------+------------+----------------+ | TPCH(_300) | parquet / none / none | 20.03 | -6.44% | 13.56 | -7.15% | +------------+-----------------------+---------+------------+------------+----------------+ +---------------------+-----------------------+---------+------------+------------+----------------+ | Workload | File Format | Avg (s) | Delta(Avg) | GeoMean(s) | Delta(GeoMean) | +---------------------+-----------------------+---------+------------+------------+----------------+ | TARGETED-PERF(_300) | parquet / none / none | 58.59 | -5.56% | 12.28 | -5.30% | +---------------------+-----------------------+---------+------------+------------+----------------+ +-------------------------+-----------------------+---------+------------+------------+----------------+ | Workload | File Format | Avg (s) | Delta(Avg) | GeoMean(s) | Delta(GeoMean) | +-------------------------+-----------------------+---------+------------+------------+----------------+ | TPCDS-UNMODIFIED(_1000) | parquet / none / none | 15.60 | -3.10% | 7.16 | -4.33% | +-------------------------+-----------------------+---------+------------+------------+----------------+ +-------------------+-----------------------+---------+------------+------------+----------------+ | Workload | File Format | Avg (s) | Delta(Avg) | GeoMean(s) | Delta(GeoMean) | +-------------------+-----------------------+---------+------------+------------+----------------+ | TPCH_NESTED(_300) | parquet / none / none | 30.93 | -3.02% | 17.46 | -4.71% | +-------------------+-----------------------+---------+------------+------------+----------------+ Change-Id: I1c44cc9312c062cc7a5a4ac9156ceaa31fb887ff Reviewed-on: http://gerrit.cloudera.org:8080/10421 Reviewed-by: Michael Ho <k...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- be/src/codegen/gen_ir_descriptions.py | 8 +- be/src/codegen/impala-ir.cc | 1 + be/src/exec/data-sink.cc | 4 + be/src/exec/data-sink.h | 4 + be/src/exec/exchange-node.cc | 2 +- be/src/exec/exec-node.cc | 8 - be/src/exec/exec-node.h | 3 - be/src/exec/hdfs-scan-node-base.cc | 2 +- be/src/exec/partial-sort-node.cc | 2 +- be/src/exec/partitioned-aggregation-node.cc | 2 +- be/src/exec/partitioned-hash-join-builder.cc | 6 +- be/src/exec/partitioned-hash-join-builder.h | 2 +- be/src/exec/partitioned-hash-join-node.cc | 2 +- be/src/exec/sort-node.cc | 2 +- be/src/exec/topn-node.cc | 2 +- be/src/runtime/CMakeLists.txt | 1 + be/src/runtime/fragment-instance-state.cc | 2 + be/src/runtime/krpc-data-stream-sender-ir.cc | 49 +++++ be/src/runtime/krpc-data-stream-sender.cc | 237 ++++++++++++++++++--- be/src/runtime/krpc-data-stream-sender.h | 71 ++++-- be/src/runtime/raw-value-ir.cc | 28 +++ be/src/runtime/raw-value.cc | 27 --- be/src/runtime/runtime-state.h | 9 + be/src/util/runtime-profile.h | 3 +- .../QueryTest/datastream-sender-codegen.test | 41 ++++ tests/query_test/test_codegen.py | 8 +- 26 files changed, 425 insertions(+), 101 deletions(-) diff --git a/be/src/codegen/gen_ir_descriptions.py b/be/src/codegen/gen_ir_descriptions.py index 26a8ad7..dd2df9e 100755 --- a/be/src/codegen/gen_ir_descriptions.py +++ b/be/src/codegen/gen_ir_descriptions.py @@ -196,6 +196,8 @@ ir_functions = [ "_ZN6impala8RawValue7CompareEPKvS2_RKNS_10ColumnTypeE"], ["RAW_VALUE_GET_HASH_VALUE", "_ZN6impala8RawValue12GetHashValueEPKvRKNS_10ColumnTypeEj"], + ["RAW_VALUE_GET_HASH_VALUE_FAST_HASH", + "_ZN6impala8RawValue20GetHashValueFastHashEPKvRKNS_10ColumnTypeEm"], ["TOPN_NODE_INSERT_BATCH", "_ZN6impala8TopNNode11InsertBatchEPNS_8RowBatchE"], ["MEMPOOL_ALLOCATE", @@ -219,7 +221,11 @@ ir_functions = [ ["FLOAT_MIN_MAX_FILTER_INSERT", "_ZN6impala17FloatMinMaxFilter6InsertEPv"], ["DOUBLE_MIN_MAX_FILTER_INSERT", "_ZN6impala18DoubleMinMaxFilter6InsertEPv"], ["STRING_MIN_MAX_FILTER_INSERT", "_ZN6impala18StringMinMaxFilter6InsertEPv"], - ["TIMESTAMP_MIN_MAX_FILTER_INSERT", "_ZN6impala21TimestampMinMaxFilter6InsertEPv"] + ["TIMESTAMP_MIN_MAX_FILTER_INSERT", "_ZN6impala21TimestampMinMaxFilter6InsertEPv"], + ["KRPC_DSS_GET_PART_EXPR_EVAL", + "_ZN6impala20KrpcDataStreamSender25GetPartitionExprEvaluatorEi"], + ["KRPC_DSS_HASH_AND_ADD_ROWS", + "_ZN6impala20KrpcDataStreamSender14HashAndAddRowsEPNS_8RowBatchE"] ] enums_preamble = '\ diff --git a/be/src/codegen/impala-ir.cc b/be/src/codegen/impala-ir.cc index 4b79e8b..9c5b3eb 100644 --- a/be/src/codegen/impala-ir.cc +++ b/be/src/codegen/impala-ir.cc @@ -54,6 +54,7 @@ #include "exprs/timestamp-functions-ir.cc" #include "exprs/udf-builtins-ir.cc" #include "exprs/utility-functions-ir.cc" +#include "runtime/krpc-data-stream-sender-ir.cc" #include "runtime/mem-pool.h" #include "runtime/raw-value-ir.cc" #include "runtime/runtime-filter-ir.cc" diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc index 9140b3e..2ca0019 100644 --- a/be/src/exec/data-sink.cc +++ b/be/src/exec/data-sink.cc @@ -134,6 +134,10 @@ Status DataSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) { return Status::OK(); } +void DataSink::Codegen(LlvmCodeGen* codegen) { + return; +} + Status DataSink::Open(RuntimeState* state) { DCHECK_EQ(output_exprs_.size(), output_expr_evals_.size()); return ScalarExprEvaluator::Open(output_expr_evals_, state); diff --git a/be/src/exec/data-sink.h b/be/src/exec/data-sink.h index 605b46d..d4f2040 100644 --- a/be/src/exec/data-sink.h +++ b/be/src/exec/data-sink.h @@ -63,6 +63,10 @@ class DataSink { /// initializes their evaluators. Subclasses must call DataSink::Prepare(). virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker); + /// Codegen expressions in the sink. Overridden by sink type which supports codegen. + /// No-op by default. + virtual void Codegen(LlvmCodeGen* codegen); + /// Call before Send() to open the sink and initialize output expression evaluators. virtual Status Open(RuntimeState* state); diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc index 297a805..8884f30 100644 --- a/be/src/exec/exchange-node.cc +++ b/be/src/exec/exchange-node.cc @@ -94,7 +94,7 @@ Status ExchangeNode::Prepare(RuntimeState* state) { if (is_merging_) { less_than_.reset( new TupleRowComparator(ordering_exprs_, is_asc_order_, nulls_first_)); - AddCodegenDisabledMessage(state); + state->CheckAndAddCodegenDisabledMessage(runtime_profile()); } return Status::OK(); } diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc index 3a99f18..85d965b 100644 --- a/be/src/exec/exec-node.cc +++ b/be/src/exec/exec-node.cc @@ -501,14 +501,6 @@ Status ExecNode::QueryMaintenance(RuntimeState* state) { return state->CheckQueryState(); } -void ExecNode::AddCodegenDisabledMessage(RuntimeState* state) { - if (state->CodegenDisabledByQueryOption()) { - runtime_profile()->AddCodegenMsg(false, "disabled by query option DISABLE_CODEGEN"); - } else if (state->CodegenDisabledByHint()) { - runtime_profile()->AddCodegenMsg(false, "disabled due to optimization hints"); - } -} - bool ExecNode::IsNodeCodegenDisabled() const { return disable_codegen_; } diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h index 80241b1..fac8312 100644 --- a/be/src/exec/exec-node.h +++ b/be/src/exec/exec-node.h @@ -216,9 +216,6 @@ class ExecNode { /// check to see if codegen was enabled for the enclosing fragment. bool IsNodeCodegenDisabled() const; - /// Add codegen disabled message if codegen is disabled for this ExecNode. - void AddCodegenDisabledMessage(RuntimeState* state); - /// Extract node id from p->name(). static int GetNodeIdFromProfile(RuntimeProfile* p); diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc index a163e63..67f07cd 100644 --- a/be/src/exec/hdfs-scan-node-base.cc +++ b/be/src/exec/hdfs-scan-node-base.cc @@ -264,7 +264,7 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) { UpdateHdfsSplitStats(*scan_range_params_, &per_volume_stats); PrintHdfsSplitStats(per_volume_stats, &str); runtime_profile()->AddInfoString(HDFS_SPLIT_STATS_DESC, str.str()); - AddCodegenDisabledMessage(state); + state->CheckAndAddCodegenDisabledMessage(runtime_profile()); return Status::OK(); } diff --git a/be/src/exec/partial-sort-node.cc b/be/src/exec/partial-sort-node.cc index 911f014..9fd653c 100644 --- a/be/src/exec/partial-sort-node.cc +++ b/be/src/exec/partial-sort-node.cc @@ -62,7 +62,7 @@ Status PartialSortNode::Prepare(RuntimeState* state) { resource_profile_.spillable_buffer_size, runtime_profile(), state, id(), false)); RETURN_IF_ERROR(sorter_->Prepare(pool_)); DCHECK_GE(resource_profile_.min_reservation, sorter_->ComputeMinReservation()); - AddCodegenDisabledMessage(state); + state->CheckAndAddCodegenDisabledMessage(runtime_profile()); input_batch_.reset( new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker())); return Status::OK(); diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc index d7b8c0a..3a90f14 100644 --- a/be/src/exec/partitioned-aggregation-node.cc +++ b/be/src/exec/partitioned-aggregation-node.cc @@ -225,7 +225,7 @@ Status PartitionedAggregationNode::Prepare(RuntimeState* state) { state->fragment_hash_seed(), MAX_PARTITION_DEPTH, 1, expr_perm_pool(), expr_results_pool(), expr_results_pool(), &ht_ctx_)); } - AddCodegenDisabledMessage(state); + state->CheckAndAddCodegenDisabledMessage(runtime_profile()); return Status::OK(); } diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc index 194bb92..c627b98 100644 --- a/be/src/exec/partitioned-hash-join-builder.cc +++ b/be/src/exec/partitioned-hash-join-builder.cc @@ -137,11 +137,7 @@ Status PhjBuilder::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) partition_build_rows_timer_ = ADD_TIMER(profile(), "BuildRowsPartitionTime"); build_hash_table_timer_ = ADD_TIMER(profile(), "HashTablesBuildTime"); repartition_timer_ = ADD_TIMER(profile(), "RepartitionTime"); - if (state->CodegenDisabledByQueryOption()) { - profile()->AddCodegenMsg(false, "disabled by query option DISABLE_CODEGEN"); - } else if (state->CodegenDisabledByHint()) { - profile()->AddCodegenMsg(false, "disabled due to optimization hints"); - } + state->CheckAndAddCodegenDisabledMessage(profile()); return Status::OK(); } diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h index e075beb..f6105fc 100644 --- a/be/src/exec/partitioned-hash-join-builder.h +++ b/be/src/exec/partitioned-hash-join-builder.h @@ -91,7 +91,7 @@ class PhjBuilder : public DataSink { /// Does all codegen for the builder (if codegen is enabled). /// Updates the the builder's runtime profile with info about whether any errors /// occured during codegen. - void Codegen(LlvmCodeGen* codegen); + virtual void Codegen(LlvmCodeGen* codegen) override; ///////////////////////////////////////// // The following functions are used only by PartitionedHashJoinNode. diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc index 99e07eb..909a427 100644 --- a/be/src/exec/partitioned-hash-join-node.cc +++ b/be/src/exec/partitioned-hash-join-node.cc @@ -136,7 +136,7 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) { ADD_COUNTER(runtime_profile(), "ProbeRowsPartitioned", TUnit::UNIT); num_hash_table_builds_skipped_ = ADD_COUNTER(runtime_profile(), "NumHashTableBuildsSkipped", TUnit::UNIT); - AddCodegenDisabledMessage(state); + state->CheckAndAddCodegenDisabledMessage(runtime_profile()); return Status::OK(); } diff --git a/be/src/exec/sort-node.cc b/be/src/exec/sort-node.cc index 9ab1435..546f3ba 100644 --- a/be/src/exec/sort-node.cc +++ b/be/src/exec/sort-node.cc @@ -58,7 +58,7 @@ Status SortNode::Prepare(RuntimeState* state) { resource_profile_.spillable_buffer_size, runtime_profile(), state, id(), true)); RETURN_IF_ERROR(sorter_->Prepare(pool_)); DCHECK_GE(resource_profile_.min_reservation, sorter_->ComputeMinReservation()); - AddCodegenDisabledMessage(state); + state->CheckAndAddCodegenDisabledMessage(runtime_profile()); return Status::OK(); } diff --git a/be/src/exec/topn-node.cc b/be/src/exec/topn-node.cc index 31bf58b..97c05f9 100644 --- a/be/src/exec/topn-node.cc +++ b/be/src/exec/topn-node.cc @@ -79,7 +79,7 @@ Status TopNNode::Prepare(RuntimeState* state) { new TupleRowComparator(ordering_exprs_, is_asc_order_, nulls_first_)); output_tuple_desc_ = row_descriptor_.tuple_descriptors()[0]; insert_batch_timer_ = ADD_TIMER(runtime_profile(), "InsertBatchTime"); - AddCodegenDisabledMessage(state); + state->CheckAndAddCodegenDisabledMessage(runtime_profile()); tuple_pool_reclaim_counter_ = ADD_COUNTER(runtime_profile(), "TuplePoolReclamations", TUnit::UNIT); return Status::OK(); diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 89cbbb9..7dcf45c 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -47,6 +47,7 @@ add_library(Runtime krpc-data-stream-mgr.cc krpc-data-stream-recvr.cc krpc-data-stream-sender.cc + krpc-data-stream-sender-ir.cc lib-cache.cc mem-tracker.cc mem-pool.cc diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc index 629fb96..e6fa489 100644 --- a/be/src/runtime/fragment-instance-state.cc +++ b/be/src/runtime/fragment-instance-state.cc @@ -247,6 +247,8 @@ Status FragmentInstanceState::Open() { SCOPED_THREAD_COUNTER_MEASUREMENT( runtime_state_->codegen()->llvm_thread_counters()); exec_tree_->Codegen(runtime_state_); + sink_->Codegen(runtime_state_->codegen()); + // It shouldn't be fatal to fail codegen. However, until IMPALA-4233 is fixed, // ScalarFnCall has no fall back to interpretation when codegen fails so propagates // the error status for now. diff --git a/be/src/runtime/krpc-data-stream-sender-ir.cc b/be/src/runtime/krpc-data-stream-sender-ir.cc new file mode 100644 index 0000000..335e16f --- /dev/null +++ b/be/src/runtime/krpc-data-stream-sender-ir.cc @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exprs/scalar-expr-evaluator.h" +#include "runtime/krpc-data-stream-sender.h" +#include "runtime/raw-value.h" +#include "runtime/row-batch.h" + +namespace impala { + +ScalarExprEvaluator* KrpcDataStreamSender::GetPartitionExprEvaluator(int i) { + return partition_expr_evals_[i]; +} + +Status KrpcDataStreamSender::HashAndAddRows(RowBatch* batch) { + const int num_rows = batch->num_rows(); + const int num_channels = GetNumChannels(); + int channel_ids[RowBatch::HASH_BATCH_SIZE]; + int row_idx = 0; + while (row_idx < num_rows) { + int row_count = 0; + FOREACH_ROW_LIMIT(batch, row_idx, RowBatch::HASH_BATCH_SIZE, row_batch_iter) { + TupleRow* row = row_batch_iter.Get(); + channel_ids[row_count++] = HashRow(row) % num_channels; + } + row_count = 0; + FOREACH_ROW_LIMIT(batch, row_idx, RowBatch::HASH_BATCH_SIZE, row_batch_iter) { + RETURN_IF_ERROR(AddRowToChannel(channel_ids[row_count++], row_batch_iter.Get())); + } + row_idx += row_count; + } + return Status::OK(); +} + +} diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc index cd30f06..74eb129 100644 --- a/be/src/runtime/krpc-data-stream-sender.cc +++ b/be/src/runtime/krpc-data-stream-sender.cc @@ -25,6 +25,8 @@ #include <thrift/protocol/TDebugProtocol.h> #include "common/logging.h" +#include "codegen/codegen-anyval.h" +#include "codegen/llvm-codegen.h" #include "exec/kudu-util.h" #include "exprs/scalar-expr.h" #include "exprs/scalar-expr-evaluator.h" @@ -61,6 +63,10 @@ DECLARE_int32(rpc_retry_interval_ms); namespace impala { +const char* KrpcDataStreamSender::HASH_ROW_SYMBOL = + "KrpcDataStreamSender7HashRowEPNS_8TupleRowE"; +const char* KrpcDataStreamSender::LLVM_CLASS_NAME = "class.impala::KrpcDataStreamSender"; + // A datastream sender may send row batches to multiple destinations. There is one // channel for each destination. // @@ -645,6 +651,7 @@ Status KrpcDataStreamSender::Prepare( for (int i = 0; i < channels_.size(); ++i) { RETURN_IF_ERROR(channels_[i]->Init(state)); } + state->CheckAndAddCodegenDisabledMessage(profile()); return Status::OK(); } @@ -653,6 +660,198 @@ Status KrpcDataStreamSender::Open(RuntimeState* state) { return ScalarExprEvaluator::Open(partition_expr_evals_, state); } +// +// An example of generated code with int type. +// +// define i64 @KrpcDataStreamSenderHashRow(%"class.impala::KrpcDataStreamSender"* %this, +// %"class.impala::TupleRow"* %row) #46 { +// entry: +// %0 = alloca i32 +// %1 = call %"class.impala::ScalarExprEvaluator"* +// @_ZN6impala20KrpcDataStreamSender25GetPartitionExprEvaluatorEi( +// %"class.impala::KrpcDataStreamSender"* %this, i32 0) +// %partition_val = call i64 @GetSlotRef( +// %"class.impala::ScalarExprEvaluator"* %1, %"class.impala::TupleRow"* %row) +// %is_null = trunc i64 %partition_val to i1 +// br i1 %is_null, label %is_null_block, label %not_null_block +// +// is_null_block: ; preds = %entry +// br label %hash_val_block +// +// not_null_block: ; preds = %entry +// %2 = ashr i64 %partition_val, 32 +// %3 = trunc i64 %2 to i32 +// store i32 %3, i32* %0 +// %native_ptr = bitcast i32* %0 to i8* +// br label %hash_val_block +// +// hash_val_block: ; preds = %not_null_block, %is_null_block +// %val_ptr_phi = phi i8* [ %native_ptr, %not_null_block ], [ null, %is_null_block ] +// %hash_val = call i64 +// @_ZN6impala8RawValue20GetHashValueFastHashEPKvRKNS_10ColumnTypeEm( +// i8* %val_ptr_phi, %"struct.impala::ColumnType"* @expr_type_arg, +// i64 7403188670037225271) +// ret i64 %hash_val +// } +Status KrpcDataStreamSender::CodegenHashRow(LlvmCodeGen* codegen, llvm::Function** fn) { + llvm::LLVMContext& context = codegen->context(); + LlvmBuilder builder(context); + + LlvmCodeGen::FnPrototype prototype( + codegen, "KrpcDataStreamSenderHashRow", codegen->i64_type()); + prototype.AddArgument( + LlvmCodeGen::NamedVariable("this", codegen->GetNamedPtrType(LLVM_CLASS_NAME))); + prototype.AddArgument( + LlvmCodeGen::NamedVariable("row", codegen->GetStructPtrType<TupleRow>())); + + llvm::Value* args[2]; + llvm::Function* hash_row_fn = prototype.GeneratePrototype(&builder, args); + llvm::Value* this_arg = args[0]; + llvm::Value* row_arg = args[1]; + + // Store the initial seed to hash_val + llvm::Value* hash_val = codegen->GetI64Constant(EXCHANGE_HASH_SEED); + + // Unroll the loop and codegen each of the partition expressions + for (int i = 0; i < partition_exprs_.size(); ++i) { + llvm::Function* compute_fn; + RETURN_IF_ERROR(partition_exprs_[i]->GetCodegendComputeFn(codegen, &compute_fn)); + + // Load the expression evaluator for the i-th partition expression + llvm::Function* get_expr_eval_fn = + codegen->GetFunction(IRFunction::KRPC_DSS_GET_PART_EXPR_EVAL, false); + DCHECK(get_expr_eval_fn != nullptr); + llvm::Value* expr_eval_arg = + builder.CreateCall(get_expr_eval_fn, {this_arg, codegen->GetI32Constant(i)}); + + // Compute the value against the i-th partition expression + llvm::Value* compute_fn_args[] = {expr_eval_arg, row_arg}; + CodegenAnyVal partition_val = CodegenAnyVal::CreateCallWrapped(codegen, &builder, + partition_exprs_[i]->type(), compute_fn, compute_fn_args, "partition_val"); + + llvm::BasicBlock* is_null_block = + llvm::BasicBlock::Create(context, "is_null_block", hash_row_fn); + llvm::BasicBlock* not_null_block = + llvm::BasicBlock::Create(context, "not_null_block", hash_row_fn); + llvm::BasicBlock* hash_val_block = + llvm::BasicBlock::Create(context, "hash_val_block", hash_row_fn); + + // Check if 'partition_val' is NULL + llvm::Value* val_is_null = partition_val.GetIsNull(); + builder.CreateCondBr(val_is_null, is_null_block, not_null_block); + + // Set the pointer to NULL in case 'partition_val' evaluates to NULL + builder.SetInsertPoint(is_null_block); + llvm::Value* null_ptr = codegen->null_ptr_value(); + builder.CreateBr(hash_val_block); + + // Saves 'partition_val' on the stack and passes a pointer to it to the hash function + builder.SetInsertPoint(not_null_block); + llvm::Value* native_ptr = partition_val.ToNativePtr(); + native_ptr = builder.CreatePointerCast(native_ptr, codegen->ptr_type(), "native_ptr"); + builder.CreateBr(hash_val_block); + + // Picks the input value to hash function + builder.SetInsertPoint(hash_val_block); + llvm::PHINode* val_ptr_phi = builder.CreatePHI(codegen->ptr_type(), 2, "val_ptr_phi"); + val_ptr_phi->addIncoming(native_ptr, not_null_block); + val_ptr_phi->addIncoming(null_ptr, is_null_block); + + // Creates a global constant of the partition expression's ColumnType. It has to be a + // constant for constant propagation and dead code elimination in 'get_hash_value_fn' + llvm::Type* col_type = codegen->GetStructType<ColumnType>(); + llvm::Constant* expr_type_arg = codegen->ConstantToGVPtr( + col_type, partition_exprs_[i]->type().ToIR(codegen), "expr_type_arg"); + + // Update 'hash_val' with the new 'partition-val' + llvm::Value* get_hash_value_args[] = {val_ptr_phi, expr_type_arg, hash_val}; + llvm::Function* get_hash_value_fn = + codegen->GetFunction(IRFunction::RAW_VALUE_GET_HASH_VALUE_FAST_HASH, false); + DCHECK(get_hash_value_fn != nullptr); + hash_val = builder.CreateCall(get_hash_value_fn, get_hash_value_args, "hash_val"); + } + + builder.CreateRet(hash_val); + *fn = codegen->FinalizeFunction(hash_row_fn); + if (*fn == nullptr) { + return Status("Codegen'd KrpcDataStreamSenderHashRow() fails verification. See log"); + } + + return Status::OK(); +} + +string KrpcDataStreamSender::PartitionTypeName() const { + switch (partition_type_) { + case TPartitionType::UNPARTITIONED: + return "Unpartitioned"; + case TPartitionType::HASH_PARTITIONED: + return "Hash Partitioned"; + case TPartitionType::RANDOM: + return "Random Partitioned"; + case TPartitionType::KUDU: + return "Kudu Partitioned"; + default: + DCHECK(false) << partition_type_; + return ""; + } +} + +void KrpcDataStreamSender::Codegen(LlvmCodeGen* codegen) { + const string sender_name = PartitionTypeName() + " Sender"; + if (partition_type_ != TPartitionType::HASH_PARTITIONED) { + const string& msg = Substitute("not $0", + partition_type_ == TPartitionType::KUDU ? "supported" : "needed"); + profile()->AddCodegenMsg(false, msg, sender_name); + return; + } + + llvm::Function* hash_row_fn; + Status codegen_status = CodegenHashRow(codegen, &hash_row_fn); + if (codegen_status.ok()) { + llvm::Function* hash_and_add_rows_fn = + codegen->GetFunction(IRFunction::KRPC_DSS_HASH_AND_ADD_ROWS, true); + DCHECK(hash_and_add_rows_fn != nullptr); + + int num_replaced; + // Replace GetNumChannels() with a constant. + num_replaced = codegen->ReplaceCallSitesWithValue(hash_and_add_rows_fn, + codegen->GetI32Constant(GetNumChannels()), "GetNumChannels"); + DCHECK_EQ(num_replaced, 1); + + // Replace HashRow() with the handcrafted IR function. + num_replaced = codegen->ReplaceCallSites(hash_and_add_rows_fn, + hash_row_fn, HASH_ROW_SYMBOL); + DCHECK_EQ(num_replaced, 1); + + hash_and_add_rows_fn = codegen->FinalizeFunction(hash_and_add_rows_fn); + if (hash_and_add_rows_fn == nullptr) { + codegen_status = + Status("Codegen'd HashAndAddRows() failed verification. See log"); + } else { + codegen->AddFunctionToJit(hash_and_add_rows_fn, + reinterpret_cast<void**>(&hash_and_add_rows_fn_)); + } + } + profile()->AddCodegenMsg(codegen_status.ok(), codegen_status, sender_name); +} + +Status KrpcDataStreamSender::AddRowToChannel(const int channel_id, TupleRow* row) { + return channels_[channel_id]->AddRow(row); +} + +uint64_t KrpcDataStreamSender::HashRow(TupleRow* row) { + uint64_t hash_val = EXCHANGE_HASH_SEED; + for (ScalarExprEvaluator* eval : partition_expr_evals_) { + void* partition_val = eval->GetValue(row); + // We can't use the crc hash function here because it does not result in + // uncorrelated hashes with different seeds. Instead we use FastHash. + // TODO: fix crc hash/GetHashValue() + hash_val = RawValue::GetHashValueFastHash( + partition_val, eval->root().type(), hash_val); + } + return hash_val; +} + Status KrpcDataStreamSender::Send(RuntimeState* state, RowBatch* batch) { SCOPED_TIMER(profile()->total_time_counter()); DCHECK(!closed_); @@ -703,40 +902,10 @@ Status KrpcDataStreamSender::Send(RuntimeState* state, RowBatch* batch) { } } else { DCHECK_EQ(partition_type_, TPartitionType::HASH_PARTITIONED); - // hash-partition batch's rows across channels - // TODO: encapsulate this in an Expr as we've done for Kudu above and remove this case - // once we have codegen here. - int num_channels = channels_.size(); - const int num_partition_exprs = partition_exprs_.size(); - const int num_rows = batch->num_rows(); - const int hash_batch_size = RowBatch::HASH_BATCH_SIZE; - int channel_ids[hash_batch_size]; - - // Break the loop into two parts break the data dependency between computing - // the hash and calling AddRow() - // To keep stack allocation small a RowBatch::HASH_BATCH is used - for (int batch_start = 0; batch_start < num_rows; batch_start += hash_batch_size) { - int batch_window_size = min(num_rows - batch_start, hash_batch_size); - for (int i = 0; i < batch_window_size; ++i) { - TupleRow* row = batch->GetRow(i + batch_start); - uint64_t hash_val = EXCHANGE_HASH_SEED; - for (int j = 0; j < num_partition_exprs; ++j) { - ScalarExprEvaluator* eval = partition_expr_evals_[j]; - void* partition_val = eval->GetValue(row); - // We can't use the crc hash function here because it does not result in - // uncorrelated hashes with different seeds. Instead we use FastHash. - // TODO: fix crc hash/GetHashValue() - DCHECK(&(eval->root()) == partition_exprs_[j]); - hash_val = RawValue::GetHashValueFastHash( - partition_val, partition_exprs_[j]->type(), hash_val); - } - channel_ids[i] = hash_val % num_channels; - } - - for (int i = 0; i < batch_window_size; ++i) { - TupleRow* row = batch->GetRow(i + batch_start); - RETURN_IF_ERROR(channels_[channel_ids[i]]->AddRow(row)); - } + if (hash_and_add_rows_fn_ != nullptr) { + RETURN_IF_ERROR(hash_and_add_rows_fn_(this, batch)); + } else { + RETURN_IF_ERROR(HashAndAddRows(batch)); } } COUNTER_ADD(total_sent_rows_counter_, batch->num_rows()); diff --git a/be/src/runtime/krpc-data-stream-sender.h b/be/src/runtime/krpc-data-stream-sender.h index e6c6ccf..6757c2a 100644 --- a/be/src/runtime/krpc-data-stream-sender.h +++ b/be/src/runtime/krpc-data-stream-sender.h @@ -23,9 +23,11 @@ #include <string> #include "exec/data-sink.h" +#include "codegen/impala-ir.h" #include "common/global-types.h" #include "common/object-pool.h" #include "common/status.h" +#include "exprs/scalar-expr.h" #include "runtime/row-batch.h" #include "util/runtime-profile.h" @@ -48,7 +50,7 @@ class TPlanFragmentDestination; /// TODO: create a PlanNode equivalent class for DataSink. class KrpcDataStreamSender : public DataSink { public: - /// Construct a sender according to the output specification (tsink), sending to the + /// Constructs a sender according to the output specification (tsink), sending to the /// given destinations: /// 'sender_id' identifies this sender instance, and is unique within a fragment. /// 'row_desc' is the descriptor of the tuple row. It must out-live the sink. @@ -65,40 +67,44 @@ class KrpcDataStreamSender : public DataSink { virtual ~KrpcDataStreamSender(); - /// Initialize the sender by initializing all the channels and allocates all + /// Initializes the sender by initializing all the channels and allocates all /// the stat counters. Return error status if any channels failed to initialize. - virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker); + virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) override; - /// Initialize the evaluator of the partitioning expressions. Return error status + /// Codegen HashAndAddRows() if partitioning type is HASH_PARTITIONED. + /// Replaces HashRow() and GetNumChannels() based on runtime information. + virtual void Codegen(LlvmCodeGen* codegen) override; + + /// Initializes the evaluator of the partitioning expressions. Return error status /// if initialization failed. - virtual Status Open(RuntimeState* state); + virtual Status Open(RuntimeState* state) override; - /// Flush all buffered data and close all existing channels to destination hosts. + /// Flushes all buffered data and close all existing channels to destination hosts. /// Further Send() calls are illegal after calling FlushFinal(). It is legal to call /// FlushFinal() no more than once. Return error status if Send() failed or the end /// of stream call failed. - virtual Status FlushFinal(RuntimeState* state); + virtual Status FlushFinal(RuntimeState* state) override; - /// Send data in 'batch' to destination nodes according to partitioning + /// Sends data in 'batch' to destination nodes according to partitioning /// specification provided in c'tor. /// Blocks until all rows in batch are placed in their appropriate outgoing /// buffers (ie, blocks if there are still in-flight rpcs from the last /// Send() call). - virtual Status Send(RuntimeState* state, RowBatch* batch); + virtual Status Send(RuntimeState* state, RowBatch* batch) override; /// Shutdown all existing channels to destination hosts. Further FlushFinal() calls are /// illegal after calling Close(). - virtual void Close(RuntimeState* state); + virtual void Close(RuntimeState* state) override; protected: friend class DataStreamTest; - /// Initialize any partitioning expressions based on 'thrift_output_exprs' and stores + /// Initializes any partitioning expressions based on 'thrift_output_exprs' and stores /// them in 'partition_exprs_'. Returns error status if the initialization failed. virtual Status Init(const std::vector<TExpr>& thrift_output_exprs, - const TDataSink& tsink, RuntimeState* state); + const TDataSink& tsink, RuntimeState* state) override; - /// Return total number of bytes sent. If batches are broadcast to multiple receivers, + /// Returns total number of bytes sent. If batches are broadcast to multiple receivers, /// they are counted once per receiver. int64_t GetNumDataBytesSent() const; @@ -111,6 +117,35 @@ class KrpcDataStreamSender : public DataSink { /// updating the stat counters. Status SerializeBatch(RowBatch* src, OutboundRowBatch* dest, int num_receivers = 1); + /// Returns 'partition_expr_evals_[i]'. Used by the codegen'd HashRow() IR function. + ScalarExprEvaluator* GetPartitionExprEvaluator(int i); + + /// Returns the number of channels in this data stream sender. Not inlined for the + /// cross-compiled code as it's to be replaced with a constant during codegen. + int IR_NO_INLINE GetNumChannels() const { return channels_.size(); } + + /// Evaluates the input row against partition expressions and hashes the expression + /// values. Returns the final hash value. + uint64_t HashRow(TupleRow* row); + + /// Used when 'partition_type_' is HASH_PARTITIONED. Call HashRow() against each row + /// in the input batch and adds it to the corresponding channel based on the hash value. + /// Cross-compiled to be patched by Codegen() at runtime. Returns error status if + /// insertion into the channel fails. Returns OK status otherwise. + Status HashAndAddRows(RowBatch* batch); + + /// Adds the given row to 'channels_[channel_id]'. + Status AddRowToChannel(const int channel_id, TupleRow* row); + + /// Codegen the HashRow() function and returns the codegen'd function in 'fn'. + /// This involves unrolling the loop in HashRow(), codegens each of the partition + /// expressions and replaces the column type argument to the hash function with + /// constants to eliminate some branches. Returns error status on failure. + Status CodegenHashRow(LlvmCodeGen* codegen, llvm::Function** fn); + + /// Returns the name of the partitioning type of this data stream sender. + string PartitionTypeName() const; + /// Sender instance id, unique within a fragment. const int sender_id_; @@ -187,8 +222,18 @@ class KrpcDataStreamSender : public DataSink { /// or when errors are encountered. int next_unknown_partition_; + /// Types and pointers for the codegen'd HashAndAddRows() functions. + /// NULL if codegen is disabled or failed. + typedef Status (*HashAndAddRowsFn)(KrpcDataStreamSender*, RowBatch* row); + HashAndAddRowsFn hash_and_add_rows_fn_ = nullptr; + + /// KrpcDataStreamSender::HashRow() symbol. Used for call-site replacement. + static const char* HASH_ROW_SYMBOL; + /// An arbitrary hash seed used for exchanges. static constexpr uint64_t EXCHANGE_HASH_SEED = 0x66bd68df22c3ef37; + + static const char* LLVM_CLASS_NAME; }; } // namespace impala diff --git a/be/src/runtime/raw-value-ir.cc b/be/src/runtime/raw-value-ir.cc index 93b77f2..2e44d6a 100644 --- a/be/src/runtime/raw-value-ir.cc +++ b/be/src/runtime/raw-value-ir.cc @@ -23,6 +23,7 @@ #include "runtime/raw-value.inline.h" #include "runtime/string-value.inline.h" #include "runtime/timestamp-value.h" +#include "util/hash-util.h" using namespace impala; @@ -162,3 +163,30 @@ uint32_t IR_ALWAYS_INLINE RawValue::GetHashValue( return 0; } } + +uint64_t IR_ALWAYS_INLINE RawValue::GetHashValueFastHash(const void* v, + const ColumnType& type, uint64_t seed) { + // Hash with an arbitrary constant to ensure we don't return seed. + if (v == nullptr) { + return HashUtil::FastHash64(&HASH_VAL_NULL, sizeof(HASH_VAL_NULL), seed); + } + switch (type.type) { + case TYPE_STRING: + case TYPE_VARCHAR: { + const StringValue* string_value = reinterpret_cast<const StringValue*>(v); + return HashUtil::FastHash64(string_value->ptr, + static_cast<size_t>(string_value->len), seed); + } + case TYPE_BOOLEAN: return HashUtil::FastHash64(v, 1, seed); + case TYPE_TINYINT: return HashUtil::FastHash64(v, 1, seed); + case TYPE_SMALLINT: return HashUtil::FastHash64(v, 2, seed); + case TYPE_INT: return HashUtil::FastHash64(v, 4, seed); + case TYPE_BIGINT: return HashUtil::FastHash64(v, 8, seed); + case TYPE_FLOAT: return HashUtil::FastHash64(v, 4, seed); + case TYPE_DOUBLE: return HashUtil::FastHash64(v, 8, seed); + case TYPE_TIMESTAMP: return HashUtil::FastHash64(v, 12, seed); + case TYPE_CHAR: return HashUtil::FastHash64(v, type.len, seed); + case TYPE_DECIMAL: return HashUtil::FastHash64(v, type.GetByteSize(), seed); + default: DCHECK(false); return 0; + } +} diff --git a/be/src/runtime/raw-value.cc b/be/src/runtime/raw-value.cc index 1db9741..d030460 100644 --- a/be/src/runtime/raw-value.cc +++ b/be/src/runtime/raw-value.cc @@ -193,33 +193,6 @@ void RawValue::Write(const void* value, Tuple* tuple, const SlotDescriptor* slot } } -uint64_t RawValue::GetHashValueFastHash(const void* v, const ColumnType& type, - uint64_t seed) { - // Hash with an arbitrary constant to ensure we don't return seed. - if (v == nullptr) { - return HashUtil::FastHash64(&HASH_VAL_NULL, sizeof(HASH_VAL_NULL), seed); - } - switch (type.type) { - case TYPE_STRING: - case TYPE_VARCHAR: { - const StringValue* string_value = reinterpret_cast<const StringValue*>(v); - return HashUtil::FastHash64(string_value->ptr, - static_cast<size_t>(string_value->len), seed); - } - case TYPE_BOOLEAN: return HashUtil::FastHash64(v, 1, seed); - case TYPE_TINYINT: return HashUtil::FastHash64(v, 1, seed); - case TYPE_SMALLINT: return HashUtil::FastHash64(v, 2, seed); - case TYPE_INT: return HashUtil::FastHash64(v, 4, seed); - case TYPE_BIGINT: return HashUtil::FastHash64(v, 8, seed); - case TYPE_FLOAT: return HashUtil::FastHash64(v, 4, seed); - case TYPE_DOUBLE: return HashUtil::FastHash64(v, 8, seed); - case TYPE_TIMESTAMP: return HashUtil::FastHash64(v, 12, seed); - case TYPE_CHAR: return HashUtil::FastHash64(v, type.len, seed); - case TYPE_DECIMAL: return HashUtil::FastHash64(v, type.GetByteSize(), seed); - default: DCHECK(false); return 0; - } -} - void RawValue::PrintValue( const void* value, const ColumnType& type, int scale, std::stringstream* stream) { if (value == NULL) { diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h index 47eaf7a..8901d7e 100644 --- a/be/src/runtime/runtime-state.h +++ b/be/src/runtime/runtime-state.h @@ -152,6 +152,15 @@ class RuntimeState { /// expressions' Prepare() are invoked. bool ScalarFnNeedsCodegen() const { return !scalar_fns_to_codegen_.empty(); } + /// Check if codegen was disabled and if so, add a message to the runtime profile. + void CheckAndAddCodegenDisabledMessage(RuntimeProfile* profile) { + if (CodegenDisabledByQueryOption()) { + profile->AddCodegenMsg(false, "disabled by query option DISABLE_CODEGEN"); + } else if (CodegenDisabledByHint()) { + profile->AddCodegenMsg(false, "disabled due to optimization hints"); + } + } + /// Returns true if there is a hint to disable codegen. This can be true for single node /// optimization or expression evaluation request from FE to BE (see fe-support.cc). /// Note that this internal flag is advisory and it may be ignored if the fragment has diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h index d0e296f..cae2462 100644 --- a/be/src/util/runtime-profile.h +++ b/be/src/util/runtime-profile.h @@ -234,7 +234,8 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s /// then no error occurred). void AddCodegenMsg(bool codegen_enabled, const Status& codegen_status, const std::string& extra_label = "") { - AddCodegenMsg(codegen_enabled, codegen_status.GetDetail(), extra_label); + const string& err_msg = codegen_status.ok() ? "" : codegen_status.msg().msg(); + AddCodegenMsg(codegen_enabled, err_msg, extra_label); } /// Creates and returns a new EventSequence (owned by the runtime diff --git a/testdata/workloads/functional-query/queries/QueryTest/datastream-sender-codegen.test b/testdata/workloads/functional-query/queries/QueryTest/datastream-sender-codegen.test new file mode 100644 index 0000000..ad396ad --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/datastream-sender-codegen.test @@ -0,0 +1,41 @@ +==== +---- QUERY +set disable_codegen_rows_threshold=0; +select count(*) from alltypes t1 + join /* +SHUFFLE */ alltypes t2 + on t1.int_col= t2.int_col and + t1.string_col = t2.string_col +---- RESULTS +5329000 +---- TYPES +bigint +---- RUNTIME_PROFILE +# Verify that codegen was enabled +row_regex: .*Hash Partitioned Sender Codegen Enabled.* +==== +---- QUERY +set disable_codegen_rows_threshold=0; +select count(*) from alltypes t1 + join /* +BROADCAST */ alltypes t2 + on t1.int_col= t2.int_col and + t1.string_col = t2.string_col +---- RESULTS +5329000 +---- TYPES +bigint +---- RUNTIME_PROFILE +# Verify that codegen was enabled +row_regex: .*Unpartitioned Sender Codegen Disabled: not needed.* +==== +---- QUERY +set disable_codegen_rows_threshold=0; +select count(*) from chars_tiny t1 + join /* +SHUFFLE */ chars_tiny t2 on t1.cs=t2.cs; +---- RESULTS +10 +---- TYPES +bigint +---- RUNTIME_PROFILE +# Verify that codegen was disabled +row_regex: .*Hash Partitioned Sender Codegen Disabled: Codegen for Char not supported.* +==== diff --git a/tests/query_test/test_codegen.py b/tests/query_test/test_codegen.py index ccd85e1..a4c02f5 100644 --- a/tests/query_test/test_codegen.py +++ b/tests/query_test/test_codegen.py @@ -18,6 +18,7 @@ # Tests end-to-end codegen behaviour. from tests.common.impala_test_suite import ImpalaTestSuite +from tests.common.skip import SkipIf from tests.common.test_dimensions import create_exec_option_dimension_from_dict from tests.common.test_result_verifier import get_node_exec_options,\ assert_codegen_enabled @@ -49,4 +50,9 @@ class TestCodegen(ImpalaTestSuite): exec_options = get_node_exec_options(result.runtime_profile, 1) # Make sure test fails if there are no exec options in the profile for the node assert len(exec_options) > 0 - assert_codegen_enabled(result.runtime_profile, [1]) \ No newline at end of file + assert_codegen_enabled(result.runtime_profile, [1]) + + @SkipIf.not_krpc + def test_datastream_sender_codegen(self, vector): + """Test the KrpcDataStreamSender's codegen logic""" + self.run_test_case('QueryTest/datastream-sender-codegen', vector)