This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit b71731cb7d39c83bf28a2fb35950d5132dd27fde
Author: akashrn5 <akashr...@gmail.com>
AuthorDate: Fri Jan 25 14:35:32 2019 +0530

    [CARBONDATA-3272]fix ArrayIndexOutOfBoundsException of horizontal 
compaction during update, when cardinality changes within a segment
    
    Problem:
    During horizontal compaction in update, we prepare a taskBlockMapping to 
get the resultIterators. horizontal compaction will be done within a segment. 
Here, source segment properties will be always prepared by the filefooter of 
first block in the blocklist for a corresponding task. source segment 
properties will contain the dimensionKeyGenerator which will be used to convert 
the rows. If the cardinality is different for two blcoks for a task, then the 
dimensionKeyGenerator will be dif [...]
    
    Solution
    so get all the blocks present in a task and then split into multiple lists 
of same key length and create separate RawResultIterator for each list of same 
key length. If all the blocks have same keylength, then make a single 
RawResultIterator for all the blocks
    
    This closes #3102
---
 .../core/scan/wrappers/IntArrayWrapper.java        | 47 +++++++++++++
 .../merger/CarbonCompactionExecutor.java           | 77 +++++++++++++++++-----
 .../processing/merger/CarbonCompactionUtil.java    |  2 +-
 3 files changed, 109 insertions(+), 17 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/wrappers/IntArrayWrapper.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/wrappers/IntArrayWrapper.java
