This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-1.1-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.1-lts by this push:
     new a894bed26d [fix](spark load)partition column is not duplicate key, 
spark load IndexOutOfBounds error (#14724)
a894bed26d is described below

commit a894bed26d54a2680bcb38eb74faa2003c9ccafb
Author: gnehil <[email protected]>
AuthorDate: Mon Dec 5 09:15:07 2022 +0800

    [fix](spark load)partition column is not duplicate key, spark load 
IndexOutOfBounds error (#14724)
    
    In spark load, the job failed when the partition column is not in the 
duplicate key column list.
    
    We traced the sparkDpp.java and found that, the partition columns ware 
built with the key index in base metadata. But when partition column was not 
key, an IndexOutOfBoundsException occurs when obtaining it through DppColumns.
    
    Describe your changes.
---
 .../org/apache/doris/load/loadv2/dpp/SparkDpp.java | 34 +++++++++++++---------
 1 file changed, 20 insertions(+), 14 deletions(-)

diff --git 
a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java 
b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
index 763a0e9ee4..4331f4842b 100644
--- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
+++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.load.loadv2.dpp;
 
+import com.google.common.collect.Maps;
 import org.apache.doris.common.SparkDppException;
 import org.apache.doris.load.loadv2.etl.EtlJobConfig;
 import com.google.common.base.Strings;
@@ -171,7 +172,7 @@ public final class SparkDpp implements java.io.Serializable 
{
             for (int i = 0; i < curNode.indexMeta.columns.size(); i++) {
                 if (!curNode.indexMeta.columns.get(i).isKey) {
                     // duplicate table doesn't need aggregator
-                    // init a aggregator here just for keeping interface 
compatibility when writing data to HDFS
+                    // init an aggregator here just for keeping interface 
compatibility when writing data to HDFS
                     sparkRDDAggregators[idx] = new DefaultSparkRDDAggregator();
                     idx++;
                 }
@@ -429,7 +430,7 @@ public final class SparkDpp implements java.io.Serializable 
{
                                                         
EtlJobConfig.EtlPartitionInfo partitionInfo,
                                                         List<Integer> 
partitionKeyIndex,
                                                         
List<DorisRangePartitioner.PartitionRangeKey> partitionRangeKeys,
-                                                        List<String> 
keyColumnNames,
+                                                        List<String> 
keyAndPartitionColumnNames,
                                                         List<String> 
valueColumnNames,
                                                         StructType 
dstTableSchema,
                                                         EtlJobConfig.EtlIndex 
baseIndex,
@@ -449,9 +450,9 @@ public final class SparkDpp implements java.io.Serializable 
{
             }
         }
 
-        List<ColumnParser> parsers = new ArrayList<>();
+        Map<String, ColumnParser> parsers = Maps.newHashMap();
         for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
-            parsers.add(ColumnParser.create(column));
+            parsers.put(column.columnName, ColumnParser.create(column));
         }
 
         // use PairFlatMapFunction instead of PairMapFunction because the 
there will be
@@ -460,29 +461,34 @@ public final class SparkDpp implements 
java.io.Serializable {
             @Override
             public Iterator<Tuple2<List<Object>, Object[]>> call(Row row) 
throws Exception {
                 List<Tuple2<List<Object>, Object[]>> result = new 
ArrayList<>();
+                List<Object> keyAndPartitionColumns = new ArrayList<>();
                 List<Object> keyColumns = new ArrayList<>();
                 List<Object> valueColumns = new 
ArrayList<>(valueColumnNames.size());
-                for (int i = 0; i < keyColumnNames.size(); i++) {
-                    String columnName = keyColumnNames.get(i);
+                for (int i = 0; i < keyAndPartitionColumnNames.size(); i++) {
+                    String columnName = keyAndPartitionColumnNames.get(i);
                     Object columnObject = row.get(row.fieldIndex(columnName));
-                    if(!validateData(columnObject, 
baseIndex.getColumn(columnName), parsers.get(i), row)) {
+                    if (!validateData(columnObject, 
baseIndex.getColumn(columnName), parsers.get(columnName), row)) {
                         abnormalRowAcc.add(1);
                         return result.iterator();
                     };
-                    keyColumns.add(columnObject);
+                    keyAndPartitionColumns.add(columnObject);
+
+                    if (baseIndex.getColumn(columnName).isKey) {
+                        keyColumns.add(columnObject);
+                    }
                 }
 
                 for (int i = 0; i < valueColumnNames.size(); i++) {
                     String columnName = valueColumnNames.get(i);
                     Object columnObject = row.get(row.fieldIndex(columnName));
-                    if(!validateData(columnObject,  
baseIndex.getColumn(columnName), parsers.get(i + keyColumnNames.size()),row)) {
+                    if(!validateData(columnObject,  
baseIndex.getColumn(columnName), parsers.get(columnName),row)) {
                         abnormalRowAcc.add(1);
                         return result.iterator();
                     };
                     valueColumns.add(columnObject);
                 }
 
-                DppColumns key = new DppColumns(keyColumns);
+                DppColumns key = new DppColumns(keyAndPartitionColumns);
                 int pid = partitioner.getPartition(key);
                 if (!validPartitionIndex.contains(pid)) {
                     LOG.warn("invalid partition for row:" + row + ", pid:" + 
pid);
@@ -1031,11 +1037,11 @@ public final class SparkDpp implements 
java.io.Serializable {
                 }
 
                 // get key column names and value column names seperately
-                List<String> keyColumnNames = new ArrayList<>();
+                List<String> keyAndPartitionColumnNames = new ArrayList<>();
                 List<String> valueColumnNames = new ArrayList<>();
                 for (EtlJobConfig.EtlColumn etlColumn : baseIndex.columns) {
                     if (etlColumn.isKey) {
-                        keyColumnNames.add(etlColumn.columnName);
+                        keyAndPartitionColumnNames.add(etlColumn.columnName);
                     } else {
                         valueColumnNames.add(etlColumn.columnName);
                     }
@@ -1048,7 +1054,7 @@ public final class SparkDpp implements 
java.io.Serializable {
                     for (int i = 0; i < baseIndex.columns.size(); ++i) {
                         EtlJobConfig.EtlColumn column = 
baseIndex.columns.get(i);
                         if (column.columnName.equals(key)) {
-                            partitionKeyIndex.add(i);
+                            
partitionKeyIndex.add(keyAndPartitionColumnNames.indexOf(key));
                             
partitionKeySchema.add(DppUtils.getClassFromColumn(column));
                             break;
                         }
@@ -1083,7 +1089,7 @@ public final class SparkDpp implements 
java.io.Serializable {
                             fileGroupDataframe,
                             partitionInfo, partitionKeyIndex,
                             partitionRangeKeys,
-                            keyColumnNames, valueColumnNames,
+                            keyAndPartitionColumnNames, valueColumnNames,
                             dstTableSchema, baseIndex, fileGroup.partitions);
                     if (tablePairRDD == null) {
                         tablePairRDD = ret;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to