Re: [I] Improve error handling and reporting in query side [pinot]
gortiz commented on issue #14950: URL: https://github.com/apache/pinot/issues/14950#issuecomment-2636395829 Please take a look at https://github.com/apache/pinot/pull/14994 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [I] Segments are lost [pinot]
xiangfu0 commented on issue #14993: URL: https://github.com/apache/pinot/issues/14993#issuecomment-2636378944 can you share the table config, and the controller logs for retention manager? Is it possible the data are deleted due to retention policy? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Improve error handling in controller [pinot]
gortiz commented on code in PR #14951: URL: https://github.com/apache/pinot/pull/14951#discussion_r1942636179 ## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java: ## @@ -104,59 +104,61 @@ public class PinotQueryResource { @POST @Path("sql") @ManualAuthorization // performed by broker - public String handlePostSql(String requestJsonStr, @Context HttpHeaders httpHeaders) { + public StreamingOutput handlePostSql(String requestJsonStr, @Context HttpHeaders httpHeaders) { +JsonNode requestJson; try { - JsonNode requestJson = JsonUtils.stringToJsonNode(requestJsonStr); - if (!requestJson.has("sql")) { -return constructQueryExceptionResponse(QueryException.getException(QueryException.JSON_PARSING_ERROR, -"JSON Payload is missing the query string field 'sql'")); - } - String sqlQuery = requestJson.get("sql").asText(); - String traceEnabled = "false"; - if (requestJson.has("trace")) { -traceEnabled = requestJson.get("trace").toString(); - } - String queryOptions = null; - if (requestJson.has("queryOptions")) { -queryOptions = requestJson.get("queryOptions").asText(); - } - LOGGER.debug("Trace: {}, Running query: {}", traceEnabled, sqlQuery); - return executeSqlQuery(httpHeaders, sqlQuery, traceEnabled, queryOptions, "/sql"); -} catch (ProcessingException pe) { - LOGGER.error("Caught exception while processing post request {}", pe.getMessage()); - return constructQueryExceptionResponse(pe); -} catch (WebApplicationException wae) { - LOGGER.error("Caught exception while processing post request", wae); - throw wae; + requestJson = JsonUtils.stringToJsonNode(requestJsonStr); } catch (Exception e) { - LOGGER.error("Caught exception while processing post request", e); - return constructQueryExceptionResponse(QueryException.getException(QueryException.INTERNAL_ERROR, e)); + return constructQueryExceptionResponse(QueryException.JSON_PARSING_ERROR_CODE, e.getMessage()); +} +if (!requestJson.has("sql")) { + return constructQueryExceptionResponse(QueryException.getException(QueryException.JSON_PARSING_ERROR, + "JSON Payload is missing the query string field 'sql'")); +} +String sqlQuery = requestJson.get("sql").asText(); +String traceEnabled = "false"; +if (requestJson.has("trace")) { + traceEnabled = requestJson.get("trace").toString(); +} +String queryOptions = null; +if (requestJson.has("queryOptions")) { + queryOptions = requestJson.get("queryOptions").asText(); } + +return executeSqlQueryCatching(httpHeaders, sqlQuery, traceEnabled, queryOptions); } @GET @Path("sql") @ManualAuthorization - public String handleGetSql(@QueryParam("sql") String sqlQuery, @QueryParam("trace") String traceEnabled, + public StreamingOutput handleGetSql(@QueryParam("sql") String sqlQuery, @QueryParam("trace") String traceEnabled, @QueryParam("queryOptions") String queryOptions, @Context HttpHeaders httpHeaders) { +return executeSqlQueryCatching(httpHeaders, sqlQuery, traceEnabled, queryOptions); + } + + private StreamingOutput executeSqlQueryCatching(HttpHeaders httpHeaders, String sqlQuery, String traceEnabled, + String queryOptions) { try { - LOGGER.debug("Trace: {}, Running query: {}", traceEnabled, sqlQuery); - return executeSqlQuery(httpHeaders, sqlQuery, traceEnabled, queryOptions, "/sql"); + return executeSqlQuery(httpHeaders, sqlQuery, traceEnabled, queryOptions); +} catch (QException e) { + LOGGER.error("Caught query exception while processing post request", e); Review Comment: Good catch. Same in the others. I've changed these messages in https://github.com/apache/pinot/pull/14994 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Improve error handling in controller [pinot]
gortiz commented on code in PR #14951: URL: https://github.com/apache/pinot/pull/14951#discussion_r1942622733 ## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java: ## @@ -398,18 +414,13 @@ private String getQueryURL(String protocol, String hostName, int port) { return String.format("%s://%s:%d/query/sql", protocol, hostName, port); } - public String sendPostRaw(String urlStr, String requestStr, Map headers) { + public void sendPostRaw(String urlStr, String requestStr, Map headers, OutputStream outputStream) { HttpURLConnection conn = null; try { - /*if (LOG.isInfoEnabled()){ + if (LOGGER.isInfoEnabled()) { LOGGER.info("Sending a post request to the server - " + urlStr); Review Comment: I just uncommented this log. I decided to remove it in https://github.com/apache/pinot/pull/14994 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Improve error handling in controller [pinot]
gortiz commented on code in PR #14951: URL: https://github.com/apache/pinot/pull/14951#discussion_r1942636179 ## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java: ## @@ -104,59 +104,61 @@ public class PinotQueryResource { @POST @Path("sql") @ManualAuthorization // performed by broker - public String handlePostSql(String requestJsonStr, @Context HttpHeaders httpHeaders) { + public StreamingOutput handlePostSql(String requestJsonStr, @Context HttpHeaders httpHeaders) { +JsonNode requestJson; try { - JsonNode requestJson = JsonUtils.stringToJsonNode(requestJsonStr); - if (!requestJson.has("sql")) { -return constructQueryExceptionResponse(QueryException.getException(QueryException.JSON_PARSING_ERROR, -"JSON Payload is missing the query string field 'sql'")); - } - String sqlQuery = requestJson.get("sql").asText(); - String traceEnabled = "false"; - if (requestJson.has("trace")) { -traceEnabled = requestJson.get("trace").toString(); - } - String queryOptions = null; - if (requestJson.has("queryOptions")) { -queryOptions = requestJson.get("queryOptions").asText(); - } - LOGGER.debug("Trace: {}, Running query: {}", traceEnabled, sqlQuery); - return executeSqlQuery(httpHeaders, sqlQuery, traceEnabled, queryOptions, "/sql"); -} catch (ProcessingException pe) { - LOGGER.error("Caught exception while processing post request {}", pe.getMessage()); - return constructQueryExceptionResponse(pe); -} catch (WebApplicationException wae) { - LOGGER.error("Caught exception while processing post request", wae); - throw wae; + requestJson = JsonUtils.stringToJsonNode(requestJsonStr); } catch (Exception e) { - LOGGER.error("Caught exception while processing post request", e); - return constructQueryExceptionResponse(QueryException.getException(QueryException.INTERNAL_ERROR, e)); + return constructQueryExceptionResponse(QueryException.JSON_PARSING_ERROR_CODE, e.getMessage()); +} +if (!requestJson.has("sql")) { + return constructQueryExceptionResponse(QueryException.getException(QueryException.JSON_PARSING_ERROR, + "JSON Payload is missing the query string field 'sql'")); +} +String sqlQuery = requestJson.get("sql").asText(); +String traceEnabled = "false"; +if (requestJson.has("trace")) { + traceEnabled = requestJson.get("trace").toString(); +} +String queryOptions = null; +if (requestJson.has("queryOptions")) { + queryOptions = requestJson.get("queryOptions").asText(); } + +return executeSqlQueryCatching(httpHeaders, sqlQuery, traceEnabled, queryOptions); } @GET @Path("sql") @ManualAuthorization - public String handleGetSql(@QueryParam("sql") String sqlQuery, @QueryParam("trace") String traceEnabled, + public StreamingOutput handleGetSql(@QueryParam("sql") String sqlQuery, @QueryParam("trace") String traceEnabled, @QueryParam("queryOptions") String queryOptions, @Context HttpHeaders httpHeaders) { +return executeSqlQueryCatching(httpHeaders, sqlQuery, traceEnabled, queryOptions); + } + + private StreamingOutput executeSqlQueryCatching(HttpHeaders httpHeaders, String sqlQuery, String traceEnabled, + String queryOptions) { try { - LOGGER.debug("Trace: {}, Running query: {}", traceEnabled, sqlQuery); - return executeSqlQuery(httpHeaders, sqlQuery, traceEnabled, queryOptions, "/sql"); + return executeSqlQuery(httpHeaders, sqlQuery, traceEnabled, queryOptions); +} catch (QException e) { + LOGGER.error("Caught query exception while processing post request", e); Review Comment: Good catch. Same in the others. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Improve error handling in controller [pinot]
gortiz commented on code in PR #14951: URL: https://github.com/apache/pinot/pull/14951#discussion_r1942634833 ## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java: ## @@ -210,51 +217,62 @@ private String getMultiStageQueryResponse(String query, String queryOptions, Htt queryOptionsMap.putAll(RequestUtils.getOptionsFromString(queryOptions)); } String database = DatabaseUtils.extractDatabaseFromQueryRequest(queryOptionsMap, httpHeaders); +List tableNames = getTableNames(query, database); +List instanceIds = getInstanceIds(query, tableNames, database); +String instanceId = selectRandomInstanceId(instanceIds); +return sendRequestToBroker(query, instanceId, traceEnabled, queryOptions, httpHeaders); + } + + private List getTableNames(String query, String database) { QueryEnvironment queryEnvironment = new QueryEnvironment(database, _pinotHelixResourceManager.getTableCache(), null); List tableNames; try { tableNames = queryEnvironment.getTableNamesForQuery(query); +} catch (QException e) { + if (e.getErrorCode() != QueryException.UNKNOWN_ERROR_CODE) { +throw e; + } else { +throw new QException(QException.SQL_PARSING_ERROR_CODE, e); + } } catch (Exception e) { - return QueryException.getException(QueryException.SQL_PARSING_ERROR, - new Exception("Unable to find table for this query", e)).toString(); + throw new QException(QException.SQL_PARSING_ERROR_CODE, e); Review Comment: Because we assume other error types will be more precise. The error codes being used are pretty chaotic. It looks like we added them without actual consideration and the fact that we cannot create hierarchies is not a good design. For example, is a UNKNOWN_COLUMN_ERROR_CODE a QUERY_VALIDATION_ERROR_CODE? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Improve error handling in controller [pinot]
gortiz commented on code in PR #14951: URL: https://github.com/apache/pinot/pull/14951#discussion_r1942625995 ## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java: ## @@ -398,18 +414,13 @@ private String getQueryURL(String protocol, String hostName, int port) { return String.format("%s://%s:%d/query/sql", protocol, hostName, port); } - public String sendPostRaw(String urlStr, String requestStr, Map headers) { + public void sendPostRaw(String urlStr, String requestStr, Map headers, OutputStream outputStream) { HttpURLConnection conn = null; try { - /*if (LOG.isInfoEnabled()){ + if (LOGGER.isInfoEnabled()) { Review Comment: Probably not. In general this longer way to log is less expensive in terms of allocation that using the default: ``` LOGGER.info("some message with {} and {}", param1, param2) ``` Given this default uses varargs, which means that it allocates an array for no reason if INFO is disabled. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Improve error handling in controller [pinot]
gortiz commented on code in PR #14951: URL: https://github.com/apache/pinot/pull/14951#discussion_r1942618169 ## pinot-spi/src/main/java/org/apache/pinot/spi/exception/BadQueryRequestException.java: ## @@ -18,16 +18,28 @@ */ package org.apache.pinot.spi.exception; -public class BadQueryRequestException extends RuntimeException { +public class BadQueryRequestException extends QException { public BadQueryRequestException(String message) { -super(message); +super(SQL_RUNTIME_ERROR_CODE, message); Review Comment: This class is used in tons of places to detect errors in runtime. There are places where it is being caught and reconverted into a different error type depending on the context where it was fired. For example BaseSingleBlockCombineOperator. Here, I'm assigning a default error code, which, in general, is something difficult to do in a precise manner. Callers can anyway include an explicit error code -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Improve error handling in controller [pinot]
gortiz commented on code in PR #14951: URL: https://github.com/apache/pinot/pull/14951#discussion_r1942618169 ## pinot-spi/src/main/java/org/apache/pinot/spi/exception/BadQueryRequestException.java: ## @@ -18,16 +18,28 @@ */ package org.apache.pinot.spi.exception; -public class BadQueryRequestException extends RuntimeException { +public class BadQueryRequestException extends QException { public BadQueryRequestException(String message) { -super(message); +super(SQL_RUNTIME_ERROR_CODE, message); Review Comment: This class is used in tons of places to detect errors in runtime. There are places where it is being caught and reconverted into a different error type depending on the context where it was fired. For example BaseSingleBlockCombineOperator -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [I] Improve error handling and reporting in query side [pinot]
vrajat commented on issue #14950: URL: https://github.com/apache/pinot/issues/14950#issuecomment-2636333616 > MSE errors include the stack trace and don't include the actual error code. Instead, error code 200 is always returned. This is true for SSE as well. For example: ``` "exceptions": [ { "message": "QueryExecutionError:\nQuery execution error on: Server_100.81.71.117_7050 org.apache.pinot.spi.exception.EarlyTerminationException: Interrupted while processing next block", "errorCode": 200 }, ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Improve error handling in controller [pinot]
gortiz commented on code in PR #14951: URL: https://github.com/apache/pinot/pull/14951#discussion_r1942610938 ## pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java: ## @@ -110,6 +110,10 @@ public BrokerResponseNative(ProcessingException exception) { _exceptions.add(new QueryProcessingException(exception.getErrorCode(), exception.getMessage())); } + public BrokerResponseNative(int errorCode, String errorMessage) { +_exceptions.add(new QueryProcessingException(errorCode, errorMessage)); Review Comment: I've added a TODO to rename this class at https://github.com/apache/pinot/pull/14994. That PR is already too long, so I preferred not to rename it to make it even longer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Improve error handling in controller [pinot]
gortiz commented on code in PR #14951: URL: https://github.com/apache/pinot/pull/14951#discussion_r1942610938 ## pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java: ## @@ -110,6 +110,10 @@ public BrokerResponseNative(ProcessingException exception) { _exceptions.add(new QueryProcessingException(exception.getErrorCode(), exception.getMessage())); } + public BrokerResponseNative(int errorCode, String errorMessage) { +_exceptions.add(new QueryProcessingException(errorCode, errorMessage)); Review Comment: I've added a TODO to rename this class at https://github.com/apache/pinot/pull/14994. That PR is already too long, so I preferred to not rename it just to not make the PR even longer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Improve error handling in controller [pinot]
gortiz commented on code in PR #14951: URL: https://github.com/apache/pinot/pull/14951#discussion_r1942606315 ## pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java: ## @@ -23,6 +23,7 @@ import java.util.regex.Pattern; import org.apache.commons.lang3.StringUtils; import org.apache.pinot.common.response.ProcessingException; +import org.apache.pinot.spi.exception.QException; Review Comment: Ideally this class should end up being removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Improve error handling in controller [pinot]
gortiz commented on PR #14951: URL: https://github.com/apache/pinot/pull/14951#issuecomment-2636314940 @yashmayya I've created https://github.com/apache/pinot/pull/14994, which is built on top of this PR and fixes most of the changes reported here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[PR] Improve exceptions broker [pinot]
gortiz opened a new pull request, #14994: URL: https://github.com/apache/pinot/pull/14994 This extensive PR is focused on improving error handling on MSE, but partially in SSE and TSE. Error handling includes changes in the error responses sent to clients but also logs in servers, brokers and controllers. Logging has been reduced to not logging stack traces when they are not useful. Logging is also improved to include the request-id in MSE and SSE queries and stage id in MSE queries. Instead of spreading the query context to all places where logs are created, this PR injects these values using [MDC](https://www.baeldung.com/mdc-in-log4j-2-logback). This means that: 1. Code doesn't have to change that much. 2. However, to modify the MDC in some parts, I needed to add a try-catch. I recommend reviewing the PR with the hidden white chances. 3. To log these properties, log4j2.xml needs to be changed. This PR includes a change in the log4j2.xml used for quickstars. Actual deployments will likely use their log4j2.xml files, and they will need to be modified to log request IDs and stage IDs. Please remember that this is optional. Logs are still as helpful as before, even if MDC properties are unused. Specifically, logs that already included the request ID or stage ID haven't been changed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] add option to delete table and schema immediately [pinot]
jayeshchoudhary commented on PR #14736: URL: https://github.com/apache/pinot/pull/14736#issuecomment-2635984874 Tried checking if this can be managed by state but can't figure out the root cause. PR is good to go. sorry for taking so long. missed the notifications 😓 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Add debug API for pauseless tables. Add pauselessFSM as an option in realtimeQuickStart [pinot]
9aman commented on code in PR #14961: URL: https://github.com/apache/pinot/pull/14961#discussion_r1942279690 ## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java: ## @@ -301,6 +309,74 @@ public ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap getConsumingSegmentsI } } + @GET + @Path("/tables/{tableName}/pauselessDebugInfo") + @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.GET_DEBUG_INFO) + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Returns state of pauseless table", notes = + "Gets the segments that are in error state and optionally gets COMMITTING segments based on the " + + "includeCommittingSegments parameter") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success"), + @ApiResponse(code = 404, message = "Table not found"), + @ApiResponse(code = 500, message = "Internal server error") + }) + public String getPauslessTableDebugInfo( + @ApiParam(value = "Realtime table name with or without type", required = true, example = "myTable | " + + "myTable_REALTIME") @PathParam("tableName") String realtimeTableName, + @ApiParam(value = "Flag to include committing segment info") @QueryParam("includeCommittingSegments") + @DefaultValue("false") boolean includeCommittingSegments, + @Context HttpHeaders headers) { +realtimeTableName = DatabaseUtils.translateTableName(realtimeTableName, headers); +try { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(realtimeTableName); Review Comment: Yes, the objective here was to allow the user to get ERROR segments for all tables. Additionally, they can choose to fetch the COMMITTING segments for pauseless tables by setting the flag to true. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Add debug API for pauseless tables. Add pauselessFSM as an option in realtimeQuickStart [pinot]
9aman commented on code in PR #14961: URL: https://github.com/apache/pinot/pull/14961#discussion_r1942278607 ## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java: ## @@ -301,6 +309,74 @@ public ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap getConsumingSegmentsI } } + @GET + @Path("/tables/{tableName}/pauselessDebugInfo") + @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.GET_DEBUG_INFO) + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Returns state of pauseless table", notes = + "Gets the segments that are in error state and optionally gets COMMITTING segments based on the " + + "includeCommittingSegments parameter") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success"), + @ApiResponse(code = 404, message = "Table not found"), + @ApiResponse(code = 500, message = "Internal server error") + }) + public String getPauslessTableDebugInfo( Review Comment: I don't feel this might get extended. Moreover, this is not called internally by controller or server and hence thought it's fine to not introduce another class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [I] Segments are lost [pinot]
raghukn commented on issue #14993: URL: https://github.com/apache/pinot/issues/14993#issuecomment-2635744569 @xiangfu0 @mayankshriv -- Plese help me triage -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [I] Segments are lost [pinot]
raghukn commented on issue #14993: URL: https://github.com/apache/pinot/issues/14993#issuecomment-2635743223 https://github.com/user-attachments/files/18667128/server0-log.log shows clearly segments all the way upto 159 getting created -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [I] Segments are lost [pinot]
raghukn commented on issue #14993: URL: https://github.com/apache/pinot/issues/14993#issuecomment-2635742116 [server0-log.log](https://github.com/user-attachments/files/18667128/server0-log.log) https://github.com/user-attachments/assets/d3a416e0-ebf5-4452-a5ca-f29874d3eb86"; /> https://github.com/user-attachments/assets/da2e7188-5cf9-4fca-9096-82e1af5bcd9e"; /> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Adding perf benchmark logic for GroupIdGenerator hash map [pinot]
codecov-commenter commented on PR #14992: URL: https://github.com/apache/pinot/pull/14992#issuecomment-2635740181 ## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/14992?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report All modified and coverable lines are covered by tests :white_check_mark: > Project coverage is 63.55%. Comparing base [(`59551e4`)](https://app.codecov.io/gh/apache/pinot/commit/59551e45224f1535c4863fd577622b37366ccc97?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) to head [(`6e70a53`)](https://app.codecov.io/gh/apache/pinot/commit/6e70a53bfddc4c09e2e8b7a8851c9745173b526f?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). > Report is 1673 commits behind head on master. Additional details and impacted files ```diff @@ Coverage Diff @@ ## master #14992 +/- ## + Coverage 61.75% 63.55% +1.80% - Complexity 207 1388+1181 Files 2436 2713 +277 Lines133233 152169 +18936 Branches 2063623526+2890 + Hits 8227496718 +1 - Misses4491148151+3240 - Partials 6048 7300+1252 ``` | [Flag](https://app.codecov.io/gh/apache/pinot/pull/14992/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | | |---|---|---| | [custom-integration1](https://app.codecov.io/gh/apache/pinot/pull/14992/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | | | [integration](https://app.codecov.io/gh/apache/pinot/pull/14992/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | | | [integration1](https://app.codecov.io/gh/apache/pinot/pull/14992/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | | | [integration2](https://app.codecov.io/gh/apache/pinot/pull/14992/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | | | [java-11](https://app.codecov.io/gh/apache/pinot/pull/14992/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | | | [java-21](https://app.codecov.io/gh/apache/pinot/pull/14992/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `63.55% <ø> (+1.93%)` | :arrow_up: | | [skip-bytebuffers-false](https://app.codecov.io/gh/apache/pinot/pull/14992/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `56.01% <ø> (-5.73%)` | :arrow_down: | | [skip-bytebuffers-true](https://app.codecov.io/gh/apache/pinot/pull/14992/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `63.54% <ø> (+35.82%)` | :arrow_up: | | [temurin](https://app.codecov.io/gh/apache/pinot/pull/14992/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `63.55% <ø> (+1.80%)` | :arrow_up: | | [unittests](https://app.codecov.io/gh/apache/pinot/pull/14992/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `63.55% <ø> (+1.81%)` | :arrow_up: | | [unittests1](https://app.codecov.io/gh/apache/pinot/pull/14992/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `56.06% <ø> (+9.17%)` | :arrow_up: | | [unittests2](https://app.codecov.io/gh/apache/pinot/pull/14992/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `34.01% <ø> (+6.28%)` | :arrow_up: | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more. [:umbrella: View full report in Codecov by Sentry](https://app.codecov.io/gh/apache/pinot/pull/14992?dropdown=coverage&src=pr&el=continue&utm_medium=referral&utm_source
[I] Segments are lost [pinot]
raghukn opened a new issue, #14993: URL: https://github.com/apache/pinot/issues/14993 Hi, I was ingesting data from Kafka Topic (679GB) yesterday and had seen 16M rows in my table by EOD yesterday. But when I check today, I am surprised there are only 14 segments and at total of only 4.5M rows. Not sure how the segments are lost ! Attached is the server 0's log file which seem to show around 159 segments created each of 50 rows. I was expecting 79M rows in the table by now (Given that row limit of my segment is 50 each). Can I know why segments are missing? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Calculate size based flush threshold per topic [pinot]
chenboat commented on PR #14765: URL: https://github.com/apache/pinot/pull/14765#issuecomment-2635674904 @Jackie-Jiang Do you still have concerns here? This PR looks good to me without extensive testings on Production. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]
9aman commented on code in PR #14920: URL: https://github.com/apache/pinot/pull/14920#discussion_r1942252443 ## pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReIngestionResource.java: ## @@ -0,0 +1,533 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api.resources; + +import com.google.common.base.Function; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiKeyAuthDefinition; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import io.swagger.annotations.Authorization; +import io.swagger.annotations.SecurityDefinition; +import io.swagger.annotations.SwaggerDefinition; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import javax.inject.Inject; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.NameValuePair; +import org.apache.hc.core5.http.message.BasicHeader; +import org.apache.hc.core5.http.message.BasicNameValuePair; +import org.apache.pinot.common.auth.AuthProviderUtils; +import org.apache.pinot.common.exception.HttpErrorStatusException; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.SimpleHttpResponse; +import org.apache.pinot.common.utils.TarCompressionUtils; +import org.apache.pinot.common.utils.URIUtils; +import org.apache.pinot.common.utils.http.HttpClient; +import org.apache.pinot.core.data.manager.InstanceDataManager; +import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager; +import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils; +import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.server.api.resources.reingestion.ReIngestionRequest; +import org.apache.pinot.server.api.resources.reingestion.ReIngestionResponse; +import org.apache.pinot.server.api.resources.reingestion.utils.StatelessRealtimeSegmentDataManager; +import org.apache.pinot.server.realtime.ControllerLeaderLocator; +import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler; +import org.apache.pinot.server.starter.ServerInstance; +import org.apache.pinot.spi.auth.AuthProvider; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.IngestionConfigUtils; +import org.apache.pinot.spi.utils.StringUtil; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.sl
Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]
9aman commented on code in PR #14920: URL: https://github.com/apache/pinot/pull/14920#discussion_r1942248666 ## pinot-server/src/main/java/org/apache/pinot/server/api/resources/reingestion/utils/StatelessRealtimeSegmentDataManager.java: ## @@ -0,0 +1,575 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api.resources.reingestion.utils; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.common.utils.TarCompressionUtils; +import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl; +import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager; +import org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory; +import org.apache.pinot.segment.local.segment.creator.TransformPipeline; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.local.utils.IngestionUtils; +import org.apache.pinot.segment.spi.MutableSegment; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; +import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory; +import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths; +import org.apache.pinot.spi.config.table.ColumnPartitionConfig; +import org.apache.pinot.spi.config.table.SegmentPartitionConfig; +import org.apache.pinot.spi.config.table.SegmentZKPropsConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.plugin.PluginManager; +import org.apache.pinot.spi.stream.MessageBatch; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConsumerFactory; +import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; +import org.apache.pinot.spi.stream.StreamDataDecoder; +import org.apache.pinot.spi.stream.StreamDataDecoderImpl; +import org.apache.pinot.spi.stream.StreamDataDecoderResult; +import org.apache.pinot.spi.stream.StreamMessage; +import org.apache.pinot.spi.stream.StreamMessageDecoder; +import org.apache.pinot.spi.stream.StreamMetadataProvider; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory; +import org.apache.pinot.spi.utils.retry.RetryPolicies; +import org.apache.pinot.spi.utils.retry.RetryPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Simplified Segment Data Manager for ingesting data from a start offset to an end offset. + */ +public class StatelessRealtimeSegmentDataManager extends SegmentDataManager { + + private static final int DEFAULT_CAPACITY = 100_000; + private static final int DEFAULT_FETCH_TIMEOUT_MS = 5000; + public static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = new FileUploadDownloadClient(); + + private final String _segmentName; + private final String _tableNameWithType; + private final int _partitionGroupId; + private final String _segmentNameStr; + private final SegmentZKMetadata _segmentZKMetadata; + private final TableConfig _tableConfig; + private final Schema _schema; + private final StreamConfi
Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]
9aman commented on code in PR #14920: URL: https://github.com/apache/pinot/pull/14920#discussion_r1942241681 ## pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java: ## @@ -1274,6 +1275,47 @@ public File downloadUntarFileStreamed(URI uri, File dest, AuthProvider authProvi httpHeaders, maxStreamRateInByte); } + /** + * Invokes the server's reIngestSegment API via a POST request with JSON payload, + * using Simple HTTP APIs. + * + * POST http://[serverURL]/reIngestSegment + * { + * "tableNameWithType": [tableName], + * "segmentName": [segmentName] + * } + */ + public void triggerReIngestion(String serverHostPort, String tableNameWithType, String segmentName) + throws IOException, URISyntaxException, HttpErrorStatusException { +String scheme = HTTP; +if (serverHostPort.contains(HTTPS)) { + scheme = HTTPS; + serverHostPort = serverHostPort.replace(HTTPS + "://", ""); +} else if (serverHostPort.contains(HTTP)) { + serverHostPort = serverHostPort.replace(HTTP + "://", ""); +} + +String serverHost = serverHostPort.split(":")[0]; +String serverPort = serverHostPort.split(":")[1]; + +URI reIngestUri = getURI(scheme, serverHost, Integer.parseInt(serverPort), REINGEST_SEGMENT_PATH); + +Map requestJson = new HashMap<>(); +requestJson.put("tableNameWithType", tableNameWithType); +requestJson.put("segmentName", segmentName); + +String jsonPayload = JsonUtils.objectToString(requestJson); +SimpleHttpResponse response = + HttpClient.wrapAndThrowHttpException(_httpClient.sendJsonPostRequest(reIngestUri, jsonPayload)); + +// Check that we got a 2xx response +int statusCode = response.getStatusCode(); +if (statusCode / 100 != 2) { + throw new IOException(String.format("Failed POST to %s, HTTP %d: %s", Review Comment: Was the code updated here ? I see that the comment has been resolved but no changes in the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[PR] Adding perf benchmark logic for GroupIdGenerator hash map [pinot]
shauryachats opened a new pull request, #14992: URL: https://github.com/apache/pinot/pull/14992 This PR adds performance benchmarking logic to identify and measure the improvement of different strategies for hash map selection, to make a data-driven choice on the hash map used to power `GroupIdGenerator`, which has been described in the issue https://github.com/apache/pinot/issues/14685. The results of this benchmark are: ``` Benchmark (_cardinality) Mode Cnt Score Error Units BenchmarkObjectOpenHashMap.object2IntOpenHashMap 50 avgt 20 111.262 ± 5.448 ms/op BenchmarkObjectOpenHashMap.object2IntOpenHashMap 100 avgt 20 299.255 ± 9.814 ms/op BenchmarkObjectOpenHashMap.object2IntOpenHashMap 500 avgt 20 1859.503 ± 58.990 ms/op BenchmarkObjectOpenHashMap.object2IntOpenHashMap2000 avgt 20 8236.525 ± 170.751 ms/op BenchmarkObjectOpenHashMap.object2IntReservedOpenHashMap 50 avgt 2079.908 ± 4.715 ms/op BenchmarkObjectOpenHashMap.object2IntReservedOpenHashMap 100 avgt 20 180.827 ± 19.987 ms/op BenchmarkObjectOpenHashMap.object2IntReservedOpenHashMap 500 avgt 20 1051.368 ± 49.204 ms/op BenchmarkObjectOpenHashMap.object2IntReservedOpenHashMap2000 avgt 20 3340.668 ± 106.874 ms/op BenchmarkObjectOpenHashMap.vanillaHashMap 50 avgt 20 109.589 ± 2.836 ms/op BenchmarkObjectOpenHashMap.vanillaHashMap100 avgt 20 265.262 ± 5.215 ms/op BenchmarkObjectOpenHashMap.vanillaHashMap500 avgt 20 1556.399 ± 49.787 ms/op BenchmarkObjectOpenHashMap.vanillaHashMap 2000 avgt 20 6757.234 ± 314.138 ms/op BenchmarkObjectOpenHashMap.vanillaReservedHashMap 50 avgt 2098.798 ± 4.344 ms/op BenchmarkObjectOpenHashMap.vanillaReservedHashMap100 avgt 20 228.480 ± 6.570 ms/op BenchmarkObjectOpenHashMap.vanillaReservedHashMap500 avgt 20 1067.580 ± 48.764 ms/op BenchmarkObjectOpenHashMap.vanillaReservedHashMap 2000 avgt 20 4725.897 ± 284.449 ms/op ``` which yields that the reserved hashmap performs ~2.5x better than the current unreserved hashmap, which led to the following PR: https://github.com/apache/pinot/pull/14981. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Bump com.google.cloud:libraries-bom from 26.53.0 to 26.54.0 [pinot]
Jackie-Jiang merged PR #14988: URL: https://github.com/apache/pinot/pull/14988 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Bump software.amazon.awssdk:bom from 2.30.11 to 2.30.12 [pinot]
Jackie-Jiang merged PR #14987: URL: https://github.com/apache/pinot/pull/14987 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Bump org.checkerframework:checker-qual from 3.48.4 to 3.49.0 [pinot]
Jackie-Jiang merged PR #14985: URL: https://github.com/apache/pinot/pull/14985 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Bump joda-time:joda-time from 2.13.0 to 2.13.1 [pinot]
Jackie-Jiang merged PR #14986: URL: https://github.com/apache/pinot/pull/14986 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]
Jackie-Jiang commented on code in PR #14920: URL: https://github.com/apache/pinot/pull/14920#discussion_r1942150405 ## pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java: ## @@ -389,15 +390,21 @@ protected void replaceSegmentIfCrcMismatch(SegmentDataManager segmentDataManager IndexLoadingConfig indexLoadingConfig) throws Exception { String segmentName = segmentDataManager.getSegmentName(); -Preconditions.checkState(segmentDataManager instanceof ImmutableSegmentDataManager, -"Cannot replace CONSUMING segment: %s in table: %s", segmentName, _tableNameWithType); -SegmentMetadata localMetadata = segmentDataManager.getSegment().getSegmentMetadata(); -if (hasSameCRC(zkMetadata, localMetadata)) { - _logger.info("Segment: {} has CRC: {} same as before, not replacing it", segmentName, localMetadata.getCrc()); - return; +TableConfig tableConfig = indexLoadingConfig.getTableConfig(); +// For pauseless tables, we should replace the segment if download url is missing even if crc is same +// Without this the reingestion of ERROR segments in pauseless tables fails +// as the segment data manager is still an instance of RealtimeSegmentDataManager +if (!PauselessConsumptionUtils.isPauselessEnabled(tableConfig)) { Review Comment: We probably need a new API to handle re-ingested segment. It is not regular segment push. With a new API we can also set the status to `DONE`, and then trigger the reset from the API. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Set CONSUMING state priority for helix state model [pinot]
somandal commented on code in PR #14991: URL: https://github.com/apache/pinot/pull/14991#discussion_r1942082673 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixSegmentOnlineOfflineStateModelGenerator.java: ## @@ -46,6 +40,10 @@ private PinotHelixSegmentOnlineOfflineStateModelGenerator() { public static final String DROPPED_STATE = "DROPPED"; public static final String CONSUMING_STATE = "CONSUMING"; + public static final int DEFAULT_TRANSITION_PRIORITY = 1000; + // Given a lower priority to let the consuming segment start last during server startup. + public static final int OFFLINE_TO_CONSUMING_TRANSITION_PRIORITY = 1100; Review Comment: what do you mean by buffer though? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Set CONSUMING state priority for helix state model [pinot]
xiangfu0 commented on code in PR #14991: URL: https://github.com/apache/pinot/pull/14991#discussion_r1942060957 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixSegmentOnlineOfflineStateModelGenerator.java: ## @@ -46,6 +40,10 @@ private PinotHelixSegmentOnlineOfflineStateModelGenerator() { public static final String DROPPED_STATE = "DROPPED"; public static final String CONSUMING_STATE = "CONSUMING"; + public static final int DEFAULT_TRANSITION_PRIORITY = 1000; + // Given a lower priority to let the consuming segment start last during server startup. + public static final int OFFLINE_TO_CONSUMING_TRANSITION_PRIORITY = 1100; Review Comment: just give enough buffer between each state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Introducing MSE result holder config to minimize rehashing for high cardinality group by [pinot]
shauryachats commented on code in PR #14981: URL: https://github.com/apache/pinot/pull/14981#discussion_r1941950925 ## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java: ## @@ -87,7 +87,13 @@ public MultistageGroupByExecutor( _aggType = aggType; _leafReturnFinalResult = leafReturnFinalResult; _resultSchema = resultSchema; + int maxInitialResultHolderCapacity = getMaxInitialResultHolderCapacity(opChainMetadata, nodeHint); +Integer mseCapacity = getMSEMaxInitialResultHolderCapacity(opChainMetadata, nodeHint); Review Comment: Addressed. ## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java: ## @@ -87,7 +87,13 @@ public MultistageGroupByExecutor( _aggType = aggType; _leafReturnFinalResult = leafReturnFinalResult; _resultSchema = resultSchema; + int maxInitialResultHolderCapacity = getMaxInitialResultHolderCapacity(opChainMetadata, nodeHint); +Integer mseCapacity = getMSEMaxInitialResultHolderCapacity(opChainMetadata, nodeHint); +if (mseCapacity != null) { + maxInitialResultHolderCapacity = mseCapacity; Review Comment: Addressed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Introducing MSE result holder config to minimize rehashing for high cardinality group by [pinot]
shauryachats commented on code in PR #14981: URL: https://github.com/apache/pinot/pull/14981#discussion_r1941951138 ## pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java: ## @@ -756,6 +757,8 @@ public static class Server { "pinot.server.query.executor.group.trim.size"; public static final String CONFIG_OF_QUERY_EXECUTOR_MAX_INITIAL_RESULT_HOLDER_CAPACITY = "pinot.server.query.executor.max.init.group.holder.capacity"; +public static final String CONFIG_OF_QUERY_EXECUTOR_MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY = Review Comment: Addressed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Introducing MSE result holder config to minimize rehashing for high cardinality group by [pinot]
ankitsultana commented on code in PR #14981: URL: https://github.com/apache/pinot/pull/14981#discussion_r1941904398 ## pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java: ## @@ -756,6 +757,8 @@ public static class Server { "pinot.server.query.executor.group.trim.size"; public static final String CONFIG_OF_QUERY_EXECUTOR_MAX_INITIAL_RESULT_HOLDER_CAPACITY = "pinot.server.query.executor.max.init.group.holder.capacity"; +public static final String CONFIG_OF_QUERY_EXECUTOR_MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY = Review Comment: Remove `QUERY_EXECUTOR` from the var name too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Introducing MSE result holder config to minimize rehashing for high cardinality group by [pinot]
ankitsultana commented on code in PR #14981: URL: https://github.com/apache/pinot/pull/14981#discussion_r1941911832 ## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java: ## @@ -87,7 +87,13 @@ public MultistageGroupByExecutor( _aggType = aggType; _leafReturnFinalResult = leafReturnFinalResult; _resultSchema = resultSchema; + int maxInitialResultHolderCapacity = getMaxInitialResultHolderCapacity(opChainMetadata, nodeHint); +Integer mseCapacity = getMSEMaxInitialResultHolderCapacity(opChainMetadata, nodeHint); Review Comment: nit: naming could be made slightly more precise (e.g. `mseMaxInitialResultHolderCapacity`). Also you could move this logic to a separate method in the class to keep this clean. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Introducing MSE result holder config to minimize rehashing for high cardinality group by [pinot]
ankitsultana commented on code in PR #14981: URL: https://github.com/apache/pinot/pull/14981#discussion_r1941907710 ## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java: ## @@ -87,7 +87,13 @@ public MultistageGroupByExecutor( _aggType = aggType; _leafReturnFinalResult = leafReturnFinalResult; _resultSchema = resultSchema; + int maxInitialResultHolderCapacity = getMaxInitialResultHolderCapacity(opChainMetadata, nodeHint); +Integer mseCapacity = getMSEMaxInitialResultHolderCapacity(opChainMetadata, nodeHint); +if (mseCapacity != null) { + maxInitialResultHolderCapacity = mseCapacity; Review Comment: I guess the behavior we are implementing is: 1. By default use the previous behavior 2. If a user has explicitly set a hint for the mse initial capacity, or set a server config, then use that. Can you also call it out in the Issue Description? We'll have to update Pinot Docs too later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Introducing MSE result holder config to minimize rehashing for high cardinality group by [pinot]
ankitsultana commented on code in PR #14981: URL: https://github.com/apache/pinot/pull/14981#discussion_r1941903211 ## pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java: ## @@ -62,6 +62,7 @@ public static class AggregateOptions { public static final String GROUP_TRIM_SIZE = "group_trim_size"; public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY = "max_initial_result_holder_capacity"; +public static final String MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY = "mse_max_initial_result_holder_capacity"; Review Comment: @Jackie-Jiang : do you have any recommendation for the hint name? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Set CONSUMING state priority for helix state model [pinot]
codecov-commenter commented on PR #14991: URL: https://github.com/apache/pinot/pull/14991#issuecomment-2635078552 ## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/14991?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report All modified and coverable lines are covered by tests :white_check_mark: > Project coverage is 63.59%. Comparing base [(`59551e4`)](https://app.codecov.io/gh/apache/pinot/commit/59551e45224f1535c4863fd577622b37366ccc97?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) to head [(`819a39d`)](https://app.codecov.io/gh/apache/pinot/commit/819a39d38958fd0091000d0adbd29408009ed2d5?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). > Report is 1669 commits behind head on master. Additional details and impacted files ```diff @@ Coverage Diff @@ ## master #14991 +/- ## + Coverage 61.75% 63.59% +1.84% - Complexity 207 1391+1184 Files 2436 2713 +277 Lines133233 152128 +18895 Branches 2063623519+2883 + Hits 8227496744 +14470 - Misses4491148085+3174 - Partials 6048 7299+1251 ``` | [Flag](https://app.codecov.io/gh/apache/pinot/pull/14991/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | | |---|---|---| | [custom-integration1](https://app.codecov.io/gh/apache/pinot/pull/14991/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | | | [integration](https://app.codecov.io/gh/apache/pinot/pull/14991/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | | | [integration1](https://app.codecov.io/gh/apache/pinot/pull/14991/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | | | [integration2](https://app.codecov.io/gh/apache/pinot/pull/14991/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | | | [java-11](https://app.codecov.io/gh/apache/pinot/pull/14991/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | | | [java-21](https://app.codecov.io/gh/apache/pinot/pull/14991/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `63.59% <100.00%> (+1.96%)` | :arrow_up: | | [skip-bytebuffers-false](https://app.codecov.io/gh/apache/pinot/pull/14991/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `63.58% <100.00%> (+1.84%)` | :arrow_up: | | [skip-bytebuffers-true](https://app.codecov.io/gh/apache/pinot/pull/14991/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `56.04% <ø> (+28.31%)` | :arrow_up: | | [temurin](https://app.codecov.io/gh/apache/pinot/pull/14991/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `63.59% <100.00%> (+1.84%)` | :arrow_up: | | [unittests](https://app.codecov.io/gh/apache/pinot/pull/14991/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `63.59% <100.00%> (+1.84%)` | :arrow_up: | | [unittests1](https://app.codecov.io/gh/apache/pinot/pull/14991/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `56.10% <ø> (+9.21%)` | :arrow_up: | | [unittests2](https://app.codecov.io/gh/apache/pinot/pull/14991/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `34.02% <100.00%> (+6.28%)` | :arrow_up: | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more. [:umbrella: View full report in Codecov by Sentry](https://app.codecov.io/gh/apache/pinot/pull/14991?dropdown=coverage&src=pr&el=continue&ut
Re: [PR] Set CONSUMING state priority for helix state model [pinot]
somandal commented on code in PR #14991: URL: https://github.com/apache/pinot/pull/14991#discussion_r1941854342 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixSegmentOnlineOfflineStateModelGenerator.java: ## @@ -46,6 +40,10 @@ private PinotHelixSegmentOnlineOfflineStateModelGenerator() { public static final String DROPPED_STATE = "DROPPED"; public static final String CONSUMING_STATE = "CONSUMING"; + public static final int DEFAULT_TRANSITION_PRIORITY = 1000; + // Given a lower priority to let the consuming segment start last during server startup. + public static final int OFFLINE_TO_CONSUMING_TRANSITION_PRIORITY = 1100; Review Comment: Why such high values? I guess you can make the default as 1 and offline to consuming as 2? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Introducing MSE result holder config to minimize rehashing for high cardinality group by [pinot]
shauryachats commented on code in PR #14981: URL: https://github.com/apache/pinot/pull/14981#discussion_r1941846709 ## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/GroupIdGeneratorFactory.java: ## @@ -25,24 +25,27 @@ public class GroupIdGeneratorFactory { private GroupIdGeneratorFactory() { } - public static GroupIdGenerator getGroupIdGenerator(ColumnDataType[] keyTypes, int numKeyColumns, int numGroupsLimit) { + public static GroupIdGenerator getGroupIdGenerator(ColumnDataType[] keyTypes, int numKeyColumns, + int numGroupsLimit, int maxInitialResultHolderCapacity) { +// Initial capacity is one more than expected to avoid rehashing if container is full. +int initialCapacity = 1 + Math.min(maxInitialResultHolderCapacity, numGroupsLimit); Review Comment: Agreed, updated. ## pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java: ## @@ -756,6 +757,8 @@ public static class Server { "pinot.server.query.executor.group.trim.size"; public static final String CONFIG_OF_QUERY_EXECUTOR_MAX_INITIAL_RESULT_HOLDER_CAPACITY = "pinot.server.query.executor.max.init.group.holder.capacity"; +public static final String CONFIG_OF_QUERY_EXECUTOR_MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY = +"pinot.server.query.executor.mse.max.init.group.holder.capacity"; Review Comment: Good point, updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[PR] Set CONSUMING state priority for helix state model [pinot]
xiangfu0 opened a new pull request, #14991: URL: https://github.com/apache/pinot/pull/14991 Set CONSUMING state priority for helix state model: Here we make ONLINE state to be the higher priority than CONSUMING. Why: Usually in the normal operations this won't make any difference. In large table restart scenario (e.g. the server restart or pre-process taking long time, say 1 hour), if the consuming segment is up early then the pinot server spent more cpu cycles on the consuming segments. In fact, we should let the bootstrap server/restart server to focus on the pre-process for those ONLINE segments transitions, and at the last to bring back the CONSUMING state. Another thing worth to mention is that, during the bootstrap, the running replica could consume data and commit multiple segments already, to allow the bootstrap replica to bypass CONSUMING and directly download the segments from deep store. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Introducing MSE result holder config to minimize rehashing for high cardinality group by [pinot]
ankitsultana commented on code in PR #14981: URL: https://github.com/apache/pinot/pull/14981#discussion_r1941785723 ## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/GroupIdGeneratorFactory.java: ## @@ -25,24 +25,27 @@ public class GroupIdGeneratorFactory { private GroupIdGeneratorFactory() { } - public static GroupIdGenerator getGroupIdGenerator(ColumnDataType[] keyTypes, int numKeyColumns, int numGroupsLimit) { + public static GroupIdGenerator getGroupIdGenerator(ColumnDataType[] keyTypes, int numKeyColumns, + int numGroupsLimit, int maxInitialResultHolderCapacity) { +// Initial capacity is one more than expected to avoid rehashing if container is full. +int initialCapacity = 1 + Math.min(maxInitialResultHolderCapacity, numGroupsLimit); Review Comment: Hash table resize would happen based on load factor though right? Adding 1 here might not do much. (default is usually 0.75) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Introducing MSE result holder config to minimize rehashing for high cardinality group by [pinot]
ankitsultana commented on code in PR #14981: URL: https://github.com/apache/pinot/pull/14981#discussion_r1941785723 ## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/GroupIdGeneratorFactory.java: ## @@ -25,24 +25,27 @@ public class GroupIdGeneratorFactory { private GroupIdGeneratorFactory() { } - public static GroupIdGenerator getGroupIdGenerator(ColumnDataType[] keyTypes, int numKeyColumns, int numGroupsLimit) { + public static GroupIdGenerator getGroupIdGenerator(ColumnDataType[] keyTypes, int numKeyColumns, + int numGroupsLimit, int maxInitialResultHolderCapacity) { +// Initial capacity is one more than expected to avoid rehashing if container is full. +int initialCapacity = 1 + Math.min(maxInitialResultHolderCapacity, numGroupsLimit); Review Comment: Hash table resize would happen based on load factor though right? Adding 1 here might not do much. ## pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java: ## @@ -756,6 +757,8 @@ public static class Server { "pinot.server.query.executor.group.trim.size"; public static final String CONFIG_OF_QUERY_EXECUTOR_MAX_INITIAL_RESULT_HOLDER_CAPACITY = "pinot.server.query.executor.max.init.group.holder.capacity"; +public static final String CONFIG_OF_QUERY_EXECUTOR_MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY = +"pinot.server.query.executor.mse.max.init.group.holder.capacity"; Review Comment: I think we could remove `query.executor` fragment from this altogether. afaik query.executor refers to ServerQueryExecutorV1Impl and the corresponding interface, which are V1 Engine constructs. That would yield: ``` pinot.server.mse.max.init.group.holder.capacity ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Add broker config with default value for is_enable_group_trim hint [pinot]
codecov-commenter commented on PR #14990: URL: https://github.com/apache/pinot/pull/14990#issuecomment-2634488309 ## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/14990?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report Attention: Patch coverage is `60.0%` with `8 lines` in your changes missing coverage. Please review. > Project coverage is 56.01%. Comparing base [(`59551e4`)](https://app.codecov.io/gh/apache/pinot/commit/59551e45224f1535c4863fd577622b37366ccc97?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) to head [(`fe8c273`)](https://app.codecov.io/gh/apache/pinot/commit/fe8c2739ef341e2bb23834363b2e4ffdaebd7320?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). > Report is 1669 commits behind head on master. | [Files with missing lines](https://app.codecov.io/gh/apache/pinot/pull/14990?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Patch % | Lines | |---|---|---| | [...el/rules/PinotAggregateExchangeNodeInsertRule.java](https://app.codecov.io/gh/apache/pinot/pull/14990?src=pr&el=tree&filepath=pinot-query-planner%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpinot%2Fcalcite%2Frel%2Frules%2FPinotAggregateExchangeNodeInsertRule.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcGxhbm5lci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2FsY2l0ZS9yZWwvcnVsZXMvUGlub3RBZ2dyZWdhdGVFeGNoYW5nZU5vZGVJbnNlcnRSdWxlLmphdmE=) | 50.00% | [3 Missing and 5 partials :warning: ](https://app.codecov.io/gh/apache/pinot/pull/14990?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | > :exclamation: There is a different number of reports uploaded between BASE (59551e4) and HEAD (fe8c273). Click for more details. > > HEAD has 53 uploads less than BASE > >| Flag | BASE (59551e4) | HEAD (fe8c273) | >|--|--|--| >|integration|7|0| >|integration2|3|0| >|temurin|12|1| >|java-21|7|1| >|skip-bytebuffers-true|3|0| >|skip-bytebuffers-false|7|1| >|unittests|5|1| >|unittests1|2|1| >|java-11|5|0| >|unittests2|3|0| >|integration1|2|0| >|custom-integration1|2|0| > Additional details and impacted files ```diff @@ Coverage Diff @@ ## master #14990 +/- ## - Coverage 61.75% 56.01% -5.74% - Complexity 207 710 +503 Files 2436 2122 -314 Lines133233 114585 -18648 Branches 2063618449-2187 - Hits 8227464186 -18088 - Misses4491145135 +224 + Partials 6048 5264 -784 ``` | [Flag](https://app.codecov.io/gh/apache/pinot/pull/14990/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | | |---|---|---| | [custom-integration1](https://app.codecov.io/gh/apache/pinot/pull/14990/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | | | [integration](https://app.codecov.io/gh/apache/pinot/pull/14990/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | | | [integration1](https://app.codecov.io/gh/apache/pinot/pull/14990/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | | | [integration2](https://app.codecov.io/gh/apache/pinot/pull/14990/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | | | [java-11](https://app.codecov.io/gh/apache/pinot/pull/14990/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | | | [java-21](https://app.codecov.io/gh/apache/pinot/pull/14990/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `56.01% <60.00%> (-5.61%)` | :arrow_down: | | [skip-bytebuffers-false](https://app.codecov.io/gh/apache/pinot/pull/14990/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `56.01% <60.00%> (-5.74%)` | :arrow_down: | | [skip-bytebuffers-true](https:
Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]
KKcorps commented on code in PR #14920: URL: https://github.com/apache/pinot/pull/14920#discussion_r1941507341 ## pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java: ## @@ -389,15 +390,21 @@ protected void replaceSegmentIfCrcMismatch(SegmentDataManager segmentDataManager IndexLoadingConfig indexLoadingConfig) throws Exception { String segmentName = segmentDataManager.getSegmentName(); -Preconditions.checkState(segmentDataManager instanceof ImmutableSegmentDataManager, -"Cannot replace CONSUMING segment: %s in table: %s", segmentName, _tableNameWithType); -SegmentMetadata localMetadata = segmentDataManager.getSegment().getSegmentMetadata(); -if (hasSameCRC(zkMetadata, localMetadata)) { - _logger.info("Segment: {} has CRC: {} same as before, not replacing it", segmentName, localMetadata.getCrc()); - return; +TableConfig tableConfig = indexLoadingConfig.getTableConfig(); +// For pauseless tables, we should replace the segment if download url is missing even if crc is same +// Without this the reingestion of ERROR segments in pauseless tables fails +// as the segment data manager is still an instance of RealtimeSegmentDataManager +if (!PauselessConsumptionUtils.isPauselessEnabled(tableConfig)) { Review Comment: Yeah, I am also not happy with this. The thing I am trying to solve for is basically making segment refresh succeed for reingesion Without this check it fails on the following line: ``` Preconditions.checkState(segmentDataManager instanceof ImmutableSegmentDataManager, "Cannot replace CONSUMING segment: %s in table: %s", segmentName, _tableNameWithType); ``` Segment refresh is triggered whenever a segment is uploaded which is what reingestion is doing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]
KKcorps commented on code in PR #14920: URL: https://github.com/apache/pinot/pull/14920#discussion_r1941501560 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ## @@ -2096,6 +2104,131 @@ URI createSegmentPath(String rawTableName, String segmentName) { return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName)); } + /** + * Re-ingests segments that are in ERROR state in EV but ONLINE in IS with no peer copy on any server. This method + * will call the server reIngestSegment API + * on one of the alive servers that are supposed to host that segment according to IdealState. + * + * API signature: + * POST http://[serverURL]/reIngestSegment + * Request body (JSON): + * { + * "tableNameWithType": [tableName], + * "segmentName": [segmentName] + * } + * + * @param tableNameWithType The table name with type, e.g. "myTable_REALTIME" + */ + public void reIngestSegmentsWithErrorState(String tableNameWithType) { +// Step 1: Fetch the ExternalView and all segments +ExternalView externalView = getExternalView(tableNameWithType); +IdealState idealState = getIdealState(tableNameWithType); +Map> segmentToInstanceCurrentStateMap = externalView.getRecord().getMapFields(); +Map> segmentToInstanceIdealStateMap = idealState.getRecord().getMapFields(); + +// find segments in ERROR state in externalView +List segmentsInErrorState = new ArrayList<>(); +for (Map.Entry> entry : segmentToInstanceCurrentStateMap.entrySet()) { + String segmentName = entry.getKey(); + Map instanceStateMap = entry.getValue(); + boolean allReplicasInError = true; + for (String state : instanceStateMap.values()) { +if (!SegmentStateModel.ERROR.equals(state)) { + allReplicasInError = false; + break; +} + } + if (allReplicasInError) { +segmentsInErrorState.add(segmentName); + } +} + +if (segmentsInErrorState.isEmpty()) { + LOGGER.info("No segments found in ERROR state for table {}", tableNameWithType); + return; +} + +// filter out segments that are not ONLINE in IdealState +for (String segmentName : segmentsInErrorState) { + Map instanceIdealStateMap = segmentToInstanceIdealStateMap.get(segmentName); + boolean isOnline = true; + for (String state : instanceIdealStateMap.values()) { +if (!SegmentStateModel.ONLINE.equals(state)) { + isOnline = false; + break; +} + } + if (!isOnline) { +segmentsInErrorState.remove(segmentName); + } +} + +// Step 2: For each segment, check the ZK metadata for conditions +for (String segmentName : segmentsInErrorState) { + // Skip non-LLC segments or segments missing from the ideal state altogether + LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName); + if (llcSegmentName == null) { +continue; + } + + SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(tableNameWithType, segmentName); + // We only consider segments that are in COMMITTING state + if (segmentZKMetadata.getStatus() == Status.COMMITTING) { +Map instanceStateMap = segmentToInstanceIdealStateMap.get(segmentName); + +// Step 3: “No peer has that segment.” => Re-ingest from one server that is supposed to host it and is alive +LOGGER.info( +"Segment {} in table {} is COMMITTING with missing download URL and no peer copy. Triggering re-ingestion.", +segmentName, tableNameWithType); + +// Find at least one server that should host this segment and is alive +String aliveServer = findAliveServerToReIngest(instanceStateMap.keySet()); +if (aliveServer == null) { + LOGGER.warn("No alive server found to re-ingest segment {} in table {}", segmentName, tableNameWithType); + continue; +} + +try { + _fileUploadDownloadClient.triggerReIngestion(aliveServer, tableNameWithType, segmentName); Review Comment: the reason I have kept it here so that I can utilise correct http client with all the ssl settings. What would be appropriate class to move this to? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]
KKcorps commented on PR #14920: URL: https://github.com/apache/pinot/pull/14920#issuecomment-2634466525 > When a segment is re-ingested, it should be `DONE` instead of `UPLOADED`. I didn't find the semaphore related logic. Is the PR description up to date? During reingestion segment metadata upload is triggered after segment building. Hence, the segment status appears as "UPLOADED" rather than "DONE". Upon upload completion, a segment refresh operation is triggered, which replaces the failed RealtimeSegmentDataManager with an ImmutableSegmentDataManager. During this replacement process, the system automatically releases associated semaphores. Once the segment is refreshed, we send a reset message so that it starts showing up as ONLINE in EV -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]
KKcorps commented on code in PR #14920: URL: https://github.com/apache/pinot/pull/14920#discussion_r1941464061 ## pinot-server/src/main/java/org/apache/pinot/server/api/resources/reingestion/utils/StatelessRealtimeSegmentDataManager.java: ## @@ -0,0 +1,575 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api.resources.reingestion.utils; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.common.utils.TarCompressionUtils; +import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl; +import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager; +import org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory; +import org.apache.pinot.segment.local.segment.creator.TransformPipeline; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.local.utils.IngestionUtils; +import org.apache.pinot.segment.spi.MutableSegment; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; +import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory; +import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths; +import org.apache.pinot.spi.config.table.ColumnPartitionConfig; +import org.apache.pinot.spi.config.table.SegmentPartitionConfig; +import org.apache.pinot.spi.config.table.SegmentZKPropsConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.plugin.PluginManager; +import org.apache.pinot.spi.stream.MessageBatch; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConsumerFactory; +import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; +import org.apache.pinot.spi.stream.StreamDataDecoder; +import org.apache.pinot.spi.stream.StreamDataDecoderImpl; +import org.apache.pinot.spi.stream.StreamDataDecoderResult; +import org.apache.pinot.spi.stream.StreamMessage; +import org.apache.pinot.spi.stream.StreamMessageDecoder; +import org.apache.pinot.spi.stream.StreamMetadataProvider; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory; +import org.apache.pinot.spi.utils.retry.RetryPolicies; +import org.apache.pinot.spi.utils.retry.RetryPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Simplified Segment Data Manager for ingesting data from a start offset to an end offset. + */ +public class StatelessRealtimeSegmentDataManager extends SegmentDataManager { + + private static final int DEFAULT_CAPACITY = 100_000; + private static final int DEFAULT_FETCH_TIMEOUT_MS = 5000; + public static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = new FileUploadDownloadClient(); + + private final String _segmentName; + private final String _tableNameWithType; + private final int _partitionGroupId; + private final String _segmentNameStr; + private final SegmentZKMetadata _segmentZKMetadata; + private final TableConfig _tableConfig; + private final Schema _schema; + private final StreamCon
[PR] Add broker config with default value for is_enable_group_trim hint [pinot]
bziobrowski opened a new pull request, #14990: URL: https://github.com/apache/pinot/pull/14990 PR adds `pinot.broker.enable.group.trim` broker configuration setting which allows enabling `is_enable_group_trim` hint for all queries, which in turn enables V1-style trimming in leaf nodes and v2-style trimming in intermediate nodes. This default value is overridden with value passed in hint. Examples: - if `pinot.broker.enable.group.trim=true` then ```sql set explainAskingServers=true; explain plan for select i, j, count(*) as cnt from testTable group by i, j order by i asc, j asc limit 3 ``` uses group trimming and ought to return plan containing (collations & limit are the important piece): ``` ... LogicalSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC], fetch=[3]) PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)], aggType=[FINAL], collations=[[0, 1]], limit=[3]) PinotLogicalExchange(distribution=[hash[0, 1]]) ... ``` - if we explicitly disable group trimming with hint, e.g. ```sql set explainAskingServers=true; explain plan for select /*+ aggOptions(is_enable_group_trim='false') */ i, j, count(*) as cnt from testTable group by i, j order by i asc, j asc limit 3 ``` then execution plan doesn't show collations & limit in PinotLogicalAggregate ``` ... LogicalSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC], fetch=[3]) PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)], aggType=[FINAL]) PinotLogicalExchange(distribution=[hash[0, 1]]) ... ``` which means that trimming is disabled. See https://docs.pinot.apache.org/users/user-guide-query/query-syntax/grouping-algorithm for details on grouping algorithm. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]
KKcorps commented on code in PR #14920: URL: https://github.com/apache/pinot/pull/14920#discussion_r1941436572 ## pinot-server/src/main/java/org/apache/pinot/server/api/resources/reingestion/utils/StatelessRealtimeSegmentDataManager.java: ## @@ -0,0 +1,575 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api.resources.reingestion.utils; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.common.utils.TarCompressionUtils; +import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl; +import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager; +import org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory; +import org.apache.pinot.segment.local.segment.creator.TransformPipeline; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.local.utils.IngestionUtils; +import org.apache.pinot.segment.spi.MutableSegment; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; +import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory; +import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths; +import org.apache.pinot.spi.config.table.ColumnPartitionConfig; +import org.apache.pinot.spi.config.table.SegmentPartitionConfig; +import org.apache.pinot.spi.config.table.SegmentZKPropsConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.plugin.PluginManager; +import org.apache.pinot.spi.stream.MessageBatch; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConsumerFactory; +import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; +import org.apache.pinot.spi.stream.StreamDataDecoder; +import org.apache.pinot.spi.stream.StreamDataDecoderImpl; +import org.apache.pinot.spi.stream.StreamDataDecoderResult; +import org.apache.pinot.spi.stream.StreamMessage; +import org.apache.pinot.spi.stream.StreamMessageDecoder; +import org.apache.pinot.spi.stream.StreamMetadataProvider; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory; +import org.apache.pinot.spi.utils.retry.RetryPolicies; +import org.apache.pinot.spi.utils.retry.RetryPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Simplified Segment Data Manager for ingesting data from a start offset to an end offset. + */ +public class StatelessRealtimeSegmentDataManager extends SegmentDataManager { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]
KKcorps commented on code in PR #14920: URL: https://github.com/apache/pinot/pull/14920#discussion_r1941429595 ## pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReIngestionResource.java: ## @@ -0,0 +1,533 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api.resources; + +import com.google.common.base.Function; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiKeyAuthDefinition; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import io.swagger.annotations.Authorization; +import io.swagger.annotations.SecurityDefinition; +import io.swagger.annotations.SwaggerDefinition; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import javax.inject.Inject; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.NameValuePair; +import org.apache.hc.core5.http.message.BasicHeader; +import org.apache.hc.core5.http.message.BasicNameValuePair; +import org.apache.pinot.common.auth.AuthProviderUtils; +import org.apache.pinot.common.exception.HttpErrorStatusException; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.SimpleHttpResponse; +import org.apache.pinot.common.utils.TarCompressionUtils; +import org.apache.pinot.common.utils.URIUtils; +import org.apache.pinot.common.utils.http.HttpClient; +import org.apache.pinot.core.data.manager.InstanceDataManager; +import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager; +import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils; +import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.server.api.resources.reingestion.ReIngestionRequest; +import org.apache.pinot.server.api.resources.reingestion.ReIngestionResponse; +import org.apache.pinot.server.api.resources.reingestion.utils.StatelessRealtimeSegmentDataManager; +import org.apache.pinot.server.realtime.ControllerLeaderLocator; +import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler; +import org.apache.pinot.server.starter.ServerInstance; +import org.apache.pinot.spi.auth.AuthProvider; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.IngestionConfigUtils; +import org.apache.pinot.spi.utils.StringUtil; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.
Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]
KKcorps commented on code in PR #14920: URL: https://github.com/apache/pinot/pull/14920#discussion_r1941389686 ## pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReIngestionResource.java: ## @@ -0,0 +1,533 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api.resources; + +import com.google.common.base.Function; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiKeyAuthDefinition; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import io.swagger.annotations.Authorization; +import io.swagger.annotations.SecurityDefinition; +import io.swagger.annotations.SwaggerDefinition; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import javax.inject.Inject; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.NameValuePair; +import org.apache.hc.core5.http.message.BasicHeader; +import org.apache.hc.core5.http.message.BasicNameValuePair; +import org.apache.pinot.common.auth.AuthProviderUtils; +import org.apache.pinot.common.exception.HttpErrorStatusException; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.SimpleHttpResponse; +import org.apache.pinot.common.utils.TarCompressionUtils; +import org.apache.pinot.common.utils.URIUtils; +import org.apache.pinot.common.utils.http.HttpClient; +import org.apache.pinot.core.data.manager.InstanceDataManager; +import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager; +import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils; +import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.server.api.resources.reingestion.ReIngestionRequest; +import org.apache.pinot.server.api.resources.reingestion.ReIngestionResponse; +import org.apache.pinot.server.api.resources.reingestion.utils.StatelessRealtimeSegmentDataManager; +import org.apache.pinot.server.realtime.ControllerLeaderLocator; +import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler; +import org.apache.pinot.server.starter.ServerInstance; +import org.apache.pinot.spi.auth.AuthProvider; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.IngestionConfigUtils; +import org.apache.pinot.spi.utils.StringUtil; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.
Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]
KKcorps commented on code in PR #14920: URL: https://github.com/apache/pinot/pull/14920#discussion_r1941388026 ## pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java: ## @@ -100,6 +100,14 @@ public AuthProvider getAuthProvider() { return _authProvider; } + public String getProtocol() { +return _protocol; + } + + public Integer getControllerHttpsPort() { +return _controllerHttpsPort; + } + Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]
KKcorps commented on code in PR #14920: URL: https://github.com/apache/pinot/pull/14920#discussion_r1941381089 ## pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReIngestionResource.java: ## @@ -0,0 +1,533 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api.resources; + +import com.google.common.base.Function; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiKeyAuthDefinition; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import io.swagger.annotations.Authorization; +import io.swagger.annotations.SecurityDefinition; +import io.swagger.annotations.SwaggerDefinition; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import javax.inject.Inject; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.NameValuePair; +import org.apache.hc.core5.http.message.BasicHeader; +import org.apache.hc.core5.http.message.BasicNameValuePair; +import org.apache.pinot.common.auth.AuthProviderUtils; +import org.apache.pinot.common.exception.HttpErrorStatusException; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.SimpleHttpResponse; +import org.apache.pinot.common.utils.TarCompressionUtils; +import org.apache.pinot.common.utils.URIUtils; +import org.apache.pinot.common.utils.http.HttpClient; +import org.apache.pinot.core.data.manager.InstanceDataManager; +import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager; +import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils; +import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.server.api.resources.reingestion.ReIngestionRequest; +import org.apache.pinot.server.api.resources.reingestion.ReIngestionResponse; +import org.apache.pinot.server.api.resources.reingestion.utils.StatelessRealtimeSegmentDataManager; +import org.apache.pinot.server.realtime.ControllerLeaderLocator; +import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler; +import org.apache.pinot.server.starter.ServerInstance; +import org.apache.pinot.spi.auth.AuthProvider; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.IngestionConfigUtils; +import org.apache.pinot.spi.utils.StringUtil; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.
Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]
9aman commented on code in PR #14920: URL: https://github.com/apache/pinot/pull/14920#discussion_r1941094499 ## pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReIngestionResource.java: ## @@ -0,0 +1,533 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api.resources; + +import com.google.common.base.Function; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiKeyAuthDefinition; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import io.swagger.annotations.Authorization; +import io.swagger.annotations.SecurityDefinition; +import io.swagger.annotations.SwaggerDefinition; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import javax.inject.Inject; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.NameValuePair; +import org.apache.hc.core5.http.message.BasicHeader; +import org.apache.hc.core5.http.message.BasicNameValuePair; +import org.apache.pinot.common.auth.AuthProviderUtils; +import org.apache.pinot.common.exception.HttpErrorStatusException; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.SimpleHttpResponse; +import org.apache.pinot.common.utils.TarCompressionUtils; +import org.apache.pinot.common.utils.URIUtils; +import org.apache.pinot.common.utils.http.HttpClient; +import org.apache.pinot.core.data.manager.InstanceDataManager; +import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager; +import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils; +import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.server.api.resources.reingestion.ReIngestionRequest; +import org.apache.pinot.server.api.resources.reingestion.ReIngestionResponse; +import org.apache.pinot.server.api.resources.reingestion.utils.StatelessRealtimeSegmentDataManager; +import org.apache.pinot.server.realtime.ControllerLeaderLocator; +import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler; +import org.apache.pinot.server.starter.ServerInstance; +import org.apache.pinot.spi.auth.AuthProvider; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.IngestionConfigUtils; +import org.apache.pinot.spi.utils.StringUtil; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.sl
Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]
KKcorps commented on code in PR #14920: URL: https://github.com/apache/pinot/pull/14920#discussion_r1941044749 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ## @@ -2096,6 +2104,131 @@ URI createSegmentPath(String rawTableName, String segmentName) { return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName)); } + /** + * Re-ingests segments that are in ERROR state in EV but ONLINE in IS with no peer copy on any server. This method + * will call the server reIngestSegment API + * on one of the alive servers that are supposed to host that segment according to IdealState. + * + * API signature: + * POST http://[serverURL]/reIngestSegment + * Request body (JSON): + * { + * "tableNameWithType": [tableName], + * "segmentName": [segmentName] + * } + * + * @param tableNameWithType The table name with type, e.g. "myTable_REALTIME" + */ + public void reIngestSegmentsWithErrorState(String tableNameWithType) { +// Step 1: Fetch the ExternalView and all segments +ExternalView externalView = getExternalView(tableNameWithType); +IdealState idealState = getIdealState(tableNameWithType); +Map> segmentToInstanceCurrentStateMap = externalView.getRecord().getMapFields(); +Map> segmentToInstanceIdealStateMap = idealState.getRecord().getMapFields(); + +// find segments in ERROR state in externalView +List segmentsInErrorState = new ArrayList<>(); +for (Map.Entry> entry : segmentToInstanceCurrentStateMap.entrySet()) { + String segmentName = entry.getKey(); + Map instanceStateMap = entry.getValue(); + boolean allReplicasInError = true; Review Comment: `When there are ONLINE replica, ideally we should reset the ERROR replica. Do we rely on validation manager for that?` yes that is already a part of validation manager https://github.com/apache/pinot/pull/14217/files -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]
KKcorps commented on code in PR #14920: URL: https://github.com/apache/pinot/pull/14920#discussion_r1941000154 ## pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReIngestionResource.java: ## @@ -0,0 +1,525 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api.resources; + +import com.google.common.base.Function; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiKeyAuthDefinition; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import io.swagger.annotations.Authorization; +import io.swagger.annotations.SecurityDefinition; +import io.swagger.annotations.SwaggerDefinition; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import javax.inject.Inject; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.NameValuePair; +import org.apache.hc.core5.http.message.BasicHeader; +import org.apache.hc.core5.http.message.BasicNameValuePair; +import org.apache.pinot.common.auth.AuthProviderUtils; +import org.apache.pinot.common.exception.HttpErrorStatusException; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.SimpleHttpResponse; +import org.apache.pinot.common.utils.TarCompressionUtils; +import org.apache.pinot.common.utils.URIUtils; +import org.apache.pinot.common.utils.http.HttpClient; +import org.apache.pinot.core.data.manager.InstanceDataManager; +import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager; +import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils; +import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.server.api.resources.reingestion.ReIngestionRequest; +import org.apache.pinot.server.api.resources.reingestion.ReIngestionResponse; +import org.apache.pinot.server.api.resources.reingestion.utils.StatelessRealtimeSegmentDataManager; +import org.apache.pinot.server.realtime.ControllerLeaderLocator; +import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler; +import org.apache.pinot.server.starter.ServerInstance; +import org.apache.pinot.spi.auth.AuthProvider; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.IngestionConfigUtils; +import org.apache.pinot.spi.utils.StringUtil; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.pinot.spi.utils.CommonConstants.DATABASE; +import static org.apache.pinot.spi.utils.Com
Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]
9aman commented on code in PR #14920: URL: https://github.com/apache/pinot/pull/14920#discussion_r1940998922 ## pinot-server/src/main/java/org/apache/pinot/server/api/resources/reingestion/utils/StatelessRealtimeSegmentDataManager.java: ## @@ -0,0 +1,575 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api.resources.reingestion.utils; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.common.utils.TarCompressionUtils; +import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl; +import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager; +import org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory; +import org.apache.pinot.segment.local.segment.creator.TransformPipeline; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.local.utils.IngestionUtils; +import org.apache.pinot.segment.spi.MutableSegment; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; +import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory; +import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths; +import org.apache.pinot.spi.config.table.ColumnPartitionConfig; +import org.apache.pinot.spi.config.table.SegmentPartitionConfig; +import org.apache.pinot.spi.config.table.SegmentZKPropsConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.plugin.PluginManager; +import org.apache.pinot.spi.stream.MessageBatch; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConsumerFactory; +import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; +import org.apache.pinot.spi.stream.StreamDataDecoder; +import org.apache.pinot.spi.stream.StreamDataDecoderImpl; +import org.apache.pinot.spi.stream.StreamDataDecoderResult; +import org.apache.pinot.spi.stream.StreamMessage; +import org.apache.pinot.spi.stream.StreamMessageDecoder; +import org.apache.pinot.spi.stream.StreamMetadataProvider; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory; +import org.apache.pinot.spi.utils.retry.RetryPolicies; +import org.apache.pinot.spi.utils.retry.RetryPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Simplified Segment Data Manager for ingesting data from a start offset to an end offset. + */ +public class StatelessRealtimeSegmentDataManager extends SegmentDataManager { + + private static final int DEFAULT_CAPACITY = 100_000; + private static final int DEFAULT_FETCH_TIMEOUT_MS = 5000; + public static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = new FileUploadDownloadClient(); + + private final String _segmentName; + private final String _tableNameWithType; + private final int _partitionGroupId; + private final String _segmentNameStr; + private final SegmentZKMetadata _segmentZKMetadata; + private final TableConfig _tableConfig; + private final Schema _schema; + private final StreamConfi
Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]
9aman commented on code in PR #14920: URL: https://github.com/apache/pinot/pull/14920#discussion_r1940993753 ## pinot-server/src/main/java/org/apache/pinot/server/api/resources/reingestion/utils/StatelessRealtimeSegmentDataManager.java: ## @@ -0,0 +1,575 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api.resources.reingestion.utils; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.common.utils.TarCompressionUtils; +import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl; +import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager; +import org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory; +import org.apache.pinot.segment.local.segment.creator.TransformPipeline; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.local.utils.IngestionUtils; +import org.apache.pinot.segment.spi.MutableSegment; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; +import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory; +import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths; +import org.apache.pinot.spi.config.table.ColumnPartitionConfig; +import org.apache.pinot.spi.config.table.SegmentPartitionConfig; +import org.apache.pinot.spi.config.table.SegmentZKPropsConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.plugin.PluginManager; +import org.apache.pinot.spi.stream.MessageBatch; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConsumerFactory; +import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; +import org.apache.pinot.spi.stream.StreamDataDecoder; +import org.apache.pinot.spi.stream.StreamDataDecoderImpl; +import org.apache.pinot.spi.stream.StreamDataDecoderResult; +import org.apache.pinot.spi.stream.StreamMessage; +import org.apache.pinot.spi.stream.StreamMessageDecoder; +import org.apache.pinot.spi.stream.StreamMetadataProvider; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory; +import org.apache.pinot.spi.utils.retry.RetryPolicies; +import org.apache.pinot.spi.utils.retry.RetryPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Simplified Segment Data Manager for ingesting data from a start offset to an end offset. + */ +public class StatelessRealtimeSegmentDataManager extends SegmentDataManager { + + private static final int DEFAULT_CAPACITY = 100_000; + private static final int DEFAULT_FETCH_TIMEOUT_MS = 5000; + public static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = new FileUploadDownloadClient(); + + private final String _segmentName; + private final String _tableNameWithType; + private final int _partitionGroupId; + private final String _segmentNameStr; + private final SegmentZKMetadata _segmentZKMetadata; + private final TableConfig _tableConfig; + private final Schema _schema; + private final StreamConfi
[PR] Bump org.apache.helix:helix-core from 1.3.1 to 1.4.3 [pinot]
dependabot[bot] opened a new pull request, #14989: URL: https://github.com/apache/pinot/pull/14989 Bumps [org.apache.helix:helix-core](https://github.com/apache/helix) from 1.3.1 to 1.4.3. Release notes Sourced from https://github.com/apache/helix/releases";>org.apache.helix:helix-core's releases. Apache Helix 1.4.2 Apache Release Note: https://helix.apache.org/1.4.2-docs/releasenotes/release-1.4.2.html";>https://helix.apache.org/1.4.2-docs/releasenotes/release-1.4.2.html What's Changed Use actions/upload-artifact@v4, v2 was deprecated by https://github.com/GrantPSpencer";>@GrantPSpencer in https://redirect.github.com/apache/helix/pull/2911";>apache/helix#2911 Fix new resource causing pipeline NPE when rebalance overwrites required by https://github.com/GrantPSpencer";>@GrantPSpencer in https://redirect.github.com/apache/helix/pull/2914";>apache/helix#2914 Pin netty_codec to higher version to address vulnerability by https://github.com/zpinto";>@zpinto in https://redirect.github.com/apache/helix/pull/2924";>apache/helix#2924 Bump express from 4.19.2 to 4.20.0 in /helix-front by https://github.com/dependabot";>@dependabot in https://redirect.github.com/apache/helix/pull/2917";>apache/helix#2917 Support aggregated endpoint for customized stoppable check by https://github.com/MarkGaox";>@MarkGaox in https://redirect.github.com/apache/helix/pull/2919";>apache/helix#2919 Fix flaky test - testMultipleWrites by https://github.com/GrantPSpencer";>@GrantPSpencer in https://redirect.github.com/apache/helix/pull/2926";>apache/helix#2926 Fix flaky test - testExists in TestZookeeperAccessor by https://github.com/GrantPSpencer";>@GrantPSpencer in https://redirect.github.com/apache/helix/pull/2931";>apache/helix#2931 Bump commons-io:commons-io from 2.11.0 to 2.14.0 in /helix-core by https://github.com/dependabot";>@dependabot in https://redirect.github.com/apache/helix/pull/2935";>apache/helix#2935 Bump commons-io:commons-io from 2.11.0 to 2.14.0 in /meta-client by https://github.com/dependabot";>@dependabot in https://redirect.github.com/apache/helix/pull/2936";>apache/helix#2936 Add test for topology migration by resource group by https://github.com/zpinto";>@zpinto in https://redirect.github.com/apache/helix/pull/2933";>apache/helix#2933 Fix BestPossibleExternalViewVerifier for WAGED resource by https://github.com/MarkGaox";>@MarkGaox in https://redirect.github.com/apache/helix/pull/2939";>apache/helix#2939 Bump org.eclipse.jetty:jetty-server from 9.4.51.v20230217 to 9.4.55.v20240627 by https://github.com/dependabot";>@dependabot in https://redirect.github.com/apache/helix/pull/2947";>apache/helix#2947 Bump http-proxy-middleware from 2.0.6 to 2.0.7 in /helix-front by https://github.com/dependabot";>@dependabot in https://redirect.github.com/apache/helix/pull/2952";>apache/helix#2952 Fix helix-rest memory leak by https://github.com/MarkGaox";>@MarkGaox in https://redirect.github.com/apache/helix/pull/2960";>apache/helix#2960 Full Changelog: https://github.com/apache/helix/compare/helix-1.4.1...helix-1.4.2";>https://github.com/apache/helix/compare/helix-1.4.1...helix-1.4.2 Apache Helix 1.4.1 Apache Release Note: https://helix.apache.org/1.4.1-docs/releasenotes/release-1.4.1.html";>https://helix.apache.org/1.4.1-docs/releasenotes/release-1.4.1.html What's Changed Metaclient updater retry logic by https://github.com/GrantPSpencer";>@GrantPSpencer in https://redirect.github.com/apache/helix/pull/2805";>apache/helix#2805 Fix race condition when reconnect by https://github.com/xyuanlu";>@xyuanlu in https://redirect.github.com/apache/helix/pull/2814";>apache/helix#2814 Fix flaky updateInstance(org.apache.helix.rest.server.TestPerInstanceAccessor) by https://github.com/zpinto";>@zpinto in https://redirect.github.com/apache/helix/pull/2825";>apache/helix#2825 setup cluster for ServiceDiscoveryDemo by https://github.com/JoeCqupt";>@JoeCqupt in https://redirect.github.com/apache/helix/pull/2812";>apache/helix#2812 [apache/helix] -- Added detail in the Exception message for WAGED rebalance (hard constraint) failures. by https://github.com/himanshukandwal";>@himanshukandwal in https://redirect.github.com/apache/helix/pull/2829";>apache/helix#2829 Change partitionAssignment API to handle ANY_LIVEINSTANCE by https://github.com/GrantPSpencer";>@GrantPSpencer in https://redirect.github.com/apache/helix/pull/2817";>apache/helix#2817 [apache/helix] -- Package resources in JDK 1.8 (backward compatible) jar by https://github.com/himanshukandwal";>@himanshukandwal in https://redirect.github.com/apache/helix/pull/2833";>apache/helix#2833 Fix unstable tests of TestPartitionAssignmentAPI and TestPerInstanceAccessor by https://github.com/junkaixue";>@junkaixue in https://redirect.github.com/apache/helix/pull/2843";>apache/helix#2843 Add support for ALL_RESOURCES key to disabled partitions by
[PR] Bump com.google.cloud:libraries-bom from 26.53.0 to 26.54.0 [pinot]
dependabot[bot] opened a new pull request, #14988: URL: https://github.com/apache/pinot/pull/14988 Bumps [com.google.cloud:libraries-bom](https://github.com/googleapis/java-cloud-bom) from 26.53.0 to 26.54.0. Release notes Sourced from https://github.com/googleapis/java-cloud-bom/releases";>com.google.cloud:libraries-bom's releases. v26.54.0 GCP Libraries BOM 26.54.0 Here are the differences from the previous version (26.53.0) New Addition com.google.cloud:google-cloud-parametermanager:0.1.0 The group ID of the following artifacts is com.google.cloud. Notable Changes google-cloud-bigquery 2.47.0 (prev: 2.46.0) bigquery: Support resource tags for datasets in java client (https://redirect.github.com/googleapis/java-bigquery/issues/3647";>#3647) (https://github.com/googleapis/java-bigquery/commit/01e0b742b9ffeafaa89b080a39d8a66c12c1fd3b";>01e0b74) bigquery: Remove ReadAPI bypass in executeSelect() (https://redirect.github.com/googleapis/java-bigquery/issues/3624";>#3624) (https://github.com/googleapis/java-bigquery/commit/fadd992a63fd1bc87c99cc689ed103f05de49a99";>fadd992) Close bq read client (https://redirect.github.com/googleapis/java-bigquery/issues/3644";>#3644) (https://github.com/googleapis/java-bigquery/commit/8833c97d73e3ba8e6a2061bbc55a6254b9e6668e";>8833c97) google-cloud-datastore 2.26.0 (prev: 2.25.2) Add firestoreInDatastoreMode for datastore emulator (https://redirect.github.com/googleapis/java-datastore/issues/1698";>#1698) (https://github.com/googleapis/java-datastore/commit/50f106d4c50884ce471a66c00df322270fe4a91c";>50f106d) google-cloud-spanner 6.86.0 (prev: 6.85.0) Add sample for asymmetric autoscaling instances (https://redirect.github.com/googleapis/java-spanner/issues/3562";>#3562) (https://github.com/googleapis/java-spanner/commit/3584b81a27bfcdd071fbf7e0d40dfa840ea88151";>3584b81) Support graph and pipe queries in Connection API (https://redirect.github.com/googleapis/java-spanner/issues/3586";>#3586) (https://github.com/googleapis/java-spanner/commit/71c306346d5b3805f55d5698cf8867d5f4ae519e";>71c3063) Always add instance-id for built-in metrics (https://redirect.github.com/googleapis/java-spanner/issues/3612";>#3612) (https://github.com/googleapis/java-spanner/commit/705b627646f1679b7d1c4c1f86a853872cf8bfd5";>705b627) spanner: Moved mTLSContext configurator from builder to construtor (https://redirect.github.com/googleapis/java-spanner/issues/3605";>#3605) (https://github.com/googleapis/java-spanner/commit/ac7c30bfb14bdafc11675c2a120effde4a71c922";>ac7c30b) google-cloud-storage 2.48.1 (prev: 2.47.0) Add new Storage#moveBlob method to atomically rename an object (https://redirect.github.com/googleapis/java-storage/issues/2882";>#2882) (https://github.com/googleapis/java-storage/commit/c49fd08582c7235919270c1dd4eb2ece6933d302";>c49fd08) Update Signed URL default scheme to resolve from storage options host (https://redirect.github.com/googleapis/java-storage/issues/2880";>#2880) (https://github.com/googleapis/java-storage/commit/7ae7e3998930c1bec72ff7c06ebc2b66343852ca";>7ae7e39), closes https://redirect.github.com/googleapis/java-storage/issues/2870";>#2870 Update StorageException translation of an ApiException to include error details (https://redirect.github.com/googleapis/java-storage/issues/2872";>#2872) (https://github.com/googleapis/java-storage/commit/8ad501012fab0dfd8d0f0dce49d7c681540022a9";>8ad5010) Update batch handling to ensure each operation has its own unique idempotency-token (https://redirect.github.com/googleapis/java-storage/issues/2905";>#2905) (https://github.com/googleapis/java-storage/commit/8d79b8d9cea30c6bba0d2550fa397b8c8b7acc3c";>8d79b8d) Other libraries [aiplatform] add Context Cache to v1 (https://github.com/googleapis/google-cloud-java/commit/87de77d00b5bb8bcea1046a412288386e65bba0d";>87de77d) [aiplatform] Add machine_spec, data_persistent_disk_spec, network_spec, euc_config, shielded_vm_config to .google.cloud.aiplatform.v1beta1.NotebookRuntime (https://github.com/googleapis/google-cloud-java/commit/87de77d00b5bb8bcea1046a412288386e65bba0d";>87de77d) [aiplatform] Add machine_spec, data_persistent_disk_spec, network_spec, euc_config, shielded_vm_config to message .google.cloud.aiplatform.v1.NotebookRuntime (https://github.com/googleapis/google-cloud-java/commit/87de77d00b5bb8bcea1046a412288386e65bba0d";>87de77d) [aiplatform] add optimized config in v1 API (https://github.com/googleapis/google-cloud-java/commit/87de77d00b5bb8bcea1046a412288386e65bba0d";>87de77d) [aiplatform] add per-modality token count break downs for GenAI APIs (https://github.com/googleapis/google-cloud-java/commit/87de77d00b5bb8bcea1046a412288386e65bba0d";>87de77d) [aiplatform] add per-modality token count break downs for Gen
[PR] Bump software.amazon.awssdk:bom from 2.30.11 to 2.30.12 [pinot]
dependabot[bot] opened a new pull request, #14987: URL: https://github.com/apache/pinot/pull/14987 Bumps software.amazon.awssdk:bom from 2.30.11 to 2.30.12. Most Recent Ignore Conditions Applied to This Pull Request | Dependency Name | Ignore Conditions | | --- | --- | | software.amazon.awssdk:bom | [< 2.29, > 2.28.12] | [![Dependabot compatibility score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=software.amazon.awssdk:bom&package-manager=maven&previous-version=2.30.11&new-version=2.30.12)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores) Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting `@dependabot rebase`. [//]: # (dependabot-automerge-start) [//]: # (dependabot-automerge-end) --- Dependabot commands and options You can trigger Dependabot actions by commenting on this PR: - `@dependabot rebase` will rebase this PR - `@dependabot recreate` will recreate this PR, overwriting any edits that have been made to it - `@dependabot merge` will merge this PR after your CI passes on it - `@dependabot squash and merge` will squash and merge this PR after your CI passes on it - `@dependabot cancel merge` will cancel a previously requested merge and block automerging - `@dependabot reopen` will reopen this PR if it is closed - `@dependabot close` will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually - `@dependabot show ignore conditions` will show all of the ignore conditions of the specified dependency - `@dependabot ignore this major version` will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself) - `@dependabot ignore this minor version` will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself) - `@dependabot ignore this dependency` will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[PR] Bump joda-time:joda-time from 2.13.0 to 2.13.1 [pinot]
dependabot[bot] opened a new pull request, #14986: URL: https://github.com/apache/pinot/pull/14986 Bumps [joda-time:joda-time](https://github.com/JodaOrg/joda-time) from 2.13.0 to 2.13.1. Release notes Sourced from https://github.com/JodaOrg/joda-time/releases";>joda-time:joda-time's releases. Release v2.13.1 See the https://www.joda.org/joda-time/changes-report.html#a2.13.1";>change notes for more information. What's Changed Update time zone data to 2025agtz by https://github.com/github-actions";>@github-actions in https://redirect.github.com/JodaOrg/joda-time/pull/805";>JodaOrg/joda-time#805 Full Changelog: https://github.com/JodaOrg/joda-time/compare/v2.13.0...v2.13.1";>https://github.com/JodaOrg/joda-time/compare/v2.13.0...v2.13.1 Commits https://github.com/JodaOrg/joda-time/commit/935d44edf8ce824005e2e8423536ad23d0dd6016";>935d44e Release v2.13.1 https://github.com/JodaOrg/joda-time/commit/f42d2d039f94021c35c699393f6701d760b8d800";>f42d2d0 Update time zone data to 2025agtz (https://redirect.github.com/JodaOrg/joda-time/issues/805";>#805) https://github.com/JodaOrg/joda-time/commit/3fcd96c262113a4d0cc0806c78669198f01d46f9";>3fcd96c Update CI fuzz version https://github.com/JodaOrg/joda-time/commit/8430877b2da61124a4a2527a5c45f0cb43405470";>8430877 Add workflow dispatch https://github.com/JodaOrg/joda-time/commit/2ce9b4cdc207130a186dafbb8456cd0b0cfd9e4c";>2ce9b4c Update changes.xml See full diff in https://github.com/JodaOrg/joda-time/compare/v2.13.0...v2.13.1";>compare view [![Dependabot compatibility score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=joda-time:joda-time&package-manager=maven&previous-version=2.13.0&new-version=2.13.1)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores) Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting `@dependabot rebase`. [//]: # (dependabot-automerge-start) [//]: # (dependabot-automerge-end) --- Dependabot commands and options You can trigger Dependabot actions by commenting on this PR: - `@dependabot rebase` will rebase this PR - `@dependabot recreate` will recreate this PR, overwriting any edits that have been made to it - `@dependabot merge` will merge this PR after your CI passes on it - `@dependabot squash and merge` will squash and merge this PR after your CI passes on it - `@dependabot cancel merge` will cancel a previously requested merge and block automerging - `@dependabot reopen` will reopen this PR if it is closed - `@dependabot close` will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually - `@dependabot show ignore conditions` will show all of the ignore conditions of the specified dependency - `@dependabot ignore this major version` will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself) - `@dependabot ignore this minor version` will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself) - `@dependabot ignore this dependency` will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[PR] Bump org.checkerframework:checker-qual from 3.48.4 to 3.49.0 [pinot]
dependabot[bot] opened a new pull request, #14985: URL: https://github.com/apache/pinot/pull/14985 Bumps [org.checkerframework:checker-qual](https://github.com/typetools/checker-framework) from 3.48.4 to 3.49.0. Release notes Sourced from https://github.com/typetools/checker-framework/releases";>org.checkerframework:checker-qual's releases. Checker Framework 3.49.0 Version 3.49.0 (February 3, 2025) User-visible changes: The Optional Checker is more precise for Optional values resulting from operations on container types (e.g., List, Map, Iterable). It supports two new annotations: @NonEmpty @UnknownNonEmpty The Signature Checker no longer supports @BinaryNameWithoutPackage because it is equivalent to @Identifier; use @Identifier instead. The JavaStubifier implementation now appears in package org.checkerframework.framework.stubifier.JavaStubifier. Closed issues: https://redirect.github.com/typetools/checker-framework/issues/6935";>#6935, https://redirect.github.com/typetools/checker-framework/issues/6936";>#6936, https://redirect.github.com/typetools/checker-framework/issues/6939";>#6939. Changelog Sourced from https://github.com/typetools/checker-framework/blob/master/docs/CHANGELOG.md";>org.checkerframework:checker-qual's changelog. Version 3.49.0 (February 3, 2025) User-visible changes: The Optional Checker is more precise for Optional values resulting from operations on container types (e.g., List, Map, Iterable). It supports two new annotations: @NonEmpty @UnknownNonEmpty The Signature Checker no longer supports @BinaryNameWithoutPackage because it is equivalent to @Identifier; use @Identifier instead. The JavaStubifier implementation now appears in package org.checkerframework.framework.stubifier.JavaStubifier. Closed issues: https://redirect.github.com/typetools/checker-framework/issues/6935";>#6935, https://redirect.github.com/typetools/checker-framework/issues/6936";>#6936, https://redirect.github.com/typetools/checker-framework/issues/6939";>#6939. Commits https://github.com/typetools/checker-framework/commit/8ae32b8532a518d50736a6f3e6a4803f87ee16a7";>8ae32b8 new release 3.49.0 https://github.com/typetools/checker-framework/commit/852188908513895ee85e2a1db863c7e74b8684ea";>8521889 Prep for release. https://github.com/typetools/checker-framework/commit/637c79a688db2daa09a6239ef224d88567ef5d06";>637c79a Improve diagnostics https://github.com/typetools/checker-framework/commit/1f69ec2f4203a6161d178d42389c32cf82990b84";>1f69ec2 Update dependency gradle to v8.12.1 (https://redirect.github.com/typetools/checker-framework/issues/6937";>#6937) https://github.com/typetools/checker-framework/commit/64e84df0e09aa3ca381b5812896669fe17c60442";>64e84df Fix Gradle deprecation warnings https://github.com/typetools/checker-framework/commit/b7aea1eae09034ca83ff7bd6d8320cb4b3280abc";>b7aea1e Update to Gradle 8.12 https://github.com/typetools/checker-framework/commit/b48f3b4b2f1f701892f17871278916bc076ed830";>b48f3b4 Fix favicon problem. (https://redirect.github.com/typetools/checker-framework/issues/6964";>#6964) https://github.com/typetools/checker-framework/commit/de51e9f08eea783efadbab75362ff1ea076b5e49";>de51e9f Use built in file tools rather than exec in Gradle code https://github.com/typetools/checker-framework/commit/3531816ce29ce122280d22ac06df1c76821d97c4";>3531816 Move code that adds favicon to separate task https://github.com/typetools/checker-framework/commit/441b52773e23466f2bd6746cddf5711a311e058d";>441b527 Remove exec from settings in Gradle code Additional commits viewable in https://github.com/typetools/checker-framework/compare/checker-framework-3.48.4...checker-framework-3.49.0";>compare view [![Dependabot compatibility score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=org.checkerframework:checker-qual&package-manager=maven&previous-version=3.48.4&new-version=3.49.0)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores) Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting `@dependabot rebase`. [//]: # (dependabot-automerge-start) [//]: # (dependabot-automerge-end) --- Dependabot commands and options You can trigger Dependabot actions by commenting on this PR: - `@dependabot rebase` will rebase this PR - `@dependabot recreate` will recreate this PR, overwriting any edits that have been made to it - `@dependabot merge` will merge this PR after your CI passes on it - `@dependabot squash and merge` will squash and merge this PR after your CI passes on it - `@dependabot cancel merge` will cancel a
Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]
9aman commented on code in PR #14920: URL: https://github.com/apache/pinot/pull/14920#discussion_r1940974264 ## pinot-server/src/main/java/org/apache/pinot/server/api/resources/reingestion/utils/StatelessRealtimeSegmentDataManager.java: ## @@ -0,0 +1,575 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api.resources.reingestion.utils; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.common.utils.TarCompressionUtils; +import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl; +import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager; +import org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory; +import org.apache.pinot.segment.local.segment.creator.TransformPipeline; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.local.utils.IngestionUtils; +import org.apache.pinot.segment.spi.MutableSegment; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; +import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory; +import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths; +import org.apache.pinot.spi.config.table.ColumnPartitionConfig; +import org.apache.pinot.spi.config.table.SegmentPartitionConfig; +import org.apache.pinot.spi.config.table.SegmentZKPropsConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.plugin.PluginManager; +import org.apache.pinot.spi.stream.MessageBatch; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConsumerFactory; +import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; +import org.apache.pinot.spi.stream.StreamDataDecoder; +import org.apache.pinot.spi.stream.StreamDataDecoderImpl; +import org.apache.pinot.spi.stream.StreamDataDecoderResult; +import org.apache.pinot.spi.stream.StreamMessage; +import org.apache.pinot.spi.stream.StreamMessageDecoder; +import org.apache.pinot.spi.stream.StreamMetadataProvider; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory; +import org.apache.pinot.spi.utils.retry.RetryPolicies; +import org.apache.pinot.spi.utils.retry.RetryPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Simplified Segment Data Manager for ingesting data from a start offset to an end offset. + */ +public class StatelessRealtimeSegmentDataManager extends SegmentDataManager { + + private static final int DEFAULT_CAPACITY = 100_000; + private static final int DEFAULT_FETCH_TIMEOUT_MS = 5000; + public static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = new FileUploadDownloadClient(); + + private final String _segmentName; + private final String _tableNameWithType; + private final int _partitionGroupId; + private final String _segmentNameStr; + private final SegmentZKMetadata _segmentZKMetadata; + private final TableConfig _tableConfig; + private final Schema _schema; + private final StreamConfi
Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]
9aman commented on code in PR #14920: URL: https://github.com/apache/pinot/pull/14920#discussion_r1940959863 ## pinot-server/src/main/java/org/apache/pinot/server/api/resources/reingestion/utils/StatelessRealtimeSegmentDataManager.java: ## @@ -0,0 +1,575 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api.resources.reingestion.utils; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.common.utils.TarCompressionUtils; +import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl; +import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager; +import org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory; +import org.apache.pinot.segment.local.segment.creator.TransformPipeline; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.local.utils.IngestionUtils; +import org.apache.pinot.segment.spi.MutableSegment; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; +import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory; +import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths; +import org.apache.pinot.spi.config.table.ColumnPartitionConfig; +import org.apache.pinot.spi.config.table.SegmentPartitionConfig; +import org.apache.pinot.spi.config.table.SegmentZKPropsConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.plugin.PluginManager; +import org.apache.pinot.spi.stream.MessageBatch; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConsumerFactory; +import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; +import org.apache.pinot.spi.stream.StreamDataDecoder; +import org.apache.pinot.spi.stream.StreamDataDecoderImpl; +import org.apache.pinot.spi.stream.StreamDataDecoderResult; +import org.apache.pinot.spi.stream.StreamMessage; +import org.apache.pinot.spi.stream.StreamMessageDecoder; +import org.apache.pinot.spi.stream.StreamMetadataProvider; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory; +import org.apache.pinot.spi.utils.retry.RetryPolicies; +import org.apache.pinot.spi.utils.retry.RetryPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Simplified Segment Data Manager for ingesting data from a start offset to an end offset. + */ +public class StatelessRealtimeSegmentDataManager extends SegmentDataManager { + + private static final int DEFAULT_CAPACITY = 100_000; + private static final int DEFAULT_FETCH_TIMEOUT_MS = 5000; + public static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = new FileUploadDownloadClient(); + + private final String _segmentName; + private final String _tableNameWithType; + private final int _partitionGroupId; + private final String _segmentNameStr; + private final SegmentZKMetadata _segmentZKMetadata; + private final TableConfig _tableConfig; + private final Schema _schema; + private final StreamConfi
Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]
9aman commented on code in PR #14920: URL: https://github.com/apache/pinot/pull/14920#discussion_r1940951429 ## pinot-server/src/main/java/org/apache/pinot/server/api/resources/reingestion/utils/StatelessRealtimeSegmentDataManager.java: ## @@ -0,0 +1,575 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api.resources.reingestion.utils; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.common.utils.TarCompressionUtils; +import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl; +import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager; +import org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory; +import org.apache.pinot.segment.local.segment.creator.TransformPipeline; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.local.utils.IngestionUtils; +import org.apache.pinot.segment.spi.MutableSegment; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; +import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory; +import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths; +import org.apache.pinot.spi.config.table.ColumnPartitionConfig; +import org.apache.pinot.spi.config.table.SegmentPartitionConfig; +import org.apache.pinot.spi.config.table.SegmentZKPropsConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.plugin.PluginManager; +import org.apache.pinot.spi.stream.MessageBatch; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConsumerFactory; +import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; +import org.apache.pinot.spi.stream.StreamDataDecoder; +import org.apache.pinot.spi.stream.StreamDataDecoderImpl; +import org.apache.pinot.spi.stream.StreamDataDecoderResult; +import org.apache.pinot.spi.stream.StreamMessage; +import org.apache.pinot.spi.stream.StreamMessageDecoder; +import org.apache.pinot.spi.stream.StreamMetadataProvider; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory; +import org.apache.pinot.spi.utils.retry.RetryPolicies; +import org.apache.pinot.spi.utils.retry.RetryPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Simplified Segment Data Manager for ingesting data from a start offset to an end offset. + */ +public class StatelessRealtimeSegmentDataManager extends SegmentDataManager { + + private static final int DEFAULT_CAPACITY = 100_000; + private static final int DEFAULT_FETCH_TIMEOUT_MS = 5000; + public static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = new FileUploadDownloadClient(); + + private final String _segmentName; + private final String _tableNameWithType; + private final int _partitionGroupId; + private final String _segmentNameStr; + private final SegmentZKMetadata _segmentZKMetadata; + private final TableConfig _tableConfig; + private final Schema _schema; + private final StreamConfi
Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]
9aman commented on code in PR #14920: URL: https://github.com/apache/pinot/pull/14920#discussion_r1940948236 ## pinot-server/src/main/java/org/apache/pinot/server/api/resources/reingestion/utils/StatelessRealtimeSegmentDataManager.java: ## @@ -0,0 +1,575 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api.resources.reingestion.utils; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.common.utils.TarCompressionUtils; +import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl; +import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager; +import org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory; +import org.apache.pinot.segment.local.segment.creator.TransformPipeline; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.local.utils.IngestionUtils; +import org.apache.pinot.segment.spi.MutableSegment; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; +import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory; +import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths; +import org.apache.pinot.spi.config.table.ColumnPartitionConfig; +import org.apache.pinot.spi.config.table.SegmentPartitionConfig; +import org.apache.pinot.spi.config.table.SegmentZKPropsConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.plugin.PluginManager; +import org.apache.pinot.spi.stream.MessageBatch; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConsumerFactory; +import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; +import org.apache.pinot.spi.stream.StreamDataDecoder; +import org.apache.pinot.spi.stream.StreamDataDecoderImpl; +import org.apache.pinot.spi.stream.StreamDataDecoderResult; +import org.apache.pinot.spi.stream.StreamMessage; +import org.apache.pinot.spi.stream.StreamMessageDecoder; +import org.apache.pinot.spi.stream.StreamMetadataProvider; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory; +import org.apache.pinot.spi.utils.retry.RetryPolicies; +import org.apache.pinot.spi.utils.retry.RetryPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Simplified Segment Data Manager for ingesting data from a start offset to an end offset. + */ +public class StatelessRealtimeSegmentDataManager extends SegmentDataManager { + + private static final int DEFAULT_CAPACITY = 100_000; + private static final int DEFAULT_FETCH_TIMEOUT_MS = 5000; + public static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = new FileUploadDownloadClient(); + + private final String _segmentName; + private final String _tableNameWithType; + private final int _partitionGroupId; + private final String _segmentNameStr; + private final SegmentZKMetadata _segmentZKMetadata; + private final TableConfig _tableConfig; + private final Schema _schema; + private final StreamConfi
Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]
9aman commented on code in PR #14920: URL: https://github.com/apache/pinot/pull/14920#discussion_r1940875087 ## pinot-server/src/main/java/org/apache/pinot/server/api/resources/reingestion/utils/StatelessRealtimeSegmentDataManager.java: ## @@ -0,0 +1,575 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api.resources.reingestion.utils; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.common.utils.TarCompressionUtils; +import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl; +import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager; +import org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory; +import org.apache.pinot.segment.local.segment.creator.TransformPipeline; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.local.utils.IngestionUtils; +import org.apache.pinot.segment.spi.MutableSegment; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; +import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory; +import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths; +import org.apache.pinot.spi.config.table.ColumnPartitionConfig; +import org.apache.pinot.spi.config.table.SegmentPartitionConfig; +import org.apache.pinot.spi.config.table.SegmentZKPropsConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.plugin.PluginManager; +import org.apache.pinot.spi.stream.MessageBatch; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConsumerFactory; +import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; +import org.apache.pinot.spi.stream.StreamDataDecoder; +import org.apache.pinot.spi.stream.StreamDataDecoderImpl; +import org.apache.pinot.spi.stream.StreamDataDecoderResult; +import org.apache.pinot.spi.stream.StreamMessage; +import org.apache.pinot.spi.stream.StreamMessageDecoder; +import org.apache.pinot.spi.stream.StreamMetadataProvider; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory; +import org.apache.pinot.spi.utils.retry.RetryPolicies; +import org.apache.pinot.spi.utils.retry.RetryPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Simplified Segment Data Manager for ingesting data from a start offset to an end offset. + */ +public class StatelessRealtimeSegmentDataManager extends SegmentDataManager { + + private static final int DEFAULT_CAPACITY = 100_000; + private static final int DEFAULT_FETCH_TIMEOUT_MS = 5000; + public static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = new FileUploadDownloadClient(); + + private final String _segmentName; + private final String _tableNameWithType; + private final int _partitionGroupId; + private final String _segmentNameStr; + private final SegmentZKMetadata _segmentZKMetadata; + private final TableConfig _tableConfig; + private final Schema _schema; + private final StreamConfi
Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]
9aman commented on code in PR #14920: URL: https://github.com/apache/pinot/pull/14920#discussion_r1940867017 ## pinot-server/src/main/java/org/apache/pinot/server/api/resources/reingestion/utils/StatelessRealtimeSegmentDataManager.java: ## @@ -0,0 +1,575 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api.resources.reingestion.utils; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.common.utils.TarCompressionUtils; +import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl; +import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager; +import org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory; +import org.apache.pinot.segment.local.segment.creator.TransformPipeline; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.local.utils.IngestionUtils; +import org.apache.pinot.segment.spi.MutableSegment; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; +import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory; +import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths; +import org.apache.pinot.spi.config.table.ColumnPartitionConfig; +import org.apache.pinot.spi.config.table.SegmentPartitionConfig; +import org.apache.pinot.spi.config.table.SegmentZKPropsConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.plugin.PluginManager; +import org.apache.pinot.spi.stream.MessageBatch; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConsumerFactory; +import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; +import org.apache.pinot.spi.stream.StreamDataDecoder; +import org.apache.pinot.spi.stream.StreamDataDecoderImpl; +import org.apache.pinot.spi.stream.StreamDataDecoderResult; +import org.apache.pinot.spi.stream.StreamMessage; +import org.apache.pinot.spi.stream.StreamMessageDecoder; +import org.apache.pinot.spi.stream.StreamMetadataProvider; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory; +import org.apache.pinot.spi.utils.retry.RetryPolicies; +import org.apache.pinot.spi.utils.retry.RetryPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Simplified Segment Data Manager for ingesting data from a start offset to an end offset. + */ +public class StatelessRealtimeSegmentDataManager extends SegmentDataManager { + + private static final int DEFAULT_CAPACITY = 100_000; + private static final int DEFAULT_FETCH_TIMEOUT_MS = 5000; + public static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = new FileUploadDownloadClient(); + + private final String _segmentName; + private final String _tableNameWithType; + private final int _partitionGroupId; + private final String _segmentNameStr; + private final SegmentZKMetadata _segmentZKMetadata; + private final TableConfig _tableConfig; + private final Schema _schema; + private final StreamConfi
Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]
9aman commented on code in PR #14920: URL: https://github.com/apache/pinot/pull/14920#discussion_r1940866516 ## pinot-server/src/main/java/org/apache/pinot/server/api/resources/reingestion/utils/StatelessRealtimeSegmentDataManager.java: ## @@ -0,0 +1,575 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api.resources.reingestion.utils; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.common.utils.TarCompressionUtils; +import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl; +import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager; +import org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory; +import org.apache.pinot.segment.local.segment.creator.TransformPipeline; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.local.utils.IngestionUtils; +import org.apache.pinot.segment.spi.MutableSegment; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; +import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory; +import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths; +import org.apache.pinot.spi.config.table.ColumnPartitionConfig; +import org.apache.pinot.spi.config.table.SegmentPartitionConfig; +import org.apache.pinot.spi.config.table.SegmentZKPropsConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.plugin.PluginManager; +import org.apache.pinot.spi.stream.MessageBatch; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConsumerFactory; +import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; +import org.apache.pinot.spi.stream.StreamDataDecoder; +import org.apache.pinot.spi.stream.StreamDataDecoderImpl; +import org.apache.pinot.spi.stream.StreamDataDecoderResult; +import org.apache.pinot.spi.stream.StreamMessage; +import org.apache.pinot.spi.stream.StreamMessageDecoder; +import org.apache.pinot.spi.stream.StreamMetadataProvider; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory; +import org.apache.pinot.spi.utils.retry.RetryPolicies; +import org.apache.pinot.spi.utils.retry.RetryPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Simplified Segment Data Manager for ingesting data from a start offset to an end offset. + */ +public class StatelessRealtimeSegmentDataManager extends SegmentDataManager { + + private static final int DEFAULT_CAPACITY = 100_000; + private static final int DEFAULT_FETCH_TIMEOUT_MS = 5000; + public static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = new FileUploadDownloadClient(); + + private final String _segmentName; + private final String _tableNameWithType; + private final int _partitionGroupId; + private final String _segmentNameStr; + private final SegmentZKMetadata _segmentZKMetadata; + private final TableConfig _tableConfig; + private final Schema _schema; + private final StreamConfi
Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]
9aman commented on code in PR #14920: URL: https://github.com/apache/pinot/pull/14920#discussion_r1940863270 ## pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReIngestionResource.java: ## @@ -0,0 +1,533 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api.resources; + +import com.google.common.base.Function; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiKeyAuthDefinition; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import io.swagger.annotations.Authorization; +import io.swagger.annotations.SecurityDefinition; +import io.swagger.annotations.SwaggerDefinition; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import javax.inject.Inject; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.NameValuePair; +import org.apache.hc.core5.http.message.BasicHeader; +import org.apache.hc.core5.http.message.BasicNameValuePair; +import org.apache.pinot.common.auth.AuthProviderUtils; +import org.apache.pinot.common.exception.HttpErrorStatusException; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.SimpleHttpResponse; +import org.apache.pinot.common.utils.TarCompressionUtils; +import org.apache.pinot.common.utils.URIUtils; +import org.apache.pinot.common.utils.http.HttpClient; +import org.apache.pinot.core.data.manager.InstanceDataManager; +import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager; +import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils; +import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.server.api.resources.reingestion.ReIngestionRequest; +import org.apache.pinot.server.api.resources.reingestion.ReIngestionResponse; +import org.apache.pinot.server.api.resources.reingestion.utils.StatelessRealtimeSegmentDataManager; +import org.apache.pinot.server.realtime.ControllerLeaderLocator; +import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler; +import org.apache.pinot.server.starter.ServerInstance; +import org.apache.pinot.spi.auth.AuthProvider; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.IngestionConfigUtils; +import org.apache.pinot.spi.utils.StringUtil; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.sl
Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]
9aman commented on code in PR #14920: URL: https://github.com/apache/pinot/pull/14920#discussion_r1940830134 ## pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReIngestionResource.java: ## @@ -0,0 +1,533 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api.resources; + +import com.google.common.base.Function; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiKeyAuthDefinition; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import io.swagger.annotations.Authorization; +import io.swagger.annotations.SecurityDefinition; +import io.swagger.annotations.SwaggerDefinition; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import javax.inject.Inject; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.NameValuePair; +import org.apache.hc.core5.http.message.BasicHeader; +import org.apache.hc.core5.http.message.BasicNameValuePair; +import org.apache.pinot.common.auth.AuthProviderUtils; +import org.apache.pinot.common.exception.HttpErrorStatusException; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.SimpleHttpResponse; +import org.apache.pinot.common.utils.TarCompressionUtils; +import org.apache.pinot.common.utils.URIUtils; +import org.apache.pinot.common.utils.http.HttpClient; +import org.apache.pinot.core.data.manager.InstanceDataManager; +import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager; +import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils; +import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.server.api.resources.reingestion.ReIngestionRequest; +import org.apache.pinot.server.api.resources.reingestion.ReIngestionResponse; +import org.apache.pinot.server.api.resources.reingestion.utils.StatelessRealtimeSegmentDataManager; +import org.apache.pinot.server.realtime.ControllerLeaderLocator; +import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler; +import org.apache.pinot.server.starter.ServerInstance; +import org.apache.pinot.spi.auth.AuthProvider; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.IngestionConfigUtils; +import org.apache.pinot.spi.utils.StringUtil; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.sl
Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]
9aman commented on code in PR #14920: URL: https://github.com/apache/pinot/pull/14920#discussion_r1940830134 ## pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReIngestionResource.java: ## @@ -0,0 +1,533 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api.resources; + +import com.google.common.base.Function; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiKeyAuthDefinition; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import io.swagger.annotations.Authorization; +import io.swagger.annotations.SecurityDefinition; +import io.swagger.annotations.SwaggerDefinition; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import javax.inject.Inject; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.NameValuePair; +import org.apache.hc.core5.http.message.BasicHeader; +import org.apache.hc.core5.http.message.BasicNameValuePair; +import org.apache.pinot.common.auth.AuthProviderUtils; +import org.apache.pinot.common.exception.HttpErrorStatusException; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.SimpleHttpResponse; +import org.apache.pinot.common.utils.TarCompressionUtils; +import org.apache.pinot.common.utils.URIUtils; +import org.apache.pinot.common.utils.http.HttpClient; +import org.apache.pinot.core.data.manager.InstanceDataManager; +import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager; +import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils; +import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.server.api.resources.reingestion.ReIngestionRequest; +import org.apache.pinot.server.api.resources.reingestion.ReIngestionResponse; +import org.apache.pinot.server.api.resources.reingestion.utils.StatelessRealtimeSegmentDataManager; +import org.apache.pinot.server.realtime.ControllerLeaderLocator; +import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler; +import org.apache.pinot.server.starter.ServerInstance; +import org.apache.pinot.spi.auth.AuthProvider; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.IngestionConfigUtils; +import org.apache.pinot.spi.utils.StringUtil; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.sl
Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]
9aman commented on code in PR #14920: URL: https://github.com/apache/pinot/pull/14920#discussion_r1940790579 ## pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java: ## @@ -1274,6 +1275,47 @@ public File downloadUntarFileStreamed(URI uri, File dest, AuthProvider authProvi httpHeaders, maxStreamRateInByte); } + /** + * Invokes the server's reIngestSegment API via a POST request with JSON payload, + * using Simple HTTP APIs. + * + * POST http://[serverURL]/reIngestSegment + * { + * "tableNameWithType": [tableName], + * "segmentName": [segmentName] + * } + */ + public void triggerReIngestion(String serverHostPort, String tableNameWithType, String segmentName) + throws IOException, URISyntaxException, HttpErrorStatusException { +String scheme = HTTP; +if (serverHostPort.contains(HTTPS)) { + scheme = HTTPS; + serverHostPort = serverHostPort.replace(HTTPS + "://", ""); +} else if (serverHostPort.contains(HTTP)) { Review Comment: I think it's better to update the tests than put a condition here. Not sure whether we will run into this. Seems fine for now though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[I] Pinot Sparse Data Support [pinot]
alexch2000 opened a new issue, #14984: URL: https://github.com/apache/pinot/issues/14984 Hey team, we’re building a funnel observability solution using Pinot, where we have a large table with more than 200B rows and around 300+ columns. Of these, only 20–30 columns are commonly used, while the rest are almost always NULL (sparse). Based on my understanding of Pinot’s architecture, each column’s forward index will store some default value for every row, even if the column is NULL most of the time. What is your recommendation for dealing with a table that has hundreds of mostly NULL columns, especially in terms of segment sizing, ingestion performance, and ongoing query efficiency? Are there any roadmap items or upcoming features aimed at optimizing storage or performance for extremely sparse columns (e.g., skipping or compressing columns with mostly NULL values)? Are there any existing best practices or configurations we could apply to make this more efficient (e.g., using **defaultNullValue**, dictionary vs. raw encoding, or segment-level pruning)? > Pinot always stores column values in a [forward index](https://docs.pinot.apache.org/basics/indexing/forward-index). Forward index never stores null values but have to store a value for each row. Therefore independent of the null handling configuration, Pinot always stores a default value for nulls rows in the forward index. The default value used in a column can be specified in the [schema](https://docs.pinot.apache.org/configuration-reference/schema) configuration by setting the defaultNullValue field spec. The defaultNullValue depends on the type of data. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Adding a new config skipSegmentPreprocess in table IndexingConfig [pinot]
xiangfu0 merged PR #14982: URL: https://github.com/apache/pinot/pull/14982 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[I] Regex Query for TEXT_MATCH is not working for input words with a space [pinot]
rahulkrishna-dev opened a new issue, #14983: URL: https://github.com/apache/pinot/issues/14983 Query being executed: ``` SELECT keyword FROM keywords_rank_v1 WHERE TEXT_MATCH(keyword, '/.*amul milk.*/') ORDER BY rank ASC LIMIT 100 ``` The above query is expected to return all the keywords having `amul milk` but it is not returning anything. My hypothesis is that the regex expression is being treated as a term query because of the space character, I couldn't find a way to escape the space character. Please help! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Adding a new config enableSegmentPreprocess in table IndexingConfig [pinot]
codecov-commenter commented on PR #14982: URL: https://github.com/apache/pinot/pull/14982#issuecomment-2632595967 ## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/14982?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report Attention: Patch coverage is `50.0%` with `3 lines` in your changes missing coverage. Please review. > Project coverage is 56.09%. Comparing base [(`59551e4`)](https://app.codecov.io/gh/apache/pinot/commit/59551e45224f1535c4863fd577622b37366ccc97?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) to head [(`45dbdfb`)](https://app.codecov.io/gh/apache/pinot/commit/45dbdfb97786ecaa3aa5774004b219987fa817ca?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). > Report is 1668 commits behind head on master. | [Files with missing lines](https://app.codecov.io/gh/apache/pinot/pull/14982?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Patch % | Lines | |---|---|---| | [...indexsegment/immutable/ImmutableSegmentLoader.java](https://app.codecov.io/gh/apache/pinot/pull/14982?src=pr&el=tree&filepath=pinot-segment-local%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpinot%2Fsegment%2Flocal%2Findexsegment%2Fimmutable%2FImmutableSegmentLoader.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9pbmRleHNlZ21lbnQvaW1tdXRhYmxlL0ltbXV0YWJsZVNlZ21lbnRMb2FkZXIuamF2YQ==) | 0.00% | [1 Missing and 1 partial :warning: ](https://app.codecov.io/gh/apache/pinot/pull/14982?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | | [...local/segment/index/loader/IndexLoadingConfig.java](https://app.codecov.io/gh/apache/pinot/pull/14982?src=pr&el=tree&filepath=pinot-segment-local%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpinot%2Fsegment%2Flocal%2Fsegment%2Findex%2Floader%2FIndexLoadingConfig.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9zZWdtZW50L2luZGV4L2xvYWRlci9JbmRleExvYWRpbmdDb25maWcuamF2YQ==) | 0.00% | [0 Missing and 1 partial :warning: ](https://app.codecov.io/gh/apache/pinot/pull/14982?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | > :exclamation: There is a different number of reports uploaded between BASE (59551e4) and HEAD (45dbdfb). Click for more details. > > HEAD has 48 uploads less than BASE > >| Flag | BASE (59551e4) | HEAD (45dbdfb) | >|--|--|--| >|integration|7|0| >|integration2|3|0| >|temurin|12|2| >|java-21|7|2| >|skip-bytebuffers-true|3|1| >|skip-bytebuffers-false|7|1| >|unittests|5|2| >|java-11|5|0| >|unittests2|3|0| >|integration1|2|0| >|custom-integration1|2|0| > Additional details and impacted files ```diff @@ Coverage Diff @@ ## master #14982 +/- ## - Coverage 61.75% 56.09% -5.66% - Complexity 207 710 +503 Files 2436 2122 -314 Lines133233 114572 -18661 Branches 2063618444-2192 - Hits 8227464271 -18003 - Misses4491145050 +139 + Partials 6048 5251 -797 ``` | [Flag](https://app.codecov.io/gh/apache/pinot/pull/14982/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | | |---|---|---| | [custom-integration1](https://app.codecov.io/gh/apache/pinot/pull/14982/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | | | [integration](https://app.codecov.io/gh/apache/pinot/pull/14982/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | | | [integration1](https://app.codecov.io/gh/apache/pinot/pull/14982/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | | | [integration2](https://app.codecov.io/gh/apache/pinot/pull/14982/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apa
Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]
Jackie-Jiang commented on code in PR #14920: URL: https://github.com/apache/pinot/pull/14920#discussion_r1940355486 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ## @@ -442,6 +446,10 @@ public IdealState getIdealState(String realtimeTableName) { } } + public ExternalView getExternalView(String realtimeTableName) { +return _helixResourceManager.getTableExternalView(realtimeTableName); + } + Review Comment: (minor) Let's inline this and remove this method. The behavior is not consistent with `getIdealState()` which can cause confusion ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ## @@ -2096,6 +2104,131 @@ URI createSegmentPath(String rawTableName, String segmentName) { return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName)); } + /** + * Re-ingests segments that are in ERROR state in EV but ONLINE in IS with no peer copy on any server. This method + * will call the server reIngestSegment API + * on one of the alive servers that are supposed to host that segment according to IdealState. + * + * API signature: + * POST http://[serverURL]/reIngestSegment + * Request body (JSON): + * { + * "tableNameWithType": [tableName], + * "segmentName": [segmentName] + * } + * + * @param tableNameWithType The table name with type, e.g. "myTable_REALTIME" + */ + public void reIngestSegmentsWithErrorState(String tableNameWithType) { Review Comment: (minor) When the table type is known, let's directly use the type for readability ```suggestion public void reIngestSegmentsWithErrorState(String realtimeTableName) { ``` ## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java: ## @@ -1243,7 +1243,7 @@ protected boolean buildSegmentAndReplace() return true; } - private void closeStreamConsumers() { + protected void closeStreamConsumers() { Review Comment: (minor) Seems not needed, or do you plan to add a test? ## pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java: ## @@ -1274,6 +1275,39 @@ public File downloadUntarFileStreamed(URI uri, File dest, AuthProvider authProvi httpHeaders, maxStreamRateInByte); } + /** + * Invokes the server's reIngestSegment API via a POST request with JSON payload, + * using Simple HTTP APIs. + * + * POST http://[serverURL]/reIngestSegment Review Comment: Suggest just making it `POST /reIngestSegment/{segmentName}` which is easier to use. ## pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java: ## @@ -1274,6 +1275,39 @@ public File downloadUntarFileStreamed(URI uri, File dest, AuthProvider authProvi httpHeaders, maxStreamRateInByte); } + /** + * Invokes the server's reIngestSegment API via a POST request with JSON payload, + * using Simple HTTP APIs. + * + * POST http://[serverURL]/reIngestSegment + * { + * "tableNameWithType": [tableName], + * "segmentName": [segmentName] + * } + */ + public void triggerReIngestion(String serverHostPort, String tableNameWithType, String segmentName) Review Comment: (minor) table name is implicit from the LLC segment name, and we may simplify this API ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ## @@ -2096,6 +2104,131 @@ URI createSegmentPath(String rawTableName, String segmentName) { return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName)); } + /** + * Re-ingests segments that are in ERROR state in EV but ONLINE in IS with no peer copy on any server. This method + * will call the server reIngestSegment API + * on one of the alive servers that are supposed to host that segment according to IdealState. + * + * API signature: + * POST http://[serverURL]/reIngestSegment + * Request body (JSON): + * { + * "tableNameWithType": [tableName], + * "segmentName": [segmentName] + * } + * + * @param tableNameWithType The table name with type, e.g. "myTable_REALTIME" + */ + public void reIngestSegmentsWithErrorState(String tableNameWithType) { +// Step 1: Fetch the ExternalView and all segments +ExternalView externalView = getExternalView(tableNameWithType); +IdealState idealState = getIdealState(tableNameWithType); +Map> segmentToInstanceCurrentStateMap = externalView.getRecord().getMapFields(); +Map> segmentToInstanceIdealStateMap = idealState.getRecord().getMapFields(); + +// find segments in ERROR state in externalView +List segmentsInErrorState = new ArrayList<>(); +for (Map.E
Re: [PR] Adding a new config enableSegmentPreprocess in table IndexingConfig [pinot]
xiangfu0 commented on code in PR #14982: URL: https://github.com/apache/pinot/pull/14982#discussion_r1940327893 ## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java: ## @@ -337,6 +340,14 @@ public void setErrorOnColumnBuildFailure(boolean errorOnColumnBuildFailure) { _errorOnColumnBuildFailure = errorOnColumnBuildFailure; } + public void setEnablePreProcess(boolean enablePreProcess) { +_enableSegmentPreprocess = enablePreProcess; + } + + public boolean isEnablePreProcess() { +return _enableSegmentPreprocess; + } + Review Comment: it will be set in the `init()` method so cannot use final unless we explicitly set this field -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Adding a new config enableSegmentPreprocess in table IndexingConfig [pinot]
Jackie-Jiang commented on code in PR #14982: URL: https://github.com/apache/pinot/pull/14982#discussion_r1940316175 ## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java: ## @@ -337,6 +340,14 @@ public void setErrorOnColumnBuildFailure(boolean errorOnColumnBuildFailure) { _errorOnColumnBuildFailure = errorOnColumnBuildFailure; } + public void setEnablePreProcess(boolean enablePreProcess) { +_enableSegmentPreprocess = enablePreProcess; + } + + public boolean isEnablePreProcess() { +return _enableSegmentPreprocess; + } + Review Comment: Don't add them, and make the field final. `IndexLoadingConfig` shouldn't be modified ## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java: ## @@ -83,6 +84,7 @@ public class IndexLoadingConfig { private boolean _enableDefaultStarTree; private Map _indexConfigsByColName = new HashMap<>(); + Review Comment: (nit) Revert ## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java: ## @@ -56,6 +56,7 @@ public class IndexLoadingConfig { private final InstanceDataManagerConfig _instanceDataManagerConfig; private final TableConfig _tableConfig; private final Schema _schema; + private boolean _enableSegmentPreprocess = true; Review Comment: Suggest renaming it to `_skipSegmentPreprocess` so that the default is `false`. Adding default true flag is error prone. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[PR] Adding a new config enableSegmentPreprocess in table IndexingConfig [pinot]
xiangfu0 opened a new pull request, #14982: URL: https://github.com/apache/pinot/pull/14982 This config: `enableSegmentPreprocess` allows the table to skip preprocess during server startup or reload -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Bump jakarta.validation:jakarta.validation-api from 2.0.2 to 3.1.1 [pinot]
Jackie-Jiang commented on PR #14979: URL: https://github.com/apache/pinot/pull/14979#issuecomment-2632446882 @dependabot ignore this dependency -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [I] Server replica allowed to overwrite deep store segment after segment was already committed [pinot]
Jackie-Jiang commented on issue #14786: URL: https://github.com/apache/pinot/issues/14786#issuecomment-2632457409 @KKcorps should we simply change the if condition to `segmentMetadata.getStatus() != CommonConstants.Segment.Realtime.Status.IN_PROGRESS`? Can you help submit a fix? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Bump jakarta.validation:jakarta.validation-api from 2.0.2 to 3.1.1 [pinot]
dependabot[bot] closed pull request #14979: Bump jakarta.validation:jakarta.validation-api from 2.0.2 to 3.1.1 URL: https://github.com/apache/pinot/pull/14979 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Bump jakarta.validation:jakarta.validation-api from 2.0.2 to 3.1.1 [pinot]
dependabot[bot] commented on PR #14979: URL: https://github.com/apache/pinot/pull/14979#issuecomment-2632446931 OK, I won't notify you about jakarta.validation:jakarta.validation-api again, unless you re-open this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Introducing MSE result holder config to minimize rehashing for high cardinality group by [pinot]
codecov-commenter commented on PR #14981: URL: https://github.com/apache/pinot/pull/14981#issuecomment-2632408914 ## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/14981?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report Attention: Patch coverage is `71.1%` with `13 lines` in your changes missing coverage. Please review. > Project coverage is 63.55%. Comparing base [(`59551e4`)](https://app.codecov.io/gh/apache/pinot/commit/59551e45224f1535c4863fd577622b37366ccc97?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) to head [(`cfe1760`)](https://app.codecov.io/gh/apache/pinot/commit/cfe1760159e1f4ecdb704743fb037890fdb17adc?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). > Report is 1668 commits behind head on master. | [Files with missing lines](https://app.codecov.io/gh/apache/pinot/pull/14981?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Patch % | Lines | |---|---|---| | [...va/org/apache/pinot/query/runtime/QueryRunner.java](https://app.codecov.io/gh/apache/pinot/pull/14981?src=pr&el=tree&filepath=pinot-query-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpinot%2Fquery%2Fruntime%2FQueryRunner.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9RdWVyeVJ1bm5lci5qYXZh) | 50.00% | [2 Missing and 3 partials :warning: ](https://app.codecov.io/gh/apache/pinot/pull/14981?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | | [...ry/runtime/operator/MultistageGroupByExecutor.java](https://app.codecov.io/gh/apache/pinot/pull/14981?src=pr&el=tree&filepath=pinot-query-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpinot%2Fquery%2Fruntime%2Foperator%2FMultistageGroupByExecutor.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9NdWx0aXN0YWdlR3JvdXBCeUV4ZWN1dG9yLmphdmE=) | 54.54% | [2 Missing and 3 partials :warning: ](https://app.codecov.io/gh/apache/pinot/pull/14981?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | | [...e/operator/groupby/OneLongKeyGroupIdGenerator.java](https://app.codecov.io/gh/apache/pinot/pull/14981?src=pr&el=tree&filepath=pinot-query-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpinot%2Fquery%2Fruntime%2Foperator%2Fgroupby%2FOneLongKeyGroupIdGenerator.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9ncm91cGJ5L09uZUxvbmdLZXlHcm91cElkR2VuZXJhdG9yLmphdmE=) | 0.00% | [2 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/14981?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | | [...time/operator/groupby/GroupIdGeneratorFactory.java](https://app.codecov.io/gh/apache/pinot/pull/14981?src=pr&el=tree&filepath=pinot-query-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpinot%2Fquery%2Fruntime%2Foperator%2Fgroupby%2FGroupIdGeneratorFactory.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9ncm91cGJ5L0dyb3VwSWRHZW5lcmF0b3JGYWN0b3J5LmphdmE=) | 87.50% | [1 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/14981?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Additional details and impacted files ```diff @@ Coverage Diff @@ ## master #14981 +/- ## + Coverage 61.75% 63.55% +1.80% - Complexity 207 1389+1182 Files 2436 2713 +277 Lines133233 152187 +18954 Branches 2063623532+2896 + Hits 8227496724 +14450 - Misses4491148152+3241 - Partials 6048 7311+1263 ``` | [Flag](https://app.codecov.io/gh/apache/pinot/pull/14981/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | C
[PR] Introducing MSE result holder config to minimize rehashing for high cardinality group by [pinot]
shauryachats opened a new pull request, #14981: URL: https://github.com/apache/pinot/pull/14981 A new configuration to control the size of result holders for MSE is necessary to avoid resizing and rehashing operations in use cases where grouping is needed on high-cardinality columns (e.g., UUIDs). A simple query where it is necessary is ``` SELECT count(*) FROM table_A WHERE ( user_uuid NOT IN ( SELECT user_uuid FROM table_B ) ) LIMIT 100 option(useMultistageEngine=true, timeoutMs=12, useColocatedJoin = true, maxRowsInJoin = 4000) ``` where a group by step occurs on `user_uuid` for `table_B` before the colocated join with `table_A` which has a high cardinality. More details in the following issue: https://github.com/apache/pinot/issues/14685 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Bump com.mchange:c3p0 from 0.10.1 to 0.10.2 [pinot]
Jackie-Jiang merged PR #14978: URL: https://github.com/apache/pinot/pull/14978 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] [timeseries] Add Support for limit and numGroupsLimit [pinot]
tibrewalpratik17 merged PR #14945: URL: https://github.com/apache/pinot/pull/14945 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Bump software.amazon.awssdk:bom from 2.30.10 to 2.30.11 [pinot]
Jackie-Jiang merged PR #14977: URL: https://github.com/apache/pinot/pull/14977 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Bump org.apache.httpcomponents.client5:httpclient5 from 5.3.1 to 5.4.2 [pinot]
Jackie-Jiang merged PR #14975: URL: https://github.com/apache/pinot/pull/14975 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] [timeseries] Add Support for limit and numGroupsLimit [pinot]
ankitsultana commented on code in PR #14945: URL: https://github.com/apache/pinot/pull/14945#discussion_r1940034557 ## pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java: ## @@ -152,7 +156,11 @@ public BaseTimeSeriesPlanNode handleFetchNode(String planId, List tokens Preconditions.checkNotNull(timeColumn, "Time column not set. Set via time_col="); Preconditions.checkNotNull(timeUnit, "Time unit not set. Set via time_unit="); Preconditions.checkNotNull(valueExpr, "Value expression not set. Set via value="); +Map queryOptions = new HashMap<>(); +if (request.getNumGroupsLimit() > 0) { + queryOptions.put("numGroupsLimit", Integer.toString(request.getNumGroupsLimit())); Review Comment: Good point. Can address in a follow-up. This module is mostly a throw away.. we'll open source the full M3 Plugin after time series support is marked as GA. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] [timeseries] Add Support for limit and numGroupsLimit [pinot]
tibrewalpratik17 commented on code in PR #14945: URL: https://github.com/apache/pinot/pull/14945#discussion_r1940030515 ## pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java: ## @@ -152,7 +156,11 @@ public BaseTimeSeriesPlanNode handleFetchNode(String planId, List tokens Preconditions.checkNotNull(timeColumn, "Time column not set. Set via time_col="); Preconditions.checkNotNull(timeUnit, "Time unit not set. Set via time_unit="); Preconditions.checkNotNull(valueExpr, "Value expression not set. Set via value="); +Map queryOptions = new HashMap<>(); +if (request.getNumGroupsLimit() > 0) { + queryOptions.put("numGroupsLimit", Integer.toString(request.getNumGroupsLimit())); Review Comment: nit: can put "numGroupsLimit" in a constant class -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] [timeseries] Add Support for limit and numGroupsLimit [pinot]
ankitsultana commented on code in PR #14945: URL: https://github.com/apache/pinot/pull/14945#discussion_r1939936596 ## pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java: ## @@ -78,7 +80,8 @@ public BaseTimeSeriesPlanNode planQuery(RangeTimeSeriesRequest request) { switch (command) { case "fetch": List tokens = commands.get(commandId).subList(1, commands.get(commandId).size()); - currentNode = handleFetchNode(planIdGenerator.generateId(), tokens, children, aggInfo, groupByColumns); + currentNode = handleFetchNode(planIdGenerator.generateId(), tokens, children, aggInfo, groupByColumns, Review Comment: Yeah you just need to set the `limit` and `numGroupsLimit` in the `LeafTimeSeriesPlanNode` of the returned plan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org