rahulLiving commented on code in PR #2379:
URL: https://github.com/apache/phoenix/pull/2379#discussion_r3007789991


##########
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormat.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.query.KeyRange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * InputFormat designed for PhoenixSyncTableTool that generates splits based 
on HBase region
+ * boundaries. Filters out already-processed mapper regions using checkpoint 
data, enabling
+ * resumable sync jobs. Uses {@link PhoenixNoOpSingleRecordReader} to invoke 
the mapper once per
+ * split (region).
+ */
+public class PhoenixSyncTableInputFormat extends PhoenixInputFormat {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PhoenixSyncTableInputFormat.class);
+
+  public PhoenixSyncTableInputFormat() {
+    super();
+  }
+
+  /**
+   * Returns a {@link PhoenixNoOpSingleRecordReader} that emits exactly one 
dummy record per split.
+   * <p>
+   * PhoenixSyncTableMapper doesn't need actual row data from the RecordReader 
- it extracts region
+   * boundaries from the InputSplit and delegates all scanning to the 
PhoenixSyncTableRegionScanner
+   * coprocessor. Using PhoenixNoOpSingleRecordReader ensures that {@code 
map()} is called exactly
+   * once per region no matter what scan looks like, avoiding the overhead of 
the default
+   * PhoenixRecordReader which would call {@code map()} for every row of scan.
+   * @param split Input Split
+   * @return A PhoenixNoOpSingleRecordReader instance
+   */
+  @SuppressWarnings("rawtypes")
+  @Override
+  public RecordReader createRecordReader(InputSplit split, TaskAttemptContext 
context) {
+    return new PhoenixNoOpSingleRecordReader();
+  }
+
+  /**
+   * Generates InputSplits for the Phoenix sync table job, splits are done 
based on region boundary
+   * and then filter out already-completed regions using sync table checkpoint 
table.
+   */
+  @Override
+  public List<InputSplit> getSplits(JobContext context) throws IOException, 
InterruptedException {
+    Configuration conf = context.getConfiguration();
+    String tableName = PhoenixSyncTableTool.getPhoenixSyncTableName(conf);
+    String targetZkQuorum = 
PhoenixSyncTableTool.getPhoenixSyncTableTargetZkQuorum(conf);
+    Long fromTime = PhoenixSyncTableTool.getPhoenixSyncTableFromTime(conf);
+    Long toTime = PhoenixSyncTableTool.getPhoenixSyncTableToTime(conf);
+    List<InputSplit> allSplits = super.getSplits(context);

Review Comment:
   Casting can be done where it is being used i.e `filterCompletedSplits`
   ```
    List<PhoenixInputSplit> typedSplits = new ArrayList<>(allSplits.size());
       for (InputSplit split : allSplits) {
         typedSplits.add((PhoenixInputSplit) split);
       }
   ```    
   
   You meant something like this ? It would need an extra iteration for that.
   I think `List<PhoenixInputSplit> typedSplits = (List<PhoenixInputSplit>) 
allSplits;` shouldn't be done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to