This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8e77080007 Add an API to download validDocIdsSnapshots from peer
servers (#10052)
8e77080007 is described below
commit 8e7708000785a6a20b54ac0efa3f6c2ebaae5195
Author: deemoliu <[email protected]>
AuthorDate: Fri Jan 13 12:35:02 2023 -0800
Add an API to download validDocIdsSnapshots from peer servers (#10052)
---
.../pinot/common/utils/RoaringBitmapUtils.java | 3 +-
.../server/api/resources/ServerResourceUtils.java | 21 +++++++
.../pinot/server/api/resources/TablesResource.java | 71 +++++++++++++++++-----
.../pinot/server/api/TablesResourceTest.java | 50 +++++++++++++++
4 files changed, 129 insertions(+), 16 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/RoaringBitmapUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/RoaringBitmapUtils.java
index 861ac5c114..520ff8f233 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/RoaringBitmapUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/RoaringBitmapUtils.java
@@ -20,6 +20,7 @@ package org.apache.pinot.common.utils;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.roaringbitmap.ImmutableBitmapDataProvider;
import org.roaringbitmap.RoaringBitmap;
@@ -27,7 +28,7 @@ public class RoaringBitmapUtils {
private RoaringBitmapUtils() {
}
- public static byte[] serialize(RoaringBitmap bitmap) {
+ public static byte[] serialize(ImmutableBitmapDataProvider bitmap) {
byte[] bytes = new byte[bitmap.serializedSizeInBytes()];
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
bitmap.serialize(byteBuffer);
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ServerResourceUtils.java
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ServerResourceUtils.java
index 986d67d2d5..0159d8d542 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ServerResourceUtils.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ServerResourceUtils.java
@@ -19,9 +19,14 @@
package org.apache.pinot.server.api.resources;
import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.server.access.AccessControl;
+import org.apache.pinot.server.access.AccessControlFactory;
+import org.apache.pinot.server.access.HttpRequesterIdentity;
+import org.apache.pinot.server.access.RequesterIdentity;
import org.apache.pinot.server.starter.ServerInstance;
@@ -54,4 +59,20 @@ public class ServerResourceUtils {
}
return instanceDataManager;
}
+
+ public static void validateDataAccess(AccessControlFactory
accessControlFactory, String tableNameWithType,
+ HttpHeaders httpHeaders) {
+ boolean hasDataAccess;
+ try {
+ AccessControl accessControl = accessControlFactory.create();
+ RequesterIdentity httpRequestIdentity = new
HttpRequesterIdentity(httpHeaders);
+ hasDataAccess = accessControl.hasDataAccess(httpRequestIdentity,
tableNameWithType);
+ } catch (Exception e) {
+ throw new WebApplicationException("Caught exception while validating
access to table: " + tableNameWithType,
+ Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ if (!hasDataAccess) {
+ throw new WebApplicationException("No data access to table: " +
tableNameWithType, Response.Status.FORBIDDEN);
+ }
+ }
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index 27df969782..4495d19ebb 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -69,6 +69,7 @@ import
org.apache.pinot.common.restlet.resources.TableSegmentValidationInfo;
import org.apache.pinot.common.restlet.resources.TableSegments;
import org.apache.pinot.common.restlet.resources.TablesList;
import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.RoaringBitmapUtils;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.core.data.manager.InstanceDataManager;
@@ -77,14 +78,13 @@ import
org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
-import org.apache.pinot.server.access.AccessControl;
import org.apache.pinot.server.access.AccessControlFactory;
-import org.apache.pinot.server.access.HttpRequesterIdentity;
-import org.apache.pinot.server.access.RequesterIdentity;
import org.apache.pinot.server.api.AdminApiApplication;
import org.apache.pinot.server.starter.ServerInstance;
import org.apache.pinot.spi.config.table.TableType;
@@ -93,6 +93,7 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.stream.ConsumerPartitionState;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -374,18 +375,7 @@ public class TablesResource {
throws Exception {
LOGGER.info("Received a request to download segment {} for table {}",
segmentName, tableNameWithType);
// Validate data access
- boolean hasDataAccess;
- try {
- AccessControl accessControl = _accessControlFactory.create();
- RequesterIdentity httpRequestIdentity = new
HttpRequesterIdentity(httpHeaders);
- hasDataAccess = accessControl.hasDataAccess(httpRequestIdentity,
tableNameWithType);
- } catch (Exception e) {
- throw new WebApplicationException("Caught exception while validating
access to table: " + tableNameWithType,
- Response.Status.INTERNAL_SERVER_ERROR);
- }
- if (!hasDataAccess) {
- throw new WebApplicationException("No data access to table: " +
tableNameWithType, Response.Status.FORBIDDEN);
- }
+ ServerResourceUtils.validateDataAccess(_accessControlFactory,
tableNameWithType, httpHeaders);
TableDataManager tableDataManager =
ServerResourceUtils.checkGetTableDataManager(_serverInstance,
tableNameWithType);
@@ -423,6 +413,57 @@ public class TablesResource {
}
}
+ /**
+ * Download snapshot for the given immutable segment for upsert table. This
endpoint is used when get snapshot from
+ * peer to avoid recompute when reload segments.
+ */
+ @GET
+ @Produces(MediaType.APPLICATION_OCTET_STREAM)
+ @Path("/segments/{tableNameWithType}/{segmentName}/validDocIds")
+ @ApiOperation(value = "Download validDocIds for an REALTIME immutable
segment", notes = "Download validDocIds for "
+ + "an immutable segment in bitmap format.")
+ public Response downloadValidDocIds(
+ @ApiParam(value = "Name of the table with type REALTIME", required =
true, example = "myTable_REALTIME")
+ @PathParam("tableNameWithType") String tableNameWithType,
+ @ApiParam(value = "Name of the segment", required = true)
@PathParam("segmentName") @Encoded String segmentName,
+ @Context HttpHeaders httpHeaders) {
+ LOGGER.info("Received a request to download validDocIds for segment {}
table {}", segmentName, tableNameWithType);
+ // Validate data access
+ ServerResourceUtils.validateDataAccess(_accessControlFactory,
tableNameWithType, httpHeaders);
+
+ TableDataManager tableDataManager =
+ ServerResourceUtils.checkGetTableDataManager(_serverInstance,
tableNameWithType);
+ SegmentDataManager segmentDataManager =
tableDataManager.acquireSegment(segmentName);
+ if (segmentDataManager == null) {
+ throw new WebApplicationException(
+ String.format("Table %s segment %s does not exist",
tableNameWithType, segmentName),
+ Response.Status.NOT_FOUND);
+ }
+
+ try {
+ IndexSegment indexSegment = segmentDataManager.getSegment();
+ if (!(indexSegment instanceof ImmutableSegmentImpl)) {
+ throw new WebApplicationException(
+ String.format("Table %s segment %s is not a immutable segment",
tableNameWithType, segmentName),
+ Response.Status.BAD_REQUEST);
+ }
+ MutableRoaringBitmap validDocIds =
+ indexSegment.getValidDocIds() != null ?
indexSegment.getValidDocIds().getMutableRoaringBitmap() : null;
+ if (validDocIds == null) {
+ throw new WebApplicationException(
+ String.format("Missing validDocIds for table %s segment %s does
not exist", tableNameWithType, segmentName),
+ Response.Status.NOT_FOUND);
+ }
+
+ byte[] validDocIdsBytes = RoaringBitmapUtils.serialize(validDocIds);
+ Response.ResponseBuilder builder = Response.ok(validDocIdsBytes);
+ builder.header(HttpHeaders.CONTENT_LENGTH, validDocIdsBytes.length);
+ return builder.build();
+ } finally {
+ tableDataManager.releaseSegment(segmentDataManager);
+ }
+ }
+
/**
* Upload a low level consumer segment to segment store and return the
segment download url. This endpoint is used
* when segment store copy is unavailable for committed low level consumer
segments.
diff --git
a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
index c60ffd1b84..ce7ed438cc 100644
---
a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
+++
b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
@@ -21,6 +21,7 @@ package org.apache.pinot.server.api;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.List;
import javax.ws.rs.core.Response;
import org.apache.commons.io.FileUtils;
@@ -28,18 +29,24 @@ import
org.apache.pinot.common.restlet.resources.TableMetadataInfo;
import org.apache.pinot.common.restlet.resources.TableSegments;
import org.apache.pinot.common.restlet.resources.TablesList;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
import org.testng.Assert;
import org.testng.annotations.Test;
+import static org.mockito.Mockito.mock;
+
public class TablesResourceTest extends BaseResourceTest {
@Test
@@ -222,6 +229,24 @@ public class TablesResourceTest extends BaseResourceTest {
Assert.assertEquals(response.getStatus(),
Response.Status.NOT_FOUND.getStatusCode());
}
+ @Test
+ public void testDownloadValidDocIdsSnapshot()
+ throws Exception {
+ // Verify the content of the downloaded snapshot from a realtime table.
+
downLoadAndVerifyValidDocIdsSnapshot(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
+ (ImmutableSegmentImpl) _realtimeIndexSegments.get(0));
+
+ // Verify non-existent table and segment download return NOT_FOUND status.
+ Response response =
_webTarget.path("/tables/UNKNOWN_REALTIME/segments/segmentname/validDocIds").request()
+ .get(Response.class);
+ Assert.assertEquals(response.getStatus(),
Response.Status.NOT_FOUND.getStatusCode());
+
+ response = _webTarget.path(
+ String.format("/tables/%s/segments/%s/validDocIds",
TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
+ "UNKNOWN_SEGMENT")).request().get(Response.class);
+ Assert.assertEquals(response.getStatus(),
Response.Status.NOT_FOUND.getStatusCode());
+ }
+
// Verify metadata file from segments.
private void downLoadAndVerifySegmentContent(String tableNameWithType,
IndexSegment segment)
throws IOException {
@@ -250,6 +275,31 @@ public class TablesResourceTest extends BaseResourceTest {
FileUtils.forceDelete(tempMetadataDir);
}
+ // Verify metadata file from segments.
+ private void downLoadAndVerifyValidDocIdsSnapshot(String tableNameWithType,
ImmutableSegmentImpl segment)
+ throws IOException {
+
+ String snapshotPath = "/segments/" + tableNameWithType + "/" +
segment.getSegmentName() + "/validDocIds";
+
+ PartitionUpsertMetadataManager upsertMetadataManager =
mock(PartitionUpsertMetadataManager.class);
+ ThreadSafeMutableRoaringBitmap validDocIds = new
ThreadSafeMutableRoaringBitmap();
+ int[] docIds = new int[]{1, 4, 6, 10, 15, 17, 18, 20};
+ for (int docId: docIds) {
+ validDocIds.add(docId);
+ }
+ segment.enableUpsert(upsertMetadataManager, validDocIds);
+
+ // Download the snapshot in byte[] format.
+ Response response =
_webTarget.path(snapshotPath).request().get(Response.class);
+ Assert.assertEquals(response.getStatus(),
Response.Status.OK.getStatusCode());
+ byte[] snapshot = response.readEntity(byte[].class);
+
+ // Load the snapshot file.
+ Assert.assertNotNull(snapshot);
+ Assert.assertEquals(new
ImmutableRoaringBitmap(ByteBuffer.wrap(snapshot)).toMutableRoaringBitmap(),
+ validDocIds.getMutableRoaringBitmap());
+ }
+
@Test
public void testUploadSegments()
throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]