This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4000568  Fix race conditions between segment merge/roll-up and purge 
(or convertToRawIndex) tasks: (#7427)
4000568 is described below

commit 400056804fc1c0da73c5ec78ca3fe1a1bcab9231
Author: Jiapeng Tao <[email protected]>
AuthorDate: Thu Sep 23 17:22:32 2021 -0700

    Fix race conditions between segment merge/roll-up and purge (or 
convertToRawIndex) tasks: (#7427)
    
    1. Add REFRESH_ONLY header for purge and convertToRawIndex tasks, segment 
upload api will abort the request if the segment does not exist or is deleted 
before the upload request is completed.
    2. Honor segment lineage for convertToRawIndexTaskGenerator
---
 .../SegmentLineageBasedSegmentPreSelector.java     |  2 +-
 .../broker/broker/HelixBrokerStarterTest.java      |  3 +-
 .../pinot/common/lineage/SegmentLineageUtils.java  |  2 +-
 .../common/utils/FileUploadDownloadClient.java     |  1 +
 .../PinotSegmentUploadDownloadRestletResource.java | 22 +++++++
 .../pinot/controller/api/upload/ZKOperator.java    | 24 ++++++--
 .../helix/core/PinotHelixResourceManager.java      |  5 +-
 .../validation/ValidationManagerTest.java          |  3 +-
 .../tests/OfflineClusterIntegrationTest.java       | 71 ++++++++++++++++++++++
 .../tasks/BaseSingleSegmentConversionExecutor.java |  4 ++
 .../ConvertToRawIndexTaskGenerator.java            | 19 +++++-
 .../mergerollup/MergeRollupTaskGenerator.java      |  2 +-
 12 files changed, 146 insertions(+), 12 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentLineageBasedSegmentPreSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentLineageBasedSegmentPreSelector.java
index cd41c6e..b9b4092 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentLineageBasedSegmentPreSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentLineageBasedSegmentPreSelector.java
@@ -44,7 +44,7 @@ public class SegmentLineageBasedSegmentPreSelector implements 
SegmentPreSelector
   @Override
   public Set<String> preSelect(Set<String> onlineSegments) {
     SegmentLineage segmentLineage = 
SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, 
_tableNameWithType);
-    SegmentLineageUtils.filterSegmentsBasedOnLineageInplace(onlineSegments, 
segmentLineage);
+    SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(onlineSegments, 
segmentLineage);
     return onlineSegments;
   }
 }
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
index 9d667f2..6464e46 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -65,6 +65,7 @@ public class HelixBrokerStarterTest extends ControllerTest {
   private static final int NUM_BROKERS = 3;
   private static final int NUM_SERVERS = 1;
   private static final int NUM_OFFLINE_SEGMENTS = 5;
+  private static final int EXPECTED_VERSION = -1;
 
   private HelixBrokerStarter _brokerStarter;
 
@@ -215,7 +216,7 @@ public class HelixBrokerStarterTest extends ControllerTest {
         _helixResourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, 
segmentToRefresh);
     _helixResourceManager.refreshSegment(OFFLINE_TABLE_NAME,
         
SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(RAW_TABLE_NAME, 
segmentToRefresh, newEndTime),
-        segmentZKMetadata, "downloadUrl", null);
+        segmentZKMetadata, EXPECTED_VERSION, "downloadUrl", null);
 
     TestUtils.waitForCondition(aVoid -> 
routingManager.getTimeBoundaryInfo(OFFLINE_TABLE_NAME).getTimeValue()
         .equals(Integer.toString(newEndTime - 1)), 30_000L, "Failed to update 
the time boundary for refreshed segment");
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageUtils.java
index 0a99509..458b247 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageUtils.java
@@ -43,7 +43,7 @@ public class SegmentLineageUtils {
    * Use the segment lineage metadata to filters out either merged segments or 
original segments in place
    * to make sure that the final segments contain no duplicate data.
    */
-  public static void filterSegmentsBasedOnLineageInplace(Set<String> segments, 
SegmentLineage segmentLineage) {
+  public static void filterSegmentsBasedOnLineageInPlace(Set<String> segments, 
SegmentLineage segmentLineage) {
     if (segmentLineage != null) {
       for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
         LineageEntry lineageEntry = 
segmentLineage.getLineageEntry(lineageEntryId);
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index 0e3ce23..bcd71ae 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -85,6 +85,7 @@ public class FileUploadDownloadClient implements Closeable {
 
   public static class CustomHeaders {
     public static final String UPLOAD_TYPE = "UPLOAD_TYPE";
+    public static final String REFRESH_ONLY = "REFRESH_ONLY";
     public static final String DOWNLOAD_URI = "DOWNLOAD_URI";
     public static final String SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER = 
"Pinot-SegmentZKMetadataCustomMapModifier";
     public static final String CRYPTER = "CRYPTER";
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index 9066ff5..f2c6c45 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -23,6 +23,8 @@ import com.google.common.base.Strings;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -414,6 +416,11 @@ public class PinotSegmentUploadDownloadRestletResource {
   @Path("/segments")
   @Authenticate(AccessType.CREATE)
   @ApiOperation(value = "Upload a segment", notes = "Upload a segment as json")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Successfully uploaded segment"),
+      @ApiResponse(code = 410, message = "Segment to refresh is deleted"),
+      @ApiResponse(code = 500, message = "Internal error")
+  })
   // We use this endpoint with URI upload because a request sent with the 
multipart content type will reject the POST
   // request if a multipart object is not sent. This endpoint does not move 
the segment to its final location;
   // it keeps it at the downloadURI header that is set. We will not support 
this endpoint going forward.
@@ -442,6 +449,11 @@ public class PinotSegmentUploadDownloadRestletResource {
   @Path("/segments")
   @Authenticate(AccessType.CREATE)
   @ApiOperation(value = "Upload a segment", notes = "Upload a segment as 
binary")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Successfully uploaded segment"),
+      @ApiResponse(code = 410, message = "Segment to refresh is deleted"),
+      @ApiResponse(code = 500, message = "Internal error")
+  })
   // For the multipart endpoint, we will always move segment to final location 
regardless of the segment endpoint.
   public void uploadSegmentAsMultiPart(FormDataMultiPart multiPart,
       @ApiParam(value = "Name of the table") 
@QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME)
@@ -468,6 +480,11 @@ public class PinotSegmentUploadDownloadRestletResource {
   @Path("/v2/segments")
   @Authenticate(AccessType.CREATE)
   @ApiOperation(value = "Upload a segment", notes = "Upload a segment as json")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Successfully uploaded segment"),
+      @ApiResponse(code = 410, message = "Segment to refresh is deleted"),
+      @ApiResponse(code = 500, message = "Internal error")
+  })
   // We use this endpoint with URI upload because a request sent with the 
multipart content type will reject the POST
   // request if a multipart object is not sent. This endpoint is recommended 
for use. It differs from the first
   // endpoint in how it moves the segment to a Pinot-determined final 
directory.
@@ -496,6 +513,11 @@ public class PinotSegmentUploadDownloadRestletResource {
   @Path("/v2/segments")
   @Authenticate(AccessType.CREATE)
   @ApiOperation(value = "Upload a segment", notes = "Upload a segment as 
binary")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Successfully uploaded segment"),
+      @ApiResponse(code = 410, message = "Segment to refresh is deleted"),
+      @ApiResponse(code = 500, message = "Internal error")
+  })
   // This behavior does not differ from v1 of the same endpoint.
   public void uploadSegmentAsMultiPartV2(FormDataMultiPart multiPart,
       @ApiParam(value = "Name of the table") 
@QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME)
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
index 9d6bf01..b3a7258 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
@@ -65,7 +65,14 @@ public class ZKOperator {
     String segmentName = segmentMetadata.getName();
     ZNRecord segmentMetadataZNRecord =
         
_pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, 
segmentName);
+    boolean refreshOnly =
+        
Boolean.parseBoolean(headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.REFRESH_ONLY));
     if (segmentMetadataZNRecord == null) {
+      if (refreshOnly) {
+        throw new ControllerApplicationException(LOGGER,
+            "Cannot refresh non-existing segment, aborted uploading segment: " 
+ segmentName + " of table: "
+                + tableNameWithType, Response.Status.GONE);
+      }
       LOGGER.info("Adding new segment {} from table {}", segmentName, 
tableNameWithType);
       processNewSegment(segmentMetadata, finalSegmentLocationURI, 
currentSegmentLocation, zkDownloadURI, headers,
           crypter, tableNameWithType, segmentName, moveSegmentToFinalLocation);
@@ -93,6 +100,7 @@ public class ZKOperator {
 
     SegmentZKMetadata existingSegmentZKMetadata = new 
SegmentZKMetadata(znRecord);
     long existingCrc = existingSegmentZKMetadata.getCrc();
+    int expectedVersion = znRecord.getVersion();
 
     // Check if CRC match when IF-MATCH header is set
     checkCRC(headers, tableNameWithType, segmentName, existingCrc);
@@ -118,10 +126,13 @@ public class ZKOperator {
       // Lock the segment by setting the upload start time in ZK
       
existingSegmentZKMetadata.setSegmentUploadStartTime(System.currentTimeMillis());
       if (!_pinotHelixResourceManager
-          .updateZkMetadata(tableNameWithType, existingSegmentZKMetadata, 
znRecord.getVersion())) {
+          .updateZkMetadata(tableNameWithType, existingSegmentZKMetadata, 
expectedVersion)) {
         throw new ControllerApplicationException(LOGGER,
             "Failed to lock the segment: " + segmentName + " of table: " + 
tableNameWithType + ", retry later",
             Response.Status.CONFLICT);
+      } else {
+        // The version will increment if the zk metadata update is successful
+        expectedVersion++;
       }
     }
 
@@ -156,7 +167,10 @@ public class ZKOperator {
         // (creation time is not included in the crc)
         
existingSegmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
         existingSegmentZKMetadata.setRefreshTime(System.currentTimeMillis());
-        if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, 
existingSegmentZKMetadata)) {
+        // NOTE: in rare cases the segment can be deleted before the metadata 
is updated and the expected version won't
+        // match, we should fail the request for such cases
+        if (!_pinotHelixResourceManager
+            .updateZkMetadata(tableNameWithType, existingSegmentZKMetadata, 
expectedVersion)) {
           throw new RuntimeException(
               "Failed to update ZK metadata for segment: " + segmentName + " 
of table: " + tableNameWithType);
         }
@@ -175,10 +189,12 @@ public class ZKOperator {
         }
 
         _pinotHelixResourceManager
-            .refreshSegment(tableNameWithType, segmentMetadata, 
existingSegmentZKMetadata, zkDownloadURI, crypter);
+            .refreshSegment(tableNameWithType, segmentMetadata, 
existingSegmentZKMetadata, expectedVersion,
+                zkDownloadURI, crypter);
       }
     } catch (Exception e) {
-      if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, 
existingSegmentZKMetadata)) {
+      if (!_pinotHelixResourceManager
+          .updateZkMetadata(tableNameWithType, existingSegmentZKMetadata, 
expectedVersion)) {
         LOGGER.error("Failed to update ZK metadata for segment: {} of table: 
{}", segmentName, tableNameWithType);
       }
       throw e;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index cbc2651..25797c3 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1788,7 +1788,7 @@ public class PinotHelixResourceManager {
   }
 
   public void refreshSegment(String tableNameWithType, SegmentMetadata 
segmentMetadata,
-      SegmentZKMetadata segmentZKMetadata, String downloadUrl, @Nullable 
String crypter) {
+      SegmentZKMetadata segmentZKMetadata, int expectedVersion, String 
downloadUrl, @Nullable String crypter) {
     String segmentName = segmentMetadata.getName();
 
     // NOTE: Must first set the segment ZK metadata before trying to refresh 
because servers and brokers rely on segment
@@ -1800,7 +1800,8 @@ public class PinotHelixResourceManager {
     segmentZKMetadata.setRefreshTime(System.currentTimeMillis());
     segmentZKMetadata.setDownloadUrl(downloadUrl);
     segmentZKMetadata.setCrypterName(crypter);
-    if (!ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, 
tableNameWithType, segmentZKMetadata)) {
+    if (!ZKMetadataProvider
+        .setSegmentZKMetadata(_propertyStore, tableNameWithType, 
segmentZKMetadata, expectedVersion)) {
       throw new RuntimeException(
           "Failed to update ZK metadata for segment: " + segmentName + " of 
table: " + tableNameWithType);
     }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
index fee669e..335d8f4 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
@@ -51,6 +51,7 @@ public class ValidationManagerTest {
   private static final String TEST_TABLE_NAME = "validationTable";
   private static final String OFFLINE_TEST_TABLE_NAME = 
TableNameBuilder.OFFLINE.tableNameWithType(TEST_TABLE_NAME);
   private static final String TEST_SEGMENT_NAME = "testSegment";
+  private static final int EXPECTED_VERSION = -1;
 
   private TableConfig _offlineTableConfig;
 
@@ -88,7 +89,7 @@ public class ValidationManagerTest {
     }, 30_000L, "Failed to find the segment in the ExternalView");
     
Mockito.when(segmentMetadata.getCrc()).thenReturn(Long.toString(System.nanoTime()));
     ControllerTestUtils.getHelixResourceManager()
-        .refreshSegment(offlineTableName, segmentMetadata, segmentZKMetadata, 
"downloadUrl", null);
+        .refreshSegment(offlineTableName, segmentMetadata, segmentZKMetadata, 
EXPECTED_VERSION, "downloadUrl", null);
 
     segmentZKMetadata =
         
ControllerTestUtils.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TEST_TABLE_NAME,
 TEST_SEGMENT_NAME);
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 10966e7..7f3652a 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.io.IOException;
+import java.net.URI;
 import java.time.Duration;
 import java.time.Instant;
 import java.time.ZoneId;
@@ -37,13 +38,21 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.model.IdealState;
+import org.apache.http.Header;
+import org.apache.http.HttpStatus;
+import org.apache.http.NameValuePair;
+import org.apache.http.message.BasicHeader;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.proto.Server;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.common.utils.DataTable.MetadataKey;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.ServiceStatus;
+import org.apache.pinot.common.utils.SimpleHttpResponse;
 import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
 import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder;
 import org.apache.pinot.core.common.datatable.DataTableFactory;
@@ -78,6 +87,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
   private static final int NUM_SERVERS = 1;
   private static final int NUM_SEGMENTS = 12;
   private static final long ONE_HOUR_IN_MS = TimeUnit.HOURS.toMillis(1);
+  private static final String SEGMENT_UPLOAD_TEST_TABLE = 
"segmentUploadTestTable";
 
   // For table config refresh test, make an expensive query to ensure the 
query won't finish in 5ms
   private static final String TEST_TIMEOUT_QUERY =
@@ -324,6 +334,67 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     }
   }
 
+  @Test
+  public void testUploadSegmentRefreshOnly()
+      throws Exception {
+    TableConfig segmentUploadTestTableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(SEGMENT_UPLOAD_TEST_TABLE).setSchemaName(getSchemaName())
+            
.setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn())
+            
.setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns())
+            
.setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns())
+            
.setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas())
+            .setSegmentVersion(getSegmentVersion())
+            
.setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig()).setBrokerTenant(getBrokerTenant())
+            
.setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig())
+            .setNullHandlingEnabled(getNullHandlingEnabled()).build();
+    addTableConfig(segmentUploadTestTableConfig);
+    String offlineTableName = segmentUploadTestTableConfig.getTableName();
+    File[] segmentTarFiles = _tarDir.listFiles();
+    assertNotNull(segmentTarFiles);
+    int numSegments = segmentTarFiles.length;
+    assertTrue(numSegments > 0);
+    List<Header> headers = new ArrayList<>();
+    headers.add(new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.REFRESH_ONLY, "true"));
+    List<NameValuePair> parameters = new ArrayList<>();
+    NameValuePair tableNameParameter = new 
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME,
+        TableNameBuilder.extractRawTableName(offlineTableName));
+    parameters.add(tableNameParameter);
+
+    URI uploadSegmentHttpURI = 
FileUploadDownloadClient.getUploadSegmentHttpURI(LOCAL_HOST, _controllerPort);
+    try (FileUploadDownloadClient fileUploadDownloadClient = new 
FileUploadDownloadClient()) {
+      // Refresh non-existing segment
+      File segmentTarFile = segmentTarFiles[0];
+      try {
+        fileUploadDownloadClient
+            .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), 
segmentTarFile, headers, parameters,
+                FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
+        fail();
+      } catch (HttpErrorStatusException e) {
+        assertEquals(e.getStatusCode(), HttpStatus.SC_GONE);
+        
assertTrue(_helixResourceManager.getSegmentsZKMetadata(SEGMENT_UPLOAD_TEST_TABLE).isEmpty());
+      }
+
+      // Upload segment
+      SimpleHttpResponse response = fileUploadDownloadClient
+          .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), 
segmentTarFile, null, parameters,
+              FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
+      assertEquals(response.getStatusCode(), HttpStatus.SC_OK);
+      System.out.println(response.getResponse());
+      List<SegmentZKMetadata> segmentsZKMetadata = 
_helixResourceManager.getSegmentsZKMetadata(offlineTableName);
+      assertEquals(segmentsZKMetadata.size(), 1);
+
+      // Refresh existing segment
+      response = fileUploadDownloadClient
+          .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), 
segmentTarFile, headers, parameters,
+              FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
+      assertEquals(response.getStatusCode(), HttpStatus.SC_OK);
+      segmentsZKMetadata = 
_helixResourceManager.getSegmentsZKMetadata(offlineTableName);
+      assertEquals(segmentsZKMetadata.size(), 1);
+      assertNotEquals(segmentsZKMetadata.get(0).getRefreshTime(), 
Long.MIN_VALUE);
+    }
+    dropOfflineTable(SEGMENT_UPLOAD_TEST_TABLE);
+  }
+
   @Test(dependsOnMethods = "testRangeIndexTriggering")
   public void testInvertedIndexTriggering()
       throws Exception {
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
index db329bd..646cc40 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
@@ -133,6 +133,9 @@ public abstract class BaseSingleSegmentConversionExecutor 
extends BaseTaskExecut
       // the newer segment won't get override
       Header ifMatchHeader = new BasicHeader(HttpHeaders.IF_MATCH, 
originalSegmentCrc);
 
+      // Only upload segment if it exists
+      Header refreshOnlyHeader = new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.REFRESH_ONLY, "true");
+
       // Set segment ZK metadata custom map modifier into HTTP header to 
