himanshug commented on a change in pull request #8115: Add shuffleSegmentPusher 
for data shuffle
URL: https://github.com/apache/incubator-druid/pull/8115#discussion_r308453259
 
 

 ##########
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
 ##########
 @@ -227,17 +250,68 @@ private void 
deleteExpiredSuprevisorTaskPartitionsIfNotRunning() throws Interrup
   /**
    * Write a segment into one of configured locations. The location to write 
is chosen in a round-robin manner per
    * supervisorTaskId.
-   *
-   * This method is only useful for the new Indexer model. Tasks running in 
the existing middleManager should the static
-   * addSegment method.
    */
-  public void addSegment(String supervisorTaskId, String subTaskId, 
DataSegment segment, File segmentFile)
+  public long addSegment(String supervisorTaskId, String subTaskId, 
DataSegment segment, File segmentDir)
+      throws IOException
   {
+    // Get or create the location iterator for supervisorTask.
     final Iterator<StorageLocation> iterator = 
locationIterators.computeIfAbsent(
         supervisorTaskId,
-        k -> Iterators.cycle(shuffleDataLocations)
+        k -> {
+          final Iterator<StorageLocation> cyclicIterator = 
Iterators.cycle(shuffleDataLocations);
+          // Random start of the iterator
+          final int random = 
ThreadLocalRandom.current().nextInt(shuffleDataLocations.size());
+          IntStream.range(0, random).forEach(i -> cyclicIterator.next());
+          return cyclicIterator;
+        }
     );
-    addSegment(iterator, shuffleDataLocations.size(), supervisorTaskId, 
subTaskId, segment, segmentFile);
+
+    // Create a zipped segment in a temp directory.
+    final File taskTempDir = taskConfig.getTaskTempDir(subTaskId);
+    if (taskTempDir.mkdirs()) {
+      taskTempDir.deleteOnExit();
+    }
+    final File tempZippedFile = new File(taskTempDir, 
segment.getId().toString());
+    final long unzippedSizeBytes = CompressionUtils.zip(segmentDir, 
tempZippedFile, true);
+    if (unzippedSizeBytes == 0) {
+      throw new IOE(
+          "Read 0 bytes from segmentDir[%s]",
+          segmentDir.getAbsolutePath()
+      );
+    }
+
+    // Try copying the zipped segment to one of storage locations
+    for (int i = 0; i < shuffleDataLocations.size(); i++) {
+      final StorageLocation location = iterator.next();
+      final String partitionFilePath = getPartitionFilePath(
+          supervisorTaskId,
+          subTaskId,
+          segment.getInterval(),
+          segment.getShardSpec().getPartitionNum()
+      );
+      final File destFile = location.reserve(partitionFilePath, 
segment.getId().toString(), tempZippedFile.length());
+      if (destFile != null) {
+        try {
+          FileUtils.forceMkdirParent(destFile);
+          StreamUtils.retryCopy(
 
 Review comment:
   here we should use FileUtils.writeAtomically(..) 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to