This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-1.1-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.1-lts by this push:
new a894bed26d [fix](spark load)partition column is not duplicate key,
spark load IndexOutOfBounds error (#14724)
a894bed26d is described below
commit a894bed26d54a2680bcb38eb74faa2003c9ccafb
Author: gnehil <[email protected]>
AuthorDate: Mon Dec 5 09:15:07 2022 +0800
[fix](spark load)partition column is not duplicate key, spark load
IndexOutOfBounds error (#14724)
In spark load, the job failed when the partition column is not in the
duplicate key column list.
We traced the sparkDpp.java and found that, the partition columns ware
built with the key index in base metadata. But when partition column was not
key, an IndexOutOfBoundsException occurs when obtaining it through DppColumns.
Describe your changes.
---
.../org/apache/doris/load/loadv2/dpp/SparkDpp.java | 34 +++++++++++++---------
1 file changed, 20 insertions(+), 14 deletions(-)
diff --git
a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
index 763a0e9ee4..4331f4842b 100644
--- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
+++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
@@ -17,6 +17,7 @@
package org.apache.doris.load.loadv2.dpp;
+import com.google.common.collect.Maps;
import org.apache.doris.common.SparkDppException;
import org.apache.doris.load.loadv2.etl.EtlJobConfig;
import com.google.common.base.Strings;
@@ -171,7 +172,7 @@ public final class SparkDpp implements java.io.Serializable
{
for (int i = 0; i < curNode.indexMeta.columns.size(); i++) {
if (!curNode.indexMeta.columns.get(i).isKey) {
// duplicate table doesn't need aggregator
- // init a aggregator here just for keeping interface
compatibility when writing data to HDFS
+ // init an aggregator here just for keeping interface
compatibility when writing data to HDFS
sparkRDDAggregators[idx] = new DefaultSparkRDDAggregator();
idx++;
}
@@ -429,7 +430,7 @@ public final class SparkDpp implements java.io.Serializable
{
EtlJobConfig.EtlPartitionInfo partitionInfo,
List<Integer>
partitionKeyIndex,
List<DorisRangePartitioner.PartitionRangeKey> partitionRangeKeys,
- List<String>
keyColumnNames,
+ List<String>
keyAndPartitionColumnNames,
List<String>
valueColumnNames,
StructType
dstTableSchema,
EtlJobConfig.EtlIndex
baseIndex,
@@ -449,9 +450,9 @@ public final class SparkDpp implements java.io.Serializable
{
}
}
- List<ColumnParser> parsers = new ArrayList<>();
+ Map<String, ColumnParser> parsers = Maps.newHashMap();
for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
- parsers.add(ColumnParser.create(column));
+ parsers.put(column.columnName, ColumnParser.create(column));
}
// use PairFlatMapFunction instead of PairMapFunction because the
there will be
@@ -460,29 +461,34 @@ public final class SparkDpp implements
java.io.Serializable {
@Override
public Iterator<Tuple2<List<Object>, Object[]>> call(Row row)
throws Exception {
List<Tuple2<List<Object>, Object[]>> result = new
ArrayList<>();
+ List<Object> keyAndPartitionColumns = new ArrayList<>();
List<Object> keyColumns = new ArrayList<>();
List<Object> valueColumns = new
ArrayList<>(valueColumnNames.size());
- for (int i = 0; i < keyColumnNames.size(); i++) {
- String columnName = keyColumnNames.get(i);
+ for (int i = 0; i < keyAndPartitionColumnNames.size(); i++) {
+ String columnName = keyAndPartitionColumnNames.get(i);
Object columnObject = row.get(row.fieldIndex(columnName));
- if(!validateData(columnObject,
baseIndex.getColumn(columnName), parsers.get(i), row)) {
+ if (!validateData(columnObject,
baseIndex.getColumn(columnName), parsers.get(columnName), row)) {
abnormalRowAcc.add(1);
return result.iterator();
};
- keyColumns.add(columnObject);
+ keyAndPartitionColumns.add(columnObject);
+
+ if (baseIndex.getColumn(columnName).isKey) {
+ keyColumns.add(columnObject);
+ }
}
for (int i = 0; i < valueColumnNames.size(); i++) {
String columnName = valueColumnNames.get(i);
Object columnObject = row.get(row.fieldIndex(columnName));
- if(!validateData(columnObject,
baseIndex.getColumn(columnName), parsers.get(i + keyColumnNames.size()),row)) {
+ if(!validateData(columnObject,
baseIndex.getColumn(columnName), parsers.get(columnName),row)) {
abnormalRowAcc.add(1);
return result.iterator();
};
valueColumns.add(columnObject);
}
- DppColumns key = new DppColumns(keyColumns);
+ DppColumns key = new DppColumns(keyAndPartitionColumns);
int pid = partitioner.getPartition(key);
if (!validPartitionIndex.contains(pid)) {
LOG.warn("invalid partition for row:" + row + ", pid:" +
pid);
@@ -1031,11 +1037,11 @@ public final class SparkDpp implements
java.io.Serializable {
}
// get key column names and value column names seperately
- List<String> keyColumnNames = new ArrayList<>();
+ List<String> keyAndPartitionColumnNames = new ArrayList<>();
List<String> valueColumnNames = new ArrayList<>();
for (EtlJobConfig.EtlColumn etlColumn : baseIndex.columns) {
if (etlColumn.isKey) {
- keyColumnNames.add(etlColumn.columnName);
+ keyAndPartitionColumnNames.add(etlColumn.columnName);
} else {
valueColumnNames.add(etlColumn.columnName);
}
@@ -1048,7 +1054,7 @@ public final class SparkDpp implements
java.io.Serializable {
for (int i = 0; i < baseIndex.columns.size(); ++i) {
EtlJobConfig.EtlColumn column =
baseIndex.columns.get(i);
if (column.columnName.equals(key)) {
- partitionKeyIndex.add(i);
+
partitionKeyIndex.add(keyAndPartitionColumnNames.indexOf(key));
partitionKeySchema.add(DppUtils.getClassFromColumn(column));
break;
}
@@ -1083,7 +1089,7 @@ public final class SparkDpp implements
java.io.Serializable {
fileGroupDataframe,
partitionInfo, partitionKeyIndex,
partitionRangeKeys,
- keyColumnNames, valueColumnNames,
+ keyAndPartitionColumnNames, valueColumnNames,
dstTableSchema, baseIndex, fileGroup.partitions);
if (tablePairRDD == null) {
tablePairRDD = ret;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]