Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2052#discussion_r174195929 --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java --- @@ -218,50 +213,45 @@ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException { rowPage.addRow(row, rowBuffer.get()); } else { try { - if (enableInMemoryIntermediateMerge) { - unsafeInMemoryIntermediateFileMerger.startInmemoryMergingIfPossible(); - } - unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible(); - semaphore.acquire(); - dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage)); + handlePreviousPage(); rowPage = createUnsafeRowPage(); rowPage.addRow(row, rowBuffer.get()); } catch (Exception e) { LOGGER.error( "exception occurred while trying to acquire a semaphore lock: " + e.getMessage()); throw new CarbonSortKeyAndGroupByException(e); } - } } /** - * Below method will be used to start storing process This method will get - * all the temp files present in sort temp folder then it will create the - * record holder heap and then it will read first record from each file and - * initialize the heap + * Below method will be used to start sorting process. This method will get + * all the temp unsafe pages in memory and all the temp files and try to merge them if possible. + * Also, it will spill the pages to disk or add it to unsafe sort memory. * - * @throws InterruptedException + * @throws CarbonSortKeyAndGroupByException if error occurs during in-memory merge + * @throws InterruptedException if error occurs during data sort and write */ - public void startSorting() throws InterruptedException { + public void startSorting() throws CarbonSortKeyAndGroupByException, InterruptedException { LOGGER.info("Unsafe based sorting will be used"); if (this.rowPage.getUsedSize() > 0) { - TimSort<UnsafeCarbonRow, IntPointerBuffer> timSort = new TimSort<>( - new UnsafeIntSortDataFormat(rowPage)); - if (parameters.getNumberOfNoDictSortColumns() > 0) { - timSort.sort(rowPage.getBuffer(), 0, rowPage.getBuffer().getActualSize(), - new UnsafeRowComparator(rowPage)); - } else { - timSort.sort(rowPage.getBuffer(), 0, rowPage.getBuffer().getActualSize(), - new UnsafeRowComparatorForNormalDims(rowPage)); - } - unsafeInMemoryIntermediateFileMerger.addDataChunkToMerge(rowPage); + handlePreviousPage(); } else { rowPage.freeMemory(); } startFileBasedMerge(); } + private void handlePreviousPage() --- End diff -- can you provide comment for this function
---