stoty commented on code in PR #1552:
URL: https://github.com/apache/phoenix/pull/1552#discussion_r1124380739
##########
phoenix-core/src/it/java/org/apache/phoenix/PhoenixMasterObserverIT.java:
##########
@@ -0,0 +1,107 @@
+package org.apache.phoenix;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.util.TestUtil.getAllSplits;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.phoenix.coprocessor.PhoenixMasterObserver;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class PhoenixMasterObserverIT extends ParallelStatsDisabledIT {
+
+ String create_table =
+ "CREATE TABLE IF NOT EXISTS %s(ID VARCHAR NOT NULL PRIMARY KEY,
VAL1 INTEGER, VAL2 INTEGER) SALT_BUCKETS=4";
+ String indexName = generateUniqueName();
+ String create_index = "CREATE INDEX " + indexName + " ON %s(VAL1 DESC)
INCLUDE (VAL2)";
+ String upsertStatement = "UPSERT INTO %s VALUES(?, ?, ?)";
+ String deleteTableName = generateUniqueName();
+
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(3);
+ serverProps.put("hbase.coprocessor.master.classes",
PhoenixMasterObserver.class.getName());
Review Comment:
There is no way to automatically load a Master coprocessor on Phoenix
startup, right ?
##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java:
##########
@@ -0,0 +1,114 @@
+package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.MasterObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+public class PhoenixMasterObserver implements MasterObserver,
MasterCoprocessor {
+
+ @Override
+ public Optional<MasterObserver> getMasterObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
+ public void preMergeRegions(final
ObserverContext<MasterCoprocessorEnvironment> ctx, RegionInfo[] regionsToMerge)
throws IOException {
+ try {
+ if (blockMergeForSaltedTables(ctx, regionsToMerge)) {
+ //different salt so block merge
+ throw new IOException("Don't merge regions with different salt
bits");
+ }
+ } catch (SQLException e) {
+ throw new IOException("SQLException while fetching data from
phoenix", e);
+ }
+ }
+
+ private boolean
blockMergeForSaltedTables(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ RegionInfo[] regionsToMerge) throws SQLException {
+ TableName table = regionsToMerge[0].getTable();
+ int saltBuckets = getTableSalt(ctx, table.toString());
+ if(saltBuckets > 0 ) {
+ System.out.println("Number of buckets="+saltBuckets);
+ return !regionsHaveSameSalt(regionsToMerge);
+ }
+ // table is not salted so don't block merge
+ return false;
+ }
+
+ private int getTableSalt(ObserverContext<MasterCoprocessorEnvironment>
ctx, String table) throws SQLException {
+ int saltBucket = 0;
+ String schemaName = SchemaUtil.getSchemaNameFromFullName(table);
+ String tableName = SchemaUtil.getTableNameFromFullName(table);
+ String sql =
+ "SELECT salt_buckets FROM system.catalog WHERE table_name = ? "
+ + "AND (table_schem = ? OR table_schem IS NULL)";
+
+ if (schemaName == null || schemaName.isEmpty()) {
+ sql = sql + " and table_schem is null";
+ } else {
+ sql = sql + String.format(" and table_schem=%s", schemaName);
+ }
+
+ Configuration conf = ctx.getEnvironment().getConfiguration();
+ PreparedStatement stmt;
+ ResultSet resultSet;
+ try (Connection conn = QueryUtil.getConnectionOnServer(conf)) {
Review Comment:
I wonder if we should get the salted status via a PTable object from the
CQSI cache instead.
Wouldn't that be faster ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]