Github user sounakr commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2276#discussion_r186388685 --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java --- @@ -220,32 +241,97 @@ private CarbonRowBatch getBatch() { CarbonRowBatch carbonRowBatch = new CarbonRowBatch(batchSize); int count = 0; while (internalHasNext() && count < batchSize) { - carbonRowBatch.addRow(new CarbonRow(convertToNoDictionaryToBytes(currentIterator.next()))); + carbonRowBatch.addRow( + new CarbonRow(convertToNoDictionaryToBytes(currentIterator.next(), dataFields))); count++; } rowCounter.getAndAdd(carbonRowBatch.getSize()); return carbonRowBatch; } - private Object[] convertToNoDictionaryToBytes(Object[] data) { + private Object[] convertToNoDictionaryToBytes(Object[] data, DataField[] dataFields) { Object[] newData = new Object[data.length]; - for (int i = 0; i < noDictionaryMapping.length; i++) { - if (noDictionaryMapping[i]) { + for (int i = 0; i < data.length; i++) { + if (i < noDictionaryMapping.length && noDictionaryMapping[i]) { newData[i] = DataTypeUtil .getBytesDataDataTypeForNoDictionaryColumn(data[orderOfData[i]], dataTypes[i]); } else { - newData[i] = data[orderOfData[i]]; + // if this is a complex column then recursively comver the data into Byte Array. + if (dataTypes[i].isComplexType()) { + ByteArrayOutputStream byteArray = new ByteArrayOutputStream(); + DataOutputStream dataOutputStream = new DataOutputStream(byteArray); + try { + getBytesForComplex(data[orderOfData[i]], dataFields[i], dataOutputStream); + dataOutputStream.close(); + newData[i] = byteArray.toByteArray(); + } catch (Exception e) { + throw new CarbonDataLoadingException( "Loading Exception", e); + } + } else { + newData[i] = data[orderOfData[i]]; + } } } - if (newData.length > noDictionaryMapping.length) { - for (int i = noDictionaryMapping.length; i < newData.length; i++) { - newData[i] = data[orderOfData[i]]; + System.out.println(Arrays.toString(data)); + return newData; + } + + private void getBytesForComplex(Object datum, DataField dataField, + DataOutputStream dataOutputStream) throws Exception { + getBytesForComplex(datum, dataField.getColumn().getDataType(), + ((CarbonDimension) dataField.getColumn()), dataOutputStream); + } + + private void getBytesForComplex(Object datum, DataType dataType, + CarbonDimension carbonDimensions, DataOutputStream dataOutputStream) throws Exception { + if (dataType.getName().equalsIgnoreCase("struct")) { + List<CarbonDimension> childStructDimensions = carbonDimensions.getListOfChildDimensions(); + int size = childStructDimensions.size(); + Object[] structFields = ((StructObject) datum).getData(); + dataOutputStream.writeInt(size); + // loop through each one of them. + for (int i = 0; i < size; i++) { + getBytesForComplex(structFields[i], childStructDimensions.get(i).getDataType(), + childStructDimensions.get(i), dataOutputStream); + } + } else if (dataType.getName().equalsIgnoreCase("array")) { --- End diff -- Done
---