This is an automated email from the ASF dual-hosted git repository.
manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 82bdda5a38 Parallelize segment metadata file generation. (#15030)
82bdda5a38 is described below
commit 82bdda5a3836dd7b0bca3c6497772b6b7837e05d
Author: Abhishek Bafna <[email protected]>
AuthorDate: Fri Feb 14 11:39:37 2025 +0530
Parallelize segment metadata file generation. (#15030)
* Parallelize segment metadata file generation.
* Improved error handling and unit tests.
* Introduce segmentMetadataGenerationParallelism config.
* Move executor shutdown past thread completion.
* Code refactor - pass executor as parameter.
---------
Co-authored-by: abhishekbafna <[email protected]>
---
.../segment/local/utils/SegmentPushUtils.java | 127 ++++++++++++++-------
.../segment/local/utils/SegmentPushUtilsTest.java | 81 ++++++++++++-
.../spi/ingestion/batch/spec/PushJobSpec.java | 14 +++
3 files changed, 177 insertions(+), 45 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
index 5403575684..d4e39a11a9 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
@@ -31,10 +31,16 @@ import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hc.core5.http.Header;
@@ -373,47 +379,24 @@ public class SegmentPushUtils implements Serializable {
Map<String, String> segmentUriToTarPathMap, List<Header> headers,
List<NameValuePair> parameters)
throws Exception {
String tableName = spec.getTableSpec().getTableName();
- Map<String, File> segmentMetadataFileMap = new HashMap<>();
- List<String> segmentURIs = new ArrayList<>();
- LOGGER.info("Start pushing segment metadata: {} to locations: {} for
table: {}", segmentUriToTarPathMap,
- Arrays.toString(spec.getPinotClusterSpecs()), tableName);
- for (String segmentUriPath : segmentUriToTarPathMap.keySet()) {
- String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath);
- String fileName = new File(tarFilePath).getName();
- // segments stored in Pinot deep store do not have .tar.gz extension
- String segmentName = fileName.endsWith(Constants.TAR_GZ_FILE_EXT)
- ? fileName.substring(0, fileName.length() -
Constants.TAR_GZ_FILE_EXT.length()) : fileName;
- SegmentNameUtils.validatePartialOrFullSegmentName(segmentName);
- File segmentMetadataFile;
- // Check if there is a segment metadata tar gz file named
`segmentName.metadata.tar.gz`, already in the remote
- // directory. This is to avoid generating a new segment metadata tar gz
file every time we push a segment,
- // which requires downloading the entire segment tar gz file.
-
- URI metadataTarGzFilePath = generateSegmentMetadataURI(tarFilePath,
segmentName);
- LOGGER.info("Checking if metadata tar gz file {} exists",
metadataTarGzFilePath);
- if (spec.getPushJobSpec().isPreferMetadataTarGz() &&
fileSystem.exists(metadataTarGzFilePath)) {
- segmentMetadataFile = new File(FileUtils.getTempDirectory(),
- SegmentUploadConstants.SEGMENT_METADATA_DIR_PREFIX +
UUID.randomUUID()
- + TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
- if (segmentMetadataFile.exists()) {
- FileUtils.forceDelete(segmentMetadataFile);
- }
- fileSystem.copyToLocalFile(metadataTarGzFilePath, segmentMetadataFile);
- } else {
- segmentMetadataFile = generateSegmentMetadataFile(fileSystem,
URI.create(tarFilePath));
- }
- segmentMetadataFileMap.put(segmentName, segmentMetadataFile);
- segmentURIs.add(segmentName);
- segmentURIs.add(segmentUriPath);
- }
-
- File allSegmentsMetadataTarFile =
createSegmentsMetadataTarFile(segmentURIs, segmentMetadataFileMap);
+ ConcurrentHashMap<String, File> segmentMetadataFileMap = new
ConcurrentHashMap<>();
+ ConcurrentLinkedQueue<String> segmentURIs = new ConcurrentLinkedQueue<>();
Map<String, File> allSegmentsMetadataMap = new HashMap<>();
- // the key is unused in batch upload mode and hence 'noopKey'
- allSegmentsMetadataMap.put("noopKey", allSegmentsMetadataTarFile);
+ File allSegmentsMetadataTarFile = null;
+ int nThreads =
spec.getPushJobSpec().getSegmentMetadataGenerationParallelism();
+ ExecutorService executor = Executors.newFixedThreadPool(nThreads);
+ LOGGER.info("Start pushing segment metadata: {} to locations: {} for
table: {} with parallelism: {}",
+ segmentUriToTarPathMap, Arrays.toString(spec.getPinotClusterSpecs()),
tableName,
+ spec.getPushJobSpec().getPushParallelism());
- // perform metadata push in batch mode for every cluster
try {
+ generateSegmentMetadataFiles(spec, fileSystem, segmentUriToTarPathMap,
segmentMetadataFileMap, segmentURIs,
+ executor);
+ allSegmentsMetadataTarFile = createSegmentsMetadataTarFile(segmentURIs,
segmentMetadataFileMap);
+ // the key is unused in batch upload mode and hence 'noopKey'
+ allSegmentsMetadataMap.put("noopKey", allSegmentsMetadataTarFile);
+
+ // perform metadata push in batch mode for every cluster
for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
URI controllerURI;
try {
@@ -458,10 +441,70 @@ public class SegmentPushUtils implements Serializable {
});
}
} finally {
- for (Map.Entry<String, File> metadataFileEntry:
segmentMetadataFileMap.entrySet()) {
+ for (Map.Entry<String, File> metadataFileEntry :
segmentMetadataFileMap.entrySet()) {
FileUtils.deleteQuietly(metadataFileEntry.getValue());
}
- FileUtils.forceDelete(allSegmentsMetadataTarFile);
+ if (allSegmentsMetadataTarFile != null) {
+ FileUtils.deleteQuietly(allSegmentsMetadataTarFile);
+ }
+ executor.shutdown();
+ }
+ }
+
+ @VisibleForTesting
+ static void generateSegmentMetadataFiles(SegmentGenerationJobSpec spec,
PinotFS fileSystem,
+ Map<String, String> segmentUriToTarPathMap, ConcurrentHashMap<String,
File> segmentMetadataFileMap,
+ ConcurrentLinkedQueue<String> segmentURIs, ExecutorService executor) {
+
+ List<Future<Void>> futures = new ArrayList<>();
+ // Generate segment metadata files in parallel
+ for (String segmentUriPath : segmentUriToTarPathMap.keySet()) {
+ futures.add(
+ executor.submit(() -> {
+ String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath);
+ String fileName = new File(tarFilePath).getName();
+ // segments stored in Pinot deep store do not have .tar.gz
extension
+ String segmentName = fileName.endsWith(Constants.TAR_GZ_FILE_EXT)
+ ? fileName.substring(0, fileName.length() -
Constants.TAR_GZ_FILE_EXT.length()) : fileName;
+ SegmentNameUtils.validatePartialOrFullSegmentName(segmentName);
+ File segmentMetadataFile;
+ // Check if there is a segment metadata tar gz file named
`segmentName.metadata.tar.gz`, already in the
+ // remote directory. This is to avoid generating a new segment
metadata tar gz file every time we push a
+ // segment, which requires downloading the entire segment tar gz
file.
+
+ URI metadataTarGzFilePath =
generateSegmentMetadataURI(tarFilePath, segmentName);
+ LOGGER.info("Checking if metadata tar gz file {} exists",
metadataTarGzFilePath);
+ if (spec.getPushJobSpec().isPreferMetadataTarGz() &&
fileSystem.exists(metadataTarGzFilePath)) {
+ segmentMetadataFile = new File(FileUtils.getTempDirectory(),
+ SegmentUploadConstants.SEGMENT_METADATA_DIR_PREFIX +
UUID.randomUUID()
+ + TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ if (segmentMetadataFile.exists()) {
+ FileUtils.forceDelete(segmentMetadataFile);
+ }
+ fileSystem.copyToLocalFile(metadataTarGzFilePath,
segmentMetadataFile);
+ } else {
+ segmentMetadataFile = generateSegmentMetadataFile(fileSystem,
URI.create(tarFilePath));
+ }
+ segmentMetadataFileMap.put(segmentName, segmentMetadataFile);
+ segmentURIs.add(segmentName);
+ segmentURIs.add(segmentUriPath);
+ return null;
+ }));
+ }
+ int errorCount = 0;
+ Exception exception = null;
+ for (Future<Void> future : futures) {
+ try {
+ future.get();
+ } catch (Exception e) {
+ errorCount++;
+ exception = e;
+ }
+ }
+ if (errorCount > 0) {
+ throw new RuntimeException(
+ String.format("%d out of %d segment metadata generation failed",
errorCount, segmentUriToTarPathMap.size()),
+ exception);
}
}
@@ -483,13 +526,13 @@ public class SegmentPushUtils implements Serializable {
// Additionally, it contains a segmentName to segmentDownloadURI mapping
file which allows us to avoid sending the
// segmentDownloadURI as a header field as there are limitations on the
number of headers allowed in the http request.
@VisibleForTesting
- static File createSegmentsMetadataTarFile(List<String> segmentURIs,
Map<String, File> segmentMetadataFileMap)
+ static File createSegmentsMetadataTarFile(Collection<String> segmentURIs,
Map<String, File> segmentMetadataFileMap)
throws IOException {
String uuid = UUID.randomUUID().toString();
File allSegmentsMetadataDir =
new File(FileUtils.getTempDirectory(),
SegmentUploadConstants.ALL_SEGMENTS_METADATA_DIR_PREFIX + uuid);
FileUtils.forceMkdir(allSegmentsMetadataDir);
- for (Map.Entry<String, File> segmentMetadataTarFileEntry:
segmentMetadataFileMap.entrySet()) {
+ for (Map.Entry<String, File> segmentMetadataTarFileEntry :
segmentMetadataFileMap.entrySet()) {
String segmentName = segmentMetadataTarFileEntry.getKey();
File tarFile = segmentMetadataTarFileEntry.getValue();
TarCompressionUtils.untarOneFile(tarFile,
V1Constants.MetadataKeys.METADATA_FILE_NAME,
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentPushUtilsTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentPushUtilsTest.java
index 1fb5cfd43a..8a86283e15 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentPushUtilsTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentPushUtilsTest.java
@@ -28,17 +28,26 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.commons.io.FileUtils;
import org.apache.hc.core5.http.NameValuePair;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@@ -47,13 +56,15 @@ public class SegmentPushUtilsTest {
private File _tempDir;
@BeforeMethod
- public void setUp() throws IOException {
+ public void setUp()
+ throws IOException {
_tempDir = new File(FileUtils.getTempDirectory(), "test-" +
UUID.randomUUID());
FileUtils.forceMkdir(_tempDir);
}
@AfterMethod
- public void tearDown() throws IOException {
+ public void tearDown()
+ throws IOException {
FileUtils.deleteDirectory(_tempDir);
}
@@ -122,7 +133,8 @@ public class SegmentPushUtilsTest {
}
@Test
- public void testCreateSegmentsMetadataTarFile() throws IOException {
+ public void testCreateSegmentsMetadataTarFile()
+ throws IOException {
// setup
List<String> segmentURIs = Arrays.asList("http://example.com/segment1",
"http://example.com/segment2");
@@ -156,4 +168,67 @@ public class SegmentPushUtilsTest {
assertTrue(result.exists(), "The resulting tar.gz file should exist");
assertTrue(result.getName().endsWith(".tar.gz"), "The resulting file
should have a .tar.gz extension");
}
+
+ @Test
+ public void testGenerateSegmentMetadataFiles()
+ throws Exception {
+ SegmentGenerationJobSpec jobSpec = new SegmentGenerationJobSpec();
+ PushJobSpec pushJobSpec = new PushJobSpec();
+ pushJobSpec.setSegmentMetadataGenerationParallelism(2);
+ jobSpec.setPushJobSpec(pushJobSpec);
+ PinotFS mockFs = Mockito.mock(PinotFS.class);
+ Map<String, String> segmentUriToTarPathMap = Map.of(
+ "segment1", "segment1.tar.gz",
+ "segment2", "segment2.tar.gz"
+ );
+ ConcurrentHashMap<String, File> segmentMetadataFileMap = new
ConcurrentHashMap<>();
+ ConcurrentLinkedQueue<String> segmentURIs = new ConcurrentLinkedQueue<>();
+
+ when(mockFs.exists(any(URI.class))).thenReturn(true);
+ mockFs.copyToLocalFile(any(URI.class), any(File.class));
+
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+ try {
+ SegmentPushUtils.generateSegmentMetadataFiles(jobSpec, mockFs,
segmentUriToTarPathMap, segmentMetadataFileMap,
+ segmentURIs, executor);
+ } finally {
+ executor.shutdown();
+ }
+
+ assertEquals(segmentMetadataFileMap.size(), 2);
+ assertEquals(segmentURIs.size(), 4);
+ }
+
+ @Test
+ public void testGenerateSegmentMetadataFilesFailure()
+ throws Exception {
+ SegmentGenerationJobSpec jobSpec = new SegmentGenerationJobSpec();
+ PushJobSpec pushJobSpec = new PushJobSpec();
+ jobSpec.setPushJobSpec(pushJobSpec);
+ PinotFS mockFs = Mockito.mock(PinotFS.class);
+ Map<String, String> segmentUriToTarPathMap = Map.of(
+ "segment1", "segment1.tar.gz",
+ "segment2", "segment2.tar.gz"
+ );
+ ConcurrentHashMap<String, File> segmentMetadataFileMap = new
ConcurrentHashMap<>();
+ ConcurrentLinkedQueue<String> segmentURIs = new ConcurrentLinkedQueue<>();
+
+ when(mockFs.exists(any(URI.class))).thenReturn(true);
+ doNothing().doThrow(new RuntimeException("test exception"))
+ .when(mockFs)
+ .copyToLocalFile(any(URI.class), any(File.class));
+
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ try {
+ SegmentPushUtils.generateSegmentMetadataFiles(jobSpec, mockFs,
segmentUriToTarPathMap, segmentMetadataFileMap,
+ segmentURIs, executor);
+ } catch (Exception e) {
+ assertEquals(e.getMessage(), "1 out of 2 segment metadata generation
failed");
+ } finally {
+ executor.shutdown();
+ }
+
+ assertEquals(segmentMetadataFileMap.size(), 1);
+ assertEquals(segmentURIs.size(), 2);
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java
index c087e364cd..d67f8d1785 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java
@@ -53,6 +53,12 @@ public class PushJobSpec implements Serializable {
*/
private boolean _batchSegmentUpload;
+ /**
+ * Applicable for METADATA push type.
+ * Number of threads to use for segment metadata generation.
+ */
+ private int _segmentMetadataGenerationParallelism = 1;
+
/**
* Used in SegmentUriPushJobRunner, which is used to composite the segment
uri to send to pinot controller.
* The URI sends to controller is in the format
${segmentUriPrefix}${segmentPath}${segmentUriSuffix}
@@ -163,4 +169,12 @@ public class PushJobSpec implements Serializable {
public void setBatchSegmentUpload(boolean batchSegmentUpload) {
_batchSegmentUpload = batchSegmentUpload;
}
+
+ public int getSegmentMetadataGenerationParallelism() {
+ return _segmentMetadataGenerationParallelism;
+ }
+
+ public void setSegmentMetadataGenerationParallelism(int
segmentMetadataGenerationParallelism) {
+ _segmentMetadataGenerationParallelism =
segmentMetadataGenerationParallelism;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]