Matthew Jacobs has posted comments on this change. Change subject: IMPALA-3742: Partitions and sort INSERTs for Kudu tables ......................................................................
Patch Set 2: (21 comments) http://gerrit.cloudera.org:8080/#/c/6559/2/be/src/exprs/kudu-partition-expr.cc File be/src/exprs/kudu-partition-expr.cc: PS2, Line 48: using namespace kudu::client we typically don't use inline 'using' statements PS2, Line 51: CreateKuduClient use QueryState::GetKuduClient Line 61: RegisterFunctionContext(ctx, state); comment the return value is ignored because the idx is stored on the base class and that is used directly when needed PS2, Line 72: if (table_schema.Column(col).is_nullable()) { this should never be true right now, PK cols are not nullable. per my comment below, I think if val == null we should just skip calling into PartitionRow for this row and just return round robin PS2, Line 80: return PartitionError(ctx, : ErrorMsg::Init(TErrorCode::KUDU_NULL_CONSTRAINT_VIOLATION, : table_desc_->table_name()).msg()); This will change the failure mode of our inserts, which is that NCV do not fail the query, they get accounted for in 'num row errors' metrics. What happens if you try to pass these rows into the partitioner? I guess it gives you an error? I think we should just round robin these for now. Maybe later we can try to do something smarter to avoid sending them around, but I don't want to mess up the accounting we do. It'll be best to leave the error handling logic in its home in the KuduTableSink. Line 87: if (!s.ok()) { I'm not sure if this will return an error we need to handle at runtime, i.e. we might be able to assert OK. Looks like there are 2 paths that can return errors: Kudu PartialRow::SetString fails, or if the type isn't supported. The latter definitely shouldn't happen unless we have a bug. Not sure if the first can fail at runtime in a way we need to handle it. Can you take a look? PS2, Line 95: if (!s.ok()) { : return PartitionError( : ctx, Substitute("Failed to determine Kudu partition for row: $0", s.ToString())); : } same thing about errors here, I think if this returns an error it means we have a bug in planning PS2, Line 110: ctx->fn_context(fn_context_index_)->AddWarning(msg.c_str()); I don't know if there are any errors we need to be reporting here. http://gerrit.cloudera.org:8080/#/c/6559/2/be/src/exprs/kudu-partition-expr.h File be/src/exprs/kudu-partition-expr.h: PS2, Line 30: number index PS2, Line 47: // /// http://gerrit.cloudera.org:8080/#/c/6559/1/be/src/scheduling/scheduler.cc File be/src/scheduling/scheduler.cc: Line 364: DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED I don't see what the point of this is. This enumerates possible values. http://gerrit.cloudera.org:8080/#/c/6559/1/common/thrift/Exprs.thrift File common/thrift/Exprs.thrift: PS1, Line 134: 4 what happened to 2 and 3? http://gerrit.cloudera.org:8080/#/c/6559/2/common/thrift/Partitions.thrift File common/thrift/Partitions.thrift: PS2, Line 50: // Expr that takes a row and returns its partition number. If specified, : // 'partition_exprs' will be empty. Used for TPartitionType::KUDU. : 3: optional Exprs.TExpr partition_expr how about for Kudu the expr just goes in partition_exprs[0] (and it's the only element in that list), then we can avoid special casing in the data structures and just handle them differently on the hash/kudu partitioning code paths? http://gerrit.cloudera.org:8080/#/c/6559/2/fe/src/main/java/org/apache/impala/analysis/KuduPartitionExpr.java File fe/src/main/java/org/apache/impala/analysis/KuduPartitionExpr.java: PS2, Line 49: artitionKeyExprs_ = Expr.cloneList(partitionKeyExprs); Is there a reason we keep a separate copy of partitionKeyExprs in addition to in children? PS2, Line 69: children_.get(i).uncheckedCastTo(partitionKeyExprs_.get(i).getType()); as we discussed, I feel like we shouldn't have to do this PS2, Line 75: isConstantImpl Should this be constant? * Returns true if this expression should be treated as constant. I.e. if the frontend * and backend should assume that two evaluations of the expression within a query will * return the same value. The only reason I think that assumption may not hold is if the kudu partitioning changes across Kudu clients, which is possible. Did you have anything else in mind? This could use a comment to explain. http://gerrit.cloudera.org:8080/#/c/6559/2/fe/src/main/java/org/apache/impala/planner/DataPartition.java File fe/src/main/java/org/apache/impala/planner/DataPartition.java: PS2, Line 45: // For hash partition: exprs used to compute hash value. : private List<Expr> partitionExprs_; : : // For kudu partition: expr that calls into the kudu client to get the partition number. : private Expr partitionExpr_; I think this will be cleaned up a bit if we just store the Kudu partition expr as the first (and only) element in partitionExprs_ http://gerrit.cloudera.org:8080/#/c/6559/2/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java File fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java: PS2, Line 251: DescriptorTable.TABLE_SINK_ID I don't think this is what you want. I think you want the table ID of the target table, which may or may not be this value. See DescriptorTable.java that this gets incremented when creating the tables. http://gerrit.cloudera.org:8080/#/c/6559/2/fe/src/main/java/org/apache/impala/planner/Planner.java File fe/src/main/java/org/apache/impala/planner/Planner.java: PS2, Line 541: DescriptorTable.TABLE_SINK_ID same as in DistributedPlanner, I don't think this is what you want PS2, Line 541: Expr partitionExpr = new KuduPartitionExpr(DescriptorTable.TABLE_SINK_ID, : insertStmt.getPartitionKeyExprs(), insertStmt.getPartitionKeyIdxs()); : partitionExpr.analyze(ctx_.getRootAnalyzer()); I wonder if we can/should clone the KuduPartitionExpr instead of creating it twice http://gerrit.cloudera.org:8080/#/c/6559/2/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test File testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test: PS2, Line 16: KUDU it'd be nice for this to indicate it's an exchange on the Kudu partitioning, this is vague -- To view, visit http://gerrit.cloudera.org:8080/6559 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I84ce0032a1b10958fdf31faef225372c5c38fdc4 Gerrit-PatchSet: 2 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Thomas Tauber-Marshall <tmarsh...@cloudera.com> Gerrit-Reviewer: Dimitris Tsirogiannis <dtsirogian...@cloudera.com> Gerrit-Reviewer: Matthew Jacobs <m...@cloudera.com> Gerrit-HasComments: Yes