new file mode 100644
index 0000000..c1a75d5
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/wrappers/IntArrayWrapper.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.wrappers;
+
+import java.util.Arrays;
+
+/**
+ * Wrapper class for int[] data
+ */
+public class IntArrayWrapper {
+
+  private final int[] data;
+
+  public IntArrayWrapper(int[] data) {
+    this.data = data;
+  }
+
+  @Override public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    IntArrayWrapper that = (IntArrayWrapper) o;
+    return Arrays.equals(data, that.data);
+  }
+
+  @Override public int hashCode() {
+    return Arrays.hashCode(data);
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
index 79b66e2..5961cd7 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
@@ -41,6 +41,7 @@ import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.model.QueryModelBuilder;
 import org.apache.carbondata.core.scan.result.RowBatch;
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
+import org.apache.carbondata.core.scan.wrappers.IntArrayWrapper;
 import org.apache.carbondata.core.stats.QueryStatistic;
 import org.apache.carbondata.core.stats.QueryStatisticsConstants;
 import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
@@ -117,7 +118,7 @@ public class CarbonCompactionExecutor {
     resultList.put(CarbonCompactionUtil.SORTED_IDX,
         new 
ArrayList<RawResultIterator>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE));
 
-    List<TableBlockInfo> list = null;
+    List<TableBlockInfo> tableBlockInfos = null;
     QueryModelBuilder builder = new QueryModelBuilder(carbonTable)
         .projectAllColumns()
         .dataConverter(dataTypeConverter)
@@ -130,7 +131,6 @@ public class CarbonCompactionExecutor {
     for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) 
{
       String segmentId = taskMap.getKey();
       List<DataFileFooter> listMetadata = 
dataFileMetadataSegMapping.get(segmentId);
-      SegmentProperties sourceSegProperties = 
getSourceSegmentProperties(listMetadata);
       // for each segment get taskblock info
       TaskBlockInfo taskBlockInfo = taskMap.getValue();
       Set<String> taskBlockListMapping = taskBlockInfo.getTaskSet();
@@ -139,26 +139,71 @@ public class CarbonCompactionExecutor {
           CarbonCompactionUtil.isRestructured(listMetadata, 
carbonTable.getTableLastUpdatedTime())
               || !CarbonCompactionUtil.isSorted(listMetadata.get(0));
       for (String task : taskBlockListMapping) {
-        list = taskBlockInfo.getTableBlockInfoList(task);
-        Collections.sort(list);
-        LOGGER.info(
-            "for task -" + task + "- in segment id -" + segmentId + "- block 
size is -" + list
-                .size());
-        queryModel.setTableBlockInfos(list);
-        if (sortingRequired) {
-          resultList.get(CarbonCompactionUtil.UNSORTED_IDX).add(
-              new RawResultIterator(executeBlockList(list, segmentId, task, 
configuration),
-                  sourceSegProperties, destinationSegProperties, false));
-        } else {
-          resultList.get(CarbonCompactionUtil.SORTED_IDX).add(
-              new RawResultIterator(executeBlockList(list, segmentId, task, 
configuration),
-                  sourceSegProperties, destinationSegProperties, false));
+        tableBlockInfos = taskBlockInfo.getTableBlockInfoList(task);
+        // during update there may be a chance that the cardinality may change 
within the segment
+        // which may lead to failure while converting the row, so get all the 
blocks present in a
+        // task and then split into multiple lists of same column values and 
create separate
+        // RawResultIterator for each tableBlockInfo of same column values. If 
all the blocks have
+        // same column values, then make a single RawResultIterator for all 
the blocks
+        List<List<TableBlockInfo>> listOfTableBlocksBasedOnKeyLength =
+            getListOfTableBlocksBasedOnColumnValueSize(tableBlockInfos);
+        for (List<TableBlockInfo> tableBlockInfoList : 
listOfTableBlocksBasedOnKeyLength) {
+          Collections.sort(tableBlockInfoList);
+          LOGGER.info("for task -" + task + "- in segment id -" + segmentId + 
"- block size is -"
+              + tableBlockInfos.size());
+          queryModel.setTableBlockInfos(tableBlockInfoList);
+          if (sortingRequired) {
+            resultList.get(CarbonCompactionUtil.UNSORTED_IDX).add(
+                getRawResultIterator(configuration, segmentId, task, 
tableBlockInfoList));
+          } else {
+            resultList.get(CarbonCompactionUtil.SORTED_IDX).add(
+                getRawResultIterator(configuration, segmentId, task, 
tableBlockInfoList));
+          }
         }
       }
     }
     return resultList;
   }
 
+  private RawResultIterator getRawResultIterator(Configuration configuration, 
String segmentId,
+      String task, List<TableBlockInfo> tableBlockInfoList)
+      throws QueryExecutionException, IOException {
+    return new RawResultIterator(
+        executeBlockList(tableBlockInfoList, segmentId, task, configuration),
+        getSourceSegmentProperties(
+            
Collections.singletonList(tableBlockInfoList.get(0).getDataFileFooter())),
+        destinationSegProperties, false);
+  }
+
+  /**
+   * This method returns the List of TableBlockInfoList, where each 
listOfTableBlockInfos will have
+   * same columnvalues
+   * @param tableBlockInfos List of tableBlockInfos present in each task
+   */
+  private List<List<TableBlockInfo>> 
getListOfTableBlocksBasedOnColumnValueSize(
+      List<TableBlockInfo> tableBlockInfos) {
+    List<List<TableBlockInfo>> listOfTableBlockInfoListOnColumnvaluesSize = 
new ArrayList<>();
+    Map<IntArrayWrapper, List<TableBlockInfo>> 
columnvalueSizeToTableBlockInfoMap = new HashMap<>();
+    for (TableBlockInfo tableBlock : tableBlockInfos) {
+      // get the columnValueSize for the dataFileFooter
+      IntArrayWrapper columnValueSize = new IntArrayWrapper(
+          
getSourceSegmentProperties(Collections.singletonList(tableBlock.getDataFileFooter()))
+              .getColumnsValueSize());
+      List<TableBlockInfo> tempBlockInfoList =
+          columnvalueSizeToTableBlockInfoMap.get(columnValueSize);
+      if (tempBlockInfoList == null) {
+        tempBlockInfoList = new ArrayList<>();
+        columnvalueSizeToTableBlockInfoMap.put(columnValueSize, 
tempBlockInfoList);
+      }
+      tempBlockInfoList.add(tableBlock);
+    }
+    for (Map.Entry<IntArrayWrapper, List<TableBlockInfo>> taskMap :
+        columnvalueSizeToTableBlockInfoMap.entrySet()) {
+      listOfTableBlockInfoListOnColumnvaluesSize.add(taskMap.getValue());
+    }
+    return listOfTableBlockInfoListOnColumnvaluesSize;
+  }
+
   /**
    * This method will create the source segment properties based on 
restructured block existence
    *
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
index 1bf30b5..ffcfe0c 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
@@ -144,10 +144,10 @@ public class CarbonCompactionUtil {
         if (null == dataFileMatadata.isSorted()) {
           dataFileMatadata.setSorted(isSortedTable);
         }
-        blockInfo.setDataFileFooter(dataFileMatadata);
       } else {
         dataFileMatadata = CarbonUtil.readMetadataFile(blockInfo);
       }
+      blockInfo.setDataFileFooter(dataFileMatadata);
       if (null == metadataList) {
         // if it is not present
         eachSegmentBlocks.add(dataFileMatadata);

Reply via email to