CodiumAI-Agent commented on PR #8518: URL: https://github.com/apache/incubator-gluten/pull/8518#issuecomment-2649962827
## PR Reviewer Guide ๐ Here are some key observations to aid the review process: <table> <tr><td> **๐ซ Ticket compliance analysis ๐ถ** **[8492](https://github.com/apache/incubator-gluten/issues/8492) - Partially compliant** Compliant requirements: - Offload `RangeExec` to the Gluten framework to improve performance. - Ensure compatibility with existing unit tests and handle any necessary exclusions. Non-compliant requirements: - Investigate and resolve performance issues with the `sum` function using a filter in the ClickHouse backend compared to vanilla Spark. Requires further human verification: - Investigate and resolve performance issues with the `sum` function using a filter in the ClickHouse backend compared to vanilla Spark. </td></tr> <tr><td>โฑ๏ธ <strong>Estimated effort to review</strong>: 4 ๐ต๐ต๐ต๐ตโช</td></tr> <tr><td>๐งช <strong>No relevant tests</strong></td></tr> <tr><td>๐ <strong>No security concerns identified</strong></td></tr> <tr><td>โก <strong>Recommended focus areas for review</strong><br><br> <details><summary><a href='https://github.com/apache/incubator-gluten/pull/8518/files#diff-bbe7f714378f62b2168d1da2842d145c0bf8844371a561aeb949a978a5248930R84-R107'><strong>Possible Issue</strong></a> The `doTransform` method in `CHRangeExecTransformer` introduces a new optimization content for range execution. Ensure that this transformation is correctly integrated and does not introduce regressions or unexpected behavior.</summary> ```scala override def doTransform(context: SubstraitContext): TransformContext = { val output = outputAttributes val typeNodes = ConverterUtils.collectAttributeTypeNodes(output) val nameList = ConverterUtils.collectAttributeNamesWithoutExprId(output) val columnTypeNodes = JavaConverters .seqAsJavaListConverter( output.map(attr => new ColumnTypeNode(NamedStruct.ColumnType.NORMAL_COL))) .asJava val optimizationContent = s"isRange=1\n" val optimization = BackendsApiManager.getTransformerApiInstance.packPBMessage( StringValue.newBuilder.setValue(optimizationContent).build) val extensionNode = ExtensionBuilder.makeAdvancedExtension(optimization, null) val readNode = RelBuilder.makeReadRel( typeNodes, nameList, columnTypeNodes, null, extensionNode, context, context.nextOperatorId(this.nodeName)) TransformContext(output, readNode) ``` </details> <details><summary><a href='https://github.com/apache/incubator-gluten/pull/8518/files#diff-fdcfc5e7f76481a7268b7e1e17d464c18b47f4dec38220f88138dab37913504fR33-R129'><strong>Performance Concern</strong></a> The `SourceFromRange` implementation includes logic for handling large ranges and step sizes. Validate that the logic is efficient and does not introduce performance bottlenecks, especially for edge cases.</summary> ```c++ SourceFromRange::SourceFromRange(const DB::Block & header_, Int64 start_, Int64 end_, Int64 step_, Int32 num_slices_, Int32 slice_index_, size_t max_block_size_) : DB::ISource(header_) , start(start_) , end(end_) , step(step_) , num_slices(num_slices_) , slice_index(slice_index_) , max_block_size(max_block_size_) , num_elements(getNumElements()) , is_empty_range(start == end ) { const auto & header = getOutputs().front().getHeader(); if (header.columns() != 1) throw Exception(ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH, "Expected 1 column, got {}", header.columns()); if (!header.getByPosition(0).type->equals(DataTypeInt64())) throw Exception(ErrorCodes::TYPE_MISMATCH, "Expected Int64 column, got {}", header.getByPosition(0).type->getName()); if (step == 0) throw Exception(ErrorCodes::LOGICAL_ERROR, "Step cannot be zero"); Int128 partition_start = (slice_index * num_elements) / num_slices * step + start; Int128 partition_end = (((slice_index + 1) * num_elements) / num_slices) * step + start; auto get_safe_margin = [](Int128 bi) -> Int64 { if (bi <= std::numeric_limits<Int64>::max() && bi >= std::numeric_limits<Int64>::min()) return static_cast<Int64>(bi); else if (bi > 0) return std::numeric_limits<Int64>::max(); else return std::numeric_limits<Int64>::min(); }; safe_partition_start = get_safe_margin(partition_start); safe_partition_end = get_safe_margin(partition_end); current = safe_partition_start; previous = 0; overflow = false; } Int128 SourceFromRange::getNumElements() const { const auto safe_start = static_cast<Int128>(start); const auto safe_end = static_cast<Int128>(end); if ((safe_end - safe_start) % step == 0 || (safe_end > safe_start) != (step > 0)) { return (safe_end - safe_start) / step; } else { // the remainder has the same sign with range, could add 1 more return (safe_end - safe_start) / step + 1; } } DB::Chunk SourceFromRange::generate() { if (is_empty_range) return {}; if (overflow || (step > 0 && current >= safe_partition_end) || (step < 0 && current <= safe_partition_end)) return {}; auto column = DB::ColumnInt64::create(); auto & data = column->getData(); data.resize_exact(max_block_size); size_t row_i = 0; if (step > 0) { for (; current < safe_partition_end && !overflow && row_i < max_block_size; ++row_i) { previous = current; data[row_i] = current; current += step; overflow = current < previous; } } else { for (; current > safe_partition_end && !overflow && row_i < max_block_size; ++row_i) { previous = current; data[row_i] = current; current += step; overflow = current > previous; } } data.resize_exact(row_i); // std::cout << "gen rows:" << column->size() << std::endl; DB::Columns columns; columns.push_back(std::move(column)); return DB::Chunk(std::move(columns), row_i); } ``` </details> <details><summary><a href='https://github.com/apache/incubator-gluten/pull/8518/files#diff-ff67a302400e7f434aca8fa3fc7c424cb671daf517ca21a892b9c47fae3d36cfR55-R157'><strong>Code Complexity</strong></a> The `parse` method in `ReadRelParser` has been extended to handle multiple types of read relations, including ranges. Ensure that the added complexity is manageable and well-tested.</summary> ```c++ ReadRelParser::parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack) { if (query_plan) throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Source node's input plan should be null"); const auto & read = rel.read(); if (isReadRelFromMergeTree(read)) { substrait::ReadRel::ExtensionTable extension_table; if (read.has_extension_table()) extension_table = read.extension_table(); else { extension_table = BinaryToMessage<substrait::ReadRel::ExtensionTable>(split_info); debug::dumpMessage(extension_table, "extension_table"); } MergeTreeRelParser merge_tree_parser(parser_context, getContext()); query_plan = merge_tree_parser.parseReadRel(std::make_unique<DB::QueryPlan>(), read, extension_table); steps = merge_tree_parser.getSteps(); } else if (isReadRelFromLocalFile(read) || isReadRelFromJavaIter(read) || isReadRelFromRange(read)) { chassert(read.has_base_schema()); DB::QueryPlanStepPtr read_step; if (isReadRelFromJavaIter(read)) read_step = parseReadRelWithJavaIter(read); else if (isReadRelFromRange(read)) read_step = parseReadRelWithRange(read); else if (isReadRelFromLocalFile(read)) read_step = parseReadRelWithLocalFile(read); query_plan = std::make_unique<DB::QueryPlan>(); steps.emplace_back(read_step.get()); query_plan->addStep(std::move(read_step)); if (getContext()->getSettingsRef()[Setting::max_threads] > 1) { auto buffer_step = std::make_unique<BlocksBufferPoolStep>(query_plan->getCurrentHeader()); steps.emplace_back(buffer_step.get()); query_plan->addStep(std::move(buffer_step)); } } else if (isReadFromStreamKafka(read)) { StreamKafkaRelParser kafka_parser(parser_context, getContext()); kafka_parser.setSplitInfo(split_info); query_plan = kafka_parser.parse(std::make_unique<DB::QueryPlan>(), rel, rel_stack); steps = kafka_parser.getSteps(); } else throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Unknown read rel:{}", read.ShortDebugString()); return query_plan; } bool ReadRelParser::isReadRelFromJavaIter(const substrait::ReadRel & rel) { return rel.has_local_files() && rel.local_files().items().size() == 1 && rel.local_files().items().at(0).uri_file().starts_with("iterator"); } bool ReadRelParser::isReadRelFromLocalFile(const substrait::ReadRel & rel) { if (rel.has_local_files()) return !isReadRelFromJavaIter(rel); else return !rel.has_extension_table() && !isReadRelFromMergeTree(rel) && !isReadRelFromRange(rel) && !isReadFromStreamKafka(rel); } bool ReadRelParser::isReadRelFromMergeTree(const substrait::ReadRel & rel) { if (!rel.has_advanced_extension()) return false; google::protobuf::StringValue optimization; optimization.ParseFromString(rel.advanced_extension().optimization().value()); ReadBufferFromString in(optimization.value()); if (!checkString("isMergeTree=", in)) return false; bool is_merge_tree = false; readBoolText(is_merge_tree, in); assertChar('\n', in); return is_merge_tree; } bool ReadRelParser::isReadRelFromRange(const substrait::ReadRel & rel) { if (!rel.has_advanced_extension()) return false; google::protobuf::StringValue optimization; optimization.ParseFromString(rel.advanced_extension().optimization().value()); ReadBufferFromString in(optimization.value()); if (!checkString("isRange=", in)) return false; bool is_range = false; readBoolText(is_range, in); assertChar('\n', in); return is_range; ``` </details> </td></tr> </table> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