modify the segment ZK metadata
       // NOTE: even segment is not changed, still need to upload the segment 
to update the segment ZK metadata so that
       // segment will not be submitted again
@@ -144,6 +147,7 @@ public abstract class BaseSingleSegmentConversionExecutor 
extends BaseTaskExecut
 
       List<Header> httpHeaders = new ArrayList<>();
       httpHeaders.add(ifMatchHeader);
+      httpHeaders.add(refreshOnlyHeader);
       httpHeaders.add(segmentZKMetadataCustomMapModifierHeader);
       httpHeaders.addAll(FileUploadDownloadClient.makeAuthHeader(authToken));
 
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskGenerator.java
index d0d9d12..5fc7c64 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskGenerator.java
@@ -21,10 +21,13 @@ package 
org.apache.pinot.plugin.minion.tasks.converttorawindex;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.common.data.Segment;
+import org.apache.pinot.common.lineage.SegmentLineage;
+import org.apache.pinot.common.lineage.SegmentLineageUtils;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
 import 
org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
@@ -94,8 +97,16 @@ public class ConvertToRawIndexTaskGenerator implements 
PinotTaskGenerator {
       String columnsToConvertConfig = 
taskConfigs.get(MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY);
 
       // Generate tasks
+      List<SegmentZKMetadata> offlineSegmentsZKMetadata = 
_clusterInfoAccessor.getSegmentsZKMetadata(offlineTableName);
+      SegmentLineage segmentLineage = 
_clusterInfoAccessor.getSegmentLineage(offlineTableName);
+      Set<String> preSelectedSegmentsBasedOnLineage = new HashSet<>();
+      for (SegmentZKMetadata offlineSegmentZKMetadata : 
offlineSegmentsZKMetadata) {
+        
preSelectedSegmentsBasedOnLineage.add(offlineSegmentZKMetadata.getSegmentName());
+      }
+      
SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(preSelectedSegmentsBasedOnLineage,
 segmentLineage);
+
       int tableNumTasks = 0;
-      for (SegmentZKMetadata segmentZKMetadata : 
_clusterInfoAccessor.getSegmentsZKMetadata(offlineTableName)) {
+      for (SegmentZKMetadata segmentZKMetadata : offlineSegmentsZKMetadata) {
         // Generate up to tableMaxNumTasks tasks each time for each table
         if (tableNumTasks == tableMaxNumTasks) {
           break;
@@ -107,6 +118,12 @@ public class ConvertToRawIndexTaskGenerator implements 
PinotTaskGenerator {
           continue;
         }
 
+        // Skip segments based on lineage: for COMPLETED lineage, segments in 
`segmentsFrom` will be removed by
+        // retention manager, for IN_PROGRESS lineage, segments in 
`segmentsTo` are uploaded yet
+        if (!preSelectedSegmentsBasedOnLineage.contains(segmentName)) {
+          continue;
+        }
+
         // Only submit segments that have not been converted
         Map<String, String> customMap = segmentZKMetadata.getCustomMap();
         if (customMap == null || !customMap.containsKey(
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
index a1f559e..252505b 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
@@ -127,7 +127,7 @@ public class MergeRollupTaskGenerator implements 
PinotTaskGenerator {
       for (SegmentZKMetadata segment : allSegments) {
         preSelectedSegmentsBasedOnLineage.add(segment.getSegmentName());
       }
-      
SegmentLineageUtils.filterSegmentsBasedOnLineageInplace(preSelectedSegmentsBasedOnLineage,
 segmentLineage);
+      
SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(preSelectedSegmentsBasedOnLineage,
 segmentLineage);
 
       List<SegmentZKMetadata> preSelectedSegments = new ArrayList<>();
       for (SegmentZKMetadata segment : allSegments) {

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to