This is an automated email from the ASF dual-hosted git repository.
stoty pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.1 by this push:
new bd5dff745a PHOENIX-7440 TableSnapshotReadsMapReduceIT fails with HBase
2.6.1 (#2014)
bd5dff745a is described below
commit bd5dff745afa8005256ea78340be584a75a60f43
Author: Istvan Toth <[email protected]>
AuthorDate: Tue Oct 22 08:38:20 2024 +0200
PHOENIX-7440 TableSnapshotReadsMapReduceIT fails with HBase 2.6.1 (#2014)
---
.../end2end/TableSnapshotReadsMapReduceIT.java | 62 +++++++++++++---------
1 file changed, 38 insertions(+), 24 deletions(-)
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
index 571294789f..6f4f43a8ba 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
@@ -33,6 +33,8 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -152,8 +154,9 @@ public class TableSnapshotReadsMapReduceIT extends BaseTest
{
@Test
public void testMapReduceSnapshotsMultiRegion() throws Exception {
+ String inputQuery = "SELECT * FROM " + tableName + " ORDER BY FIELD1 asc";
PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,
- SNAPSHOT_NAME, tableName, tmpDir, null, FIELD1, FIELD2, FIELD3);
+ SNAPSHOT_NAME, tableName, tmpDir, inputQuery);
configureJob(job, tableName, null, null, true);
}
@@ -282,6 +285,17 @@ public class TableSnapshotReadsMapReduceIT extends
BaseTest {
ResultSet rs = DriverManager.getConnection(getUrl(),
props).createStatement().executeQuery(inputQuery);
+ if(shouldSplit) {
+ //Records may not be processed in the same order as the query runs,
+ //make sure everything is ordered by Field1 ASC
+ Collections.sort(result, new Comparator<List<Object>>(){
+ @Override
+ public int compare(List<Object> o1, List<Object> o2) {
+ return ((String)o1.get(0)).compareTo((String)o2.get(0));
+ }
+ });
+ }
+
for (List<Object> r : result) {
assertTrue("No data stored in the table!", rs.next());
int i = 0;
@@ -302,12 +316,12 @@ public class TableSnapshotReadsMapReduceIT extends
BaseTest {
private void upsertData(Connection conn, String stockTableName) throws
SQLException {
PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT,
stockTableName));
- upsertData(stmt, "AAPL", 2009, new Double[]{85.88, 91.04, 88.5, 90.3});
- upsertData(stmt, "AAPL", 2008, new Double[]{75.88, 81.04, 78.5, 80.3});
+ upsertRecord(stmt, "AAPL", 2009, new Double[]{85.88, 91.04, 88.5, 90.3});
+ upsertRecord(stmt, "AAPL", 2008, new Double[]{75.88, 81.04, 78.5, 80.3});
conn.commit();
}
- private void upsertData(PreparedStatement stmt, String name, int year,
Double[] data) throws SQLException {
+ private void upsertRecord(PreparedStatement stmt, String name, int year,
Double[] data) throws SQLException {
int i = 1;
stmt.setString(i++, name);
stmt.setInt(i++, year);
@@ -319,36 +333,36 @@ public class TableSnapshotReadsMapReduceIT extends
BaseTest {
private void upsertData(String tableName) throws SQLException {
Connection conn = DriverManager.getConnection(getUrl());
PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT,
tableName));
- upsertData(stmt, "AAAA", "JHHD", 37);
- upsertData(stmt, "BBBB", "JSHJ", 224);
- upsertData(stmt, "CCCC", "SSDD", 15);
- upsertData(stmt, "PPPP", "AJDG", 53);
- upsertData(stmt, "SSSS", "HSDG", 59);
- upsertData(stmt, "XXXX", "HDPP", 22);
+ upsertRecord(stmt, "AAAA", "JHHD", 37);
+ upsertRecord(stmt, "BBBB", "JSHJ", 224);
+ upsertRecord(stmt, "CCCC", "SSDD", 15);
+ upsertRecord(stmt, "PPPP", "AJDG", 53);
+ upsertRecord(stmt, "SSSS", "HSDG", 59);
+ upsertRecord(stmt, "XXXX", "HDPP", 22);
conn.commit();
}
private void upsertDataBeforeSplit(String tableName) throws SQLException {
Connection conn = DriverManager.getConnection(getUrl());
PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT,
tableName));
- upsertData(stmt, "CCCC", "SSDD", RANDOM.nextInt());
+ upsertRecord(stmt, "CCCC", "SSDD", RANDOM.nextInt());
for (int i = 0; i < 100; i++) {
- upsertData(stmt, "AAAA" + i, "JHHA" + i, RANDOM.nextInt());
- upsertData(stmt, "0000" + i, "JHHB" + i, RANDOM.nextInt());
- upsertData(stmt, "9999" + i, "JHHC" + i, RANDOM.nextInt());
- upsertData(stmt, "BBBB" + i, "JSHJ" + i, RANDOM.nextInt());
- upsertData(stmt, "BBBB1" + i, "JSHK" + i, RANDOM.nextInt());
- upsertData(stmt, "BBBB2" + i, "JSHL" + i, RANDOM.nextInt());
- upsertData(stmt, "CCCC1" + i, "SSDE" + i, RANDOM.nextInt());
- upsertData(stmt, "CCCC2" + i, "SSDF" + i, RANDOM.nextInt());
- upsertData(stmt, "PPPP" + i, "AJDH" + i, RANDOM.nextInt());
- upsertData(stmt, "SSSS" + i, "HSDG" + i, RANDOM.nextInt());
- upsertData(stmt, "XXXX" + i, "HDPP" + i, RANDOM.nextInt());
+ upsertRecord(stmt, "AAAA" + i, "JHHA" + i, RANDOM.nextInt());
+ upsertRecord(stmt, "0000" + i, "JHHB" + i, RANDOM.nextInt());
+ upsertRecord(stmt, "9999" + i, "JHHC" + i, RANDOM.nextInt());
+ upsertRecord(stmt, "BBBB" + i, "JSHJ" + i, RANDOM.nextInt());
+ upsertRecord(stmt, "BBBB1" + i, "JSHK" + i, RANDOM.nextInt());
+ upsertRecord(stmt, "BBBB2" + i, "JSHL" + i, RANDOM.nextInt());
+ upsertRecord(stmt, "CCCC1" + i, "SSDE" + i, RANDOM.nextInt());
+ upsertRecord(stmt, "CCCC2" + i, "SSDF" + i, RANDOM.nextInt());
+ upsertRecord(stmt, "PPPP" + i, "AJDH" + i, RANDOM.nextInt());
+ upsertRecord(stmt, "SSSS" + i, "HSDG" + i, RANDOM.nextInt());
+ upsertRecord(stmt, "XXXX" + i, "HDPP" + i, RANDOM.nextInt());
}
conn.commit();
}
- private void upsertData(PreparedStatement stmt, String field1, String
field2, int field3) throws SQLException {
+ private void upsertRecord(PreparedStatement stmt, String field1, String
field2, int field3) throws SQLException {
stmt.setString(1, field1);
stmt.setString(2, field2);
stmt.setInt(3, field3);
@@ -383,7 +397,7 @@ public class TableSnapshotReadsMapReduceIT extends BaseTest
{
// upsert data after snapshot
PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT,
tableName));
- upsertData(stmt, "DDDD", "SNFB", 45);
+ upsertRecord(stmt, "DDDD", "SNFB", 45);
conn.commit();
if (isSnapshotRestoreDoneExternally) {
//Performing snapshot restore which will be used during scans