himanshug commented on a change in pull request #8115: Add shuffleSegmentPusher for data shuffle URL: https://github.com/apache/incubator-druid/pull/8115#discussion_r308453259
########## File path: indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java ########## @@ -227,17 +250,68 @@ private void deleteExpiredSuprevisorTaskPartitionsIfNotRunning() throws Interrup /** * Write a segment into one of configured locations. The location to write is chosen in a round-robin manner per * supervisorTaskId. - * - * This method is only useful for the new Indexer model. Tasks running in the existing middleManager should the static - * addSegment method. */ - public void addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentFile) + public long addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentDir) + throws IOException { + // Get or create the location iterator for supervisorTask. final Iterator<StorageLocation> iterator = locationIterators.computeIfAbsent( supervisorTaskId, - k -> Iterators.cycle(shuffleDataLocations) + k -> { + final Iterator<StorageLocation> cyclicIterator = Iterators.cycle(shuffleDataLocations); + // Random start of the iterator + final int random = ThreadLocalRandom.current().nextInt(shuffleDataLocations.size()); + IntStream.range(0, random).forEach(i -> cyclicIterator.next()); + return cyclicIterator; + } ); - addSegment(iterator, shuffleDataLocations.size(), supervisorTaskId, subTaskId, segment, segmentFile); + + // Create a zipped segment in a temp directory. + final File taskTempDir = taskConfig.getTaskTempDir(subTaskId); + if (taskTempDir.mkdirs()) { + taskTempDir.deleteOnExit(); + } + final File tempZippedFile = new File(taskTempDir, segment.getId().toString()); + final long unzippedSizeBytes = CompressionUtils.zip(segmentDir, tempZippedFile, true); + if (unzippedSizeBytes == 0) { + throw new IOE( + "Read 0 bytes from segmentDir[%s]", + segmentDir.getAbsolutePath() + ); + } + + // Try copying the zipped segment to one of storage locations + for (int i = 0; i < shuffleDataLocations.size(); i++) { + final StorageLocation location = iterator.next(); + final String partitionFilePath = getPartitionFilePath( + supervisorTaskId, + subTaskId, + segment.getInterval(), + segment.getShardSpec().getPartitionNum() + ); + final File destFile = location.reserve(partitionFilePath, segment.getId().toString(), tempZippedFile.length()); + if (destFile != null) { + try { + FileUtils.forceMkdirParent(destFile); + StreamUtils.retryCopy( Review comment: here we should use FileUtils.writeAtomically(..) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org