tkhurana commented on code in PR #2379:
URL: https://github.com/apache/phoenix/pull/2379#discussion_r2984085036


##########
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java:
##########
@@ -223,4 +233,100 @@ public static void setTenantId(final Job job, final 
String tenantId) {
     PhoenixConfigurationUtil.setTenantId(job.getConfiguration(), tenantId);
   }
 
+  /**
+   * Validates that start and end times are in the past and start < end.
+   * @param startTime Start timestamp in millis (nullable, defaults to 0)
+   * @param endTime   End timestamp in millis (nullable, defaults to current 
time)
+   * @param tableName Table name for error messages
+   * @throws IllegalArgumentException if time range is invalid
+   */
+  public static void validateTimeRange(Long startTime, Long endTime, String 
tableName) {
+    long currentTime = EnvironmentEdgeManager.currentTimeMillis();
+    long st = (startTime == null) ? 0L : startTime;
+    long et = (endTime == null) ? currentTime : endTime;
+
+    if (et > currentTime || st >= et) {
+      throw new IllegalArgumentException(String.format(
+        "%s %s: start and end times must be in the past "
+          + "and start < end. Start: %d, End: %d, Current: %d",
+        INVALID_TIME_RANGE_EXCEPTION_MESSAGE, tableName, st, et, currentTime));
+    }
+  }
+
+  /**
+   * Validates that the end time doesn't exceed the max lookback age 
configured in Phoenix.
+   * @param configuration Hadoop configuration
+   * @param endTime       End timestamp in millis
+   * @param tableName     Table name for error messages
+   * @throws IllegalArgumentException if endTime is before min allowed 
timestamp
+   */
+  public static void validateMaxLookbackAge(Configuration configuration, Long 
endTime,
+    String tableName) {
+    long maxLookBackAge = 
BaseScannerRegionObserverConstants.getMaxLookbackInMillis(configuration);
+    if (maxLookBackAge > 0) {
+      long minTimestamp = EnvironmentEdgeManager.currentTimeMillis() - 
maxLookBackAge;
+      if (endTime < minTimestamp) {
+        throw new IllegalArgumentException(String.format(
+          "Table %s can't look back past the configured max lookback age: %d 
ms. "
+            + "End time: %d, Min allowed timestamp: %d",
+          tableName, maxLookBackAge, endTime, minTimestamp));
+      }
+    }
+  }
+
+  /**
+   * Validates that a table is suitable for MR operations. Checks table 
existence, type, and state.
+   * @param connection         Phoenix connection
+   * @param qualifiedTableName Qualified table name
+   * @param allowViews         Whether to allow VIEW tables
+   * @param allowIndexes       Whether to allow INDEX tables
+   * @return PTable instance
+   * @throws SQLException             if connection fails
+   * @throws IllegalArgumentException if validation fails
+   */
+  public static PTable validateTableForMRJob(Connection connection, String 
qualifiedTableName,
+    boolean allowViews, boolean allowIndexes) throws SQLException {
+    PTable pTable = 
connection.unwrap(PhoenixConnection.class).getTableNoCache(qualifiedTableName);
+
+    if (pTable == null) {
+      throw new IllegalArgumentException(
+        String.format("Table %s does not exist", qualifiedTableName));
+    } else if (!allowViews && pTable.getType() == PTableType.VIEW) {
+      throw new IllegalArgumentException(
+        String.format("Cannot run MR job on VIEW table %s", 
qualifiedTableName));
+    } else if (!allowIndexes && pTable.getType() == PTableType.INDEX) {
+      throw new IllegalArgumentException(
+        String.format("Cannot run MR job on INDEX table %s directly", 
qualifiedTableName));
+    }
+
+    return pTable;
+  }
+
+  /**
+   * Configures a Configuration object with ZooKeeper settings from a ZK 
quorum string.
+   * @param baseConf Base configuration to create from (typically job 
configuration)
+   * @param zkQuorum ZooKeeper quorum string in format: "zk_quorum:port:znode" 
Example:
+   *                 "zk1,zk2,zk3:2181:/hbase"

Review Comment:
   This is actually not the only format for zk quorum. There are other valid 
formats also where the port number is specified separately for each server. 
There is actually a very useful API in Hbase called 
`HBaseConfiguration.createClusterConf(job.getConfiguration(), targetZkQuorum)` 
We should use that as that also works for zk registry.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to