Re: [I] Improve error handling and reporting in query side [pinot]

2025-02-05 Thread via GitHub


gortiz commented on issue #14950:
URL: https://github.com/apache/pinot/issues/14950#issuecomment-2636395829

   Please take a look at https://github.com/apache/pinot/pull/14994


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [I] Segments are lost [pinot]

2025-02-05 Thread via GitHub


xiangfu0 commented on issue #14993:
URL: https://github.com/apache/pinot/issues/14993#issuecomment-2636378944

   can you share the table config, and the controller logs for retention 
manager?
   Is it possible the data are deleted due to retention policy?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Improve error handling in controller [pinot]

2025-02-05 Thread via GitHub


gortiz commented on code in PR #14951:
URL: https://github.com/apache/pinot/pull/14951#discussion_r1942636179


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java:
##
@@ -104,59 +104,61 @@ public class PinotQueryResource {
   @POST
   @Path("sql")
   @ManualAuthorization // performed by broker
-  public String handlePostSql(String requestJsonStr, @Context HttpHeaders 
httpHeaders) {
+  public StreamingOutput handlePostSql(String requestJsonStr, @Context 
HttpHeaders httpHeaders) {
+JsonNode requestJson;
 try {
-  JsonNode requestJson = JsonUtils.stringToJsonNode(requestJsonStr);
-  if (!requestJson.has("sql")) {
-return 
constructQueryExceptionResponse(QueryException.getException(QueryException.JSON_PARSING_ERROR,
-"JSON Payload is missing the query string field 'sql'"));
-  }
-  String sqlQuery = requestJson.get("sql").asText();
-  String traceEnabled = "false";
-  if (requestJson.has("trace")) {
-traceEnabled = requestJson.get("trace").toString();
-  }
-  String queryOptions = null;
-  if (requestJson.has("queryOptions")) {
-queryOptions = requestJson.get("queryOptions").asText();
-  }
-  LOGGER.debug("Trace: {}, Running query: {}", traceEnabled, sqlQuery);
-  return executeSqlQuery(httpHeaders, sqlQuery, traceEnabled, 
queryOptions, "/sql");
-} catch (ProcessingException pe) {
-  LOGGER.error("Caught exception while processing post request {}", 
pe.getMessage());
-  return constructQueryExceptionResponse(pe);
-} catch (WebApplicationException wae) {
-  LOGGER.error("Caught exception while processing post request", wae);
-  throw wae;
+  requestJson = JsonUtils.stringToJsonNode(requestJsonStr);
 } catch (Exception e) {
-  LOGGER.error("Caught exception while processing post request", e);
-  return 
constructQueryExceptionResponse(QueryException.getException(QueryException.INTERNAL_ERROR,
 e));
+  return 
constructQueryExceptionResponse(QueryException.JSON_PARSING_ERROR_CODE, 
e.getMessage());
+}
+if (!requestJson.has("sql")) {
+  return 
constructQueryExceptionResponse(QueryException.getException(QueryException.JSON_PARSING_ERROR,
+  "JSON Payload is missing the query string field 'sql'"));
+}
+String sqlQuery = requestJson.get("sql").asText();
+String traceEnabled = "false";
+if (requestJson.has("trace")) {
+  traceEnabled = requestJson.get("trace").toString();
+}
+String queryOptions = null;
+if (requestJson.has("queryOptions")) {
+  queryOptions = requestJson.get("queryOptions").asText();
 }
+
+return executeSqlQueryCatching(httpHeaders, sqlQuery, traceEnabled, 
queryOptions);
   }
 
   @GET
   @Path("sql")
   @ManualAuthorization
-  public String handleGetSql(@QueryParam("sql") String sqlQuery, 
@QueryParam("trace") String traceEnabled,
+  public StreamingOutput handleGetSql(@QueryParam("sql") String sqlQuery, 
@QueryParam("trace") String traceEnabled,
   @QueryParam("queryOptions") String queryOptions, @Context HttpHeaders 
httpHeaders) {
+return executeSqlQueryCatching(httpHeaders, sqlQuery, traceEnabled, 
queryOptions);
+  }
+
+  private StreamingOutput executeSqlQueryCatching(HttpHeaders httpHeaders, 
String sqlQuery, String traceEnabled,
+  String queryOptions) {
 try {
-  LOGGER.debug("Trace: {}, Running query: {}", traceEnabled, sqlQuery);
-  return executeSqlQuery(httpHeaders, sqlQuery, traceEnabled, 
queryOptions, "/sql");
+  return executeSqlQuery(httpHeaders, sqlQuery, traceEnabled, 
queryOptions);
+} catch (QException e) {
+  LOGGER.error("Caught query exception while processing post request", e);

Review Comment:
   Good catch. Same in the others. I've changed these messages in 
https://github.com/apache/pinot/pull/14994



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Improve error handling in controller [pinot]

2025-02-05 Thread via GitHub


gortiz commented on code in PR #14951:
URL: https://github.com/apache/pinot/pull/14951#discussion_r1942622733


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java:
##
@@ -398,18 +414,13 @@ private String getQueryURL(String protocol, String 
hostName, int port) {
 return String.format("%s://%s:%d/query/sql", protocol, hostName, port);
   }
 
-  public String sendPostRaw(String urlStr, String requestStr, Map headers) {
+  public void sendPostRaw(String urlStr, String requestStr, Map headers, OutputStream outputStream) {
 HttpURLConnection conn = null;
 try {
-  /*if (LOG.isInfoEnabled()){
+  if (LOGGER.isInfoEnabled()) {
 LOGGER.info("Sending a post request to the server - " + urlStr);

Review Comment:
   I just uncommented this log. I decided to remove it in 
https://github.com/apache/pinot/pull/14994



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Improve error handling in controller [pinot]

2025-02-05 Thread via GitHub


gortiz commented on code in PR #14951:
URL: https://github.com/apache/pinot/pull/14951#discussion_r1942636179


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java:
##
@@ -104,59 +104,61 @@ public class PinotQueryResource {
   @POST
   @Path("sql")
   @ManualAuthorization // performed by broker
-  public String handlePostSql(String requestJsonStr, @Context HttpHeaders 
httpHeaders) {
+  public StreamingOutput handlePostSql(String requestJsonStr, @Context 
HttpHeaders httpHeaders) {
+JsonNode requestJson;
 try {
-  JsonNode requestJson = JsonUtils.stringToJsonNode(requestJsonStr);
-  if (!requestJson.has("sql")) {
-return 
constructQueryExceptionResponse(QueryException.getException(QueryException.JSON_PARSING_ERROR,
-"JSON Payload is missing the query string field 'sql'"));
-  }
-  String sqlQuery = requestJson.get("sql").asText();
-  String traceEnabled = "false";
-  if (requestJson.has("trace")) {
-traceEnabled = requestJson.get("trace").toString();
-  }
-  String queryOptions = null;
-  if (requestJson.has("queryOptions")) {
-queryOptions = requestJson.get("queryOptions").asText();
-  }
-  LOGGER.debug("Trace: {}, Running query: {}", traceEnabled, sqlQuery);
-  return executeSqlQuery(httpHeaders, sqlQuery, traceEnabled, 
queryOptions, "/sql");
-} catch (ProcessingException pe) {
-  LOGGER.error("Caught exception while processing post request {}", 
pe.getMessage());
-  return constructQueryExceptionResponse(pe);
-} catch (WebApplicationException wae) {
-  LOGGER.error("Caught exception while processing post request", wae);
-  throw wae;
+  requestJson = JsonUtils.stringToJsonNode(requestJsonStr);
 } catch (Exception e) {
-  LOGGER.error("Caught exception while processing post request", e);
-  return 
constructQueryExceptionResponse(QueryException.getException(QueryException.INTERNAL_ERROR,
 e));
+  return 
constructQueryExceptionResponse(QueryException.JSON_PARSING_ERROR_CODE, 
e.getMessage());
+}
+if (!requestJson.has("sql")) {
+  return 
constructQueryExceptionResponse(QueryException.getException(QueryException.JSON_PARSING_ERROR,
+  "JSON Payload is missing the query string field 'sql'"));
+}
+String sqlQuery = requestJson.get("sql").asText();
+String traceEnabled = "false";
+if (requestJson.has("trace")) {
+  traceEnabled = requestJson.get("trace").toString();
+}
+String queryOptions = null;
+if (requestJson.has("queryOptions")) {
+  queryOptions = requestJson.get("queryOptions").asText();
 }
+
+return executeSqlQueryCatching(httpHeaders, sqlQuery, traceEnabled, 
queryOptions);
   }
 
   @GET
   @Path("sql")
   @ManualAuthorization
-  public String handleGetSql(@QueryParam("sql") String sqlQuery, 
@QueryParam("trace") String traceEnabled,
+  public StreamingOutput handleGetSql(@QueryParam("sql") String sqlQuery, 
@QueryParam("trace") String traceEnabled,
   @QueryParam("queryOptions") String queryOptions, @Context HttpHeaders 
httpHeaders) {
+return executeSqlQueryCatching(httpHeaders, sqlQuery, traceEnabled, 
queryOptions);
+  }
+
+  private StreamingOutput executeSqlQueryCatching(HttpHeaders httpHeaders, 
String sqlQuery, String traceEnabled,
+  String queryOptions) {
 try {
-  LOGGER.debug("Trace: {}, Running query: {}", traceEnabled, sqlQuery);
-  return executeSqlQuery(httpHeaders, sqlQuery, traceEnabled, 
queryOptions, "/sql");
+  return executeSqlQuery(httpHeaders, sqlQuery, traceEnabled, 
queryOptions);
+} catch (QException e) {
+  LOGGER.error("Caught query exception while processing post request", e);

Review Comment:
   Good catch. Same in the others.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Improve error handling in controller [pinot]

2025-02-05 Thread via GitHub


gortiz commented on code in PR #14951:
URL: https://github.com/apache/pinot/pull/14951#discussion_r1942634833


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java:
##
@@ -210,51 +217,62 @@ private String getMultiStageQueryResponse(String query, 
String queryOptions, Htt
   queryOptionsMap.putAll(RequestUtils.getOptionsFromString(queryOptions));
 }
 String database = 
DatabaseUtils.extractDatabaseFromQueryRequest(queryOptionsMap, httpHeaders);
+List tableNames = getTableNames(query, database);
+List instanceIds = getInstanceIds(query, tableNames, database);
+String instanceId = selectRandomInstanceId(instanceIds);
+return sendRequestToBroker(query, instanceId, traceEnabled, queryOptions, 
httpHeaders);
+  }
+
+  private List getTableNames(String query, String database) {
 QueryEnvironment queryEnvironment =
 new QueryEnvironment(database, 
_pinotHelixResourceManager.getTableCache(), null);
 List tableNames;
 try {
   tableNames = queryEnvironment.getTableNamesForQuery(query);
+} catch (QException e) {
+  if (e.getErrorCode() != QueryException.UNKNOWN_ERROR_CODE) {
+throw e;
+  } else {
+throw new QException(QException.SQL_PARSING_ERROR_CODE, e);
+  }
 } catch (Exception e) {
-  return QueryException.getException(QueryException.SQL_PARSING_ERROR,
-  new Exception("Unable to find table for this query", e)).toString();
+  throw new QException(QException.SQL_PARSING_ERROR_CODE, e);

Review Comment:
   Because we assume other error types will be more precise. The error codes 
being used are pretty chaotic. It looks like we added them without actual 
consideration and the fact that we cannot create hierarchies is not a good 
design. For example, is a UNKNOWN_COLUMN_ERROR_CODE a 
QUERY_VALIDATION_ERROR_CODE?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Improve error handling in controller [pinot]

2025-02-05 Thread via GitHub


gortiz commented on code in PR #14951:
URL: https://github.com/apache/pinot/pull/14951#discussion_r1942625995


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java:
##
@@ -398,18 +414,13 @@ private String getQueryURL(String protocol, String 
hostName, int port) {
 return String.format("%s://%s:%d/query/sql", protocol, hostName, port);
   }
 
-  public String sendPostRaw(String urlStr, String requestStr, Map headers) {
+  public void sendPostRaw(String urlStr, String requestStr, Map headers, OutputStream outputStream) {
 HttpURLConnection conn = null;
 try {
-  /*if (LOG.isInfoEnabled()){
+  if (LOGGER.isInfoEnabled()) {

Review Comment:
   Probably not. In general this longer way to log is less expensive in terms 
of allocation that using the default:
   ```
   LOGGER.info("some message with {} and {}", param1, param2)
   ```
   
   Given this default uses varargs, which means that it allocates an array for 
no reason if INFO is disabled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Improve error handling in controller [pinot]

2025-02-05 Thread via GitHub


gortiz commented on code in PR #14951:
URL: https://github.com/apache/pinot/pull/14951#discussion_r1942618169


##
pinot-spi/src/main/java/org/apache/pinot/spi/exception/BadQueryRequestException.java:
##
@@ -18,16 +18,28 @@
  */
 package org.apache.pinot.spi.exception;
 
-public class BadQueryRequestException extends RuntimeException {
+public class BadQueryRequestException extends QException {
   public BadQueryRequestException(String message) {
-super(message);
+super(SQL_RUNTIME_ERROR_CODE, message);

Review Comment:
   This class is used in tons of places to detect errors in runtime. There are 
places where it is being caught and reconverted into a different error type 
depending on the context where it was fired. For example 
BaseSingleBlockCombineOperator.
   
   Here, I'm assigning a default error code, which, in general, is something 
difficult to do in a precise manner. Callers can anyway include an explicit 
error code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Improve error handling in controller [pinot]

2025-02-05 Thread via GitHub


gortiz commented on code in PR #14951:
URL: https://github.com/apache/pinot/pull/14951#discussion_r1942618169


##
pinot-spi/src/main/java/org/apache/pinot/spi/exception/BadQueryRequestException.java:
##
@@ -18,16 +18,28 @@
  */
 package org.apache.pinot.spi.exception;
 
-public class BadQueryRequestException extends RuntimeException {
+public class BadQueryRequestException extends QException {
   public BadQueryRequestException(String message) {
-super(message);
+super(SQL_RUNTIME_ERROR_CODE, message);

Review Comment:
   This class is used in tons of places to detect errors in runtime. There are 
places where it is being caught and reconverted into a different error type 
depending on the context where it was fired. For example 
BaseSingleBlockCombineOperator



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [I] Improve error handling and reporting in query side [pinot]

2025-02-05 Thread via GitHub


vrajat commented on issue #14950:
URL: https://github.com/apache/pinot/issues/14950#issuecomment-2636333616

   > MSE errors include the stack trace and don't include the actual error 
code. Instead, error code 200 is always returned.
   
   This is true for SSE as well. For example:
   
   ```
   "exceptions": [
   {
 "message": "QueryExecutionError:\nQuery execution error on: 
Server_100.81.71.117_7050 
org.apache.pinot.spi.exception.EarlyTerminationException: Interrupted while 
processing next block",
 "errorCode": 200
   },
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Improve error handling in controller [pinot]

2025-02-05 Thread via GitHub


gortiz commented on code in PR #14951:
URL: https://github.com/apache/pinot/pull/14951#discussion_r1942610938


##
pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java:
##
@@ -110,6 +110,10 @@ public BrokerResponseNative(ProcessingException exception) 
{
 _exceptions.add(new QueryProcessingException(exception.getErrorCode(), 
exception.getMessage()));
   }
 
+  public BrokerResponseNative(int errorCode, String errorMessage) {
+_exceptions.add(new QueryProcessingException(errorCode, errorMessage));

Review Comment:
   I've added a TODO to rename this class at 
https://github.com/apache/pinot/pull/14994.
   
   That PR is already too long, so I preferred not to rename it to make it even 
longer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Improve error handling in controller [pinot]

2025-02-05 Thread via GitHub


gortiz commented on code in PR #14951:
URL: https://github.com/apache/pinot/pull/14951#discussion_r1942610938


##
pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java:
##
@@ -110,6 +110,10 @@ public BrokerResponseNative(ProcessingException exception) 
{
 _exceptions.add(new QueryProcessingException(exception.getErrorCode(), 
exception.getMessage()));
   }
 
+  public BrokerResponseNative(int errorCode, String errorMessage) {
+_exceptions.add(new QueryProcessingException(errorCode, errorMessage));

Review Comment:
   I've added a TODO to rename this class at 
https://github.com/apache/pinot/pull/14994.
   
   That PR is already too long, so I preferred to not rename it just to not 
make the PR even longer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Improve error handling in controller [pinot]

2025-02-05 Thread via GitHub


gortiz commented on code in PR #14951:
URL: https://github.com/apache/pinot/pull/14951#discussion_r1942606315


##
pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java:
##
@@ -23,6 +23,7 @@
 import java.util.regex.Pattern;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.common.response.ProcessingException;
+import org.apache.pinot.spi.exception.QException;

Review Comment:
   Ideally this class should end up being removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Improve error handling in controller [pinot]

2025-02-05 Thread via GitHub


gortiz commented on PR #14951:
URL: https://github.com/apache/pinot/pull/14951#issuecomment-2636314940

   @yashmayya I've created https://github.com/apache/pinot/pull/14994, which is 
built on top of this PR and fixes most of the changes reported here


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[PR] Improve exceptions broker [pinot]

2025-02-05 Thread via GitHub


gortiz opened a new pull request, #14994:
URL: https://github.com/apache/pinot/pull/14994

   This extensive PR is focused on improving error handling on MSE, but 
partially in SSE and TSE. Error handling includes changes in the error 
responses sent to clients but also logs in servers, brokers and controllers.
   
   Logging has been reduced to not logging stack traces when they are not 
useful. Logging is also improved to include the request-id in MSE and SSE 
queries and stage id in MSE queries. Instead of spreading the query context to 
all places where logs are created, this PR injects these values using 
[MDC](https://www.baeldung.com/mdc-in-log4j-2-logback). This means that:
   1. Code doesn't have to change that much.
   2. However, to modify the MDC in some parts, I needed to add a try-catch. I 
recommend reviewing the PR with the hidden white chances.
   3. To log these properties, log4j2.xml needs to be changed. This PR includes 
a change in the log4j2.xml used for quickstars. Actual deployments will likely 
use their log4j2.xml files, and they will need to be modified to log request 
IDs and stage IDs. Please remember that this is optional. Logs are still as 
helpful as before, even if MDC properties are unused. Specifically, logs that 
already included the request ID or stage ID haven't been changed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] add option to delete table and schema immediately [pinot]

2025-02-04 Thread via GitHub


jayeshchoudhary commented on PR #14736:
URL: https://github.com/apache/pinot/pull/14736#issuecomment-2635984874

   Tried checking if this can be managed by state but can't figure out the root 
cause. 
   PR is good to go. sorry for taking so long. missed the notifications 😓 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Add debug API for pauseless tables. Add pauselessFSM as an option in realtimeQuickStart [pinot]

2025-02-04 Thread via GitHub


9aman commented on code in PR #14961:
URL: https://github.com/apache/pinot/pull/14961#discussion_r1942279690


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##
@@ -301,6 +309,74 @@ public ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap 
getConsumingSegmentsI
 }
   }
 
+  @GET
+  @Path("/tables/{tableName}/pauselessDebugInfo")
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.GET_DEBUG_INFO)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Returns state of pauseless table", notes =
+  "Gets the segments that are in error state and optionally gets 
COMMITTING segments based on the "
+  + "includeCommittingSegments parameter")
+  @ApiResponses(value = {
+  @ApiResponse(code = 200, message = "Success"),
+  @ApiResponse(code = 404, message = "Table not found"),
+  @ApiResponse(code = 500, message = "Internal server error")
+  })
+  public String getPauslessTableDebugInfo(
+  @ApiParam(value = "Realtime table name with or without type", required = 
true, example = "myTable | "
+  + "myTable_REALTIME") @PathParam("tableName") String 
realtimeTableName,
+  @ApiParam(value = "Flag to include committing segment info") 
@QueryParam("includeCommittingSegments")
+  @DefaultValue("false") boolean includeCommittingSegments,
+  @Context HttpHeaders headers) {
+realtimeTableName = DatabaseUtils.translateTableName(realtimeTableName, 
headers);
+try {
+  TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(realtimeTableName);

Review Comment:
   Yes, the objective here was to allow the user to get ERROR segments for all 
tables.
   Additionally, they can choose to fetch the COMMITTING segments for pauseless 
tables by setting the flag to true. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Add debug API for pauseless tables. Add pauselessFSM as an option in realtimeQuickStart [pinot]

2025-02-04 Thread via GitHub


9aman commented on code in PR #14961:
URL: https://github.com/apache/pinot/pull/14961#discussion_r1942278607


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##
@@ -301,6 +309,74 @@ public ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap 
getConsumingSegmentsI
 }
   }
 
+  @GET
+  @Path("/tables/{tableName}/pauselessDebugInfo")
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.GET_DEBUG_INFO)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Returns state of pauseless table", notes =
+  "Gets the segments that are in error state and optionally gets 
COMMITTING segments based on the "
+  + "includeCommittingSegments parameter")
+  @ApiResponses(value = {
+  @ApiResponse(code = 200, message = "Success"),
+  @ApiResponse(code = 404, message = "Table not found"),
+  @ApiResponse(code = 500, message = "Internal server error")
+  })
+  public String getPauslessTableDebugInfo(

Review Comment:
   I don't feel this might get extended. Moreover, this is not called 
internally by controller or server and hence thought it's fine to not introduce 
another class. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [I] Segments are lost [pinot]

2025-02-04 Thread via GitHub


raghukn commented on issue #14993:
URL: https://github.com/apache/pinot/issues/14993#issuecomment-2635744569

   @xiangfu0 @mayankshriv -- Plese help me triage 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [I] Segments are lost [pinot]

2025-02-04 Thread via GitHub


raghukn commented on issue #14993:
URL: https://github.com/apache/pinot/issues/14993#issuecomment-2635743223

   https://github.com/user-attachments/files/18667128/server0-log.log shows 
clearly segments all the way upto 159 getting created


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [I] Segments are lost [pinot]

2025-02-04 Thread via GitHub


raghukn commented on issue #14993:
URL: https://github.com/apache/pinot/issues/14993#issuecomment-2635742116

   
[server0-log.log](https://github.com/user-attachments/files/18667128/server0-log.log)
   
   https://github.com/user-attachments/assets/d3a416e0-ebf5-4452-a5ca-f29874d3eb86";
 />
   https://github.com/user-attachments/assets/da2e7188-5cf9-4fca-9096-82e1af5bcd9e";
 />


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Adding perf benchmark logic for GroupIdGenerator hash map [pinot]

2025-02-04 Thread via GitHub


codecov-commenter commented on PR #14992:
URL: https://github.com/apache/pinot/pull/14992#issuecomment-2635740181

   ## 
[Codecov](https://app.codecov.io/gh/apache/pinot/pull/14992?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 Report
   All modified and coverable lines are covered by tests :white_check_mark:
   > Project coverage is 63.55%. Comparing base 
[(`59551e4`)](https://app.codecov.io/gh/apache/pinot/commit/59551e45224f1535c4863fd577622b37366ccc97?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 to head 
[(`6e70a53`)](https://app.codecov.io/gh/apache/pinot/commit/6e70a53bfddc4c09e2e8b7a8851c9745173b526f?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   > Report is 1673 commits behind head on master.
   
   Additional details and impacted files
   
   
   ```diff
   @@ Coverage Diff  @@
   ## master   #14992  +/-   ##
   
   + Coverage 61.75%   63.55%   +1.80% 
   - Complexity  207 1388+1181 
   
 Files  2436 2713 +277 
 Lines133233   152169   +18936 
 Branches  2063623526+2890 
   
   + Hits  8227496718   +1 
   - Misses4491148151+3240 
   - Partials   6048 7300+1252 
   ```
   
   | 
[Flag](https://app.codecov.io/gh/apache/pinot/pull/14992/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | Coverage Δ | |
   |---|---|---|
   | 
[custom-integration1](https://app.codecov.io/gh/apache/pinot/pull/14992/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `?` | |
   | 
[integration](https://app.codecov.io/gh/apache/pinot/pull/14992/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `?` | |
   | 
[integration1](https://app.codecov.io/gh/apache/pinot/pull/14992/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `?` | |
   | 
[integration2](https://app.codecov.io/gh/apache/pinot/pull/14992/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `?` | |
   | 
[java-11](https://app.codecov.io/gh/apache/pinot/pull/14992/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `?` | |
   | 
[java-21](https://app.codecov.io/gh/apache/pinot/pull/14992/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `63.55% <ø> (+1.93%)` | :arrow_up: |
   | 
[skip-bytebuffers-false](https://app.codecov.io/gh/apache/pinot/pull/14992/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `56.01% <ø> (-5.73%)` | :arrow_down: |
   | 
[skip-bytebuffers-true](https://app.codecov.io/gh/apache/pinot/pull/14992/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `63.54% <ø> (+35.82%)` | :arrow_up: |
   | 
[temurin](https://app.codecov.io/gh/apache/pinot/pull/14992/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `63.55% <ø> (+1.80%)` | :arrow_up: |
   | 
[unittests](https://app.codecov.io/gh/apache/pinot/pull/14992/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `63.55% <ø> (+1.81%)` | :arrow_up: |
   | 
[unittests1](https://app.codecov.io/gh/apache/pinot/pull/14992/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `56.06% <ø> (+9.17%)` | :arrow_up: |
   | 
[unittests2](https://app.codecov.io/gh/apache/pinot/pull/14992/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `34.01% <ø> (+6.28%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click 
here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment)
 to find out more.
   
   
   
   [:umbrella: View full report in Codecov by 
Sentry](https://app.codecov.io/gh/apache/pinot/pull/14992?dropdown=coverage&src=pr&el=continue&utm_medium=referral&utm_source

[I] Segments are lost [pinot]

2025-02-04 Thread via GitHub


raghukn opened a new issue, #14993:
URL: https://github.com/apache/pinot/issues/14993

   Hi,
   
   I was ingesting data from Kafka Topic (679GB) yesterday and had seen 16M 
rows in my table by EOD yesterday. But when I check today, I am surprised there 
are only 14 segments and at total of only 4.5M rows. 
   
   Not sure how the segments are lost !
   
   Attached is the server 0's log file which seem to show around 159 segments 
created each of 50 rows.  I was expecting 79M rows in the table by now 
(Given that row limit of my segment is 50 each).
   
   Can I know why segments are missing?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Calculate size based flush threshold per topic [pinot]

2025-02-04 Thread via GitHub


chenboat commented on PR #14765:
URL: https://github.com/apache/pinot/pull/14765#issuecomment-2635674904

   @Jackie-Jiang Do you still have concerns here? This PR looks good to me 
without extensive testings on Production.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]

2025-02-04 Thread via GitHub


9aman commented on code in PR #14920:
URL: https://github.com/apache/pinot/pull/14920#discussion_r1942252443


##
pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReIngestionResource.java:
##
@@ -0,0 +1,533 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources;
+
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.NameValuePair;
+import org.apache.hc.core5.http.message.BasicHeader;
+import org.apache.hc.core5.http.message.BasicNameValuePair;
+import org.apache.pinot.common.auth.AuthProviderUtils;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.SimpleHttpResponse;
+import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.common.utils.URIUtils;
+import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
+import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.server.api.resources.reingestion.ReIngestionRequest;
+import org.apache.pinot.server.api.resources.reingestion.ReIngestionResponse;
+import 
org.apache.pinot.server.api.resources.reingestion.utils.StatelessRealtimeSegmentDataManager;
+import org.apache.pinot.server.realtime.ControllerLeaderLocator;
+import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
+import org.apache.pinot.server.starter.ServerInstance;
+import org.apache.pinot.spi.auth.AuthProvider;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.StringUtil;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.sl

Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]

2025-02-04 Thread via GitHub


9aman commented on code in PR #14920:
URL: https://github.com/apache/pinot/pull/14920#discussion_r1942248666


##
pinot-server/src/main/java/org/apache/pinot/server/api/resources/reingestion/utils/StatelessRealtimeSegmentDataManager.java:
##
@@ -0,0 +1,575 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources.reingestion.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
+import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager;
+import 
org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
+import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.utils.IngestionUtils;
+import org.apache.pinot.segment.spi.MutableSegment;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
+import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamDataDecoder;
+import org.apache.pinot.spi.stream.StreamDataDecoderImpl;
+import org.apache.pinot.spi.stream.StreamDataDecoderResult;
+import org.apache.pinot.spi.stream.StreamMessage;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Simplified Segment Data Manager for ingesting data from a start offset to 
an end offset.
+ */
+public class StatelessRealtimeSegmentDataManager extends SegmentDataManager {
+
+  private static final int DEFAULT_CAPACITY = 100_000;
+  private static final int DEFAULT_FETCH_TIMEOUT_MS = 5000;
+  public static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = 
new FileUploadDownloadClient();
+
+  private final String _segmentName;
+  private final String _tableNameWithType;
+  private final int _partitionGroupId;
+  private final String _segmentNameStr;
+  private final SegmentZKMetadata _segmentZKMetadata;
+  private final TableConfig _tableConfig;
+  private final Schema _schema;
+  private final StreamConfi

Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]

2025-02-04 Thread via GitHub


9aman commented on code in PR #14920:
URL: https://github.com/apache/pinot/pull/14920#discussion_r1942241681


##
pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java:
##
@@ -1274,6 +1275,47 @@ public File downloadUntarFileStreamed(URI uri, File 
dest, AuthProvider authProvi
 httpHeaders, maxStreamRateInByte);
   }
 
+  /**
+   * Invokes the server's reIngestSegment API via a POST request with JSON 
payload,
+   * using Simple HTTP APIs.
+   *
+   * POST http://[serverURL]/reIngestSegment
+   * {
+   *   "tableNameWithType": [tableName],
+   *   "segmentName": [segmentName]
+   * }
+   */
+  public void triggerReIngestion(String serverHostPort, String 
tableNameWithType, String segmentName)
+  throws IOException, URISyntaxException, HttpErrorStatusException {
+String scheme = HTTP;
+if (serverHostPort.contains(HTTPS)) {
+  scheme = HTTPS;
+  serverHostPort = serverHostPort.replace(HTTPS + "://", "");
+} else if (serverHostPort.contains(HTTP)) {
+  serverHostPort = serverHostPort.replace(HTTP + "://", "");
+}
+
+String serverHost = serverHostPort.split(":")[0];
+String serverPort = serverHostPort.split(":")[1];
+
+URI reIngestUri = getURI(scheme, serverHost, Integer.parseInt(serverPort), 
REINGEST_SEGMENT_PATH);
+
+Map requestJson = new HashMap<>();
+requestJson.put("tableNameWithType", tableNameWithType);
+requestJson.put("segmentName", segmentName);
+
+String jsonPayload = JsonUtils.objectToString(requestJson);
+SimpleHttpResponse response =
+
HttpClient.wrapAndThrowHttpException(_httpClient.sendJsonPostRequest(reIngestUri,
 jsonPayload));
+
+// Check that we got a 2xx response
+int statusCode = response.getStatusCode();
+if (statusCode / 100 != 2) {
+  throw new IOException(String.format("Failed POST to %s, HTTP %d: %s",

Review Comment:
   Was the code updated here ? 
   I see that the comment has been resolved but no changes in the code. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[PR] Adding perf benchmark logic for GroupIdGenerator hash map [pinot]

2025-02-04 Thread via GitHub


shauryachats opened a new pull request, #14992:
URL: https://github.com/apache/pinot/pull/14992

   This PR adds performance benchmarking logic to identify and measure the 
improvement of different strategies for hash map selection, to make a 
data-driven choice on the hash map used to power `GroupIdGenerator`, which has 
been described in the issue https://github.com/apache/pinot/issues/14685.
   
   The results of this benchmark are:
   ```
   Benchmark (_cardinality)  
Mode  Cnt Score Error  Units  
   BenchmarkObjectOpenHashMap.object2IntOpenHashMap  50  
avgt   20   111.262 ±   5.448  ms/op  
   BenchmarkObjectOpenHashMap.object2IntOpenHashMap 100  
avgt   20   299.255 ±   9.814  ms/op  
   BenchmarkObjectOpenHashMap.object2IntOpenHashMap 500  
avgt   20  1859.503 ±  58.990  ms/op  
   BenchmarkObjectOpenHashMap.object2IntOpenHashMap2000  
avgt   20  8236.525 ± 170.751  ms/op  
   
   BenchmarkObjectOpenHashMap.object2IntReservedOpenHashMap  50  
avgt   2079.908 ±   4.715  ms/op  
   BenchmarkObjectOpenHashMap.object2IntReservedOpenHashMap 100  
avgt   20   180.827 ±  19.987  ms/op  
   BenchmarkObjectOpenHashMap.object2IntReservedOpenHashMap 500  
avgt   20  1051.368 ±  49.204  ms/op  
   BenchmarkObjectOpenHashMap.object2IntReservedOpenHashMap2000  
avgt   20  3340.668 ± 106.874  ms/op 

   BenchmarkObjectOpenHashMap.vanillaHashMap 50  
avgt   20   109.589 ±   2.836  ms/op  
   BenchmarkObjectOpenHashMap.vanillaHashMap100  
avgt   20   265.262 ±   5.215  ms/op  
   BenchmarkObjectOpenHashMap.vanillaHashMap500  
avgt   20  1556.399 ±  49.787  ms/op  
   BenchmarkObjectOpenHashMap.vanillaHashMap   2000  
avgt   20  6757.234 ± 314.138  ms/op 

   BenchmarkObjectOpenHashMap.vanillaReservedHashMap 50  
avgt   2098.798 ±   4.344  ms/op  
   BenchmarkObjectOpenHashMap.vanillaReservedHashMap100  
avgt   20   228.480 ±   6.570  ms/op  
   BenchmarkObjectOpenHashMap.vanillaReservedHashMap500  
avgt   20  1067.580 ±  48.764  ms/op  
   BenchmarkObjectOpenHashMap.vanillaReservedHashMap   2000  
avgt   20  4725.897 ± 284.449  ms/op
   ```
   which yields that the reserved hashmap performs ~2.5x better than the 
current unreserved hashmap, which led to the following PR: 
https://github.com/apache/pinot/pull/14981.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Bump com.google.cloud:libraries-bom from 26.53.0 to 26.54.0 [pinot]

2025-02-04 Thread via GitHub


Jackie-Jiang merged PR #14988:
URL: https://github.com/apache/pinot/pull/14988


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Bump software.amazon.awssdk:bom from 2.30.11 to 2.30.12 [pinot]

2025-02-04 Thread via GitHub


Jackie-Jiang merged PR #14987:
URL: https://github.com/apache/pinot/pull/14987


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Bump org.checkerframework:checker-qual from 3.48.4 to 3.49.0 [pinot]

2025-02-04 Thread via GitHub


Jackie-Jiang merged PR #14985:
URL: https://github.com/apache/pinot/pull/14985


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Bump joda-time:joda-time from 2.13.0 to 2.13.1 [pinot]

2025-02-04 Thread via GitHub


Jackie-Jiang merged PR #14986:
URL: https://github.com/apache/pinot/pull/14986


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]

2025-02-04 Thread via GitHub


Jackie-Jiang commented on code in PR #14920:
URL: https://github.com/apache/pinot/pull/14920#discussion_r1942150405


##
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##
@@ -389,15 +390,21 @@ protected void 
replaceSegmentIfCrcMismatch(SegmentDataManager segmentDataManager
   IndexLoadingConfig indexLoadingConfig)
   throws Exception {
 String segmentName = segmentDataManager.getSegmentName();
-Preconditions.checkState(segmentDataManager instanceof 
ImmutableSegmentDataManager,
-"Cannot replace CONSUMING segment: %s in table: %s", segmentName, 
_tableNameWithType);
-SegmentMetadata localMetadata = 
segmentDataManager.getSegment().getSegmentMetadata();
-if (hasSameCRC(zkMetadata, localMetadata)) {
-  _logger.info("Segment: {} has CRC: {} same as before, not replacing it", 
segmentName, localMetadata.getCrc());
-  return;
+TableConfig tableConfig = indexLoadingConfig.getTableConfig();
+// For pauseless tables, we should replace the segment if download url is 
missing even if crc is same
+// Without this the reingestion of ERROR segments in pauseless tables fails
+// as the segment data manager is still an instance of 
RealtimeSegmentDataManager
+if (!PauselessConsumptionUtils.isPauselessEnabled(tableConfig)) {

Review Comment:
   We probably need a new API to handle re-ingested segment. It is not regular 
segment push. With a new API we can also set the status to `DONE`, and then 
trigger the reset from the API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Set CONSUMING state priority for helix state model [pinot]

2025-02-04 Thread via GitHub


somandal commented on code in PR #14991:
URL: https://github.com/apache/pinot/pull/14991#discussion_r1942082673


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixSegmentOnlineOfflineStateModelGenerator.java:
##
@@ -46,6 +40,10 @@ private PinotHelixSegmentOnlineOfflineStateModelGenerator() {
   public static final String DROPPED_STATE = "DROPPED";
   public static final String CONSUMING_STATE = "CONSUMING";
 
+  public static final int DEFAULT_TRANSITION_PRIORITY = 1000;
+  // Given a lower priority to let the consuming segment start last during 
server startup.
+  public static final int OFFLINE_TO_CONSUMING_TRANSITION_PRIORITY = 1100;

Review Comment:
   what do you mean by buffer though?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Set CONSUMING state priority for helix state model [pinot]

2025-02-04 Thread via GitHub


xiangfu0 commented on code in PR #14991:
URL: https://github.com/apache/pinot/pull/14991#discussion_r1942060957


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixSegmentOnlineOfflineStateModelGenerator.java:
##
@@ -46,6 +40,10 @@ private PinotHelixSegmentOnlineOfflineStateModelGenerator() {
   public static final String DROPPED_STATE = "DROPPED";
   public static final String CONSUMING_STATE = "CONSUMING";
 
+  public static final int DEFAULT_TRANSITION_PRIORITY = 1000;
+  // Given a lower priority to let the consuming segment start last during 
server startup.
+  public static final int OFFLINE_TO_CONSUMING_TRANSITION_PRIORITY = 1100;

Review Comment:
   just give enough buffer between each state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Introducing MSE result holder config to minimize rehashing for high cardinality group by [pinot]

2025-02-04 Thread via GitHub


shauryachats commented on code in PR #14981:
URL: https://github.com/apache/pinot/pull/14981#discussion_r1941950925


##
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java:
##
@@ -87,7 +87,13 @@ public MultistageGroupByExecutor(
 _aggType = aggType;
 _leafReturnFinalResult = leafReturnFinalResult;
 _resultSchema = resultSchema;
+
 int maxInitialResultHolderCapacity = 
getMaxInitialResultHolderCapacity(opChainMetadata, nodeHint);
+Integer mseCapacity = 
getMSEMaxInitialResultHolderCapacity(opChainMetadata, nodeHint);

Review Comment:
   Addressed.



##
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java:
##
@@ -87,7 +87,13 @@ public MultistageGroupByExecutor(
 _aggType = aggType;
 _leafReturnFinalResult = leafReturnFinalResult;
 _resultSchema = resultSchema;
+
 int maxInitialResultHolderCapacity = 
getMaxInitialResultHolderCapacity(opChainMetadata, nodeHint);
+Integer mseCapacity = 
getMSEMaxInitialResultHolderCapacity(opChainMetadata, nodeHint);
+if (mseCapacity != null) {
+  maxInitialResultHolderCapacity = mseCapacity;

Review Comment:
   Addressed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Introducing MSE result holder config to minimize rehashing for high cardinality group by [pinot]

2025-02-04 Thread via GitHub


shauryachats commented on code in PR #14981:
URL: https://github.com/apache/pinot/pull/14981#discussion_r1941951138


##
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##
@@ -756,6 +757,8 @@ public static class Server {
 "pinot.server.query.executor.group.trim.size";
 public static final String 
CONFIG_OF_QUERY_EXECUTOR_MAX_INITIAL_RESULT_HOLDER_CAPACITY =
 "pinot.server.query.executor.max.init.group.holder.capacity";
+public static final String 
CONFIG_OF_QUERY_EXECUTOR_MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY =

Review Comment:
   Addressed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Introducing MSE result holder config to minimize rehashing for high cardinality group by [pinot]

2025-02-04 Thread via GitHub


ankitsultana commented on code in PR #14981:
URL: https://github.com/apache/pinot/pull/14981#discussion_r1941904398


##
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##
@@ -756,6 +757,8 @@ public static class Server {
 "pinot.server.query.executor.group.trim.size";
 public static final String 
CONFIG_OF_QUERY_EXECUTOR_MAX_INITIAL_RESULT_HOLDER_CAPACITY =
 "pinot.server.query.executor.max.init.group.holder.capacity";
+public static final String 
CONFIG_OF_QUERY_EXECUTOR_MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY =

Review Comment:
   Remove `QUERY_EXECUTOR` from the var name too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Introducing MSE result holder config to minimize rehashing for high cardinality group by [pinot]

2025-02-04 Thread via GitHub


ankitsultana commented on code in PR #14981:
URL: https://github.com/apache/pinot/pull/14981#discussion_r1941911832


##
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java:
##
@@ -87,7 +87,13 @@ public MultistageGroupByExecutor(
 _aggType = aggType;
 _leafReturnFinalResult = leafReturnFinalResult;
 _resultSchema = resultSchema;
+
 int maxInitialResultHolderCapacity = 
getMaxInitialResultHolderCapacity(opChainMetadata, nodeHint);
+Integer mseCapacity = 
getMSEMaxInitialResultHolderCapacity(opChainMetadata, nodeHint);

Review Comment:
   nit: naming could be made slightly more precise (e.g. 
`mseMaxInitialResultHolderCapacity`).
   
   Also you could move this logic to a separate method in the class to keep 
this clean.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Introducing MSE result holder config to minimize rehashing for high cardinality group by [pinot]

2025-02-04 Thread via GitHub


ankitsultana commented on code in PR #14981:
URL: https://github.com/apache/pinot/pull/14981#discussion_r1941907710


##
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java:
##
@@ -87,7 +87,13 @@ public MultistageGroupByExecutor(
 _aggType = aggType;
 _leafReturnFinalResult = leafReturnFinalResult;
 _resultSchema = resultSchema;
+
 int maxInitialResultHolderCapacity = 
getMaxInitialResultHolderCapacity(opChainMetadata, nodeHint);
+Integer mseCapacity = 
getMSEMaxInitialResultHolderCapacity(opChainMetadata, nodeHint);
+if (mseCapacity != null) {
+  maxInitialResultHolderCapacity = mseCapacity;

Review Comment:
   I guess the behavior we are implementing is:
   
   1. By default use the previous behavior
   2. If a user has explicitly set a hint for the mse initial capacity, or set 
a server config, then use that.
   
   Can you also call it out in the Issue Description? We'll have to update 
Pinot Docs too later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Introducing MSE result holder config to minimize rehashing for high cardinality group by [pinot]

2025-02-04 Thread via GitHub


ankitsultana commented on code in PR #14981:
URL: https://github.com/apache/pinot/pull/14981#discussion_r1941903211


##
pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java:
##
@@ -62,6 +62,7 @@ public static class AggregateOptions {
 public static final String GROUP_TRIM_SIZE = "group_trim_size";
 
 public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY = 
"max_initial_result_holder_capacity";
+public static final String MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY = 
"mse_max_initial_result_holder_capacity";

Review Comment:
   @Jackie-Jiang : do you have any recommendation for the hint name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Set CONSUMING state priority for helix state model [pinot]

2025-02-04 Thread via GitHub


codecov-commenter commented on PR #14991:
URL: https://github.com/apache/pinot/pull/14991#issuecomment-2635078552

   ## 
[Codecov](https://app.codecov.io/gh/apache/pinot/pull/14991?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 Report
   All modified and coverable lines are covered by tests :white_check_mark:
   > Project coverage is 63.59%. Comparing base 
[(`59551e4`)](https://app.codecov.io/gh/apache/pinot/commit/59551e45224f1535c4863fd577622b37366ccc97?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 to head 
[(`819a39d`)](https://app.codecov.io/gh/apache/pinot/commit/819a39d38958fd0091000d0adbd29408009ed2d5?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   > Report is 1669 commits behind head on master.
   
   Additional details and impacted files
   
   
   ```diff
   @@ Coverage Diff  @@
   ## master   #14991  +/-   ##
   
   + Coverage 61.75%   63.59%   +1.84% 
   - Complexity  207 1391+1184 
   
 Files  2436 2713 +277 
 Lines133233   152128   +18895 
 Branches  2063623519+2883 
   
   + Hits  8227496744   +14470 
   - Misses4491148085+3174 
   - Partials   6048 7299+1251 
   ```
   
   | 
[Flag](https://app.codecov.io/gh/apache/pinot/pull/14991/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | Coverage Δ | |
   |---|---|---|
   | 
[custom-integration1](https://app.codecov.io/gh/apache/pinot/pull/14991/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `?` | |
   | 
[integration](https://app.codecov.io/gh/apache/pinot/pull/14991/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `?` | |
   | 
[integration1](https://app.codecov.io/gh/apache/pinot/pull/14991/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `?` | |
   | 
[integration2](https://app.codecov.io/gh/apache/pinot/pull/14991/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `?` | |
   | 
[java-11](https://app.codecov.io/gh/apache/pinot/pull/14991/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `?` | |
   | 
[java-21](https://app.codecov.io/gh/apache/pinot/pull/14991/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `63.59% <100.00%> (+1.96%)` | :arrow_up: |
   | 
[skip-bytebuffers-false](https://app.codecov.io/gh/apache/pinot/pull/14991/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `63.58% <100.00%> (+1.84%)` | :arrow_up: |
   | 
[skip-bytebuffers-true](https://app.codecov.io/gh/apache/pinot/pull/14991/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `56.04% <ø> (+28.31%)` | :arrow_up: |
   | 
[temurin](https://app.codecov.io/gh/apache/pinot/pull/14991/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `63.59% <100.00%> (+1.84%)` | :arrow_up: |
   | 
[unittests](https://app.codecov.io/gh/apache/pinot/pull/14991/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `63.59% <100.00%> (+1.84%)` | :arrow_up: |
   | 
[unittests1](https://app.codecov.io/gh/apache/pinot/pull/14991/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `56.10% <ø> (+9.21%)` | :arrow_up: |
   | 
[unittests2](https://app.codecov.io/gh/apache/pinot/pull/14991/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `34.02% <100.00%> (+6.28%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click 
here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment)
 to find out more.
   
   
   
   [:umbrella: View full report in Codecov by 
Sentry](https://app.codecov.io/gh/apache/pinot/pull/14991?dropdown=coverage&src=pr&el=continue&ut

Re: [PR] Set CONSUMING state priority for helix state model [pinot]

2025-02-04 Thread via GitHub


somandal commented on code in PR #14991:
URL: https://github.com/apache/pinot/pull/14991#discussion_r1941854342


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixSegmentOnlineOfflineStateModelGenerator.java:
##
@@ -46,6 +40,10 @@ private PinotHelixSegmentOnlineOfflineStateModelGenerator() {
   public static final String DROPPED_STATE = "DROPPED";
   public static final String CONSUMING_STATE = "CONSUMING";
 
+  public static final int DEFAULT_TRANSITION_PRIORITY = 1000;
+  // Given a lower priority to let the consuming segment start last during 
server startup.
+  public static final int OFFLINE_TO_CONSUMING_TRANSITION_PRIORITY = 1100;

Review Comment:
   Why such high values? I guess you can make the default as 1 and offline to 
consuming as 2?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Introducing MSE result holder config to minimize rehashing for high cardinality group by [pinot]

2025-02-04 Thread via GitHub


shauryachats commented on code in PR #14981:
URL: https://github.com/apache/pinot/pull/14981#discussion_r1941846709


##
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/GroupIdGeneratorFactory.java:
##
@@ -25,24 +25,27 @@ public class GroupIdGeneratorFactory {
   private GroupIdGeneratorFactory() {
   }
 
-  public static GroupIdGenerator getGroupIdGenerator(ColumnDataType[] 
keyTypes, int numKeyColumns, int numGroupsLimit) {
+  public static GroupIdGenerator getGroupIdGenerator(ColumnDataType[] 
keyTypes, int numKeyColumns,
+  int numGroupsLimit, int maxInitialResultHolderCapacity) {
+// Initial capacity is one more than expected to avoid rehashing if 
container is full.
+int initialCapacity = 1 + Math.min(maxInitialResultHolderCapacity, 
numGroupsLimit);

Review Comment:
   Agreed, updated.



##
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##
@@ -756,6 +757,8 @@ public static class Server {
 "pinot.server.query.executor.group.trim.size";
 public static final String 
CONFIG_OF_QUERY_EXECUTOR_MAX_INITIAL_RESULT_HOLDER_CAPACITY =
 "pinot.server.query.executor.max.init.group.holder.capacity";
+public static final String 
CONFIG_OF_QUERY_EXECUTOR_MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY =
+"pinot.server.query.executor.mse.max.init.group.holder.capacity";

Review Comment:
   Good point, updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[PR] Set CONSUMING state priority for helix state model [pinot]

2025-02-04 Thread via GitHub


xiangfu0 opened a new pull request, #14991:
URL: https://github.com/apache/pinot/pull/14991

   Set CONSUMING state priority for helix state model:
   Here we make ONLINE state to be the higher priority than CONSUMING.
   
   Why:
   Usually in the normal operations this won't make any difference.
   In large table restart scenario (e.g. the server restart or pre-process 
taking long time, say 1 hour), if the consuming segment is up early then the 
pinot server spent more cpu cycles on the consuming segments. 
   In fact, we should let the bootstrap server/restart server to focus on the 
pre-process for those ONLINE segments transitions, and at the last to bring 
back the CONSUMING state.
   Another thing worth to mention is that, during the bootstrap, the running 
replica could consume data and commit multiple segments already, to allow the 
bootstrap replica to bypass CONSUMING and directly download the segments from 
deep store.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Introducing MSE result holder config to minimize rehashing for high cardinality group by [pinot]

2025-02-04 Thread via GitHub


ankitsultana commented on code in PR #14981:
URL: https://github.com/apache/pinot/pull/14981#discussion_r1941785723


##
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/GroupIdGeneratorFactory.java:
##
@@ -25,24 +25,27 @@ public class GroupIdGeneratorFactory {
   private GroupIdGeneratorFactory() {
   }
 
-  public static GroupIdGenerator getGroupIdGenerator(ColumnDataType[] 
keyTypes, int numKeyColumns, int numGroupsLimit) {
+  public static GroupIdGenerator getGroupIdGenerator(ColumnDataType[] 
keyTypes, int numKeyColumns,
+  int numGroupsLimit, int maxInitialResultHolderCapacity) {
+// Initial capacity is one more than expected to avoid rehashing if 
container is full.
+int initialCapacity = 1 + Math.min(maxInitialResultHolderCapacity, 
numGroupsLimit);

Review Comment:
   Hash table resize would happen based on load factor though right? Adding 1 
here might not do much. (default is usually 0.75)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Introducing MSE result holder config to minimize rehashing for high cardinality group by [pinot]

2025-02-04 Thread via GitHub


ankitsultana commented on code in PR #14981:
URL: https://github.com/apache/pinot/pull/14981#discussion_r1941785723


##
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/GroupIdGeneratorFactory.java:
##
@@ -25,24 +25,27 @@ public class GroupIdGeneratorFactory {
   private GroupIdGeneratorFactory() {
   }
 
-  public static GroupIdGenerator getGroupIdGenerator(ColumnDataType[] 
keyTypes, int numKeyColumns, int numGroupsLimit) {
+  public static GroupIdGenerator getGroupIdGenerator(ColumnDataType[] 
keyTypes, int numKeyColumns,
+  int numGroupsLimit, int maxInitialResultHolderCapacity) {
+// Initial capacity is one more than expected to avoid rehashing if 
container is full.
+int initialCapacity = 1 + Math.min(maxInitialResultHolderCapacity, 
numGroupsLimit);

Review Comment:
   Hash table resize would happen based on load factor though right? Adding 1 
here might not do much.



##
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##
@@ -756,6 +757,8 @@ public static class Server {
 "pinot.server.query.executor.group.trim.size";
 public static final String 
CONFIG_OF_QUERY_EXECUTOR_MAX_INITIAL_RESULT_HOLDER_CAPACITY =
 "pinot.server.query.executor.max.init.group.holder.capacity";
+public static final String 
CONFIG_OF_QUERY_EXECUTOR_MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY =
+"pinot.server.query.executor.mse.max.init.group.holder.capacity";

Review Comment:
   I think we could remove `query.executor` fragment from this altogether. 
afaik query.executor refers to ServerQueryExecutorV1Impl and the corresponding 
interface, which are V1 Engine constructs. That would yield:
   
   ```
   pinot.server.mse.max.init.group.holder.capacity
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Add broker config with default value for is_enable_group_trim hint [pinot]

2025-02-04 Thread via GitHub


codecov-commenter commented on PR #14990:
URL: https://github.com/apache/pinot/pull/14990#issuecomment-2634488309

   ## 
[Codecov](https://app.codecov.io/gh/apache/pinot/pull/14990?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 Report
   Attention: Patch coverage is `60.0%` with `8 lines` in your changes 
missing coverage. Please review.
   > Project coverage is 56.01%. Comparing base 
[(`59551e4`)](https://app.codecov.io/gh/apache/pinot/commit/59551e45224f1535c4863fd577622b37366ccc97?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 to head 
[(`fe8c273`)](https://app.codecov.io/gh/apache/pinot/commit/fe8c2739ef341e2bb23834363b2e4ffdaebd7320?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   > Report is 1669 commits behind head on master.
   
   | [Files with missing 
lines](https://app.codecov.io/gh/apache/pinot/pull/14990?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | Patch % | Lines |
   |---|---|---|
   | 
[...el/rules/PinotAggregateExchangeNodeInsertRule.java](https://app.codecov.io/gh/apache/pinot/pull/14990?src=pr&el=tree&filepath=pinot-query-planner%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpinot%2Fcalcite%2Frel%2Frules%2FPinotAggregateExchangeNodeInsertRule.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcGxhbm5lci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2FsY2l0ZS9yZWwvcnVsZXMvUGlub3RBZ2dyZWdhdGVFeGNoYW5nZU5vZGVJbnNlcnRSdWxlLmphdmE=)
 | 50.00% | [3 Missing and 5 partials :warning: 
](https://app.codecov.io/gh/apache/pinot/pull/14990?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 |
   
   > :exclamation:  There is a different number of reports uploaded between 
BASE (59551e4) and HEAD (fe8c273). Click for more details.
   > 
   > HEAD has 53 uploads less than BASE
   >
   >| Flag | BASE (59551e4) | HEAD (fe8c273) |
   >|--|--|--|
   >|integration|7|0|
   >|integration2|3|0|
   >|temurin|12|1|
   >|java-21|7|1|
   >|skip-bytebuffers-true|3|0|
   >|skip-bytebuffers-false|7|1|
   >|unittests|5|1|
   >|unittests1|2|1|
   >|java-11|5|0|
   >|unittests2|3|0|
   >|integration1|2|0|
   >|custom-integration1|2|0|
   >
   
   Additional details and impacted files
   
   
   ```diff
   @@ Coverage Diff  @@
   ## master   #14990  +/-   ##
   
   - Coverage 61.75%   56.01%   -5.74% 
   - Complexity  207  710 +503 
   
 Files  2436 2122 -314 
 Lines133233   114585   -18648 
 Branches  2063618449-2187 
   
   - Hits  8227464186   -18088 
   - Misses4491145135 +224 
   + Partials   6048 5264 -784 
   ```
   
   | 
[Flag](https://app.codecov.io/gh/apache/pinot/pull/14990/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | Coverage Δ | |
   |---|---|---|
   | 
[custom-integration1](https://app.codecov.io/gh/apache/pinot/pull/14990/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `?` | |
   | 
[integration](https://app.codecov.io/gh/apache/pinot/pull/14990/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `?` | |
   | 
[integration1](https://app.codecov.io/gh/apache/pinot/pull/14990/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `?` | |
   | 
[integration2](https://app.codecov.io/gh/apache/pinot/pull/14990/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `?` | |
   | 
[java-11](https://app.codecov.io/gh/apache/pinot/pull/14990/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `?` | |
   | 
[java-21](https://app.codecov.io/gh/apache/pinot/pull/14990/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `56.01% <60.00%> (-5.61%)` | :arrow_down: |
   | 
[skip-bytebuffers-false](https://app.codecov.io/gh/apache/pinot/pull/14990/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `56.01% <60.00%> (-5.74%)` | :arrow_down: |
   | 
[skip-bytebuffers-true](https:

Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]

2025-02-04 Thread via GitHub


KKcorps commented on code in PR #14920:
URL: https://github.com/apache/pinot/pull/14920#discussion_r1941507341


##
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##
@@ -389,15 +390,21 @@ protected void 
replaceSegmentIfCrcMismatch(SegmentDataManager segmentDataManager
   IndexLoadingConfig indexLoadingConfig)
   throws Exception {
 String segmentName = segmentDataManager.getSegmentName();
-Preconditions.checkState(segmentDataManager instanceof 
ImmutableSegmentDataManager,
-"Cannot replace CONSUMING segment: %s in table: %s", segmentName, 
_tableNameWithType);
-SegmentMetadata localMetadata = 
segmentDataManager.getSegment().getSegmentMetadata();
-if (hasSameCRC(zkMetadata, localMetadata)) {
-  _logger.info("Segment: {} has CRC: {} same as before, not replacing it", 
segmentName, localMetadata.getCrc());
-  return;
+TableConfig tableConfig = indexLoadingConfig.getTableConfig();
+// For pauseless tables, we should replace the segment if download url is 
missing even if crc is same
+// Without this the reingestion of ERROR segments in pauseless tables fails
+// as the segment data manager is still an instance of 
RealtimeSegmentDataManager
+if (!PauselessConsumptionUtils.isPauselessEnabled(tableConfig)) {

Review Comment:
   Yeah, I am also not happy with this. The thing I am trying to solve for is 
basically making segment refresh succeed for reingesion
   
   Without this check it fails on the following line:
   ```
  Preconditions.checkState(segmentDataManager instanceof 
ImmutableSegmentDataManager,
 "Cannot replace CONSUMING segment: %s in table: %s", segmentName, 
_tableNameWithType);
```

Segment refresh is triggered whenever a segment is uploaded which is what 
reingestion is doing.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]

2025-02-04 Thread via GitHub


KKcorps commented on code in PR #14920:
URL: https://github.com/apache/pinot/pull/14920#discussion_r1941501560


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##
@@ -2096,6 +2104,131 @@ URI createSegmentPath(String rawTableName, String 
segmentName) {
 return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, 
URIUtils.encode(segmentName));
   }
 
+  /**
+   * Re-ingests segments that are in ERROR state in EV but ONLINE in IS with 
no peer copy on any server. This method
+   * will call the server reIngestSegment API
+   * on one of the alive servers that are supposed to host that segment 
according to IdealState.
+   *
+   * API signature:
+   *   POST http://[serverURL]/reIngestSegment
+   *   Request body (JSON):
+   *   {
+   * "tableNameWithType": [tableName],
+   * "segmentName": [segmentName]
+   *   }
+   *
+   * @param tableNameWithType The table name with type, e.g. "myTable_REALTIME"
+   */
+  public void reIngestSegmentsWithErrorState(String tableNameWithType) {
+// Step 1: Fetch the ExternalView and all segments
+ExternalView externalView = getExternalView(tableNameWithType);
+IdealState idealState = getIdealState(tableNameWithType);
+Map> segmentToInstanceCurrentStateMap = 
externalView.getRecord().getMapFields();
+Map> segmentToInstanceIdealStateMap = 
idealState.getRecord().getMapFields();
+
+// find segments in ERROR state in externalView
+List segmentsInErrorState = new ArrayList<>();
+for (Map.Entry> entry : 
segmentToInstanceCurrentStateMap.entrySet()) {
+  String segmentName = entry.getKey();
+  Map instanceStateMap = entry.getValue();
+  boolean allReplicasInError = true;
+  for (String state : instanceStateMap.values()) {
+if (!SegmentStateModel.ERROR.equals(state)) {
+  allReplicasInError = false;
+  break;
+}
+  }
+  if (allReplicasInError) {
+segmentsInErrorState.add(segmentName);
+  }
+}
+
+if (segmentsInErrorState.isEmpty()) {
+  LOGGER.info("No segments found in ERROR state for table {}", 
tableNameWithType);
+  return;
+}
+
+// filter out segments that are not ONLINE in IdealState
+for (String segmentName : segmentsInErrorState) {
+  Map instanceIdealStateMap = 
segmentToInstanceIdealStateMap.get(segmentName);
+  boolean isOnline = true;
+  for (String state : instanceIdealStateMap.values()) {
+if (!SegmentStateModel.ONLINE.equals(state)) {
+  isOnline = false;
+  break;
+}
+  }
+  if (!isOnline) {
+segmentsInErrorState.remove(segmentName);
+  }
+}
+
+// Step 2: For each segment, check the ZK metadata for conditions
+for (String segmentName : segmentsInErrorState) {
+  // Skip non-LLC segments or segments missing from the ideal state 
altogether
+  LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+  if (llcSegmentName == null) {
+continue;
+  }
+
+  SegmentZKMetadata segmentZKMetadata = 
getSegmentZKMetadata(tableNameWithType, segmentName);
+  // We only consider segments that are in COMMITTING state
+  if (segmentZKMetadata.getStatus() == Status.COMMITTING) {
+Map instanceStateMap = 
segmentToInstanceIdealStateMap.get(segmentName);
+
+// Step 3: “No peer has that segment.” => Re-ingest from one server 
that is supposed to host it and is alive
+LOGGER.info(
+"Segment {} in table {} is COMMITTING with missing download URL 
and no peer copy. Triggering re-ingestion.",
+segmentName, tableNameWithType);
+
+// Find at least one server that should host this segment and is alive
+String aliveServer = 
findAliveServerToReIngest(instanceStateMap.keySet());
+if (aliveServer == null) {
+  LOGGER.warn("No alive server found to re-ingest segment {} in table 
{}", segmentName, tableNameWithType);
+  continue;
+}
+
+try {
+  _fileUploadDownloadClient.triggerReIngestion(aliveServer, 
tableNameWithType, segmentName);

Review Comment:
   the reason I have kept it here so that I can utilise correct http client 
with all the ssl settings. What would be appropriate class to move this to? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]

2025-02-04 Thread via GitHub


KKcorps commented on PR #14920:
URL: https://github.com/apache/pinot/pull/14920#issuecomment-2634466525

   > When a segment is re-ingested, it should be `DONE` instead of `UPLOADED`. 
I didn't find the semaphore related logic. Is the PR description up to date?
   
   During reingestion segment metadata upload is triggered after segment 
building. Hence, the segment status appears as "UPLOADED" rather than "DONE". 
   
   Upon upload completion, a segment refresh operation is triggered, which 
replaces the failed RealtimeSegmentDataManager with an 
ImmutableSegmentDataManager. During this replacement process, the system 
automatically releases associated semaphores.
   
   Once the segment is refreshed, we send a reset message so that it starts 
showing up as ONLINE in EV


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]

2025-02-04 Thread via GitHub


KKcorps commented on code in PR #14920:
URL: https://github.com/apache/pinot/pull/14920#discussion_r1941464061


##
pinot-server/src/main/java/org/apache/pinot/server/api/resources/reingestion/utils/StatelessRealtimeSegmentDataManager.java:
##
@@ -0,0 +1,575 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources.reingestion.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
+import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager;
+import 
org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
+import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.utils.IngestionUtils;
+import org.apache.pinot.segment.spi.MutableSegment;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
+import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamDataDecoder;
+import org.apache.pinot.spi.stream.StreamDataDecoderImpl;
+import org.apache.pinot.spi.stream.StreamDataDecoderResult;
+import org.apache.pinot.spi.stream.StreamMessage;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Simplified Segment Data Manager for ingesting data from a start offset to 
an end offset.
+ */
+public class StatelessRealtimeSegmentDataManager extends SegmentDataManager {
+
+  private static final int DEFAULT_CAPACITY = 100_000;
+  private static final int DEFAULT_FETCH_TIMEOUT_MS = 5000;
+  public static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = 
new FileUploadDownloadClient();
+
+  private final String _segmentName;
+  private final String _tableNameWithType;
+  private final int _partitionGroupId;
+  private final String _segmentNameStr;
+  private final SegmentZKMetadata _segmentZKMetadata;
+  private final TableConfig _tableConfig;
+  private final Schema _schema;
+  private final StreamCon

[PR] Add broker config with default value for is_enable_group_trim hint [pinot]

2025-02-04 Thread via GitHub


bziobrowski opened a new pull request, #14990:
URL: https://github.com/apache/pinot/pull/14990

   PR adds `pinot.broker.enable.group.trim` broker configuration setting which 
allows enabling `is_enable_group_trim` hint for all queries, which in turn 
enables V1-style trimming in leaf nodes and v2-style trimming in intermediate 
nodes. 
   This default value is overridden with value passed in hint.
   
   Examples:
   - if `pinot.broker.enable.group.trim=true` then
   ```sql
   set explainAskingServers=true;
   explain plan for 
   select   i, j, count(*) as cnt
   from testTable
   group by i, j
   order by i asc, j asc 
   limit 3
   ```
   uses group trimming and ought to return plan containing (collations & limit 
are the important piece):
   ```
   ...
   LogicalSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC], fetch=[3])
 PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)], aggType=[FINAL], 
collations=[[0, 1]], limit=[3])
   PinotLogicalExchange(distribution=[hash[0, 1]])
 ...
   ```
   
   - if we explicitly disable group trimming with hint, e.g.
   
   ```sql
   set explainAskingServers=true;
   explain plan for 
   select /*+  aggOptions(is_enable_group_trim='false') */  i, j, count(*) as 
cnt
   from testTable
   group by i, j
   order by i asc, j asc 
   limit 3
   ```
   then execution plan doesn't show collations & limit in PinotLogicalAggregate
   ```
   ...
   LogicalSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC], fetch=[3])
 PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)], aggType=[FINAL])
   PinotLogicalExchange(distribution=[hash[0, 1]])
 ...
   ```
   which means that trimming is disabled.
   
   
   See 
https://docs.pinot.apache.org/users/user-guide-query/query-syntax/grouping-algorithm
 for details on grouping algorithm.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]

2025-02-04 Thread via GitHub


KKcorps commented on code in PR #14920:
URL: https://github.com/apache/pinot/pull/14920#discussion_r1941436572


##
pinot-server/src/main/java/org/apache/pinot/server/api/resources/reingestion/utils/StatelessRealtimeSegmentDataManager.java:
##
@@ -0,0 +1,575 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources.reingestion.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
+import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager;
+import 
org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
+import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.utils.IngestionUtils;
+import org.apache.pinot.segment.spi.MutableSegment;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
+import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamDataDecoder;
+import org.apache.pinot.spi.stream.StreamDataDecoderImpl;
+import org.apache.pinot.spi.stream.StreamDataDecoderResult;
+import org.apache.pinot.spi.stream.StreamMessage;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Simplified Segment Data Manager for ingesting data from a start offset to 
an end offset.
+ */
+public class StatelessRealtimeSegmentDataManager extends SegmentDataManager {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]

2025-02-04 Thread via GitHub


KKcorps commented on code in PR #14920:
URL: https://github.com/apache/pinot/pull/14920#discussion_r1941429595


##
pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReIngestionResource.java:
##
@@ -0,0 +1,533 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources;
+
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.NameValuePair;
+import org.apache.hc.core5.http.message.BasicHeader;
+import org.apache.hc.core5.http.message.BasicNameValuePair;
+import org.apache.pinot.common.auth.AuthProviderUtils;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.SimpleHttpResponse;
+import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.common.utils.URIUtils;
+import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
+import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.server.api.resources.reingestion.ReIngestionRequest;
+import org.apache.pinot.server.api.resources.reingestion.ReIngestionResponse;
+import 
org.apache.pinot.server.api.resources.reingestion.utils.StatelessRealtimeSegmentDataManager;
+import org.apache.pinot.server.realtime.ControllerLeaderLocator;
+import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
+import org.apache.pinot.server.starter.ServerInstance;
+import org.apache.pinot.spi.auth.AuthProvider;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.StringUtil;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.

Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]

2025-02-04 Thread via GitHub


KKcorps commented on code in PR #14920:
URL: https://github.com/apache/pinot/pull/14920#discussion_r1941389686


##
pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReIngestionResource.java:
##
@@ -0,0 +1,533 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources;
+
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.NameValuePair;
+import org.apache.hc.core5.http.message.BasicHeader;
+import org.apache.hc.core5.http.message.BasicNameValuePair;
+import org.apache.pinot.common.auth.AuthProviderUtils;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.SimpleHttpResponse;
+import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.common.utils.URIUtils;
+import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
+import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.server.api.resources.reingestion.ReIngestionRequest;
+import org.apache.pinot.server.api.resources.reingestion.ReIngestionResponse;
+import 
org.apache.pinot.server.api.resources.reingestion.utils.StatelessRealtimeSegmentDataManager;
+import org.apache.pinot.server.realtime.ControllerLeaderLocator;
+import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
+import org.apache.pinot.server.starter.ServerInstance;
+import org.apache.pinot.spi.auth.AuthProvider;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.StringUtil;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.

Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]

2025-02-04 Thread via GitHub


KKcorps commented on code in PR #14920:
URL: https://github.com/apache/pinot/pull/14920#discussion_r1941388026


##
pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java:
##
@@ -100,6 +100,14 @@ public AuthProvider getAuthProvider() {
 return _authProvider;
   }
 
+  public String getProtocol() {
+return _protocol;
+  }
+
+  public Integer getControllerHttpsPort() {
+return _controllerHttpsPort;
+  }
+

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]

2025-02-04 Thread via GitHub


KKcorps commented on code in PR #14920:
URL: https://github.com/apache/pinot/pull/14920#discussion_r1941381089


##
pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReIngestionResource.java:
##
@@ -0,0 +1,533 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources;
+
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.NameValuePair;
+import org.apache.hc.core5.http.message.BasicHeader;
+import org.apache.hc.core5.http.message.BasicNameValuePair;
+import org.apache.pinot.common.auth.AuthProviderUtils;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.SimpleHttpResponse;
+import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.common.utils.URIUtils;
+import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
+import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.server.api.resources.reingestion.ReIngestionRequest;
+import org.apache.pinot.server.api.resources.reingestion.ReIngestionResponse;
+import 
org.apache.pinot.server.api.resources.reingestion.utils.StatelessRealtimeSegmentDataManager;
+import org.apache.pinot.server.realtime.ControllerLeaderLocator;
+import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
+import org.apache.pinot.server.starter.ServerInstance;
+import org.apache.pinot.spi.auth.AuthProvider;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.StringUtil;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.

Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]

2025-02-04 Thread via GitHub


9aman commented on code in PR #14920:
URL: https://github.com/apache/pinot/pull/14920#discussion_r1941094499


##
pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReIngestionResource.java:
##
@@ -0,0 +1,533 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources;
+
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.NameValuePair;
+import org.apache.hc.core5.http.message.BasicHeader;
+import org.apache.hc.core5.http.message.BasicNameValuePair;
+import org.apache.pinot.common.auth.AuthProviderUtils;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.SimpleHttpResponse;
+import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.common.utils.URIUtils;
+import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
+import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.server.api.resources.reingestion.ReIngestionRequest;
+import org.apache.pinot.server.api.resources.reingestion.ReIngestionResponse;
+import 
org.apache.pinot.server.api.resources.reingestion.utils.StatelessRealtimeSegmentDataManager;
+import org.apache.pinot.server.realtime.ControllerLeaderLocator;
+import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
+import org.apache.pinot.server.starter.ServerInstance;
+import org.apache.pinot.spi.auth.AuthProvider;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.StringUtil;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.sl

Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]

2025-02-04 Thread via GitHub


KKcorps commented on code in PR #14920:
URL: https://github.com/apache/pinot/pull/14920#discussion_r1941044749


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##
@@ -2096,6 +2104,131 @@ URI createSegmentPath(String rawTableName, String 
segmentName) {
 return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, 
URIUtils.encode(segmentName));
   }
 
+  /**
+   * Re-ingests segments that are in ERROR state in EV but ONLINE in IS with 
no peer copy on any server. This method
+   * will call the server reIngestSegment API
+   * on one of the alive servers that are supposed to host that segment 
according to IdealState.
+   *
+   * API signature:
+   *   POST http://[serverURL]/reIngestSegment
+   *   Request body (JSON):
+   *   {
+   * "tableNameWithType": [tableName],
+   * "segmentName": [segmentName]
+   *   }
+   *
+   * @param tableNameWithType The table name with type, e.g. "myTable_REALTIME"
+   */
+  public void reIngestSegmentsWithErrorState(String tableNameWithType) {
+// Step 1: Fetch the ExternalView and all segments
+ExternalView externalView = getExternalView(tableNameWithType);
+IdealState idealState = getIdealState(tableNameWithType);
+Map> segmentToInstanceCurrentStateMap = 
externalView.getRecord().getMapFields();
+Map> segmentToInstanceIdealStateMap = 
idealState.getRecord().getMapFields();
+
+// find segments in ERROR state in externalView
+List segmentsInErrorState = new ArrayList<>();
+for (Map.Entry> entry : 
segmentToInstanceCurrentStateMap.entrySet()) {
+  String segmentName = entry.getKey();
+  Map instanceStateMap = entry.getValue();
+  boolean allReplicasInError = true;

Review Comment:
   `When there are ONLINE replica, ideally we should reset the ERROR replica. 
Do we rely on validation manager for that?` yes that is already a part of 
validation manager https://github.com/apache/pinot/pull/14217/files



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]

2025-02-04 Thread via GitHub


KKcorps commented on code in PR #14920:
URL: https://github.com/apache/pinot/pull/14920#discussion_r1941000154


##
pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReIngestionResource.java:
##
@@ -0,0 +1,525 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources;
+
+import com.google.common.base.Function;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.NameValuePair;
+import org.apache.hc.core5.http.message.BasicHeader;
+import org.apache.hc.core5.http.message.BasicNameValuePair;
+import org.apache.pinot.common.auth.AuthProviderUtils;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.SimpleHttpResponse;
+import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.common.utils.URIUtils;
+import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
+import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.server.api.resources.reingestion.ReIngestionRequest;
+import org.apache.pinot.server.api.resources.reingestion.ReIngestionResponse;
+import 
org.apache.pinot.server.api.resources.reingestion.utils.StatelessRealtimeSegmentDataManager;
+import org.apache.pinot.server.realtime.ControllerLeaderLocator;
+import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
+import org.apache.pinot.server.starter.ServerInstance;
+import org.apache.pinot.spi.auth.AuthProvider;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.StringUtil;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.pinot.spi.utils.CommonConstants.DATABASE;
+import static org.apache.pinot.spi.utils.Com

Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]

2025-02-04 Thread via GitHub


9aman commented on code in PR #14920:
URL: https://github.com/apache/pinot/pull/14920#discussion_r1940998922


##
pinot-server/src/main/java/org/apache/pinot/server/api/resources/reingestion/utils/StatelessRealtimeSegmentDataManager.java:
##
@@ -0,0 +1,575 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources.reingestion.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
+import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager;
+import 
org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
+import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.utils.IngestionUtils;
+import org.apache.pinot.segment.spi.MutableSegment;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
+import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamDataDecoder;
+import org.apache.pinot.spi.stream.StreamDataDecoderImpl;
+import org.apache.pinot.spi.stream.StreamDataDecoderResult;
+import org.apache.pinot.spi.stream.StreamMessage;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Simplified Segment Data Manager for ingesting data from a start offset to 
an end offset.
+ */
+public class StatelessRealtimeSegmentDataManager extends SegmentDataManager {
+
+  private static final int DEFAULT_CAPACITY = 100_000;
+  private static final int DEFAULT_FETCH_TIMEOUT_MS = 5000;
+  public static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = 
new FileUploadDownloadClient();
+
+  private final String _segmentName;
+  private final String _tableNameWithType;
+  private final int _partitionGroupId;
+  private final String _segmentNameStr;
+  private final SegmentZKMetadata _segmentZKMetadata;
+  private final TableConfig _tableConfig;
+  private final Schema _schema;
+  private final StreamConfi

Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]

2025-02-04 Thread via GitHub


9aman commented on code in PR #14920:
URL: https://github.com/apache/pinot/pull/14920#discussion_r1940993753


##
pinot-server/src/main/java/org/apache/pinot/server/api/resources/reingestion/utils/StatelessRealtimeSegmentDataManager.java:
##
@@ -0,0 +1,575 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources.reingestion.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
+import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager;
+import 
org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
+import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.utils.IngestionUtils;
+import org.apache.pinot.segment.spi.MutableSegment;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
+import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamDataDecoder;
+import org.apache.pinot.spi.stream.StreamDataDecoderImpl;
+import org.apache.pinot.spi.stream.StreamDataDecoderResult;
+import org.apache.pinot.spi.stream.StreamMessage;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Simplified Segment Data Manager for ingesting data from a start offset to 
an end offset.
+ */
+public class StatelessRealtimeSegmentDataManager extends SegmentDataManager {
+
+  private static final int DEFAULT_CAPACITY = 100_000;
+  private static final int DEFAULT_FETCH_TIMEOUT_MS = 5000;
+  public static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = 
new FileUploadDownloadClient();
+
+  private final String _segmentName;
+  private final String _tableNameWithType;
+  private final int _partitionGroupId;
+  private final String _segmentNameStr;
+  private final SegmentZKMetadata _segmentZKMetadata;
+  private final TableConfig _tableConfig;
+  private final Schema _schema;
+  private final StreamConfi

[PR] Bump org.apache.helix:helix-core from 1.3.1 to 1.4.3 [pinot]

2025-02-04 Thread via GitHub


dependabot[bot] opened a new pull request, #14989:
URL: https://github.com/apache/pinot/pull/14989

   Bumps [org.apache.helix:helix-core](https://github.com/apache/helix) from 
1.3.1 to 1.4.3.
   
   Release notes
   Sourced from https://github.com/apache/helix/releases";>org.apache.helix:helix-core's 
releases.
   
   Apache Helix 1.4.2
   Apache Release Note: https://helix.apache.org/1.4.2-docs/releasenotes/release-1.4.2.html";>https://helix.apache.org/1.4.2-docs/releasenotes/release-1.4.2.html
   What's Changed
   
   Use actions/upload-artifact@v4, v2 was deprecated by https://github.com/GrantPSpencer";>@​GrantPSpencer in https://redirect.github.com/apache/helix/pull/2911";>apache/helix#2911
   Fix new resource causing pipeline NPE when rebalance overwrites required 
by https://github.com/GrantPSpencer";>@​GrantPSpencer 
in https://redirect.github.com/apache/helix/pull/2914";>apache/helix#2914
   Pin netty_codec to higher version to address vulnerability by https://github.com/zpinto";>@​zpinto in https://redirect.github.com/apache/helix/pull/2924";>apache/helix#2924
   Bump express from 4.19.2 to 4.20.0 in /helix-front by https://github.com/dependabot";>@​dependabot in https://redirect.github.com/apache/helix/pull/2917";>apache/helix#2917
   Support aggregated endpoint for customized stoppable check by https://github.com/MarkGaox";>@​MarkGaox in https://redirect.github.com/apache/helix/pull/2919";>apache/helix#2919
   Fix flaky test - testMultipleWrites by https://github.com/GrantPSpencer";>@​GrantPSpencer in https://redirect.github.com/apache/helix/pull/2926";>apache/helix#2926
   Fix flaky test - testExists in TestZookeeperAccessor by https://github.com/GrantPSpencer";>@​GrantPSpencer in https://redirect.github.com/apache/helix/pull/2931";>apache/helix#2931
   Bump commons-io:commons-io from 2.11.0 to 2.14.0 in /helix-core by https://github.com/dependabot";>@​dependabot in https://redirect.github.com/apache/helix/pull/2935";>apache/helix#2935
   Bump commons-io:commons-io from 2.11.0 to 2.14.0 in /meta-client by https://github.com/dependabot";>@​dependabot in https://redirect.github.com/apache/helix/pull/2936";>apache/helix#2936
   Add test for topology migration by resource group by https://github.com/zpinto";>@​zpinto in https://redirect.github.com/apache/helix/pull/2933";>apache/helix#2933
   Fix BestPossibleExternalViewVerifier for WAGED resource by https://github.com/MarkGaox";>@​MarkGaox in https://redirect.github.com/apache/helix/pull/2939";>apache/helix#2939
   Bump org.eclipse.jetty:jetty-server from 9.4.51.v20230217 to 
9.4.55.v20240627 by https://github.com/dependabot";>@​dependabot in https://redirect.github.com/apache/helix/pull/2947";>apache/helix#2947
   Bump http-proxy-middleware from 2.0.6 to 2.0.7 in /helix-front by https://github.com/dependabot";>@​dependabot in https://redirect.github.com/apache/helix/pull/2952";>apache/helix#2952
   Fix helix-rest memory leak by https://github.com/MarkGaox";>@​MarkGaox in https://redirect.github.com/apache/helix/pull/2960";>apache/helix#2960
   
   Full Changelog: https://github.com/apache/helix/compare/helix-1.4.1...helix-1.4.2";>https://github.com/apache/helix/compare/helix-1.4.1...helix-1.4.2
   Apache Helix 1.4.1
   Apache Release Note: https://helix.apache.org/1.4.1-docs/releasenotes/release-1.4.1.html";>https://helix.apache.org/1.4.1-docs/releasenotes/release-1.4.1.html
   What's Changed
   
   Metaclient updater retry logic by https://github.com/GrantPSpencer";>@​GrantPSpencer in https://redirect.github.com/apache/helix/pull/2805";>apache/helix#2805
   Fix race condition when reconnect by https://github.com/xyuanlu";>@​xyuanlu in https://redirect.github.com/apache/helix/pull/2814";>apache/helix#2814
   Fix flaky 
updateInstance(org.apache.helix.rest.server.TestPerInstanceAccessor)  by https://github.com/zpinto";>@​zpinto in https://redirect.github.com/apache/helix/pull/2825";>apache/helix#2825
   setup cluster for ServiceDiscoveryDemo by https://github.com/JoeCqupt";>@​JoeCqupt in https://redirect.github.com/apache/helix/pull/2812";>apache/helix#2812
   [apache/helix] -- Added detail in the Exception message for WAGED 
rebalance (hard constraint) failures. by https://github.com/himanshukandwal";>@​himanshukandwal in 
https://redirect.github.com/apache/helix/pull/2829";>apache/helix#2829
   Change partitionAssignment API to handle ANY_LIVEINSTANCE by https://github.com/GrantPSpencer";>@​GrantPSpencer in https://redirect.github.com/apache/helix/pull/2817";>apache/helix#2817
   [apache/helix] -- Package resources in JDK 1.8 (backward compatible) jar 
by https://github.com/himanshukandwal";>@​himanshukandwal in 
https://redirect.github.com/apache/helix/pull/2833";>apache/helix#2833
   Fix unstable tests of TestPartitionAssignmentAPI and 
TestPerInstanceAccessor by https://github.com/junkaixue";>@​junkaixue in https://redirect.github.com/apache/helix/pull/2843";>apache/helix#2843
   Add support for ALL_RESOURCES key to disabled partitions by

[PR] Bump com.google.cloud:libraries-bom from 26.53.0 to 26.54.0 [pinot]

2025-02-04 Thread via GitHub


dependabot[bot] opened a new pull request, #14988:
URL: https://github.com/apache/pinot/pull/14988

   Bumps 
[com.google.cloud:libraries-bom](https://github.com/googleapis/java-cloud-bom) 
from 26.53.0 to 26.54.0.
   
   Release notes
   Sourced from https://github.com/googleapis/java-cloud-bom/releases";>com.google.cloud:libraries-bom's
 releases.
   
   v26.54.0
   GCP Libraries BOM 26.54.0
   Here are the differences from the previous version (26.53.0)
   New Addition
   
   com.google.cloud:google-cloud-parametermanager:0.1.0
   
   The group ID of the following artifacts is 
com.google.cloud.
   Notable Changes
   google-cloud-bigquery 2.47.0 (prev: 2.46.0)
   
   
   bigquery: Support resource tags for datasets in java 
client (https://redirect.github.com/googleapis/java-bigquery/issues/3647";>#3647)
 (https://github.com/googleapis/java-bigquery/commit/01e0b742b9ffeafaa89b080a39d8a66c12c1fd3b";>01e0b74)
   
   
   bigquery: Remove ReadAPI bypass in executeSelect() (https://redirect.github.com/googleapis/java-bigquery/issues/3624";>#3624)
 (https://github.com/googleapis/java-bigquery/commit/fadd992a63fd1bc87c99cc689ed103f05de49a99";>fadd992)
   
   
   Close bq read client (https://redirect.github.com/googleapis/java-bigquery/issues/3644";>#3644)
 (https://github.com/googleapis/java-bigquery/commit/8833c97d73e3ba8e6a2061bbc55a6254b9e6668e";>8833c97)
   
   
   google-cloud-datastore 2.26.0 (prev: 2.25.2)
   
   Add firestoreInDatastoreMode for datastore emulator (https://redirect.github.com/googleapis/java-datastore/issues/1698";>#1698)
 (https://github.com/googleapis/java-datastore/commit/50f106d4c50884ce471a66c00df322270fe4a91c";>50f106d)
   
   google-cloud-spanner 6.86.0 (prev: 6.85.0)
   
   
   Add sample for asymmetric autoscaling instances (https://redirect.github.com/googleapis/java-spanner/issues/3562";>#3562)
 (https://github.com/googleapis/java-spanner/commit/3584b81a27bfcdd071fbf7e0d40dfa840ea88151";>3584b81)
   
   
   Support graph and pipe queries in Connection API (https://redirect.github.com/googleapis/java-spanner/issues/3586";>#3586)
 (https://github.com/googleapis/java-spanner/commit/71c306346d5b3805f55d5698cf8867d5f4ae519e";>71c3063)
   
   
   Always add instance-id for built-in metrics (https://redirect.github.com/googleapis/java-spanner/issues/3612";>#3612)
 (https://github.com/googleapis/java-spanner/commit/705b627646f1679b7d1c4c1f86a853872cf8bfd5";>705b627)
   
   
   spanner: Moved mTLSContext configurator from builder to 
construtor (https://redirect.github.com/googleapis/java-spanner/issues/3605";>#3605)
 (https://github.com/googleapis/java-spanner/commit/ac7c30bfb14bdafc11675c2a120effde4a71c922";>ac7c30b)
   
   
   google-cloud-storage 2.48.1 (prev: 2.47.0)
   
   
   Add new Storage#moveBlob method to atomically rename an object (https://redirect.github.com/googleapis/java-storage/issues/2882";>#2882)
 (https://github.com/googleapis/java-storage/commit/c49fd08582c7235919270c1dd4eb2ece6933d302";>c49fd08)
   
   
   Update Signed URL default scheme to resolve from storage options host (https://redirect.github.com/googleapis/java-storage/issues/2880";>#2880)
 (https://github.com/googleapis/java-storage/commit/7ae7e3998930c1bec72ff7c06ebc2b66343852ca";>7ae7e39),
 closes https://redirect.github.com/googleapis/java-storage/issues/2870";>#2870
   
   
   Update StorageException translation of an ApiException to include error 
details (https://redirect.github.com/googleapis/java-storage/issues/2872";>#2872)
 (https://github.com/googleapis/java-storage/commit/8ad501012fab0dfd8d0f0dce49d7c681540022a9";>8ad5010)
   
   
   Update batch handling to ensure each operation has its own unique 
idempotency-token (https://redirect.github.com/googleapis/java-storage/issues/2905";>#2905)
 (https://github.com/googleapis/java-storage/commit/8d79b8d9cea30c6bba0d2550fa397b8c8b7acc3c";>8d79b8d)
   
   
   Other libraries
   
   [aiplatform] add Context Cache to v1 (https://github.com/googleapis/google-cloud-java/commit/87de77d00b5bb8bcea1046a412288386e65bba0d";>87de77d)
   [aiplatform] Add machine_spec, data_persistent_disk_spec, network_spec, 
euc_config, shielded_vm_config to 
.google.cloud.aiplatform.v1beta1.NotebookRuntime (https://github.com/googleapis/google-cloud-java/commit/87de77d00b5bb8bcea1046a412288386e65bba0d";>87de77d)
   [aiplatform] Add machine_spec, data_persistent_disk_spec, network_spec, 
euc_config, shielded_vm_config to message 
.google.cloud.aiplatform.v1.NotebookRuntime (https://github.com/googleapis/google-cloud-java/commit/87de77d00b5bb8bcea1046a412288386e65bba0d";>87de77d)
   [aiplatform] add optimized config in v1 API (https://github.com/googleapis/google-cloud-java/commit/87de77d00b5bb8bcea1046a412288386e65bba0d";>87de77d)
   [aiplatform] add per-modality token count break downs for GenAI APIs (https://github.com/googleapis/google-cloud-java/commit/87de77d00b5bb8bcea1046a412288386e65bba0d";>87de77d)
   [aiplatform] add per-modality token count break downs for Gen

[PR] Bump software.amazon.awssdk:bom from 2.30.11 to 2.30.12 [pinot]

2025-02-04 Thread via GitHub


dependabot[bot] opened a new pull request, #14987:
URL: https://github.com/apache/pinot/pull/14987

   Bumps software.amazon.awssdk:bom from 2.30.11 to 2.30.12.
   
   
   Most Recent Ignore Conditions Applied to This Pull Request
   
   | Dependency Name | Ignore Conditions |
   | --- | --- |
   | software.amazon.awssdk:bom | [< 2.29, > 2.28.12] |
   
   
   
   [![Dependabot compatibility 
score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=software.amazon.awssdk:bom&package-manager=maven&previous-version=2.30.11&new-version=2.30.12)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)
   
   Dependabot will resolve any conflicts with this PR as long as you don't 
alter it yourself. You can also trigger a rebase manually by commenting 
`@dependabot rebase`.
   
   [//]: # (dependabot-automerge-start)
   [//]: # (dependabot-automerge-end)
   
   ---
   
   
   Dependabot commands and options
   
   
   You can trigger Dependabot actions by commenting on this PR:
   - `@dependabot rebase` will rebase this PR
   - `@dependabot recreate` will recreate this PR, overwriting any edits that 
have been made to it
   - `@dependabot merge` will merge this PR after your CI passes on it
   - `@dependabot squash and merge` will squash and merge this PR after your CI 
passes on it
   - `@dependabot cancel merge` will cancel a previously requested merge and 
block automerging
   - `@dependabot reopen` will reopen this PR if it is closed
   - `@dependabot close` will close this PR and stop Dependabot recreating it. 
You can achieve the same result by closing it manually
   - `@dependabot show  ignore conditions` will show all of 
the ignore conditions of the specified dependency
   - `@dependabot ignore this major version` will close this PR and stop 
Dependabot creating any more for this major version (unless you reopen the PR 
or upgrade to it yourself)
   - `@dependabot ignore this minor version` will close this PR and stop 
Dependabot creating any more for this minor version (unless you reopen the PR 
or upgrade to it yourself)
   - `@dependabot ignore this dependency` will close this PR and stop 
Dependabot creating any more for this dependency (unless you reopen the PR or 
upgrade to it yourself)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[PR] Bump joda-time:joda-time from 2.13.0 to 2.13.1 [pinot]

2025-02-04 Thread via GitHub


dependabot[bot] opened a new pull request, #14986:
URL: https://github.com/apache/pinot/pull/14986

   Bumps [joda-time:joda-time](https://github.com/JodaOrg/joda-time) from 
2.13.0 to 2.13.1.
   
   Release notes
   Sourced from https://github.com/JodaOrg/joda-time/releases";>joda-time:joda-time's 
releases.
   
   Release v2.13.1
   See the https://www.joda.org/joda-time/changes-report.html#a2.13.1";>change 
notes for more information.
   What's Changed
   
   Update time zone data to 2025agtz by https://github.com/github-actions";>@​github-actions in 
https://redirect.github.com/JodaOrg/joda-time/pull/805";>JodaOrg/joda-time#805
   
   Full Changelog: https://github.com/JodaOrg/joda-time/compare/v2.13.0...v2.13.1";>https://github.com/JodaOrg/joda-time/compare/v2.13.0...v2.13.1
   
   
   
   Commits
   
   https://github.com/JodaOrg/joda-time/commit/935d44edf8ce824005e2e8423536ad23d0dd6016";>935d44e
 Release v2.13.1
   https://github.com/JodaOrg/joda-time/commit/f42d2d039f94021c35c699393f6701d760b8d800";>f42d2d0
 Update time zone data to 2025agtz (https://redirect.github.com/JodaOrg/joda-time/issues/805";>#805)
   https://github.com/JodaOrg/joda-time/commit/3fcd96c262113a4d0cc0806c78669198f01d46f9";>3fcd96c
 Update CI fuzz version
   https://github.com/JodaOrg/joda-time/commit/8430877b2da61124a4a2527a5c45f0cb43405470";>8430877
 Add workflow dispatch
   https://github.com/JodaOrg/joda-time/commit/2ce9b4cdc207130a186dafbb8456cd0b0cfd9e4c";>2ce9b4c
 Update changes.xml
   See full diff in https://github.com/JodaOrg/joda-time/compare/v2.13.0...v2.13.1";>compare 
view
   
   
   
   
   
   [![Dependabot compatibility 
score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=joda-time:joda-time&package-manager=maven&previous-version=2.13.0&new-version=2.13.1)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)
   
   Dependabot will resolve any conflicts with this PR as long as you don't 
alter it yourself. You can also trigger a rebase manually by commenting 
`@dependabot rebase`.
   
   [//]: # (dependabot-automerge-start)
   [//]: # (dependabot-automerge-end)
   
   ---
   
   
   Dependabot commands and options
   
   
   You can trigger Dependabot actions by commenting on this PR:
   - `@dependabot rebase` will rebase this PR
   - `@dependabot recreate` will recreate this PR, overwriting any edits that 
have been made to it
   - `@dependabot merge` will merge this PR after your CI passes on it
   - `@dependabot squash and merge` will squash and merge this PR after your CI 
passes on it
   - `@dependabot cancel merge` will cancel a previously requested merge and 
block automerging
   - `@dependabot reopen` will reopen this PR if it is closed
   - `@dependabot close` will close this PR and stop Dependabot recreating it. 
You can achieve the same result by closing it manually
   - `@dependabot show  ignore conditions` will show all of 
the ignore conditions of the specified dependency
   - `@dependabot ignore this major version` will close this PR and stop 
Dependabot creating any more for this major version (unless you reopen the PR 
or upgrade to it yourself)
   - `@dependabot ignore this minor version` will close this PR and stop 
Dependabot creating any more for this minor version (unless you reopen the PR 
or upgrade to it yourself)
   - `@dependabot ignore this dependency` will close this PR and stop 
Dependabot creating any more for this dependency (unless you reopen the PR or 
upgrade to it yourself)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[PR] Bump org.checkerframework:checker-qual from 3.48.4 to 3.49.0 [pinot]

2025-02-04 Thread via GitHub


dependabot[bot] opened a new pull request, #14985:
URL: https://github.com/apache/pinot/pull/14985

   Bumps 
[org.checkerframework:checker-qual](https://github.com/typetools/checker-framework)
 from 3.48.4 to 3.49.0.
   
   Release notes
   Sourced from https://github.com/typetools/checker-framework/releases";>org.checkerframework:checker-qual's
 releases.
   
   Checker Framework 3.49.0
   Version 3.49.0 (February 3, 2025)
   User-visible changes:
   The Optional Checker is more precise for Optional values 
resulting from
   operations on container types (e.g., List, Map, 
Iterable).  It supports
   two new annotations:
   
   @NonEmpty
   @UnknownNonEmpty
   
   The Signature Checker no longer supports 
@BinaryNameWithoutPackage because
   it is equivalent to @Identifier; use @Identifier 
instead.
   The JavaStubifier implementation now appears in package 
org.checkerframework.framework.stubifier.JavaStubifier.
   Closed issues:
   https://redirect.github.com/typetools/checker-framework/issues/6935";>#6935,
 https://redirect.github.com/typetools/checker-framework/issues/6936";>#6936,
 https://redirect.github.com/typetools/checker-framework/issues/6939";>#6939.
   
   
   
   Changelog
   Sourced from https://github.com/typetools/checker-framework/blob/master/docs/CHANGELOG.md";>org.checkerframework:checker-qual's
 changelog.
   
   Version 3.49.0 (February 3, 2025)
   User-visible changes:
   The Optional Checker is more precise for Optional values 
resulting from
   operations on container types (e.g., List, Map, 
Iterable).  It supports
   two new annotations:
   
   @NonEmpty
   @UnknownNonEmpty
   
   The Signature Checker no longer supports 
@BinaryNameWithoutPackage because
   it is equivalent to @Identifier; use @Identifier 
instead.
   The JavaStubifier implementation now appears in package 
org.checkerframework.framework.stubifier.JavaStubifier.
   Closed issues:
   https://redirect.github.com/typetools/checker-framework/issues/6935";>#6935,
 https://redirect.github.com/typetools/checker-framework/issues/6936";>#6936,
 https://redirect.github.com/typetools/checker-framework/issues/6939";>#6939.
   
   
   
   Commits
   
   https://github.com/typetools/checker-framework/commit/8ae32b8532a518d50736a6f3e6a4803f87ee16a7";>8ae32b8
 new release 3.49.0
   https://github.com/typetools/checker-framework/commit/852188908513895ee85e2a1db863c7e74b8684ea";>8521889
 Prep for release.
   https://github.com/typetools/checker-framework/commit/637c79a688db2daa09a6239ef224d88567ef5d06";>637c79a
 Improve diagnostics
   https://github.com/typetools/checker-framework/commit/1f69ec2f4203a6161d178d42389c32cf82990b84";>1f69ec2
 Update dependency gradle to v8.12.1 (https://redirect.github.com/typetools/checker-framework/issues/6937";>#6937)
   https://github.com/typetools/checker-framework/commit/64e84df0e09aa3ca381b5812896669fe17c60442";>64e84df
 Fix Gradle deprecation warnings
   https://github.com/typetools/checker-framework/commit/b7aea1eae09034ca83ff7bd6d8320cb4b3280abc";>b7aea1e
 Update to Gradle 8.12
   https://github.com/typetools/checker-framework/commit/b48f3b4b2f1f701892f17871278916bc076ed830";>b48f3b4
 Fix favicon problem. (https://redirect.github.com/typetools/checker-framework/issues/6964";>#6964)
   https://github.com/typetools/checker-framework/commit/de51e9f08eea783efadbab75362ff1ea076b5e49";>de51e9f
 Use built in file tools rather than exec in Gradle code
   https://github.com/typetools/checker-framework/commit/3531816ce29ce122280d22ac06df1c76821d97c4";>3531816
 Move code that adds favicon to separate task
   https://github.com/typetools/checker-framework/commit/441b52773e23466f2bd6746cddf5711a311e058d";>441b527
 Remove exec from settings in Gradle code
   Additional commits viewable in https://github.com/typetools/checker-framework/compare/checker-framework-3.48.4...checker-framework-3.49.0";>compare
 view
   
   
   
   
   
   [![Dependabot compatibility 
score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=org.checkerframework:checker-qual&package-manager=maven&previous-version=3.48.4&new-version=3.49.0)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)
   
   Dependabot will resolve any conflicts with this PR as long as you don't 
alter it yourself. You can also trigger a rebase manually by commenting 
`@dependabot rebase`.
   
   [//]: # (dependabot-automerge-start)
   [//]: # (dependabot-automerge-end)
   
   ---
   
   
   Dependabot commands and options
   
   
   You can trigger Dependabot actions by commenting on this PR:
   - `@dependabot rebase` will rebase this PR
   - `@dependabot recreate` will recreate this PR, overwriting any edits that 
have been made to it
   - `@dependabot merge` will merge this PR after your CI passes on it
   - `@dependabot squash and merge` will squash and merge this PR after your CI 
passes on it
   - `@dependabot cancel merge` will cancel a 

Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]

2025-02-04 Thread via GitHub


9aman commented on code in PR #14920:
URL: https://github.com/apache/pinot/pull/14920#discussion_r1940974264


##
pinot-server/src/main/java/org/apache/pinot/server/api/resources/reingestion/utils/StatelessRealtimeSegmentDataManager.java:
##
@@ -0,0 +1,575 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources.reingestion.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
+import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager;
+import 
org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
+import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.utils.IngestionUtils;
+import org.apache.pinot.segment.spi.MutableSegment;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
+import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamDataDecoder;
+import org.apache.pinot.spi.stream.StreamDataDecoderImpl;
+import org.apache.pinot.spi.stream.StreamDataDecoderResult;
+import org.apache.pinot.spi.stream.StreamMessage;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Simplified Segment Data Manager for ingesting data from a start offset to 
an end offset.
+ */
+public class StatelessRealtimeSegmentDataManager extends SegmentDataManager {
+
+  private static final int DEFAULT_CAPACITY = 100_000;
+  private static final int DEFAULT_FETCH_TIMEOUT_MS = 5000;
+  public static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = 
new FileUploadDownloadClient();
+
+  private final String _segmentName;
+  private final String _tableNameWithType;
+  private final int _partitionGroupId;
+  private final String _segmentNameStr;
+  private final SegmentZKMetadata _segmentZKMetadata;
+  private final TableConfig _tableConfig;
+  private final Schema _schema;
+  private final StreamConfi

Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]

2025-02-04 Thread via GitHub


9aman commented on code in PR #14920:
URL: https://github.com/apache/pinot/pull/14920#discussion_r1940959863


##
pinot-server/src/main/java/org/apache/pinot/server/api/resources/reingestion/utils/StatelessRealtimeSegmentDataManager.java:
##
@@ -0,0 +1,575 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources.reingestion.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
+import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager;
+import 
org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
+import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.utils.IngestionUtils;
+import org.apache.pinot.segment.spi.MutableSegment;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
+import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamDataDecoder;
+import org.apache.pinot.spi.stream.StreamDataDecoderImpl;
+import org.apache.pinot.spi.stream.StreamDataDecoderResult;
+import org.apache.pinot.spi.stream.StreamMessage;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Simplified Segment Data Manager for ingesting data from a start offset to 
an end offset.
+ */
+public class StatelessRealtimeSegmentDataManager extends SegmentDataManager {
+
+  private static final int DEFAULT_CAPACITY = 100_000;
+  private static final int DEFAULT_FETCH_TIMEOUT_MS = 5000;
+  public static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = 
new FileUploadDownloadClient();
+
+  private final String _segmentName;
+  private final String _tableNameWithType;
+  private final int _partitionGroupId;
+  private final String _segmentNameStr;
+  private final SegmentZKMetadata _segmentZKMetadata;
+  private final TableConfig _tableConfig;
+  private final Schema _schema;
+  private final StreamConfi

Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]

2025-02-04 Thread via GitHub


9aman commented on code in PR #14920:
URL: https://github.com/apache/pinot/pull/14920#discussion_r1940951429


##
pinot-server/src/main/java/org/apache/pinot/server/api/resources/reingestion/utils/StatelessRealtimeSegmentDataManager.java:
##
@@ -0,0 +1,575 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources.reingestion.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
+import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager;
+import 
org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
+import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.utils.IngestionUtils;
+import org.apache.pinot.segment.spi.MutableSegment;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
+import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamDataDecoder;
+import org.apache.pinot.spi.stream.StreamDataDecoderImpl;
+import org.apache.pinot.spi.stream.StreamDataDecoderResult;
+import org.apache.pinot.spi.stream.StreamMessage;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Simplified Segment Data Manager for ingesting data from a start offset to 
an end offset.
+ */
+public class StatelessRealtimeSegmentDataManager extends SegmentDataManager {
+
+  private static final int DEFAULT_CAPACITY = 100_000;
+  private static final int DEFAULT_FETCH_TIMEOUT_MS = 5000;
+  public static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = 
new FileUploadDownloadClient();
+
+  private final String _segmentName;
+  private final String _tableNameWithType;
+  private final int _partitionGroupId;
+  private final String _segmentNameStr;
+  private final SegmentZKMetadata _segmentZKMetadata;
+  private final TableConfig _tableConfig;
+  private final Schema _schema;
+  private final StreamConfi

Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]

2025-02-04 Thread via GitHub


9aman commented on code in PR #14920:
URL: https://github.com/apache/pinot/pull/14920#discussion_r1940948236


##
pinot-server/src/main/java/org/apache/pinot/server/api/resources/reingestion/utils/StatelessRealtimeSegmentDataManager.java:
##
@@ -0,0 +1,575 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources.reingestion.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
+import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager;
+import 
org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
+import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.utils.IngestionUtils;
+import org.apache.pinot.segment.spi.MutableSegment;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
+import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamDataDecoder;
+import org.apache.pinot.spi.stream.StreamDataDecoderImpl;
+import org.apache.pinot.spi.stream.StreamDataDecoderResult;
+import org.apache.pinot.spi.stream.StreamMessage;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Simplified Segment Data Manager for ingesting data from a start offset to 
an end offset.
+ */
+public class StatelessRealtimeSegmentDataManager extends SegmentDataManager {
+
+  private static final int DEFAULT_CAPACITY = 100_000;
+  private static final int DEFAULT_FETCH_TIMEOUT_MS = 5000;
+  public static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = 
new FileUploadDownloadClient();
+
+  private final String _segmentName;
+  private final String _tableNameWithType;
+  private final int _partitionGroupId;
+  private final String _segmentNameStr;
+  private final SegmentZKMetadata _segmentZKMetadata;
+  private final TableConfig _tableConfig;
+  private final Schema _schema;
+  private final StreamConfi

Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]

2025-02-04 Thread via GitHub


9aman commented on code in PR #14920:
URL: https://github.com/apache/pinot/pull/14920#discussion_r1940875087


##
pinot-server/src/main/java/org/apache/pinot/server/api/resources/reingestion/utils/StatelessRealtimeSegmentDataManager.java:
##
@@ -0,0 +1,575 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources.reingestion.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
+import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager;
+import 
org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
+import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.utils.IngestionUtils;
+import org.apache.pinot.segment.spi.MutableSegment;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
+import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamDataDecoder;
+import org.apache.pinot.spi.stream.StreamDataDecoderImpl;
+import org.apache.pinot.spi.stream.StreamDataDecoderResult;
+import org.apache.pinot.spi.stream.StreamMessage;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Simplified Segment Data Manager for ingesting data from a start offset to 
an end offset.
+ */
+public class StatelessRealtimeSegmentDataManager extends SegmentDataManager {
+
+  private static final int DEFAULT_CAPACITY = 100_000;
+  private static final int DEFAULT_FETCH_TIMEOUT_MS = 5000;
+  public static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = 
new FileUploadDownloadClient();
+
+  private final String _segmentName;
+  private final String _tableNameWithType;
+  private final int _partitionGroupId;
+  private final String _segmentNameStr;
+  private final SegmentZKMetadata _segmentZKMetadata;
+  private final TableConfig _tableConfig;
+  private final Schema _schema;
+  private final StreamConfi

Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]

2025-02-04 Thread via GitHub


9aman commented on code in PR #14920:
URL: https://github.com/apache/pinot/pull/14920#discussion_r1940867017


##
pinot-server/src/main/java/org/apache/pinot/server/api/resources/reingestion/utils/StatelessRealtimeSegmentDataManager.java:
##
@@ -0,0 +1,575 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources.reingestion.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
+import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager;
+import 
org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
+import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.utils.IngestionUtils;
+import org.apache.pinot.segment.spi.MutableSegment;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
+import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamDataDecoder;
+import org.apache.pinot.spi.stream.StreamDataDecoderImpl;
+import org.apache.pinot.spi.stream.StreamDataDecoderResult;
+import org.apache.pinot.spi.stream.StreamMessage;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Simplified Segment Data Manager for ingesting data from a start offset to 
an end offset.
+ */
+public class StatelessRealtimeSegmentDataManager extends SegmentDataManager {
+
+  private static final int DEFAULT_CAPACITY = 100_000;
+  private static final int DEFAULT_FETCH_TIMEOUT_MS = 5000;
+  public static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = 
new FileUploadDownloadClient();
+
+  private final String _segmentName;
+  private final String _tableNameWithType;
+  private final int _partitionGroupId;
+  private final String _segmentNameStr;
+  private final SegmentZKMetadata _segmentZKMetadata;
+  private final TableConfig _tableConfig;
+  private final Schema _schema;
+  private final StreamConfi

Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]

2025-02-04 Thread via GitHub


9aman commented on code in PR #14920:
URL: https://github.com/apache/pinot/pull/14920#discussion_r1940866516


##
pinot-server/src/main/java/org/apache/pinot/server/api/resources/reingestion/utils/StatelessRealtimeSegmentDataManager.java:
##
@@ -0,0 +1,575 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources.reingestion.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
+import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager;
+import 
org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
+import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.utils.IngestionUtils;
+import org.apache.pinot.segment.spi.MutableSegment;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
+import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamDataDecoder;
+import org.apache.pinot.spi.stream.StreamDataDecoderImpl;
+import org.apache.pinot.spi.stream.StreamDataDecoderResult;
+import org.apache.pinot.spi.stream.StreamMessage;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Simplified Segment Data Manager for ingesting data from a start offset to 
an end offset.
+ */
+public class StatelessRealtimeSegmentDataManager extends SegmentDataManager {
+
+  private static final int DEFAULT_CAPACITY = 100_000;
+  private static final int DEFAULT_FETCH_TIMEOUT_MS = 5000;
+  public static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = 
new FileUploadDownloadClient();
+
+  private final String _segmentName;
+  private final String _tableNameWithType;
+  private final int _partitionGroupId;
+  private final String _segmentNameStr;
+  private final SegmentZKMetadata _segmentZKMetadata;
+  private final TableConfig _tableConfig;
+  private final Schema _schema;
+  private final StreamConfi

Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]

2025-02-04 Thread via GitHub


9aman commented on code in PR #14920:
URL: https://github.com/apache/pinot/pull/14920#discussion_r1940863270


##
pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReIngestionResource.java:
##
@@ -0,0 +1,533 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources;
+
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.NameValuePair;
+import org.apache.hc.core5.http.message.BasicHeader;
+import org.apache.hc.core5.http.message.BasicNameValuePair;
+import org.apache.pinot.common.auth.AuthProviderUtils;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.SimpleHttpResponse;
+import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.common.utils.URIUtils;
+import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
+import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.server.api.resources.reingestion.ReIngestionRequest;
+import org.apache.pinot.server.api.resources.reingestion.ReIngestionResponse;
+import 
org.apache.pinot.server.api.resources.reingestion.utils.StatelessRealtimeSegmentDataManager;
+import org.apache.pinot.server.realtime.ControllerLeaderLocator;
+import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
+import org.apache.pinot.server.starter.ServerInstance;
+import org.apache.pinot.spi.auth.AuthProvider;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.StringUtil;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.sl

Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]

2025-02-04 Thread via GitHub


9aman commented on code in PR #14920:
URL: https://github.com/apache/pinot/pull/14920#discussion_r1940830134


##
pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReIngestionResource.java:
##
@@ -0,0 +1,533 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources;
+
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.NameValuePair;
+import org.apache.hc.core5.http.message.BasicHeader;
+import org.apache.hc.core5.http.message.BasicNameValuePair;
+import org.apache.pinot.common.auth.AuthProviderUtils;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.SimpleHttpResponse;
+import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.common.utils.URIUtils;
+import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
+import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.server.api.resources.reingestion.ReIngestionRequest;
+import org.apache.pinot.server.api.resources.reingestion.ReIngestionResponse;
+import 
org.apache.pinot.server.api.resources.reingestion.utils.StatelessRealtimeSegmentDataManager;
+import org.apache.pinot.server.realtime.ControllerLeaderLocator;
+import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
+import org.apache.pinot.server.starter.ServerInstance;
+import org.apache.pinot.spi.auth.AuthProvider;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.StringUtil;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.sl

Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]

2025-02-04 Thread via GitHub


9aman commented on code in PR #14920:
URL: https://github.com/apache/pinot/pull/14920#discussion_r1940830134


##
pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReIngestionResource.java:
##
@@ -0,0 +1,533 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources;
+
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.NameValuePair;
+import org.apache.hc.core5.http.message.BasicHeader;
+import org.apache.hc.core5.http.message.BasicNameValuePair;
+import org.apache.pinot.common.auth.AuthProviderUtils;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.SimpleHttpResponse;
+import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.common.utils.URIUtils;
+import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
+import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.server.api.resources.reingestion.ReIngestionRequest;
+import org.apache.pinot.server.api.resources.reingestion.ReIngestionResponse;
+import 
org.apache.pinot.server.api.resources.reingestion.utils.StatelessRealtimeSegmentDataManager;
+import org.apache.pinot.server.realtime.ControllerLeaderLocator;
+import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
+import org.apache.pinot.server.starter.ServerInstance;
+import org.apache.pinot.spi.auth.AuthProvider;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.StringUtil;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.sl

Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]

2025-02-04 Thread via GitHub


9aman commented on code in PR #14920:
URL: https://github.com/apache/pinot/pull/14920#discussion_r1940790579


##
pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java:
##
@@ -1274,6 +1275,47 @@ public File downloadUntarFileStreamed(URI uri, File 
dest, AuthProvider authProvi
 httpHeaders, maxStreamRateInByte);
   }
 
+  /**
+   * Invokes the server's reIngestSegment API via a POST request with JSON 
payload,
+   * using Simple HTTP APIs.
+   *
+   * POST http://[serverURL]/reIngestSegment
+   * {
+   *   "tableNameWithType": [tableName],
+   *   "segmentName": [segmentName]
+   * }
+   */
+  public void triggerReIngestion(String serverHostPort, String 
tableNameWithType, String segmentName)
+  throws IOException, URISyntaxException, HttpErrorStatusException {
+String scheme = HTTP;
+if (serverHostPort.contains(HTTPS)) {
+  scheme = HTTPS;
+  serverHostPort = serverHostPort.replace(HTTPS + "://", "");
+} else if (serverHostPort.contains(HTTP)) {

Review Comment:
   I think it's better to update the tests than put a condition here. Not sure 
whether we will run into this. 
   
   Seems fine for now though. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[I] Pinot Sparse Data Support [pinot]

2025-02-04 Thread via GitHub


alexch2000 opened a new issue, #14984:
URL: https://github.com/apache/pinot/issues/14984

   Hey team, we’re building a funnel observability solution using Pinot, where 
we have a large table with more than 200B rows and around 300+ columns. Of 
these, only 20–30 columns are commonly used, while the rest are almost always 
NULL (sparse). Based on my understanding of Pinot’s architecture, each column’s 
forward index will store some default value for every row, even if the column 
is NULL most of the time.
   What is your recommendation for dealing with a table that has hundreds of 
mostly NULL columns, especially in terms of segment sizing, ingestion 
performance, and ongoing query efficiency?
   
   Are there any roadmap items or upcoming features aimed at optimizing storage 
or performance for extremely sparse columns (e.g., skipping or compressing 
columns with mostly NULL values)?
   
   Are there any existing best practices or configurations we could apply to 
make this more efficient (e.g., using **defaultNullValue**, dictionary vs. raw 
encoding, or segment-level pruning)?
   
   > Pinot always stores column values in a [forward 
index](https://docs.pinot.apache.org/basics/indexing/forward-index). Forward 
index never stores null values but have to store a value for each row. 
Therefore independent of the null handling configuration, Pinot always stores a 
default value for nulls rows in the forward index. The default value used in a 
column can be specified in the 
[schema](https://docs.pinot.apache.org/configuration-reference/schema) 
configuration by setting the defaultNullValue field spec. The defaultNullValue 
depends on the type of data.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Adding a new config skipSegmentPreprocess in table IndexingConfig [pinot]

2025-02-03 Thread via GitHub


xiangfu0 merged PR #14982:
URL: https://github.com/apache/pinot/pull/14982


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[I] Regex Query for TEXT_MATCH is not working for input words with a space [pinot]

2025-02-03 Thread via GitHub


rahulkrishna-dev opened a new issue, #14983:
URL: https://github.com/apache/pinot/issues/14983

   Query being executed: 
   ```
   SELECT 
 keyword
   FROM 
 keywords_rank_v1
   WHERE 
 TEXT_MATCH(keyword, '/.*amul milk.*/')
 ORDER BY rank ASC
 LIMIT 100
   ```
   The above query is expected to return all the keywords having `amul milk` 
but it is not returning anything. My hypothesis is that the regex expression is 
being treated as a term query because of the space character, I couldn't find a 
way to escape the space character. Please help!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Adding a new config enableSegmentPreprocess in table IndexingConfig [pinot]

2025-02-03 Thread via GitHub


codecov-commenter commented on PR #14982:
URL: https://github.com/apache/pinot/pull/14982#issuecomment-2632595967

   ## 
[Codecov](https://app.codecov.io/gh/apache/pinot/pull/14982?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 Report
   Attention: Patch coverage is `50.0%` with `3 lines` in your changes 
missing coverage. Please review.
   > Project coverage is 56.09%. Comparing base 
[(`59551e4`)](https://app.codecov.io/gh/apache/pinot/commit/59551e45224f1535c4863fd577622b37366ccc97?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 to head 
[(`45dbdfb`)](https://app.codecov.io/gh/apache/pinot/commit/45dbdfb97786ecaa3aa5774004b219987fa817ca?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   > Report is 1668 commits behind head on master.
   
   | [Files with missing 
lines](https://app.codecov.io/gh/apache/pinot/pull/14982?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | Patch % | Lines |
   |---|---|---|
   | 
[...indexsegment/immutable/ImmutableSegmentLoader.java](https://app.codecov.io/gh/apache/pinot/pull/14982?src=pr&el=tree&filepath=pinot-segment-local%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpinot%2Fsegment%2Flocal%2Findexsegment%2Fimmutable%2FImmutableSegmentLoader.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9pbmRleHNlZ21lbnQvaW1tdXRhYmxlL0ltbXV0YWJsZVNlZ21lbnRMb2FkZXIuamF2YQ==)
 | 0.00% | [1 Missing and 1 partial :warning: 
](https://app.codecov.io/gh/apache/pinot/pull/14982?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 |
   | 
[...local/segment/index/loader/IndexLoadingConfig.java](https://app.codecov.io/gh/apache/pinot/pull/14982?src=pr&el=tree&filepath=pinot-segment-local%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpinot%2Fsegment%2Flocal%2Fsegment%2Findex%2Floader%2FIndexLoadingConfig.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9zZWdtZW50L2luZGV4L2xvYWRlci9JbmRleExvYWRpbmdDb25maWcuamF2YQ==)
 | 0.00% | [0 Missing and 1 partial :warning: 
](https://app.codecov.io/gh/apache/pinot/pull/14982?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 |
   
   > :exclamation:  There is a different number of reports uploaded between 
BASE (59551e4) and HEAD (45dbdfb). Click for more details.
   > 
   > HEAD has 48 uploads less than BASE
   >
   >| Flag | BASE (59551e4) | HEAD (45dbdfb) |
   >|--|--|--|
   >|integration|7|0|
   >|integration2|3|0|
   >|temurin|12|2|
   >|java-21|7|2|
   >|skip-bytebuffers-true|3|1|
   >|skip-bytebuffers-false|7|1|
   >|unittests|5|2|
   >|java-11|5|0|
   >|unittests2|3|0|
   >|integration1|2|0|
   >|custom-integration1|2|0|
   >
   
   Additional details and impacted files
   
   
   ```diff
   @@ Coverage Diff  @@
   ## master   #14982  +/-   ##
   
   - Coverage 61.75%   56.09%   -5.66% 
   - Complexity  207  710 +503 
   
 Files  2436 2122 -314 
 Lines133233   114572   -18661 
 Branches  2063618444-2192 
   
   - Hits  8227464271   -18003 
   - Misses4491145050 +139 
   + Partials   6048 5251 -797 
   ```
   
   | 
[Flag](https://app.codecov.io/gh/apache/pinot/pull/14982/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | Coverage Δ | |
   |---|---|---|
   | 
[custom-integration1](https://app.codecov.io/gh/apache/pinot/pull/14982/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `?` | |
   | 
[integration](https://app.codecov.io/gh/apache/pinot/pull/14982/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `?` | |
   | 
[integration1](https://app.codecov.io/gh/apache/pinot/pull/14982/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `?` | |
   | 
[integration2](https://app.codecov.io/gh/apache/pinot/pull/14982/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apa

Re: [PR] Pauseless Consumption #3: Disaster Recovery with Reingestion [pinot]

2025-02-03 Thread via GitHub


Jackie-Jiang commented on code in PR #14920:
URL: https://github.com/apache/pinot/pull/14920#discussion_r1940355486


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##
@@ -442,6 +446,10 @@ public IdealState getIdealState(String realtimeTableName) {
 }
   }
 
+  public ExternalView getExternalView(String realtimeTableName) {
+return _helixResourceManager.getTableExternalView(realtimeTableName);
+  }
+

Review Comment:
   (minor) Let's inline this and remove this method. The behavior is not 
consistent with `getIdealState()` which can cause confusion



##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##
@@ -2096,6 +2104,131 @@ URI createSegmentPath(String rawTableName, String 
segmentName) {
 return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, 
URIUtils.encode(segmentName));
   }
 
+  /**
+   * Re-ingests segments that are in ERROR state in EV but ONLINE in IS with 
no peer copy on any server. This method
+   * will call the server reIngestSegment API
+   * on one of the alive servers that are supposed to host that segment 
according to IdealState.
+   *
+   * API signature:
+   *   POST http://[serverURL]/reIngestSegment
+   *   Request body (JSON):
+   *   {
+   * "tableNameWithType": [tableName],
+   * "segmentName": [segmentName]
+   *   }
+   *
+   * @param tableNameWithType The table name with type, e.g. "myTable_REALTIME"
+   */
+  public void reIngestSegmentsWithErrorState(String tableNameWithType) {

Review Comment:
   (minor) When the table type is known, let's directly use the type for 
readability
   ```suggestion
 public void reIngestSegmentsWithErrorState(String realtimeTableName) {
   ```



##
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##
@@ -1243,7 +1243,7 @@ protected boolean buildSegmentAndReplace()
 return true;
   }
 
-  private void closeStreamConsumers() {
+  protected void closeStreamConsumers() {

Review Comment:
   (minor) Seems not needed, or do you plan to add a test?



##
pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java:
##
@@ -1274,6 +1275,39 @@ public File downloadUntarFileStreamed(URI uri, File 
dest, AuthProvider authProvi
 httpHeaders, maxStreamRateInByte);
   }
 
+  /**
+   * Invokes the server's reIngestSegment API via a POST request with JSON 
payload,
+   * using Simple HTTP APIs.
+   *
+   * POST http://[serverURL]/reIngestSegment

Review Comment:
   Suggest just making it `POST /reIngestSegment/{segmentName}` which is easier 
to use.



##
pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java:
##
@@ -1274,6 +1275,39 @@ public File downloadUntarFileStreamed(URI uri, File 
dest, AuthProvider authProvi
 httpHeaders, maxStreamRateInByte);
   }
 
+  /**
+   * Invokes the server's reIngestSegment API via a POST request with JSON 
payload,
+   * using Simple HTTP APIs.
+   *
+   * POST http://[serverURL]/reIngestSegment
+   * {
+   *   "tableNameWithType": [tableName],
+   *   "segmentName": [segmentName]
+   * }
+   */
+  public void triggerReIngestion(String serverHostPort, String 
tableNameWithType, String segmentName)

Review Comment:
   (minor) table name is implicit from the LLC segment name, and we may 
simplify this API



##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##
@@ -2096,6 +2104,131 @@ URI createSegmentPath(String rawTableName, String 
segmentName) {
 return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, 
URIUtils.encode(segmentName));
   }
 
+  /**
+   * Re-ingests segments that are in ERROR state in EV but ONLINE in IS with 
no peer copy on any server. This method
+   * will call the server reIngestSegment API
+   * on one of the alive servers that are supposed to host that segment 
according to IdealState.
+   *
+   * API signature:
+   *   POST http://[serverURL]/reIngestSegment
+   *   Request body (JSON):
+   *   {
+   * "tableNameWithType": [tableName],
+   * "segmentName": [segmentName]
+   *   }
+   *
+   * @param tableNameWithType The table name with type, e.g. "myTable_REALTIME"
+   */
+  public void reIngestSegmentsWithErrorState(String tableNameWithType) {
+// Step 1: Fetch the ExternalView and all segments
+ExternalView externalView = getExternalView(tableNameWithType);
+IdealState idealState = getIdealState(tableNameWithType);
+Map> segmentToInstanceCurrentStateMap = 
externalView.getRecord().getMapFields();
+Map> segmentToInstanceIdealStateMap = 
idealState.getRecord().getMapFields();
+
+// find segments in ERROR state in externalView
+List segmentsInErrorState = new ArrayList<>();
+for (Map.E

Re: [PR] Adding a new config enableSegmentPreprocess in table IndexingConfig [pinot]

2025-02-03 Thread via GitHub


xiangfu0 commented on code in PR #14982:
URL: https://github.com/apache/pinot/pull/14982#discussion_r1940327893


##
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java:
##
@@ -337,6 +340,14 @@ public void setErrorOnColumnBuildFailure(boolean 
errorOnColumnBuildFailure) {
 _errorOnColumnBuildFailure = errorOnColumnBuildFailure;
   }
 
+  public void setEnablePreProcess(boolean enablePreProcess) {
+_enableSegmentPreprocess = enablePreProcess;
+  }
+
+  public boolean isEnablePreProcess() {
+return _enableSegmentPreprocess;
+  }
+

Review Comment:
   it will be set in the `init()` method so cannot use final unless we 
explicitly set this field 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Adding a new config enableSegmentPreprocess in table IndexingConfig [pinot]

2025-02-03 Thread via GitHub


Jackie-Jiang commented on code in PR #14982:
URL: https://github.com/apache/pinot/pull/14982#discussion_r1940316175


##
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java:
##
@@ -337,6 +340,14 @@ public void setErrorOnColumnBuildFailure(boolean 
errorOnColumnBuildFailure) {
 _errorOnColumnBuildFailure = errorOnColumnBuildFailure;
   }
 
+  public void setEnablePreProcess(boolean enablePreProcess) {
+_enableSegmentPreprocess = enablePreProcess;
+  }
+
+  public boolean isEnablePreProcess() {
+return _enableSegmentPreprocess;
+  }
+

Review Comment:
   Don't add them, and make the field final. `IndexLoadingConfig` shouldn't be 
modified



##
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java:
##
@@ -83,6 +84,7 @@ public class IndexLoadingConfig {
   private boolean _enableDefaultStarTree;
   private Map _indexConfigsByColName = new 
HashMap<>();
 
+

Review Comment:
   (nit) Revert



##
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java:
##
@@ -56,6 +56,7 @@ public class IndexLoadingConfig {
   private final InstanceDataManagerConfig _instanceDataManagerConfig;
   private final TableConfig _tableConfig;
   private final Schema _schema;
+  private boolean _enableSegmentPreprocess = true;

Review Comment:
   Suggest renaming it to `_skipSegmentPreprocess` so that the default is 
`false`. Adding default true flag is error prone.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[PR] Adding a new config enableSegmentPreprocess in table IndexingConfig [pinot]

2025-02-03 Thread via GitHub


xiangfu0 opened a new pull request, #14982:
URL: https://github.com/apache/pinot/pull/14982

   This config: `enableSegmentPreprocess` allows the table to skip preprocess 
during server startup or reload


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Bump jakarta.validation:jakarta.validation-api from 2.0.2 to 3.1.1 [pinot]

2025-02-03 Thread via GitHub


Jackie-Jiang commented on PR #14979:
URL: https://github.com/apache/pinot/pull/14979#issuecomment-2632446882

   @dependabot ignore this dependency


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [I] Server replica allowed to overwrite deep store segment after segment was already committed [pinot]

2025-02-03 Thread via GitHub


Jackie-Jiang commented on issue #14786:
URL: https://github.com/apache/pinot/issues/14786#issuecomment-2632457409

   @KKcorps should we simply change the if condition to 
`segmentMetadata.getStatus() != 
CommonConstants.Segment.Realtime.Status.IN_PROGRESS`? Can you help submit a fix?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Bump jakarta.validation:jakarta.validation-api from 2.0.2 to 3.1.1 [pinot]

2025-02-03 Thread via GitHub


dependabot[bot] closed pull request #14979: Bump 
jakarta.validation:jakarta.validation-api from 2.0.2 to 3.1.1
URL: https://github.com/apache/pinot/pull/14979


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Bump jakarta.validation:jakarta.validation-api from 2.0.2 to 3.1.1 [pinot]

2025-02-03 Thread via GitHub


dependabot[bot] commented on PR #14979:
URL: https://github.com/apache/pinot/pull/14979#issuecomment-2632446931

   OK, I won't notify you about jakarta.validation:jakarta.validation-api 
again, unless you re-open this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Introducing MSE result holder config to minimize rehashing for high cardinality group by [pinot]

2025-02-03 Thread via GitHub


codecov-commenter commented on PR #14981:
URL: https://github.com/apache/pinot/pull/14981#issuecomment-2632408914

   ## 
[Codecov](https://app.codecov.io/gh/apache/pinot/pull/14981?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 Report
   Attention: Patch coverage is `71.1%` with `13 lines` in your changes 
missing coverage. Please review.
   > Project coverage is 63.55%. Comparing base 
[(`59551e4`)](https://app.codecov.io/gh/apache/pinot/commit/59551e45224f1535c4863fd577622b37366ccc97?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 to head 
[(`cfe1760`)](https://app.codecov.io/gh/apache/pinot/commit/cfe1760159e1f4ecdb704743fb037890fdb17adc?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   > Report is 1668 commits behind head on master.
   
   | [Files with missing 
lines](https://app.codecov.io/gh/apache/pinot/pull/14981?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | Patch % | Lines |
   |---|---|---|
   | 
[...va/org/apache/pinot/query/runtime/QueryRunner.java](https://app.codecov.io/gh/apache/pinot/pull/14981?src=pr&el=tree&filepath=pinot-query-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpinot%2Fquery%2Fruntime%2FQueryRunner.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9RdWVyeVJ1bm5lci5qYXZh)
 | 50.00% | [2 Missing and 3 partials :warning: 
](https://app.codecov.io/gh/apache/pinot/pull/14981?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 |
   | 
[...ry/runtime/operator/MultistageGroupByExecutor.java](https://app.codecov.io/gh/apache/pinot/pull/14981?src=pr&el=tree&filepath=pinot-query-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpinot%2Fquery%2Fruntime%2Foperator%2FMultistageGroupByExecutor.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9NdWx0aXN0YWdlR3JvdXBCeUV4ZWN1dG9yLmphdmE=)
 | 54.54% | [2 Missing and 3 partials :warning: 
](https://app.codecov.io/gh/apache/pinot/pull/14981?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 |
   | 
[...e/operator/groupby/OneLongKeyGroupIdGenerator.java](https://app.codecov.io/gh/apache/pinot/pull/14981?src=pr&el=tree&filepath=pinot-query-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpinot%2Fquery%2Fruntime%2Foperator%2Fgroupby%2FOneLongKeyGroupIdGenerator.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9ncm91cGJ5L09uZUxvbmdLZXlHcm91cElkR2VuZXJhdG9yLmphdmE=)
 | 0.00% | [2 Missing :warning: 
](https://app.codecov.io/gh/apache/pinot/pull/14981?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 |
   | 
[...time/operator/groupby/GroupIdGeneratorFactory.java](https://app.codecov.io/gh/apache/pinot/pull/14981?src=pr&el=tree&filepath=pinot-query-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpinot%2Fquery%2Fruntime%2Foperator%2Fgroupby%2FGroupIdGeneratorFactory.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9ncm91cGJ5L0dyb3VwSWRHZW5lcmF0b3JGYWN0b3J5LmphdmE=)
 | 87.50% | [1 Missing :warning: 
](https://app.codecov.io/gh/apache/pinot/pull/14981?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 |
   
   Additional details and impacted files
   
   
   ```diff
   @@ Coverage Diff  @@
   ## master   #14981  +/-   ##
   
   + Coverage 61.75%   63.55%   +1.80% 
   - Complexity  207 1389+1182 
   
 Files  2436 2713 +277 
 Lines133233   152187   +18954 
 Branches  2063623532+2896 
   
   + Hits  8227496724   +14450 
   - Misses4491148152+3241 
   - Partials   6048 7311+1263 
   ```
   
   | 
[Flag](https://app.codecov.io/gh/apache/pinot/pull/14981/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | C

[PR] Introducing MSE result holder config to minimize rehashing for high cardinality group by [pinot]

2025-02-03 Thread via GitHub


shauryachats opened a new pull request, #14981:
URL: https://github.com/apache/pinot/pull/14981

   A new configuration to control the size of result holders for MSE is 
necessary to avoid resizing and rehashing operations in use cases where 
grouping is needed on high-cardinality columns (e.g., UUIDs).
   
   A simple query where it is necessary is 
   ```
   SELECT
   count(*)
   FROM
 table_A
   WHERE 
   (
   user_uuid NOT IN (
 SELECT
   user_uuid
 FROM
   table_B
   )
 )
   LIMIT
 100 option(useMultistageEngine=true, timeoutMs=12, useColocatedJoin = 
true, maxRowsInJoin = 4000)
   ```
   
   where a group by step occurs on `user_uuid` for `table_B` before the 
colocated join with `table_A` which has a high cardinality.
   
   More details in the following issue: 
https://github.com/apache/pinot/issues/14685


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Bump com.mchange:c3p0 from 0.10.1 to 0.10.2 [pinot]

2025-02-03 Thread via GitHub


Jackie-Jiang merged PR #14978:
URL: https://github.com/apache/pinot/pull/14978


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [timeseries] Add Support for limit and numGroupsLimit [pinot]

2025-02-03 Thread via GitHub


tibrewalpratik17 merged PR #14945:
URL: https://github.com/apache/pinot/pull/14945


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Bump software.amazon.awssdk:bom from 2.30.10 to 2.30.11 [pinot]

2025-02-03 Thread via GitHub


Jackie-Jiang merged PR #14977:
URL: https://github.com/apache/pinot/pull/14977


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Bump org.apache.httpcomponents.client5:httpclient5 from 5.3.1 to 5.4.2 [pinot]

2025-02-03 Thread via GitHub


Jackie-Jiang merged PR #14975:
URL: https://github.com/apache/pinot/pull/14975


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [timeseries] Add Support for limit and numGroupsLimit [pinot]

2025-02-03 Thread via GitHub


ankitsultana commented on code in PR #14945:
URL: https://github.com/apache/pinot/pull/14945#discussion_r1940034557


##
pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java:
##
@@ -152,7 +156,11 @@ public BaseTimeSeriesPlanNode handleFetchNode(String 
planId, List tokens
 Preconditions.checkNotNull(timeColumn, "Time column not set. Set via 
time_col=");
 Preconditions.checkNotNull(timeUnit, "Time unit not set. Set via 
time_unit=");
 Preconditions.checkNotNull(valueExpr, "Value expression not set. Set via 
value=");
+Map queryOptions = new HashMap<>();
+if (request.getNumGroupsLimit() > 0) {
+  queryOptions.put("numGroupsLimit", 
Integer.toString(request.getNumGroupsLimit()));

Review Comment:
   Good point. Can address in a follow-up. This module is mostly a throw away.. 
we'll open source the full M3 Plugin after time series support is marked as GA.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [timeseries] Add Support for limit and numGroupsLimit [pinot]

2025-02-03 Thread via GitHub


tibrewalpratik17 commented on code in PR #14945:
URL: https://github.com/apache/pinot/pull/14945#discussion_r1940030515


##
pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java:
##
@@ -152,7 +156,11 @@ public BaseTimeSeriesPlanNode handleFetchNode(String 
planId, List tokens
 Preconditions.checkNotNull(timeColumn, "Time column not set. Set via 
time_col=");
 Preconditions.checkNotNull(timeUnit, "Time unit not set. Set via 
time_unit=");
 Preconditions.checkNotNull(valueExpr, "Value expression not set. Set via 
value=");
+Map queryOptions = new HashMap<>();
+if (request.getNumGroupsLimit() > 0) {
+  queryOptions.put("numGroupsLimit", 
Integer.toString(request.getNumGroupsLimit()));

Review Comment:
   nit: can put "numGroupsLimit" in a constant class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [timeseries] Add Support for limit and numGroupsLimit [pinot]

2025-02-03 Thread via GitHub


ankitsultana commented on code in PR #14945:
URL: https://github.com/apache/pinot/pull/14945#discussion_r1939936596


##
pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java:
##
@@ -78,7 +80,8 @@ public BaseTimeSeriesPlanNode 
planQuery(RangeTimeSeriesRequest request) {
   switch (command) {
 case "fetch":
   List tokens = commands.get(commandId).subList(1, 
commands.get(commandId).size());
-  currentNode = handleFetchNode(planIdGenerator.generateId(), tokens, 
children, aggInfo, groupByColumns);
+  currentNode = handleFetchNode(planIdGenerator.generateId(), tokens, 
children, aggInfo, groupByColumns,

Review Comment:
   Yeah you just need to set the `limit` and `numGroupsLimit` in the 
`LeafTimeSeriesPlanNode` of the returned plan



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



  1   2   3   4   5   6   7   8   9   10   >