Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2276#discussion_r186388685
  
    --- Diff: 
processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java
 ---
    @@ -220,32 +241,97 @@ private CarbonRowBatch getBatch() {
           CarbonRowBatch carbonRowBatch = new CarbonRowBatch(batchSize);
           int count = 0;
           while (internalHasNext() && count < batchSize) {
    -        carbonRowBatch.addRow(new 
CarbonRow(convertToNoDictionaryToBytes(currentIterator.next())));
    +        carbonRowBatch.addRow(
    +            new 
CarbonRow(convertToNoDictionaryToBytes(currentIterator.next(), dataFields)));
             count++;
           }
           rowCounter.getAndAdd(carbonRowBatch.getSize());
           return carbonRowBatch;
         }
     
    -    private Object[] convertToNoDictionaryToBytes(Object[] data) {
    +    private Object[] convertToNoDictionaryToBytes(Object[] data, 
DataField[] dataFields) {
           Object[] newData = new Object[data.length];
    -      for (int i = 0; i < noDictionaryMapping.length; i++) {
    -        if (noDictionaryMapping[i]) {
    +      for (int i = 0; i < data.length; i++) {
    +        if (i < noDictionaryMapping.length && noDictionaryMapping[i]) {
               newData[i] = DataTypeUtil
                   
.getBytesDataDataTypeForNoDictionaryColumn(data[orderOfData[i]], dataTypes[i]);
             } else {
    -          newData[i] = data[orderOfData[i]];
    +          // if this is a complex column then recursively comver the data 
into Byte Array.
    +          if (dataTypes[i].isComplexType()) {
    +            ByteArrayOutputStream byteArray = new ByteArrayOutputStream();
    +            DataOutputStream dataOutputStream = new 
DataOutputStream(byteArray);
    +            try {
    +              getBytesForComplex(data[orderOfData[i]], dataFields[i], 
dataOutputStream);
    +              dataOutputStream.close();
    +              newData[i] = byteArray.toByteArray();
    +            } catch (Exception e) {
    +              throw new CarbonDataLoadingException( "Loading Exception", 
e);
    +            }
    +          } else {
    +            newData[i] = data[orderOfData[i]];
    +          }
             }
           }
    -      if (newData.length > noDictionaryMapping.length) {
    -        for (int i = noDictionaryMapping.length; i < newData.length; i++) {
    -          newData[i] = data[orderOfData[i]];
    +      System.out.println(Arrays.toString(data));
    +      return newData;
    +    }
    +
    +    private void getBytesForComplex(Object datum, DataField dataField,
    +        DataOutputStream dataOutputStream) throws Exception {
    +      getBytesForComplex(datum, dataField.getColumn().getDataType(),
    +          ((CarbonDimension) dataField.getColumn()), dataOutputStream);
    +    }
    +
    +    private void getBytesForComplex(Object datum, DataType dataType,
    +        CarbonDimension carbonDimensions, DataOutputStream 
dataOutputStream) throws Exception {
    +      if (dataType.getName().equalsIgnoreCase("struct")) {
    +        List<CarbonDimension> childStructDimensions = 
carbonDimensions.getListOfChildDimensions();
    +        int size = childStructDimensions.size();
    +        Object[] structFields = ((StructObject) datum).getData();
    +        dataOutputStream.writeInt(size);
    +        // loop through each one of them.
    +        for (int i = 0; i < size; i++) {
    +          getBytesForComplex(structFields[i], 
childStructDimensions.get(i).getDataType(),
    +              childStructDimensions.get(i), dataOutputStream);
    +        }
    +      } else if (dataType.getName().equalsIgnoreCase("array")) {
    --- End diff --
    
    Done


---

Reply via email to