Github user karanmehta93 commented on a diff in the pull request:
https://github.com/apache/phoenix/pull/309#discussion_r203927054
--- Diff:
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
---
@@ -157,6 +192,192 @@ public static void setOutput(final Job job, final
String tableName,final String
PhoenixConfigurationUtil.setUpsertColumnNames(configuration,columns.split(","));
}
+ /**
+ * Generate a query plan for a MapReduce job query.
+ * @param configuration The MapReduce job configuration
+ * @return Query plan for the MapReduce job
+ * @throws SQLException If the plan cannot be generated
+ */
+ public static QueryPlan getQueryPlan(final Configuration configuration)
+ throws SQLException {
+ return getQueryPlan(configuration, false);
+ }
+
+ /**
+ * Generate a query plan for a MapReduce job query
+ * @param configuration The MapReduce job configuration
+ * @param isTargetConnection Whether the query plan is for the target
HBase cluster
+ * @return Query plan for the MapReduce job
+ * @throws SQLException If the plan cannot be generated
+ */
+ public static QueryPlan getQueryPlan(final Configuration configuration,
+ boolean isTargetConnection) throws SQLException {
+ Preconditions.checkNotNull(configuration);
+ final String txnScnValue =
configuration.get(PhoenixConfigurationUtil.TX_SCN_VALUE);
+ final String currentScnValue =
configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
+ final Properties overridingProps = new Properties();
+ if(txnScnValue==null && currentScnValue!=null) {
+ overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB,
currentScnValue);
+ }
+ final Connection connection;
+ final String selectStatement;
+ if (isTargetConnection) {
+ String targetTable =
PhoenixConfigurationUtil.getInputTargetTableName(configuration);
+ if (!Strings.isNullOrEmpty(targetTable)) {
+ // different table on same cluster
+ connection =
ConnectionUtil.getInputConnection(configuration, overridingProps);
+ selectStatement =
PhoenixConfigurationUtil.getSelectStatement(configuration, true);
+ } else {
+ // same table on different cluster
+ connection =
+
ConnectionUtil.getTargetInputConnection(configuration, overridingProps);
+ selectStatement =
PhoenixConfigurationUtil.getSelectStatement(configuration);
+ }
+ } else {
+ connection = ConnectionUtil.getInputConnection(configuration,
overridingProps);
+ selectStatement =
PhoenixConfigurationUtil.getSelectStatement(configuration);
+ }
+ Preconditions.checkNotNull(selectStatement);
+ final Statement statement = connection.createStatement();
+ final PhoenixStatement pstmt =
statement.unwrap(PhoenixStatement.class);
+ // Optimize the query plan so that we potentially use secondary
indexes
+ final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
+ final Scan scan = queryPlan.getContext().getScan();
+ // since we can't set a scn on connections with txn set TX_SCN
attribute so that the max time range is set by BaseScannerRegionObserver
+ if (txnScnValue!=null) {
+ scan.setAttribute(BaseScannerRegionObserver.TX_SCN,
Bytes.toBytes(Long.valueOf(txnScnValue)));
+ }
+ // Initialize the query plan so it sets up the parallel scans
+ queryPlan.iterator(MapReduceParallelScanGrouper.getInstance());
+ return queryPlan;
+ }
+
+ /**
+ * Generates the input splits for a MapReduce job.
+ * @param qplan Query plan for the job
+ * @param splits The key range splits for the job
+ * @param config The job configuration
+ * @return Input splits for the job
+ * @throws IOException If the region information for the splits cannot
be retrieved
+ */
+ public static List<InputSplit> generateSplits(final QueryPlan qplan,
+ final List<KeyRange> splits, Configuration config) throws
IOException {
+ Preconditions.checkNotNull(qplan);
+ Preconditions.checkNotNull(splits);
+
+ // Get the RegionSizeCalculator
+ org.apache.hadoop.hbase.client.Connection connection =
ConnectionFactory.createConnection(config);
--- End diff --
Any particular reason as to why you removed the try with resources block
over here?
I added it because of a memory leak that I had found out here:
https://issues.apache.org/jira/browse/PHOENIX-4489
---