This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 378bdec11c Allow moveToFinalLocation in METADATA push based on config (#8823) 378bdec11c is described below commit 378bdec11c68a54366cd98b0f2eda5807e454e2f Author: Neha Pawar <neha.pawa...@gmail.com> AuthorDate: Wed Jun 15 14:39:18 2022 -0700 Allow moveToFinalLocation in METADATA push based on config (#8823) METADATA push didn't allow the option of moveSegmentToFinalLocation. This meant that if someone had generated segments in a location that was not the deep store, there was absolutely no way to move those segments into deep store without manual scripting. --- .../common/utils/FileUploadDownloadClient.java | 6 + .../PinotSegmentUploadDownloadRestletResource.java | 46 +++-- .../pinot/controller/api/upload/ZKOperator.java | 79 ++++--- .../controller/api/upload/ZKOperatorTest.java | 141 ++++++++++--- .../tests/ClusterIntegrationTestUtils.java | 20 +- .../tests/SegmentUploadIntegrationTest.java | 229 +++++++++++++++++++++ .../segment/local/utils/SegmentPushUtils.java | 4 + .../spi/ingestion/batch/spec/PushJobSpec.java | 13 ++ 8 files changed, 472 insertions(+), 66 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java index 55931077b7..767e0f16c4 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java @@ -73,6 +73,12 @@ public class FileUploadDownloadClient implements AutoCloseable { public static final String UPLOAD_TYPE = "UPLOAD_TYPE"; public static final String REFRESH_ONLY = "REFRESH_ONLY"; public static final String DOWNLOAD_URI = "DOWNLOAD_URI"; + + /** + * This header is only used for METADATA push, to allow controller to copy segment to deep store, + * if segment was not placed in the deep store to begin with + */ + public static final String COPY_SEGMENT_TO_DEEP_STORE = "COPY_SEGMENT_TO_DEEP_STORE"; public static final String SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER = "Pinot-SegmentZKMetadataCustomMapModifier"; public static final String CRYPTER = "CRYPTER"; } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java index e675b7637e..cda4e37b50 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java @@ -197,7 +197,7 @@ public class PinotSegmentUploadDownloadRestletResource { } private SuccessResponse uploadSegment(@Nullable String tableName, TableType tableType, - @Nullable FormDataMultiPart multiPart, boolean moveSegmentToFinalLocation, boolean enableParallelPushProtection, + @Nullable FormDataMultiPart multiPart, boolean copySegmentToFinalLocation, boolean enableParallelPushProtection, boolean allowRefresh, HttpHeaders headers, Request request) { if (StringUtils.isNotEmpty(tableName)) { TableType tableTypeFromTableName = TableNameBuilder.getTableTypeFromTableName(tableName); @@ -213,13 +213,15 @@ public class PinotSegmentUploadDownloadRestletResource { extractHttpHeader(headers, CommonConstants.Controller.TABLE_NAME_HTTP_HEADER); String uploadTypeStr = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE); - String downloadURI = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI); + String sourceDownloadURIStr = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI); String crypterClassNameInHeader = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.CRYPTER); String ingestionDescriptor = extractHttpHeader(headers, CommonConstants.Controller.INGESTION_DESCRIPTOR); File tempEncryptedFile = null; File tempDecryptedFile = null; File tempSegmentDir = null; + // The downloadUri for putting into segment zk metadata + String segmentDownloadURIStr = sourceDownloadURIStr; try { ControllerFilePathProvider provider = ControllerFilePathProvider.getInstance(); String tempFileName = TMP_DIR_PREFIX + UUID.randomUUID(); @@ -238,20 +240,22 @@ public class PinotSegmentUploadDownloadRestletResource { "Segment file (as multipart/form-data) is required for SEGMENT upload mode", Response.Status.BAD_REQUEST); } - if (!moveSegmentToFinalLocation && StringUtils.isEmpty(downloadURI)) { + if (!copySegmentToFinalLocation && StringUtils.isEmpty(sourceDownloadURIStr)) { throw new ControllerApplicationException(LOGGER, - "Download URI is required if segment should not be copied to the deep store", + "Source download URI is required in header field 'DOWNLOAD_URI' if segment should not be copied to " + + "the deep store", Response.Status.BAD_REQUEST); } createSegmentFileFromMultipart(multiPart, destFile); segmentSizeInBytes = destFile.length(); break; case URI: - if (StringUtils.isEmpty(downloadURI)) { - throw new ControllerApplicationException(LOGGER, "Download URI is required for URI upload mode", + if (StringUtils.isEmpty(sourceDownloadURIStr)) { + throw new ControllerApplicationException(LOGGER, + "Source download URI is required in header field 'DOWNLOAD_URI' for URI upload mode", Response.Status.BAD_REQUEST); } - downloadSegmentFileFromURI(downloadURI, destFile, tableName); + downloadSegmentFileFromURI(sourceDownloadURIStr, destFile, tableName); segmentSizeInBytes = destFile.length(); break; case METADATA: @@ -260,14 +264,19 @@ public class PinotSegmentUploadDownloadRestletResource { "Segment metadata file (as multipart/form-data) is required for METADATA upload mode", Response.Status.BAD_REQUEST); } - if (StringUtils.isEmpty(downloadURI)) { - throw new ControllerApplicationException(LOGGER, "Download URI is required for METADATA upload mode", + if (StringUtils.isEmpty(sourceDownloadURIStr)) { + throw new ControllerApplicationException(LOGGER, + "Source download URI is required in header field 'DOWNLOAD_URI' for METADATA upload mode", Response.Status.BAD_REQUEST); } - moveSegmentToFinalLocation = false; + // override copySegmentToFinalLocation if override provided in headers:COPY_SEGMENT_TO_DEEP_STORE + // else set to false for backward compatibility + String copySegmentToDeepStore = + extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE); + copySegmentToFinalLocation = Boolean.parseBoolean(copySegmentToDeepStore); createSegmentFileFromMultipart(multiPart, destFile); try { - URI segmentURI = new URI(downloadURI); + URI segmentURI = new URI(sourceDownloadURIStr); PinotFS pinotFS = PinotFSFactory.create(segmentURI.getScheme()); segmentSizeInBytes = pinotFS.length(segmentURI); } catch (Exception e) { @@ -332,24 +341,25 @@ public class PinotSegmentUploadDownloadRestletResource { // Update download URI if controller is responsible for moving the segment to the deep store URI finalSegmentLocationURI = null; - if (moveSegmentToFinalLocation) { + if (copySegmentToFinalLocation) { URI dataDirURI = provider.getDataDirURI(); String dataDirPath = dataDirURI.toString(); String encodedSegmentName = URIUtils.encode(segmentName); String finalSegmentLocationPath = URIUtils.getPath(dataDirPath, rawTableName, encodedSegmentName); if (dataDirURI.getScheme().equalsIgnoreCase(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME)) { - downloadURI = URIUtils.getPath(provider.getVip(), "segments", rawTableName, encodedSegmentName); + segmentDownloadURIStr = URIUtils.getPath(provider.getVip(), "segments", rawTableName, encodedSegmentName); } else { - downloadURI = finalSegmentLocationPath; + segmentDownloadURIStr = finalSegmentLocationPath; } finalSegmentLocationURI = URIUtils.getUri(finalSegmentLocationPath); } - LOGGER.info("Using download URI: {} for segment: {} of table: {} (move segment: {})", downloadURI, segmentFile, - tableNameWithType, moveSegmentToFinalLocation); + LOGGER.info("Using segment download URI: {} for segment: {} of table: {} (move segment: {})", + segmentDownloadURIStr, segmentFile, tableNameWithType, copySegmentToFinalLocation); ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, _controllerConf, _controllerMetrics); - zkOperator.completeSegmentOperations(tableNameWithType, segmentMetadata, finalSegmentLocationURI, segmentFile, - downloadURI, crypterName, segmentSizeInBytes, enableParallelPushProtection, allowRefresh, headers); + zkOperator.completeSegmentOperations(tableNameWithType, segmentMetadata, uploadType, finalSegmentLocationURI, + segmentFile, sourceDownloadURIStr, segmentDownloadURIStr, crypterName, segmentSizeInBytes, + enableParallelPushProtection, allowRefresh, headers); return new SuccessResponse("Successfully uploaded segment: " + segmentName + " of table: " + tableNameWithType); } catch (WebApplicationException e) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java index b75d112ce2..62bc770846 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.controller.api.upload; +import com.google.common.base.Preconditions; import java.io.File; import java.net.URI; import javax.annotation.Nullable; @@ -29,6 +30,7 @@ import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifi import org.apache.pinot.common.metrics.ControllerMeter; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.common.utils.FileUploadDownloadClient.FileUploadType; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.api.exception.ControllerApplicationException; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; @@ -60,7 +62,8 @@ public class ZKOperator { } public void completeSegmentOperations(String tableNameWithType, SegmentMetadata segmentMetadata, - @Nullable URI finalSegmentLocationURI, File segmentFile, String downloadUrl, @Nullable String crypterName, + FileUploadType uploadType, @Nullable URI finalSegmentLocationURI, File segmentFile, + @Nullable String sourceDownloadURIStr, String segmentDownloadURIStr, @Nullable String crypterName, long segmentSizeInBytes, boolean enableParallelPushProtection, boolean allowRefresh, HttpHeaders headers) throws Exception { String segmentName = segmentMetadata.getName(); @@ -76,8 +79,9 @@ public class ZKOperator { Response.Status.GONE); } LOGGER.info("Adding new segment: {} to table: {}", segmentName, tableNameWithType); - processNewSegment(tableNameWithType, segmentMetadata, finalSegmentLocationURI, segmentFile, downloadUrl, - crypterName, segmentSizeInBytes, enableParallelPushProtection, headers); + processNewSegment(tableNameWithType, segmentMetadata, uploadType, finalSegmentLocationURI, segmentFile, + sourceDownloadURIStr, segmentDownloadURIStr, crypterName, segmentSizeInBytes, enableParallelPushProtection, + headers); } else { // Refresh an existing segment if (!allowRefresh) { @@ -89,16 +93,16 @@ public class ZKOperator { tableNameWithType), Response.Status.CONFLICT); } LOGGER.info("Segment: {} already exists in table: {}, refreshing it", segmentName, tableNameWithType); - processExistingSegment(tableNameWithType, segmentMetadata, existingSegmentMetadataZNRecord, - finalSegmentLocationURI, segmentFile, downloadUrl, crypterName, segmentSizeInBytes, - enableParallelPushProtection, headers); + processExistingSegment(tableNameWithType, segmentMetadata, uploadType, existingSegmentMetadataZNRecord, + finalSegmentLocationURI, segmentFile, sourceDownloadURIStr, segmentDownloadURIStr, crypterName, + segmentSizeInBytes, enableParallelPushProtection, headers); } } private void processExistingSegment(String tableNameWithType, SegmentMetadata segmentMetadata, - ZNRecord existingSegmentMetadataZNRecord, @Nullable URI finalSegmentLocationURI, File segmentFile, - String downloadUrl, @Nullable String crypterName, long segmentSizeInBytes, boolean enableParallelPushProtection, - HttpHeaders headers) + FileUploadType uploadType, ZNRecord existingSegmentMetadataZNRecord, @Nullable URI finalSegmentLocationURI, + File segmentFile, @Nullable String sourceDownloadURIStr, String segmentDownloadURIStr, + @Nullable String crypterName, long segmentSizeInBytes, boolean enableParallelPushProtection, HttpHeaders headers) throws Exception { String segmentName = segmentMetadata.getName(); int expectedVersion = existingSegmentMetadataZNRecord.getVersion(); @@ -179,8 +183,7 @@ public class ZKOperator { "New segment crc {} is different than the existing segment crc {}. Updating ZK metadata and refreshing " + "segment {}", newCrc, existingCrc, segmentName); if (finalSegmentLocationURI != null) { - moveSegmentToPermanentDirectory(segmentFile, finalSegmentLocationURI); - LOGGER.info("Moved segment: {} of table: {} to final location: {}", segmentName, tableNameWithType, + copySegmentToDeepStore(tableNameWithType, segmentName, uploadType, segmentFile, sourceDownloadURIStr, finalSegmentLocationURI); } @@ -191,12 +194,12 @@ public class ZKOperator { if (customMapModifier == null) { // If no modifier is provided, use the custom map from the segment metadata segmentZKMetadata.setCustomMap(null); - ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata, downloadUrl, - crypterName, segmentSizeInBytes); + ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata, + segmentDownloadURIStr, crypterName, segmentSizeInBytes); } else { // If modifier is provided, first set the custom map from the segment metadata, then apply the modifier - ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata, downloadUrl, - crypterName, segmentSizeInBytes); + ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata, + segmentDownloadURIStr, crypterName, segmentSizeInBytes); segmentZKMetadata.setCustomMap(customMapModifier.modifyMap(segmentZKMetadata.getCustomMap())); } if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata, expectedVersion)) { @@ -237,16 +240,17 @@ public class ZKOperator { } } - private void processNewSegment(String tableNameWithType, SegmentMetadata segmentMetadata, - @Nullable URI finalSegmentLocationURI, File segmentFile, String downloadUrl, @Nullable String crypterName, - long segmentSizeInBytes, boolean enableParallelPushProtection, HttpHeaders headers) + private void processNewSegment(String tableNameWithType, SegmentMetadata segmentMetadata, FileUploadType uploadType, + @Nullable URI finalSegmentLocationURI, File segmentFile, @Nullable String sourceDownloadURIStr, + String segmentDownloadURIStr, @Nullable String crypterName, long segmentSizeInBytes, + boolean enableParallelPushProtection, HttpHeaders headers) throws Exception { String segmentName = segmentMetadata.getName(); SegmentZKMetadata newSegmentZKMetadata; try { newSegmentZKMetadata = - ZKMetadataUtils.createSegmentZKMetadata(tableNameWithType, segmentMetadata, downloadUrl, crypterName, - segmentSizeInBytes); + ZKMetadataUtils.createSegmentZKMetadata(tableNameWithType, segmentMetadata, segmentDownloadURIStr, + crypterName, segmentSizeInBytes); } catch (IllegalArgumentException e) { throw new ControllerApplicationException(LOGGER, String.format("Got invalid segment metadata when adding segment: %s for table: %s, reason: %s", segmentName, @@ -274,8 +278,7 @@ public class ZKOperator { if (finalSegmentLocationURI != null) { try { - moveSegmentToPermanentDirectory(segmentFile, finalSegmentLocationURI); - LOGGER.info("Moved segment: {} of table: {} to final location: {}", segmentName, tableNameWithType, + copySegmentToDeepStore(tableNameWithType, segmentName, uploadType, segmentFile, sourceDownloadURIStr, finalSegmentLocationURI); } catch (Exception e) { // Cleanup the Zk entry and the segment from the permanent directory if it exists. @@ -310,9 +313,39 @@ public class ZKOperator { } } - private void moveSegmentToPermanentDirectory(File segmentFile, URI finalSegmentLocationURI) + private void copySegmentToDeepStore(String tableNameWithType, String segmentName, FileUploadType uploadType, + File segmentFile, String sourceDownloadURIStr, URI finalSegmentLocationURI) + throws Exception { + if (uploadType == FileUploadType.METADATA) { + // In Metadata push, local segmentFile only contains metadata. + // Copy segment over from sourceDownloadURI to final location. + copyFromSegmentURIToDeepStore(new URI(sourceDownloadURIStr), finalSegmentLocationURI); + LOGGER.info("Copied segment: {} of table: {} to final location: {}", segmentName, tableNameWithType, + finalSegmentLocationURI); + } else { + // In push types other than METADATA, local segmentFile contains the complete segment. + // Move local segment to final location + copyFromSegmentFileToDeepStore(segmentFile, finalSegmentLocationURI); + LOGGER.info("Copied segment: {} of table: {} to final location: {}", segmentName, tableNameWithType, + finalSegmentLocationURI); + } + } + + private void copyFromSegmentFileToDeepStore(File segmentFile, URI finalSegmentLocationURI) throws Exception { LOGGER.info("Copying segment from: {} to: {}", segmentFile.getAbsolutePath(), finalSegmentLocationURI); PinotFSFactory.create(finalSegmentLocationURI.getScheme()).copyFromLocalFile(segmentFile, finalSegmentLocationURI); } + + private void copyFromSegmentURIToDeepStore(URI sourceDownloadURI, URI finalSegmentLocationURI) + throws Exception { + if (sourceDownloadURI.equals(finalSegmentLocationURI)) { + LOGGER.info("Skip copying segment as sourceDownloadURI: {} is the same as finalSegmentLocationURI", + sourceDownloadURI); + } else { + Preconditions.checkState(sourceDownloadURI.getScheme().equals(finalSegmentLocationURI.getScheme())); + LOGGER.info("Copying segment from: {} to: {}", sourceDownloadURI, finalSegmentLocationURI); + PinotFSFactory.create(finalSegmentLocationURI.getScheme()).copy(sourceDownloadURI, finalSegmentLocationURI); + } + } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java index 28c33eb256..ef9910677d 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java @@ -18,29 +18,39 @@ */ package org.apache.pinot.controller.api.upload; +import com.google.common.collect.ImmutableList; import java.io.File; import java.net.URI; import java.util.HashMap; +import java.util.List; import java.util.Map; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; +import org.apache.commons.io.FileUtils; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.common.utils.FileUploadDownloadClient.FileUploadType; import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.common.utils.URIUtils; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.api.exception.ControllerApplicationException; import org.apache.pinot.controller.helix.ControllerTest; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader; import org.apache.pinot.segment.spi.SegmentMetadata; +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.util.TestUtils; +import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -51,6 +61,9 @@ import static org.testng.Assert.*; public class ZKOperatorTest { + private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "ZKOperatorTest"); + private static final File SEGMENT_DIR = new File(TEMP_DIR, "segmentDir"); + private static final File DATA_DIR = new File(TEMP_DIR, "dataDir"); private static final String RAW_TABLE_NAME = "testTable"; private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME); private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME); @@ -66,6 +79,7 @@ public class ZKOperatorTest { @BeforeClass public void setUp() throws Exception { + FileUtils.deleteQuietly(TEMP_DIR); TEST_INSTANCE.setupSharedStateAndValidate(); _resourceManager = TEST_INSTANCE.getHelixResourceManager(); @@ -93,6 +107,87 @@ public class ZKOperatorTest { return streamConfigs; } + private File generateSegment() + throws Exception { + Schema schema = + new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME).addSingleValueDimension("colA", DataType.INT).build(); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build(); + + SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema); + File outputDir = new File(SEGMENT_DIR, "segment"); + config.setOutDir(outputDir.getAbsolutePath()); + config.setSegmentName(SEGMENT_NAME); + GenericRow row = new GenericRow(); + row.putValue("colA", "100"); + List<GenericRow> rows = ImmutableList.of(row); + + SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); + driver.init(config, new GenericRowRecordReader(rows)); + driver.build(); + File segmentTar = new File(SEGMENT_DIR, SEGMENT_NAME + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); + TarGzCompressionUtils.createTarGzFile(new File(outputDir, SEGMENT_NAME), + new File(SEGMENT_DIR, SEGMENT_NAME + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION)); + FileUtils.deleteQuietly(outputDir); + return segmentTar; + } + + private void checkSegmentZkMetadata(String segmentName, long crc, long creationTime) { + SegmentZKMetadata segmentZKMetadata = _resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, segmentName); + assertNotNull(segmentZKMetadata); + assertEquals(segmentZKMetadata.getCrc(), crc); + assertEquals(segmentZKMetadata.getCreationTime(), creationTime); + long pushTime = segmentZKMetadata.getPushTime(); + assertTrue(pushTime > 0); + assertEquals(segmentZKMetadata.getRefreshTime(), Long.MIN_VALUE); + assertEquals(segmentZKMetadata.getDownloadUrl(), "downloadUrl"); + assertEquals(segmentZKMetadata.getCrypterName(), "crypter"); + assertEquals(segmentZKMetadata.getSegmentUploadStartTime(), -1); + assertEquals(segmentZKMetadata.getSizeInBytes(), 10); + } + + @Test + public void testMetadataUploadType() + throws Exception { + String segmentName = "metadataTest"; + FileUtils.deleteQuietly(TEMP_DIR); + ZKOperator zkOperator = new ZKOperator(_resourceManager, mock(ControllerConf.class), mock(ControllerMetrics.class)); + + SegmentMetadata segmentMetadata = mock(SegmentMetadata.class); + when(segmentMetadata.getName()).thenReturn(segmentName); + when(segmentMetadata.getCrc()).thenReturn("12345"); + when(segmentMetadata.getIndexCreationTime()).thenReturn(123L); + HttpHeaders httpHeaders = mock(HttpHeaders.class); + + File segmentTar = generateSegment(); + String sourceDownloadURIStr = segmentTar.toURI().toString(); + File segmentFile = new File("metadataOnly"); + + // with finalSegmentLocation not null + File finalSegmentLocation = new File(DATA_DIR, segmentName); + Assert.assertFalse(finalSegmentLocation.exists()); + zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.METADATA, + finalSegmentLocation.toURI(), segmentFile, sourceDownloadURIStr, "downloadUrl", "crypter", 10, true, true, + httpHeaders); + Assert.assertTrue(finalSegmentLocation.exists()); + Assert.assertTrue(segmentTar.exists()); + checkSegmentZkMetadata(segmentName, 12345L, 123L); + + _resourceManager.deleteSegment(OFFLINE_TABLE_NAME, segmentName); + // Wait for the segment Zk entry to be deleted. + TestUtils.waitForCondition(aVoid -> { + SegmentZKMetadata segmentZKMetadata = _resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, segmentName); + return segmentZKMetadata == null; + }, 30_000L, "Failed to delete segmentZkMetadata."); + + FileUtils.deleteQuietly(DATA_DIR); + // with finalSegmentLocation null + zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.METADATA, null, + segmentFile, sourceDownloadURIStr, "downloadUrl", "crypter", 10, true, true, httpHeaders); + Assert.assertFalse(finalSegmentLocation.exists()); + Assert.assertTrue(segmentTar.exists()); + checkSegmentZkMetadata(segmentName, 12345L, 123L); + } + @Test public void testCompleteSegmentOperations() throws Exception { @@ -110,9 +205,8 @@ public class ZKOperatorTest { URI finalSegmentLocationURI = URIUtils.getUri("mockPath", OFFLINE_TABLE_NAME, URIUtils.encode(segmentMetadata.getName())); File segmentFile = new File(new File("foo/bar"), "mockChild"); - - zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, finalSegmentLocationURI, segmentFile, - "downloadUrl", "crypter", 10, true, true, httpHeaders); + zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, + finalSegmentLocationURI, segmentFile, "downloadUrl", "downloadUrl", "crypter", 10, true, true, httpHeaders); fail(); } catch (Exception e) { // Expected @@ -124,8 +218,8 @@ public class ZKOperatorTest { return segmentZKMetadata == null; }, 30_000L, "Failed to delete segmentZkMetadata."); - zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, "downloadUrl", "crypter", 10, - true, true, httpHeaders); + zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null, + "downloadUrl", "downloadUrl", "crypter", 10, true, true, httpHeaders); SegmentZKMetadata segmentZKMetadata = _resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME); assertNotNull(segmentZKMetadata); @@ -141,8 +235,8 @@ public class ZKOperatorTest { // Upload the same segment with allowRefresh = false. Validate that an exception is thrown. try { - zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, "otherDownloadUrl", - "otherCrypter", 10, true, false, httpHeaders); + zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null, + "otherDownloadUrl", "otherDownloadUrl", "otherCrypter", 10, true, false, httpHeaders); fail(); } catch (Exception e) { // Expected @@ -151,8 +245,8 @@ public class ZKOperatorTest { // Refresh the segment with unmatched IF_MATCH field when(httpHeaders.getHeaderString(HttpHeaders.IF_MATCH)).thenReturn("123"); try { - zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, "otherDownloadUrl", - "otherCrypter", 10, true, true, httpHeaders); + zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null, + "otherDownloadUrl", "otherDownloadUrl", "otherCrypter", 10, true, true, httpHeaders); fail(); } catch (Exception e) { // Expected @@ -162,8 +256,8 @@ public class ZKOperatorTest { // downloadURL and crypter when(httpHeaders.getHeaderString(HttpHeaders.IF_MATCH)).thenReturn("12345"); when(segmentMetadata.getIndexCreationTime()).thenReturn(456L); - zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, "otherDownloadUrl", - "otherCrypter", 10, true, true, httpHeaders); + zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null, + "otherDownloadUrl", "otherDownloadUrl", "otherCrypter", 10, true, true, httpHeaders); segmentZKMetadata = _resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME); assertNotNull(segmentZKMetadata); @@ -185,8 +279,8 @@ public class ZKOperatorTest { when(segmentMetadata.getIndexCreationTime()).thenReturn(789L); // Add a tiny sleep to guarantee that refresh time is different from the previous round Thread.sleep(10); - zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, "otherDownloadUrl", - "otherCrypter", 100, true, true, httpHeaders); + zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null, + "otherDownloadUrl", "otherDownloadUrl", "otherCrypter", 100, true, true, httpHeaders); segmentZKMetadata = _resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME); assertNotNull(segmentZKMetadata); @@ -209,8 +303,8 @@ public class ZKOperatorTest { SegmentMetadata segmentMetadata = mock(SegmentMetadata.class); when(segmentMetadata.getName()).thenReturn(SEGMENT_NAME); when(segmentMetadata.getCrc()).thenReturn("12345"); - zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, null, null, "downloadUrl", null, 10, - true, true, mock(HttpHeaders.class)); + zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null, + "downloadUrl", "downloadUrl", null, 10, true, true, mock(HttpHeaders.class)); SegmentZKMetadata segmentZKMetadata = _resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, SEGMENT_NAME); assertNotNull(segmentZKMetadata); @@ -222,8 +316,8 @@ public class ZKOperatorTest { when(segmentMetadata.getName()).thenReturn(LLC_SEGMENT_NAME); when(segmentMetadata.getCrc()).thenReturn("23456"); try { - zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, null, null, "downloadUrl", null, 10, - true, true, mock(HttpHeaders.class)); + zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null, + "downloadUrl", "downloadUrl", null, 10, true, true, mock(HttpHeaders.class)); fail(); } catch (ControllerApplicationException e) { assertEquals(e.getResponse().getStatus(), Response.Status.BAD_REQUEST.getStatusCode()); @@ -233,8 +327,8 @@ public class ZKOperatorTest { // Uploading a segment with LLC segment name and start/end offset should success when(segmentMetadata.getStartOffset()).thenReturn("0"); when(segmentMetadata.getEndOffset()).thenReturn("1234"); - zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, null, null, "downloadUrl", null, 10, - true, true, mock(HttpHeaders.class)); + zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null, + "downloadUrl", "downloadUrl", null, 10, true, true, mock(HttpHeaders.class)); segmentZKMetadata = _resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, LLC_SEGMENT_NAME); assertNotNull(segmentZKMetadata); @@ -246,8 +340,8 @@ public class ZKOperatorTest { when(segmentMetadata.getCrc()).thenReturn("34567"); when(segmentMetadata.getStartOffset()).thenReturn(null); when(segmentMetadata.getEndOffset()).thenReturn(null); - zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, null, null, "downloadUrl", null, 10, - true, true, mock(HttpHeaders.class)); + zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null, + "downloadUrl", "downloadUrl", null, 10, true, true, mock(HttpHeaders.class)); segmentZKMetadata = _resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, LLC_SEGMENT_NAME); assertNotNull(segmentZKMetadata); @@ -259,8 +353,8 @@ public class ZKOperatorTest { when(segmentMetadata.getCrc()).thenReturn("45678"); when(segmentMetadata.getStartOffset()).thenReturn("1234"); when(segmentMetadata.getEndOffset()).thenReturn("2345"); - zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, null, null, "downloadUrl", null, 10, - true, true, mock(HttpHeaders.class)); + zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null, + "downloadUrl", "downloadUrl", null, 10, true, true, mock(HttpHeaders.class)); segmentZKMetadata = _resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, LLC_SEGMENT_NAME); assertNotNull(segmentZKMetadata); @@ -271,6 +365,7 @@ public class ZKOperatorTest { @AfterClass public void tearDown() { + FileUtils.deleteQuietly(TEMP_DIR); TEST_INSTANCE.cleanup(); } } diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java index fbea4e6220..21abfd6430 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java @@ -283,12 +283,28 @@ public class ClusterIntegrationTestUtils { public static void buildSegmentFromAvro(File avroFile, TableConfig tableConfig, org.apache.pinot.spi.data.Schema schema, int segmentIndex, File segmentDir, File tarDir) throws Exception { + // Test segment with space and special character in the file name + buildSegmentFromAvro(avroFile, tableConfig, schema, segmentIndex + " %", segmentDir, tarDir); + } + + /** + * Builds one Pinot segment from the given Avro file. + * + * @param avroFile Avro file + * @param tableConfig Pinot table config + * @param schema Pinot schema + * @param segmentNamePostfix Segment name postfix + * @param segmentDir Output directory for the un-tarred segments + * @param tarDir Output directory for the tarred segments + */ + public static void buildSegmentFromAvro(File avroFile, TableConfig tableConfig, + org.apache.pinot.spi.data.Schema schema, String segmentNamePostfix, File segmentDir, File tarDir) + throws Exception { SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema); segmentGeneratorConfig.setInputFilePath(avroFile.getPath()); segmentGeneratorConfig.setOutDir(segmentDir.getPath()); segmentGeneratorConfig.setTableName(tableConfig.getTableName()); - // Test segment with space and special character in the file name - segmentGeneratorConfig.setSegmentNamePostfix(segmentIndex + " %"); + segmentGeneratorConfig.setSegmentNamePostfix(segmentNamePostfix); // Build the segment SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl(); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentUploadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentUploadIntegrationTest.java new file mode 100644 index 0000000000..ae9d3989b6 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentUploadIntegrationTest.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.plugin.ingestion.batch.standalone.SegmentMetadataPushJobRunner; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec; +import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec; +import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec; +import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec; +import org.apache.pinot.spi.ingestion.batch.spec.TableSpec; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.util.TestUtils; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +/** + * Test for advanced push types. + * Currently only tests METADATA push type. + * todo: add test for URI push + */ +public class SegmentUploadIntegrationTest extends BaseClusterIntegrationTest { + + @Override + protected Map<String, String> getStreamConfigs() { + return null; + } + + @Override + protected String getSortedColumn() { + return null; + } + + @Override + protected List<String> getInvertedIndexColumns() { + return null; + } + + @Override + protected List<String> getNoDictionaryColumns() { + return null; + } + + @Override + protected List<String> getRangeIndexColumns() { + return null; + } + + @Override + protected List<String> getBloomFilterColumns() { + return null; + } + + @BeforeClass + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + // Start Zk and Kafka + startZk(); + + // Start the Pinot cluster + startController(); + startBroker(); + startServer(); + } + + @Test + public void testUploadAndQuery() + throws Exception { + // Create and upload the schema and table config + Schema schema = createSchema(); + addSchema(schema); + TableConfig offlineTableConfig = createOfflineTableConfig(); + addTableConfig(offlineTableConfig); + + List<File> avroFiles = getAllAvroFiles(); + + // Create 1 segment, for METADATA push WITH move to final location + ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFiles.get(0), offlineTableConfig, schema, "_with_move", + _segmentDir, _tarDir); + + SegmentMetadataPushJobRunner runner = new SegmentMetadataPushJobRunner(); + SegmentGenerationJobSpec jobSpec = new SegmentGenerationJobSpec(); + PushJobSpec pushJobSpec = new PushJobSpec(); + // set moveToDeepStoreForMetadataPush to true + pushJobSpec.setCopyToDeepStoreForMetadataPush(true); + jobSpec.setPushJobSpec(pushJobSpec); + PinotFSSpec fsSpec = new PinotFSSpec(); + fsSpec.setScheme("file"); + fsSpec.setClassName("org.apache.pinot.spi.filesystem.LocalPinotFS"); + jobSpec.setPinotFSSpecs(Lists.newArrayList(fsSpec)); + jobSpec.setOutputDirURI(_tarDir.getAbsolutePath()); + TableSpec tableSpec = new TableSpec(); + tableSpec.setTableName(DEFAULT_TABLE_NAME); + jobSpec.setTableSpec(tableSpec); + PinotClusterSpec clusterSpec = new PinotClusterSpec(); + clusterSpec.setControllerURI(_controllerBaseApiUrl); + jobSpec.setPinotClusterSpecs(new PinotClusterSpec[]{clusterSpec}); + + File dataDir = new File(_controllerConfig.getDataDir()); + File dataDirSegments = new File(dataDir, DEFAULT_TABLE_NAME); + + // Not present in dataDir, only present in sourceDir + Assert.assertFalse(dataDirSegments.exists()); + Assert.assertEquals(_tarDir.listFiles().length, 1); + + runner.init(jobSpec); + runner.run(); + + // Segment should be seen in dataDir + Assert.assertTrue(dataDirSegments.exists()); + Assert.assertEquals(dataDirSegments.listFiles().length, 1); + Assert.assertEquals(_tarDir.listFiles().length, 1); + + // test segment loaded + JsonNode segmentsList = getSegmentsList(); + Assert.assertEquals(segmentsList.size(), 1); + String segmentNameWithMove = segmentsList.get(0).asText(); + Assert.assertTrue(segmentNameWithMove.endsWith("_with_move")); + long numDocs = getNumDocs(segmentNameWithMove); + testCountStar(numDocs); + + // Clear segment and tar dir + for (File segment : _segmentDir.listFiles()) { + FileUtils.deleteQuietly(segment); + } + for (File tar : _tarDir.listFiles()) { + FileUtils.deleteQuietly(tar); + } + + // Create 1 segment, for METADATA push WITHOUT move to final location + ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFiles.get(1), offlineTableConfig, schema, "_without_move", + _segmentDir, _tarDir); + jobSpec.setPushJobSpec(new PushJobSpec()); + runner = new SegmentMetadataPushJobRunner(); + + Assert.assertEquals(dataDirSegments.listFiles().length, 1); + Assert.assertEquals(_tarDir.listFiles().length, 1); + + runner.init(jobSpec); + runner.run(); + + // should not see new segments in dataDir + Assert.assertEquals(dataDirSegments.listFiles().length, 1); + Assert.assertEquals(_tarDir.listFiles().length, 1); + + // test segment loaded + segmentsList = getSegmentsList(); + Assert.assertEquals(segmentsList.size(), 2); + String segmentNameWithoutMove = null; + for (JsonNode segment : segmentsList) { + if (segment.asText().endsWith("_without_move")) { + segmentNameWithoutMove = segment.asText(); + } + } + Assert.assertNotNull(segmentNameWithoutMove); + numDocs += getNumDocs(segmentNameWithoutMove); + testCountStar(numDocs); + } + + private long getNumDocs(String segmentName) + throws IOException { + return JsonUtils.stringToJsonNode( + sendGetRequest(_controllerRequestURLBuilder.forSegmentMetadata(DEFAULT_TABLE_NAME, segmentName))) + .get("segment.total.docs").asLong(); + } + + private JsonNode getSegmentsList() + throws IOException { + return JsonUtils.stringToJsonNode(sendGetRequest( + _controllerRequestURLBuilder.forSegmentListAPIWithTableType(DEFAULT_TABLE_NAME, + TableType.OFFLINE.toString()))) + .get(0).get("OFFLINE"); + } + + protected void testCountStar(final long countStarResult) { + TestUtils.waitForCondition(new Function<Void, Boolean>() { + @Nullable + @Override + public Boolean apply(@Nullable Void aVoid) { + try { + return getCurrentCountStarResult() == countStarResult; + } catch (Exception e) { + return null; + } + } + }, 100L, 300_000, "Failed to load " + countStarResult + " documents", true); + } + + @AfterClass + public void tearDown() + throws Exception { + dropOfflineTable(getTableName()); + stopServer(); + stopBroker(); + stopController(); + stopZk(); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java index 1d91e5bb4e..b756c7f760 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java @@ -264,6 +264,10 @@ public class SegmentPushUtils implements Serializable { headers.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, segmentUriPath)); headers.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE, FileUploadDownloadClient.FileUploadType.METADATA.toString())); + if (spec.getPushJobSpec() != null) { + headers.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE, + String.valueOf(spec.getPushJobSpec().getCopyToDeepStoreForMetadataPush()))); + } headers.addAll(AuthProviderUtils.toRequestHeaders(authProvider)); SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java index 40944978ca..04481baf1f 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java @@ -41,6 +41,11 @@ public class PushJobSpec implements Serializable { */ private long _pushRetryIntervalMillis = 1000; + /** + * Applicable for URI and METADATA push types. + * If true, and if segment was not already in the deep store, move it to deep store. + */ + private boolean _copyToDeepStoreForMetadataPush; /** * Used in SegmentUriPushJobRunner, which is used to composite the segment uri to send to pinot controller. * The URI sends to controller is in the format ${segmentUriPrefix}${segmentPath}${segmentUriSuffix} @@ -121,4 +126,12 @@ public class PushJobSpec implements Serializable { public void setPushParallelism(int pushParallelism) { _pushParallelism = pushParallelism; } + + public boolean getCopyToDeepStoreForMetadataPush() { + return _copyToDeepStoreForMetadataPush; + } + + public void setCopyToDeepStoreForMetadataPush(boolean copyToDeepStoreForMetadataPush) { + _copyToDeepStoreForMetadataPush = copyToDeepStoreForMetadataPush; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org