This is an automated email from the ASF dual-hosted git repository. tdsilva pushed a commit to branch 4.14-HBase-1.3 in repository https://gitbox.apache.org/repos/asf/phoenix.git
commit e5ba2e989fad4e3f1de6734a400502cb586d5e8a Author: Kadir <kozde...@salesforce.com> AuthorDate: Tue Jan 29 17:14:02 2019 -0800 PHOENIX-5018 Index mutations created by UPSERT SELECT will have wrong timestamps Signed-off-by: Geoffrey Jacoby <gjac...@apache.org> --- .../phoenix/end2end/IndexBuildTimestampIT.java | 246 ++++++++++++ .../org/apache/phoenix/end2end/IndexToolIT.java | 37 ++ .../phoenix/end2end/TableDDLPermissionsIT.java | 8 - .../org/apache/phoenix/rpc/PhoenixServerRpcIT.java | 6 - .../phoenix/compile/ServerBuildIndexCompiler.java | 138 +++++++ .../org/apache/phoenix/index/IndexMaintainer.java | 433 ++++++++++----------- .../phoenix/mapreduce/PhoenixInputFormat.java | 3 +- .../phoenix/mapreduce/PhoenixRecordReader.java | 4 +- .../PhoenixServerBuildIndexInputFormat.java | 111 ++++++ .../apache/phoenix/mapreduce/index/IndexTool.java | 241 +++++++----- .../index/PhoenixServerBuildIndexMapper.java | 75 ++++ .../mapreduce/util/PhoenixConfigurationUtil.java | 25 ++ .../mapreduce/util/PhoenixMapReduceUtil.java | 27 ++ .../org/apache/phoenix/schema/MetaDataClient.java | 16 +- 14 files changed, 1032 insertions(+), 338 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexBuildTimestampIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexBuildTimestampIT.java new file mode 100644 index 0000000..7efba07 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexBuildTimestampIT.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Collection; +import java.util.List; +import java.util.Properties; + +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixResultSet; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.util.EnvironmentEdge; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.QueryUtil; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import com.google.common.collect.Lists; + +@RunWith(Parameterized.class) +public class IndexBuildTimestampIT extends BaseUniqueNamesOwnClusterIT { + private final boolean localIndex; + private final boolean async; + private final boolean view; + private final String tableDDLOptions; + + public IndexBuildTimestampIT(boolean mutable, boolean localIndex, + boolean async, boolean view) { + this.localIndex = localIndex; + this.async = async; + this.view = view; + StringBuilder optionBuilder = new StringBuilder(); + if (!mutable) { + optionBuilder.append(" IMMUTABLE_ROWS=true "); + } + optionBuilder.append(" SPLIT ON(1,2)"); + this.tableDDLOptions = optionBuilder.toString(); + } + + @BeforeClass + public static void setup() throws Exception { + IndexToolIT.setup(); + } + + @Parameters( + name = "mutable={0},localIndex={1},async={2},view={3}") + public static Collection<Object[]> data() { + List<Object[]> list = Lists.newArrayListWithExpectedSize(8); + boolean[] Booleans = new boolean[]{false, true}; + for (boolean mutable : Booleans) { + for (boolean localIndex : Booleans) { + for (boolean async : Booleans) { + for (boolean view : Booleans) { + list.add(new Object[]{mutable, localIndex, async, view}); + } + } + } + } + return list; + } + + public static void assertExplainPlan(Connection conn, boolean localIndex, String selectSql, + String dataTableFullName, String indexTableFullName) throws SQLException { + ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql); + String actualExplainPlan = QueryUtil.getExplainPlan(rs); + + IndexToolIT.assertExplainPlan(localIndex, actualExplainPlan, dataTableFullName, indexTableFullName); + } + + private class MyClock extends EnvironmentEdge { + long initialTime; + long delta; + + public MyClock(long delta) { + initialTime = System.currentTimeMillis() + delta; + this.delta = delta; + } + + @Override + public long currentTime() { + return System.currentTimeMillis() + delta; + } + + public long initialTime() { + return initialTime; + } + } + + private void populateTable(String tableName, MyClock clock1, MyClock clock2) throws Exception { + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute("create table " + tableName + + " (id varchar(10) not null primary key, val varchar(10), ts timestamp)" + tableDDLOptions); + + EnvironmentEdgeManager.injectEdge(clock1); + conn.createStatement().execute("upsert into " + tableName + " values ('aaa', 'abc', current_date())"); + conn.commit(); + + EnvironmentEdgeManager.injectEdge(clock2); + conn.createStatement().execute("upsert into " + tableName + " values ('bbb', 'bcd', current_date())"); + conn.commit(); + conn.close(); + + Properties props = new Properties(); + props.setProperty("CurrentSCN", Long.toString(clock1.initialTime())); + conn = DriverManager.getConnection(getUrl(), props); + ResultSet rs = conn.createStatement().executeQuery("select * from " + tableName); + assertFalse(rs.next()); + conn.close(); + + props.setProperty("CurrentSCN", Long.toString(clock2.initialTime())); + conn = DriverManager.getConnection(getUrl(), props); + rs = conn.createStatement().executeQuery("select * from " + tableName); + + assertTrue(rs.next()); + assertEquals("aaa", rs.getString(1)); + assertEquals("abc", rs.getString(2)); + assertNotNull(rs.getDate(3)); + + assertFalse(rs.next()); + conn.close(); + + props.setProperty("CurrentSCN", Long.toString(clock2.currentTime())); + conn = DriverManager.getConnection(getUrl(), props); + rs = conn.createStatement().executeQuery("select * from " + tableName); + + assertTrue(rs.next()); + assertEquals("aaa", rs.getString(1)); + assertEquals("abc", rs.getString(2)); + assertNotNull(rs.getDate(3)); + + assertTrue(rs.next()); + assertEquals("bbb", rs.getString(1)); + assertEquals("bcd", rs.getString(2)); + assertNotNull(rs.getDate(3)); + assertFalse(rs.next()); + conn.close(); + } + + @Test + public void testCellTimestamp() throws Exception { + EnvironmentEdgeManager.reset(); + MyClock clock1 = new MyClock(100000); + MyClock clock2 = new MyClock(200000); + String dataTableName = generateUniqueName(); + populateTable(dataTableName, clock1, clock2); + + MyClock clock3 = new MyClock(300000); + EnvironmentEdgeManager.injectEdge(clock3); + + Connection conn = DriverManager.getConnection(getUrl()); + + String viewName = null; + if (view) { + viewName = generateUniqueName(); + conn.createStatement().execute("CREATE VIEW "+ viewName + " AS SELECT * FROM " + + dataTableName); + } + String indexName = generateUniqueName(); + conn.createStatement().execute("CREATE "+ (localIndex ? "LOCAL " : "") + " INDEX " + indexName + " on " + + (view ? viewName : dataTableName) + " (val) include (ts)" + (async ? "ASYNC" : "")); + + conn.close(); + + if (async) { + // run the index MR job. + IndexToolIT.runIndexTool(true, false, null, (view ? viewName : dataTableName), indexName); + } + + // Verify the index timestamps via Phoenix + String selectSql = String.format("SELECT * FROM %s WHERE val = 'abc'", (view ? viewName : dataTableName)); + conn = DriverManager.getConnection(getUrl()); + // assert we are pulling from index table + assertExplainPlan(conn, localIndex, selectSql, dataTableName, (view ? "_IDX_" + dataTableName : indexName)); + ResultSet rs = conn.createStatement().executeQuery(selectSql); + assertTrue (rs.next()); + assertTrue(rs.unwrap(PhoenixResultSet.class).getCurrentRow().getValue(0).getTimestamp() < clock2.initialTime() && + rs.unwrap(PhoenixResultSet.class).getCurrentRow().getValue(0).getTimestamp() >= clock1.initialTime()); + + selectSql = + String.format("SELECT * FROM %s WHERE val = 'bcd'", (view ? viewName : dataTableName)); + // assert we are pulling from index table + assertExplainPlan(conn, localIndex, selectSql, dataTableName, (view ? "_IDX_" + dataTableName : indexName)); + + rs = conn.createStatement().executeQuery(selectSql); + assertTrue (rs.next()); + assertTrue(rs.unwrap(PhoenixResultSet.class).getCurrentRow().getValue(0).getTimestamp() < clock3.initialTime() && + rs.unwrap(PhoenixResultSet.class).getCurrentRow().getValue(0).getTimestamp() >= clock2.initialTime() + ); + assertFalse (rs.next()); + + // Verify the index timestamps via HBase + PTable pIndexTable = PhoenixRuntime.getTable(conn, indexName); + Table table = conn.unwrap(PhoenixConnection.class).getQueryServices() + .getTable(pIndexTable.getPhysicalName().getBytes()); + + Scan scan = new Scan(); + scan.setTimeRange(clock3.initialTime(), clock3.currentTime()); + ResultScanner scanner = table.getScanner(scan); + assertTrue(scanner.next() == null); + + + scan = new Scan(); + scan.setTimeRange(clock2.initialTime(), clock3.initialTime()); + scanner = table.getScanner(scan); + assertTrue(scanner.next() != null); + + + scan = new Scan(); + scan.setTimeRange(clock1.initialTime(), clock2.initialTime()); + scanner = table.getScanner(scan); + assertTrue(scanner.next() != null); + conn.close(); + EnvironmentEdgeManager.reset(); + } +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java index 9d6f881..aaf9509 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java @@ -47,11 +47,17 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Job; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.index.IndexTool; import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.mapreduce.index.PhoenixIndexImportDirectMapper; +import org.apache.phoenix.mapreduce.index.PhoenixIndexImportMapper; +import org.apache.phoenix.mapreduce.index.PhoenixServerBuildIndexMapper; + import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PTable; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; @@ -67,6 +73,7 @@ import org.junit.runners.Parameterized.Parameters; import com.google.common.collect.Lists; import com.google.common.collect.Maps; + @RunWith(Parameterized.class) @Category(NeedsOwnMiniClusterTest.class) public class IndexToolIT extends ParallelStatsEnabledIT { @@ -488,6 +495,32 @@ public class IndexToolIT extends ParallelStatsEnabledIT { runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, new String[0]); } + private static void verifyMapper(Job job, boolean directApi, boolean useSnapshot, String schemaName, + String dataTableName, String indexTableName, String tenantId) throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + if (tenantId != null) { + props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); + } + try (Connection conn = + DriverManager.getConnection(getUrl(), props)) { + PTable dataTable = PhoenixRuntime.getTable(conn, SchemaUtil.getTableName(schemaName, dataTableName)); + PTable indexTable = PhoenixRuntime.getTable(conn, SchemaUtil.getTableName(schemaName, indexTableName)); + boolean transactional = dataTable.isTransactional(); + boolean localIndex = PTable.IndexType.LOCAL.equals(indexTable.getIndexType()); + + if (directApi) { + if ((localIndex || !transactional) && !useSnapshot) { + assertEquals(job.getMapperClass(), PhoenixServerBuildIndexMapper.class); + } else { + assertEquals(job.getMapperClass(), PhoenixIndexImportDirectMapper.class); + } + } + else { + assertEquals(job.getMapperClass(), PhoenixIndexImportMapper.class); + } + } + } + public static void runIndexTool(boolean directApi, boolean useSnapshot, String schemaName, String dataTableName, String indexTableName, String... additionalArgs) throws Exception { runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, additionalArgs); @@ -505,6 +538,10 @@ public class IndexToolIT extends ParallelStatsEnabledIT { List<String> cmdArgList = new ArrayList<>(Arrays.asList(cmdArgs)); cmdArgList.addAll(Arrays.asList(additionalArgs)); int status = indexingTool.run(cmdArgList.toArray(new String[cmdArgList.size()])); + + if (expectedStatus == 0) { + verifyMapper(indexingTool.getJob(), directApi, useSnapshot, schemaName, dataTableName, indexTableName, tenantId); + } assertEquals(expectedStatus, status); } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java index 8666bb8..f4b4b64 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java @@ -197,14 +197,7 @@ public class TableDDLPermissionsIT extends BasePermissionsIT { // we should be able to read the data from another index as well to which we have not given any access to // this user - verifyAllowed(createIndex(indexName2, phoenixTableName), unprivilegedUser); verifyAllowed(readTable(phoenixTableName, indexName1), unprivilegedUser); - verifyAllowed(readTable(phoenixTableName, indexName2), unprivilegedUser); - verifyAllowed(rebuildIndex(indexName2, phoenixTableName), unprivilegedUser); - - // data table user should be able to read new index - verifyAllowed(rebuildIndex(indexName2, phoenixTableName), regularUser1); - verifyAllowed(readTable(phoenixTableName, indexName2), regularUser1); verifyAllowed(readTable(phoenixTableName), regularUser1); verifyAllowed(rebuildIndex(indexName1, phoenixTableName), regularUser1); @@ -213,7 +206,6 @@ public class TableDDLPermissionsIT extends BasePermissionsIT { verifyAllowed(dropView(viewName1), regularUser1); verifyAllowed(dropView(viewName2), regularUser1); verifyAllowed(dropColumn(phoenixTableName, "val1"), regularUser1); - verifyAllowed(dropIndex(indexName2, phoenixTableName), regularUser1); verifyAllowed(dropIndex(indexName1, phoenixTableName), regularUser1); verifyAllowed(dropTable(phoenixTableName), regularUser1); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java index 1c18667..ab05c16 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java @@ -140,12 +140,6 @@ public class PhoenixServerRpcIT extends BaseUniqueNamesOwnClusterIT { assertEquals("k1", rs.getString(1)); assertEquals("v2", rs.getString(2)); assertFalse(rs.next()); - - TestPhoenixIndexRpcSchedulerFactory.reset(); - createIndex(conn, indexName + "_1"); - // Verify that that index queue is not used since running upsert select on server side has been disabled - // See PHOENIX-4171 - Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor(), Mockito.never()).dispatch(Mockito.any(CallRunner.class)); } finally { conn.close(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java new file mode 100644 index 0000000..7d1c1b4 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.compile; + +import java.sql.SQLException; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.coprocessor.MetaDataProtocol; +import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.hbase.index.covered.update.ColumnReference; +import org.apache.phoenix.index.IndexMaintainer; +import org.apache.phoenix.index.PhoenixIndexCodec; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.schema.*; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.schema.types.PLong; +import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.IndexUtil; +import org.apache.phoenix.util.ScanUtil; +import org.apache.phoenix.util.StringUtil; + +import com.google.common.collect.Lists; + +import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES; + + +/** + * Class that compiles plan to generate initial data values after a DDL command for + * index table. + */ +public class ServerBuildIndexCompiler { + private final PhoenixConnection connection; + private final String tableName; + private PTable dataTable; + private QueryPlan plan; + + private class RowCountMutationPlan extends BaseMutationPlan { + private RowCountMutationPlan(StatementContext context, PhoenixStatement.Operation operation) { + super(context, operation); + } + @Override + public MutationState execute() throws SQLException { + connection.getMutationState().commitDDLFence(dataTable); + Tuple tuple = plan.iterator().next(); + long rowCount = 0; + if (tuple != null) { + Cell kv = tuple.getValue(0); + ImmutableBytesWritable tmpPtr = new ImmutableBytesWritable(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()); + // A single Cell will be returned with the count(*) - we decode that here + rowCount = PLong.INSTANCE.getCodec().decodeLong(tmpPtr, SortOrder.getDefault()); + } + // The contract is to return a MutationState that contains the number of rows modified. In this + // case, it's the number of rows in the data table which corresponds to the number of index + // rows that were added. + return new MutationState(0, 0, connection, rowCount); + } + + @Override + public QueryPlan getQueryPlan() { + return plan; + } + }; + + public ServerBuildIndexCompiler(PhoenixConnection connection, String tableName) { + this.connection = connection; + this.tableName = tableName; + } + + public MutationPlan compile(PTable index) throws SQLException { + try (final PhoenixStatement statement = new PhoenixStatement(connection)) { + String query = "SELECT count(*) FROM " + tableName; + this.plan = statement.compileQuery(query); + TableRef tableRef = plan.getTableRef(); + Scan scan = plan.getContext().getScan(); + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + dataTable = tableRef.getTable(); + if (index.getIndexType() == PTable.IndexType.GLOBAL && dataTable.isTransactional()) { + throw new IllegalArgumentException( + "ServerBuildIndexCompiler does not support global indexes on transactional tables"); + } + IndexMaintainer.serialize(dataTable, ptr, Collections.singletonList(index), plan.getContext().getConnection()); + // Set the scan attributes that UngroupedAggregateRegionObserver will switch on. + // For local indexes, the BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO attribute, and + // for global indexes PhoenixIndexCodec.INDEX_PROTO_MD attribute is set to the serialized form of index + // metadata to build index rows from data table rows. For global indexes, we also need to set (1) the + // BaseScannerRegionObserver.REBUILD_INDEXES attribute in order to signal UngroupedAggregateRegionObserver + // that this scan is for building global indexes and (2) the MetaDataProtocol.PHOENIX_VERSION attribute + // that will be passed as a mutation attribute for the scanned mutations that will be applied on + // the index table possibly remotely + if (index.getIndexType() == PTable.IndexType.LOCAL) { + scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO, ByteUtil.copyKeyBytesIfNecessary(ptr)); + } else { + scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr)); + scan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES); + ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION); + } + // By default, we'd use a FirstKeyOnly filter as nothing else needs to be projected for count(*). + // However, in this case, we need to project all of the data columns that contribute to the index. + IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable, connection); + for (ColumnReference columnRef : indexMaintainer.getAllColumns()) { + if (index.getImmutableStorageScheme() == PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) { + scan.addFamily(columnRef.getFamily()); + } else { + scan.addColumn(columnRef.getFamily(), columnRef.getQualifier()); + } + } + + if (dataTable.isTransactional()) { + scan.setAttribute(BaseScannerRegionObserver.TX_STATE, connection.getMutationState().encodeTransaction()); + } + + // Go through MutationPlan abstraction so that we can create local indexes + // with a connectionless connection (which makes testing easier). + return new RowCountMutationPlan(plan.getContext(), PhoenixStatement.Operation.UPSERT); + } + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index bc2523d..d94d187 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -119,7 +119,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; /** - * + * * Class that builds index row key from data row key and current state of * row and caches any covered columns. Client-side serializes into byte array using * @link #serialize(PTable, ImmutableBytesWritable)} @@ -127,15 +127,15 @@ import com.google.common.collect.Sets; * {@link org.apache.phoenix.index.PhoenixIndexCodec#INDEX_PROTO_MD} * Mutation attribute or as a separate RPC call using * {@link org.apache.phoenix.cache.ServerCacheClient}) - * - * + * + * * @since 2.1.0 */ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { private static final int EXPRESSION_NOT_PRESENT = -1; private static final int ESTIMATED_EXPRESSION_SIZE = 8; - + public static IndexMaintainer create(PTable dataTable, PTable index, PhoenixConnection connection) { if (dataTable.getType() == PTableType.INDEX || index.getType() != PTableType.INDEX || !dataTable.getIndexes().contains(index)) { throw new IllegalArgumentException(); @@ -143,7 +143,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { IndexMaintainer maintainer = new IndexMaintainer(dataTable, index, connection); return maintainer; } - + private static boolean sendIndexMaintainer(PTable index) { PIndexState indexState = index.getIndexState(); return ! ( PIndexState.DISABLE == indexState || PIndexState.PENDING_ACTIVE == indexState ); @@ -157,7 +157,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } }); } - + public static Iterator<PTable> maintainedGlobalIndexes(Iterator<PTable> indexes) { return Iterators.filter(indexes, new Predicate<PTable>() { @Override @@ -166,7 +166,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } }); } - + public static Iterator<PTable> maintainedLocalIndexes(Iterator<PTable> indexes) { return Iterators.filter(indexes, new Predicate<PTable>() { @Override @@ -175,7 +175,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } }); } - + /** * For client-side to serialize all IndexMaintainers for a given table * @param dataTable data table @@ -183,17 +183,11 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { */ public static void serialize(PTable dataTable, ImmutableBytesWritable ptr, PhoenixConnection connection) { List<PTable> indexes = dataTable.getIndexes(); - serialize(dataTable, ptr, indexes, connection); + serializeServerMaintainedIndexes(dataTable, ptr, indexes, connection); } - /** - * For client-side to serialize all IndexMaintainers for a given table - * @param dataTable data table - * @param ptr bytes pointer to hold returned serialized value - * @param indexes indexes to serialize - */ - public static void serialize(PTable dataTable, ImmutableBytesWritable ptr, - List<PTable> indexes, PhoenixConnection connection) { + public static void serializeServerMaintainedIndexes(PTable dataTable, ImmutableBytesWritable ptr, + List<PTable> indexes, PhoenixConnection connection) { Iterator<PTable> indexesItr; boolean onlyLocalIndexes = dataTable.isImmutableRows() || dataTable.isTransactional(); if (onlyLocalIndexes) { @@ -201,15 +195,22 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } else { indexesItr = maintainedIndexes(indexes.iterator()); } - if (!indexesItr.hasNext()) { + + serialize(dataTable, ptr, Lists.newArrayList(indexesItr), connection); + } + /** + * For client-side to serialize all IndexMaintainers for a given table + * @param dataTable data table + * @param ptr bytes pointer to hold returned serialized value + * @param indexes indexes to serialize + */ + public static void serialize(PTable dataTable, ImmutableBytesWritable ptr, + List<PTable> indexes, PhoenixConnection connection) { + if (indexes.isEmpty()) { ptr.set(ByteUtil.EMPTY_BYTE_ARRAY); return; } - int nIndexes = 0; - while (indexesItr.hasNext()) { - nIndexes++; - indexesItr.next(); - } + int nIndexes = indexes.size(); ByteArrayOutputStream stream = new ByteArrayOutputStream(); DataOutputStream output = new DataOutputStream(stream); try { @@ -217,21 +218,18 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { WritableUtils.writeVInt(output, nIndexes * (dataTable.getBucketNum() == null ? 1 : -1)); // Write out data row key schema once, since it's the same for all index maintainers dataTable.getRowKeySchema().write(output); - indexesItr = onlyLocalIndexes - ? maintainedLocalIndexes(indexes.iterator()) - : maintainedIndexes(indexes.iterator()); - while (indexesItr.hasNext()) { - org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer proto = IndexMaintainer.toProto(indexesItr.next().getIndexMaintainer(dataTable, connection)); - byte[] protoBytes = proto.toByteArray(); - WritableUtils.writeVInt(output, protoBytes.length); - output.write(protoBytes); + for (PTable index : indexes) { + org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer proto = IndexMaintainer.toProto(index.getIndexMaintainer(dataTable, connection)); + byte[] protoBytes = proto.toByteArray(); + WritableUtils.writeVInt(output, protoBytes.length); + output.write(protoBytes); } } catch (IOException e) { throw new RuntimeException(e); // Impossible } ptr.set(stream.toByteArray(), 0, stream.size()); } - + /** * For client-side to append serialized IndexMaintainers of keyValueIndexes * @param dataTable data table @@ -239,7 +237,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { * @param keyValueIndexes indexes to serialize */ public static void serializeAdditional(PTable table, ImmutableBytesWritable indexMetaDataPtr, - List<PTable> keyValueIndexes, PhoenixConnection connection) { + List<PTable> keyValueIndexes, PhoenixConnection connection) { int nMutableIndexes = indexMetaDataPtr.getLength() == 0 ? 0 : ByteUtil.vintFromBytes(indexMetaDataPtr); int nIndexes = nMutableIndexes + keyValueIndexes.size(); int estimatedSize = indexMetaDataPtr.getLength() + 1; // Just in case new size increases buffer @@ -273,53 +271,55 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } indexMetaDataPtr.set(stream.getBuffer(), 0, stream.size()); } - + public static List<IndexMaintainer> deserialize(ImmutableBytesWritable metaDataPtr, - KeyValueBuilder builder, boolean useProtoForIndexMaintainer) { + KeyValueBuilder builder, boolean useProtoForIndexMaintainer) { return deserialize(metaDataPtr.get(), metaDataPtr.getOffset(), metaDataPtr.getLength(), useProtoForIndexMaintainer); } - + public static List<IndexMaintainer> deserialize(byte[] buf, boolean useProtoForIndexMaintainer) { return deserialize(buf, 0, buf.length, useProtoForIndexMaintainer); } private static List<IndexMaintainer> deserialize(byte[] buf, int offset, int length, boolean useProtoForIndexMaintainer) { - ByteArrayInputStream stream = new ByteArrayInputStream(buf, offset, length); - DataInput input = new DataInputStream(stream); List<IndexMaintainer> maintainers = Collections.emptyList(); - try { - int size = WritableUtils.readVInt(input); - boolean isDataTableSalted = size < 0; - size = Math.abs(size); - RowKeySchema rowKeySchema = new RowKeySchema(); - rowKeySchema.readFields(input); - maintainers = Lists.newArrayListWithExpectedSize(size); - for (int i = 0; i < size; i++) { - if (useProtoForIndexMaintainer) { - int protoSize = WritableUtils.readVInt(input); - byte[] b = new byte[protoSize]; - input.readFully(b); - org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer proto = ServerCachingProtos.IndexMaintainer.parseFrom(b); - maintainers.add(IndexMaintainer.fromProto(proto, rowKeySchema, isDataTableSalted)); - } else { - IndexMaintainer maintainer = new IndexMaintainer(rowKeySchema, isDataTableSalted); - maintainer.readFields(input); - maintainers.add(maintainer); + if (length > 0) { + ByteArrayInputStream stream = new ByteArrayInputStream(buf, offset, length); + DataInput input = new DataInputStream(stream); + try { + int size = WritableUtils.readVInt(input); + boolean isDataTableSalted = size < 0; + size = Math.abs(size); + RowKeySchema rowKeySchema = new RowKeySchema(); + rowKeySchema.readFields(input); + maintainers = Lists.newArrayListWithExpectedSize(size); + for (int i = 0; i < size; i++) { + if (useProtoForIndexMaintainer) { + int protoSize = WritableUtils.readVInt(input); + byte[] b = new byte[protoSize]; + input.readFully(b); + org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer proto = ServerCachingProtos.IndexMaintainer.parseFrom(b); + maintainers.add(IndexMaintainer.fromProto(proto, rowKeySchema, isDataTableSalted)); + } else { + IndexMaintainer maintainer = new IndexMaintainer(rowKeySchema, isDataTableSalted); + maintainer.readFields(input); + maintainers.add(maintainer); + } } + } catch (IOException e) { + throw new RuntimeException(e); // Impossible } - } catch (IOException e) { - throw new RuntimeException(e); // Impossible } return maintainers; } - + private byte[] viewIndexId; private boolean isMultiTenant; // indexed expressions that are not present in the row key of the data table, the expression can also refer to a regular column private List<Expression> indexedExpressions; // columns required to evaluate all expressions in indexedExpressions (this does not include columns in the data row key) private Set<ColumnReference> indexedColumns; - + // columns required to create index row i.e. indexedColumns + coveredColumns (this does not include columns in the data row key) private Set<ColumnReference> allColumns; // TODO remove this in the next major release @@ -336,15 +336,15 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { // Transient state private final boolean isDataTableSalted; private final RowKeySchema dataRowKeySchema; - + private int estimatedIndexRowKeyBytes; private int estimatedExpressionSize; private int[] dataPkPosition; private int maxTrailingNulls; private ColumnReference dataEmptyKeyValueRef; private boolean rowKeyOrderOptimizable; - - /**** START: New member variables added in 4.10 *****/ + + /**** START: New member variables added in 4.10 *****/ private QualifierEncodingScheme encodingScheme; private ImmutableStorageScheme immutableStorageScheme; /* @@ -366,7 +366,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { this.dataRowKeySchema = dataRowKeySchema; this.isDataTableSalted = isDataTableSalted; } - + private IndexMaintainer(final PTable dataTable, final PTable index, PhoenixConnection connection) { this(dataTable.getRowKeySchema(), dataTable.getBucketNum() != null); this.rowKeyOrderOptimizable = index.rowKeyOrderOptimizable(); @@ -374,11 +374,11 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { this.viewIndexId = index.getViewIndexId() == null ? null : MetaDataUtil.getViewIndexIdDataType().toBytes(index.getViewIndexId()); this.isLocalIndex = index.getIndexType() == IndexType.LOCAL; this.encodingScheme = index.getEncodingScheme(); - + // null check for b/w compatibility this.encodingScheme = index.getEncodingScheme() == null ? QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : index.getEncodingScheme(); this.immutableStorageScheme = index.getImmutableStorageScheme() == null ? ImmutableStorageScheme.ONE_CELL_PER_COLUMN : index.getImmutableStorageScheme(); - + byte[] indexTableName = index.getPhysicalName().getBytes(); // Use this for the nDataSaltBuckets as we need this for local indexes // TODO: persist nDataSaltBuckets separately, but maintain b/w compat. @@ -390,16 +390,16 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { // number of expressions that are indexed that are not present in the row key of the data table int indexedExpressionCount = 0; for (int i = indexPosOffset; i<index.getPKColumns().size();i++) { - PColumn indexColumn = index.getPKColumns().get(i); - String indexColumnName = indexColumn.getName().getString(); + PColumn indexColumn = index.getPKColumns().get(i); + String indexColumnName = indexColumn.getName().getString(); String dataFamilyName = IndexUtil.getDataColumnFamilyName(indexColumnName); String dataColumnName = IndexUtil.getDataColumnName(indexColumnName); try { PColumn dataColumn = dataFamilyName.equals("") ? dataTable.getColumnForColumnName(dataColumnName) : dataTable.getColumnFamily(dataFamilyName).getPColumnForColumnName(dataColumnName); - if (SchemaUtil.isPKColumn(dataColumn)) + if (SchemaUtil.isPKColumn(dataColumn)) continue; } catch (ColumnNotFoundException e) { - // This column must be an expression + // This column must be an expression } catch (Exception e) { throw new IllegalArgumentException(e); } @@ -457,7 +457,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } StatementContext context = new StatementContext(new PhoenixStatement(connection), resolver); this.indexedColumnsInfo = Sets.newHashSetWithExpectedSize(nIndexColumns - nIndexPKColumns); - + IndexExpressionCompiler expressionIndexCompiler = new IndexExpressionCompiler(context); for (int i = indexPosOffset; i < index.getPKColumns().size(); i++) { PColumn indexColumn = index.getPKColumns().get(i); @@ -470,30 +470,30 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { throw new RuntimeException(e); // Impossible } if ( expressionIndexCompiler.getColumnRef()!=null ) { - // get the column of the data column that corresponds to this index column - PColumn column = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString()); - boolean isPKColumn = SchemaUtil.isPKColumn(column); - if (isPKColumn) { - int dataPkPos = dataTable.getPKColumns().indexOf(column) - (dataTable.getBucketNum() == null ? 0 : 1) - (this.isMultiTenant ? 1 : 0); - this.rowKeyMetaData.setIndexPkPosition(dataPkPos, indexPos); - indexedColumnsInfo.add(new Pair<>((String)null, column.getName().getString())); - } else { - indexColByteSize += column.getDataType().isFixedWidth() ? SchemaUtil.getFixedByteSize(column) : ValueSchema.ESTIMATED_VARIABLE_LENGTH_SIZE; - try { - // Surround constant with cast so that we can still know the original type. Otherwise, if we lose the type, - // (for example when VARCHAR becomes CHAR), it can lead to problems in the type translation we do between data tables and indexes. - if (column.isNullable() && ExpressionUtil.isConstant(expression)) { - expression = CoerceExpression.create(expression, indexColumn.getDataType()); - } + // get the column of the data column that corresponds to this index column + PColumn column = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString()); + boolean isPKColumn = SchemaUtil.isPKColumn(column); + if (isPKColumn) { + int dataPkPos = dataTable.getPKColumns().indexOf(column) - (dataTable.getBucketNum() == null ? 0 : 1) - (this.isMultiTenant ? 1 : 0); + this.rowKeyMetaData.setIndexPkPosition(dataPkPos, indexPos); + indexedColumnsInfo.add(new Pair<>((String)null, column.getName().getString())); + } else { + indexColByteSize += column.getDataType().isFixedWidth() ? SchemaUtil.getFixedByteSize(column) : ValueSchema.ESTIMATED_VARIABLE_LENGTH_SIZE; + try { + // Surround constant with cast so that we can still know the original type. Otherwise, if we lose the type, + // (for example when VARCHAR becomes CHAR), it can lead to problems in the type translation we do between data tables and indexes. + if (column.isNullable() && ExpressionUtil.isConstant(expression)) { + expression = CoerceExpression.create(expression, indexColumn.getDataType()); + } this.indexedExpressions.add(expression); indexedColumnsInfo.add(new Pair<>(column.getFamilyName().getString(), column.getName().getString())); } catch (SQLException e) { throw new RuntimeException(e); // Impossible } - } + } } else { - indexColByteSize += expression.getDataType().isFixedWidth() ? SchemaUtil.getFixedByteSize(expression) : ValueSchema.ESTIMATED_VARIABLE_LENGTH_SIZE; + indexColByteSize += expression.getDataType().isFixedWidth() ? SchemaUtil.getFixedByteSize(expression) : ValueSchema.ESTIMATED_VARIABLE_LENGTH_SIZE; this.indexedExpressions.add(expression); KeyValueExpressionVisitor kvVisitor = new KeyValueExpressionVisitor() { @Override @@ -523,7 +523,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { PColumn dataColumn = cf == null ? dataTable.getColumnForColumnQualifier(null, cq) : dataTable.getColumnFamily(cf) - .getPColumnForColumnQualifier(cq); + .getPColumnForColumnQualifier(cq); indexedColumnsInfo.add(new Pair<>(dataColumn.getFamilyName() .getString(), dataColumn.getName().getString())); } catch (ColumnNotFoundException | ColumnFamilyNotFoundException @@ -549,7 +549,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { if (dataColumn != null) { byte[] dataColumnCq = dataColumn.getColumnQualifierBytes(); byte[] indexColumnCq = indexColumn.getColumnQualifierBytes(); - this.coveredColumnsMap.put(new ColumnReference(dataColumn.getFamilyName().getBytes(), dataColumnCq), + this.coveredColumnsMap.put(new ColumnReference(dataColumn.getFamilyName().getBytes(), dataColumnCq), new ColumnReference(indexColumn.getFamilyName().getBytes(), indexColumnCq)); } } @@ -557,14 +557,14 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { this.estimatedIndexRowKeyBytes = estimateIndexRowKeyByteSize(indexColByteSize); initCachedState(); } - + public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable rowKeyPtr, byte[] regionStartKey, byte[] regionEndKey, long ts) { ImmutableBytesWritable ptr = new ImmutableBytesWritable(); boolean prependRegionStartKey = isLocalIndex && regionStartKey != null; boolean isIndexSalted = !isLocalIndex && nIndexSaltBuckets > 0; int prefixKeyLength = prependRegionStartKey ? (regionStartKey.length != 0 ? regionStartKey.length - : regionEndKey.length) : 0; + : regionEndKey.length) : 0; TrustedByteArrayOutputStream stream = new TrustedByteArrayOutputStream(estimatedIndexRowKeyBytes + (prependRegionStartKey ? prefixKeyLength : 0)); DataOutput output = new DataOutputStream(stream); try { @@ -588,11 +588,11 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { // Skip data table salt byte int maxRowKeyOffset = rowKeyPtr.getOffset() + rowKeyPtr.getLength(); dataRowKeySchema.iterator(rowKeyPtr, ptr, dataPosOffset); - + if (viewIndexId != null) { output.write(viewIndexId); } - + if (isMultiTenant) { dataRowKeySchema.next(ptr, dataPosOffset, maxRowKeyOffset); output.write(ptr.get(), ptr.getOffset(), ptr.getLength()); @@ -601,7 +601,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } dataPosOffset++; } - + // Write index row key for (int i = dataPosOffset; i < dataRowKeySchema.getFieldCount(); i++) { Boolean hasValue=dataRowKeySchema.next(ptr, i, maxRowKeyOffset); @@ -617,7 +617,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { dataRowKeyLocator[0][pos] = 0; dataRowKeyLocator[1][pos] = 0; } - } + } } BitSet descIndexColumnBitSet = rowKeyMetaData.getDescIndexColumnBitSet(); Iterator<Expression> expressionIterator = indexedExpressions.iterator(); @@ -626,11 +626,11 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { boolean isNullable; SortOrder dataSortOrder; if (dataPkPosition[i] == EXPRESSION_NOT_PRESENT) { - Expression expression = expressionIterator.next(); - dataColumnType = expression.getDataType(); - dataSortOrder = expression.getSortOrder(); + Expression expression = expressionIterator.next(); + dataColumnType = expression.getDataType(); + dataSortOrder = expression.getSortOrder(); isNullable = expression.isNullable(); - expression.evaluate(new ValueGetterTuple(valueGetter, ts), ptr); + expression.evaluate(new ValueGetterTuple(valueGetter, ts), ptr); } else { Field field = dataRowKeySchema.getField(dataPkPosition[i]); @@ -788,7 +788,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } } } - + /* * return the view index id from the index row key */ @@ -801,7 +801,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } private volatile RowKeySchema indexRowKeySchema; - + // We have enough information to generate the index row key schema private RowKeySchema generateIndexRowKeySchema() { int nIndexedColumns = getIndexPkColumnCount() + (isMultiTenant ? 1 : 0) + (!isLocalIndex && nIndexSaltBuckets > 0 ? 1 : 0) + (viewIndexId != null ? 1 : 0) - getNumViewConstants(); @@ -840,7 +840,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { public SortOrder getSortOrder() { return SortOrder.getDefault(); } - + }, false, SortOrder.getDefault()); } if (isMultiTenant) { @@ -848,7 +848,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { builder.addField(field, field.isNullable(), field.getSortOrder()); nIndexedColumns--; } - + Field[] indexFields = new Field[nIndexedColumns]; BitSet viewConstantColumnBitSet = this.rowKeyMetaData.getViewConstantColumnBitSet(); // Add Field for all data row pk columns @@ -858,9 +858,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { // same for all rows in this index) if (!viewConstantColumnBitSet.get(i)) { int pos = rowKeyMetaData.getIndexPkPosition(i-dataPosOffset); - indexFields[pos] = + indexFields[pos] = dataRowKeySchema.getField(i); - } + } } BitSet descIndexColumnBitSet = rowKeyMetaData.getDescIndexColumnBitSet(); Iterator<Expression> expressionItr = indexedExpressions.iterator(); @@ -916,12 +916,12 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { public SortOrder getSortOrder() { return sortOrder; } - + }, true, sortOrder); } return builder.build(); } - + private int getNumViewConstants() { BitSet bitSet = this.rowKeyMetaData.getViewConstantColumnBitSet(); int num = 0; @@ -942,7 +942,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } return indexRowKeySchema; } - + public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter valueGetter, ImmutableBytesWritable dataRowKeyPtr, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException { byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey, ts); Put put = null; @@ -957,9 +957,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { put = new Put(indexRowKey); // add the keyvalue for the empty row put.add(kvBuilder.buildPut(new ImmutableBytesPtr(indexRowKey), - this.getEmptyKeyValueFamily(), dataEmptyKeyValueRef.getQualifierWritable(), ts, - // set the value to the empty column name - dataEmptyKeyValueRef.getQualifierWritable())); + this.getEmptyKeyValueFamily(), dataEmptyKeyValueRef.getQualifierWritable(), ts, + QueryConstants.EMPTY_COLUMN_VALUE_BYTES_PTR)); put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); } ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey); @@ -993,22 +992,22 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { public boolean isNullable() { return false; } - + @Override public SortOrder getSortOrder() { return null; } - + @Override public Integer getScale() { return null; } - + @Override public Integer getMaxLength() { return null; } - + @Override public PDataType getDataType() { return null; @@ -1022,7 +1021,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { colValues[indexArrayPos] = new LiteralExpression(value); } } - + List<Expression> children = Arrays.asList(colValues); // we use SingleCellConstructorExpression to serialize multiple columns into a single byte[] SingleCellConstructorExpression singleCellConstructorExpression = new SingleCellConstructorExpression(immutableStorageScheme, children); @@ -1058,38 +1057,38 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { private DeleteType getDeleteTypeOrNull(Collection<? extends Cell> pendingUpdates) { return getDeleteTypeOrNull(pendingUpdates, this.nDataCFs); } - + private DeleteType getDeleteTypeOrNull(Collection<? extends Cell> pendingUpdates, int nCFs) { int nDeleteCF = 0; int nDeleteVersionCF = 0; for (Cell kv : pendingUpdates) { - if (kv.getTypeByte() == KeyValue.Type.DeleteFamilyVersion.getCode()) { + if (kv.getTypeByte() == KeyValue.Type.DeleteFamilyVersion.getCode()) { nDeleteVersionCF++; } - else if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode() - // Since we don't include the index rows in the change set for txn tables, we need to detect row deletes that have transformed by TransactionProcessor - || TransactionUtil.isDeleteFamily(kv)) { - nDeleteCF++; - } + else if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode() + // Since we don't include the index rows in the change set for txn tables, we need to detect row deletes that have transformed by TransactionProcessor + || TransactionUtil.isDeleteFamily(kv)) { + nDeleteCF++; + } } // This is what a delete looks like on the server side for mutable indexing... // Should all be one or the other for DeleteFamily versus DeleteFamilyVersion, but just in case not - DeleteType deleteType = null; + DeleteType deleteType = null; if (nDeleteVersionCF > 0 && nDeleteVersionCF >= nCFs) { - deleteType = DeleteType.SINGLE_VERSION; + deleteType = DeleteType.SINGLE_VERSION; } else { - int nDelete = nDeleteCF + nDeleteVersionCF; - if (nDelete>0 && nDelete >= nCFs) { - deleteType = DeleteType.ALL_VERSIONS; - } - } + int nDelete = nDeleteCF + nDeleteVersionCF; + if (nDelete>0 && nDelete >= nCFs) { + deleteType = DeleteType.ALL_VERSIONS; + } + } return deleteType; } - + public boolean isRowDeleted(Collection<? extends Cell> pendingUpdates) { return getDeleteTypeOrNull(pendingUpdates) != null; } - + public boolean isRowDeleted(Mutation m) { if (m.getFamilyCellMap().size() < this.nDataCFs) { return false; @@ -1106,42 +1105,42 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { if (pendingUpdates.isEmpty()) { return false; } - Map<ColumnReference,Cell> newState = Maps.newHashMapWithExpectedSize(pendingUpdates.size()); + Map<ColumnReference,Cell> newState = Maps.newHashMapWithExpectedSize(pendingUpdates.size()); for (Cell kv : pendingUpdates) { newState.put(new ColumnReference(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv)), kv); } for (ColumnReference ref : indexedColumns) { - Cell newValue = newState.get(ref); - if (newValue != null) { // Indexed column has potentially changed - ImmutableBytesWritable oldValue = oldState.getLatestValue(ref, ts); + Cell newValue = newState.get(ref); + if (newValue != null) { // Indexed column has potentially changed + ImmutableBytesWritable oldValue = oldState.getLatestValue(ref, ts); boolean newValueSetAsNull = (newValue.getTypeByte() == Type.DeleteColumn.getCode() || newValue.getTypeByte() == Type.Delete.getCode() || CellUtil.matchingValue(newValue, HConstants.EMPTY_BYTE_ARRAY)); - boolean oldValueSetAsNull = oldValue == null || oldValue.getLength() == 0; - //If the new column value has to be set as null and the older value is null too, - //then just skip to the next indexed column. - if (newValueSetAsNull && oldValueSetAsNull) { - continue; - } - if (oldValueSetAsNull || newValueSetAsNull) { - return true; - } - // If the old value is different than the new value, the index row needs to be deleted - if (Bytes.compareTo(oldValue.get(), oldValue.getOffset(), oldValue.getLength(), - newValue.getValueArray(), newValue.getValueOffset(), newValue.getValueLength()) != 0) { - return true; - } - } + boolean oldValueSetAsNull = oldValue == null || oldValue.getLength() == 0; + //If the new column value has to be set as null and the older value is null too, + //then just skip to the next indexed column. + if (newValueSetAsNull && oldValueSetAsNull) { + continue; + } + if (oldValueSetAsNull || newValueSetAsNull) { + return true; + } + // If the old value is different than the new value, the index row needs to be deleted + if (Bytes.compareTo(oldValue.get(), oldValue.getOffset(), oldValue.getLength(), + newValue.getValueArray(), newValue.getValueOffset(), newValue.getValueLength()) != 0) { + return true; + } + } } return false; } - - /** + + /** * Used for immutable indexes that only index PK column values. In that case, we can handle a data row deletion, * since we can build the corresponding index row key. */ public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ImmutableBytesWritable dataRowKeyPtr, long ts) throws IOException { return buildDeleteMutation(kvBuilder, null, dataRowKeyPtr, Collections.<KeyValue>emptyList(), ts, null, null); } - + public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter oldState, ImmutableBytesWritable dataRowKeyPtr, Collection<KeyValue> pendingUpdates, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException { byte[] indexRowKey = this.buildRowKey(oldState, dataRowKeyPtr, regionStartKey, regionEndKey, ts); // Delete the entire row if any of the indexed columns changed @@ -1149,7 +1148,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { if (oldState == null || (deleteType=getDeleteTypeOrNull(pendingUpdates)) != null || hasIndexedColumnChanged(oldState, pendingUpdates, ts)) { // Deleting the entire row byte[] emptyCF = emptyKeyValueCFPtr.copyBytesIfNecessary(); Delete delete = new Delete(indexRowKey); - + for (ColumnReference ref : getCoveredColumns()) { ColumnReference indexColumn = coveredColumnsMap.get(ref); // If table delete was single version, then index delete should be as well @@ -1175,12 +1174,12 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { ColumnReference ref = new ColumnReference(kv.getFamily(), kv.getQualifier()); if (dataTableColRefs.contains(ref)) { if (delete == null) { - delete = new Delete(indexRowKey); + delete = new Delete(indexRowKey); delete.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); } ColumnReference indexColumn = coveredColumnsMap.get(ref); // If point delete for data table, then use point delete for index as well - if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) { + if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) { delete.addColumn(indexColumn.getFamily(), indexColumn.getQualifier(), ts); } else { delete.addColumns(indexColumn.getFamily(), indexColumn.getQualifier(), ts); @@ -1190,11 +1189,11 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } return delete; } - + public byte[] getIndexTableName() { return indexTableName; } - + public Set<ColumnReference> getCoveredColumns() { return coveredColumnsMap.keySet(); } @@ -1202,14 +1201,14 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { public Set<ColumnReference> getAllColumns() { return allColumns; } - + public ImmutableBytesPtr getEmptyKeyValueFamily() { // Since the metadata of an index table will never change, // we can infer this based on the family of the first covered column // If if there are no covered columns, we know it's our default name return emptyKeyValueCFPtr; } - + @Deprecated // Only called by code older than our 4.10 release @Override public void readFields(DataInput input) throws IOException { @@ -1263,16 +1262,16 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { //TODO remove this in the next major release boolean isNewClient = false; if (len < 0) { - isNewClient = true; - len=Math.abs(len); + isNewClient = true; + len=Math.abs(len); } byte [] emptyKeyValueCF = new byte[len]; input.readFully(emptyKeyValueCF, 0, len); emptyKeyValueCFPtr = new ImmutableBytesPtr(emptyKeyValueCF); - + if (isNewClient) { int numIndexedExpressions = WritableUtils.readVInt(input); - indexedExpressions = Lists.newArrayListWithExpectedSize(numIndexedExpressions); + indexedExpressions = Lists.newArrayListWithExpectedSize(numIndexedExpressions); for (int i = 0; i < numIndexedExpressions; i++) { Expression expression = ExpressionType.values()[WritableUtils.readVInt(input)].newInstance(); expression.readFields(input); @@ -1287,27 +1286,27 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { ColumnReference colRef = colReferenceIter.next(); final PDataType dataType = dataTypeIter.next(); indexedExpressions.add(new KeyValueColumnExpression(new PDatum() { - + @Override public boolean isNullable() { return true; } - + @Override public SortOrder getSortOrder() { return SortOrder.getDefault(); } - + @Override public Integer getScale() { return null; } - + @Override public Integer getMaxLength() { return null; } - + @Override public PDataType getDataType() { return dataType; @@ -1315,7 +1314,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { }, colRef.getFamily(), colRef.getQualifier())); } } - + rowKeyMetaData = newRowKeyMetaData(); rowKeyMetaData.readFields(input); int nDataCFs = WritableUtils.readVInt(input); @@ -1330,8 +1329,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { this.encodingScheme = QualifierEncodingScheme.NON_ENCODED_QUALIFIERS; initCachedState(); } - - + + public static IndexMaintainer fromProto(ServerCachingProtos.IndexMaintainer proto, RowKeySchema dataTableRowKeySchema, boolean isDataTableSalted) throws IOException { IndexMaintainer maintainer = new IndexMaintainer(dataTableRowKeySchema, isDataTableSalted); maintainer.nIndexSaltBuckets = proto.getSaltBuckets(); @@ -1365,7 +1364,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { maintainer.rowKeyMetaData = newRowKeyMetaData(maintainer, dataTableRowKeySchema, maintainer.indexedExpressions.size(), isDataTableSalted, maintainer.isMultiTenant); try (ByteArrayInputStream stream = new ByteArrayInputStream(proto.getRowKeyMetadata().toByteArray())) { DataInput input = new DataInputStream(stream); - maintainer.rowKeyMetaData.readFields(input); + maintainer.rowKeyMetaData.readFields(input); } maintainer.nDataCFs = proto.getNumDataTableColFamilies(); maintainer.indexWALDisabled = proto.getIndexWalDisabled(); @@ -1380,7 +1379,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { maintainer.encodingScheme = PTable.QualifierEncodingScheme.fromSerializedValue((byte)proto.getEncodingScheme()); maintainer.immutableStorageScheme = PTable.ImmutableStorageScheme.fromSerializedValue((byte)proto.getImmutableStorageScheme()); maintainer.isLocalIndex = proto.getIsLocalIndex(); - + List<ServerCachingProtos.ColumnReference> dataTableColRefsForCoveredColumnsList = proto.getDataTableColRefForCoveredColumnsList(); List<ServerCachingProtos.ColumnReference> indexTableColRefsForCoveredColumnsList = proto.getIndexTableColRefForCoveredColumnsList(); maintainer.coveredColumnsMap = Maps.newHashMapWithExpectedSize(dataTableColRefsForCoveredColumnsList.size()); @@ -1390,7 +1389,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { ColumnReference dataTableColRef = new ColumnReference(colRefFromProto.getFamily().toByteArray(), colRefFromProto.getQualifier( ).toByteArray()); ColumnReference indexTableColRef; if (encodedColumnNames) { - ServerCachingProtos.ColumnReference fromProto = indexTableColRefItr.next(); + ServerCachingProtos.ColumnReference fromProto = indexTableColRefItr.next(); indexTableColRef = new ColumnReference(fromProto.getFamily().toByteArray(), fromProto.getQualifier( ).toByteArray()); } else { byte[] cq = IndexUtil.getIndexColumnName(dataTableColRef.getFamily(), dataTableColRef.getQualifier()); @@ -1402,7 +1401,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { maintainer.initCachedState(); return maintainer; } - + @Deprecated // Only called by code older than our 4.10 release @Override public void write(DataOutput output) throws IOException { @@ -1436,20 +1435,20 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { // when indexedColumnTypes is removed, remove this WritableUtils.writeVInt(output,-emptyKeyValueCFPtr.getLength()); output.write(emptyKeyValueCFPtr.get(),emptyKeyValueCFPtr.getOffset(), emptyKeyValueCFPtr.getLength()); - + WritableUtils.writeVInt(output, indexedExpressions.size()); for (Expression expression : indexedExpressions) { WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal()); expression.write(output); } - + rowKeyMetaData.write(output); // Encode indexWALDisabled in nDataCFs WritableUtils.writeVInt(output, (nDataCFs + 1) * (indexWALDisabled ? -1 : 1)); // Encode estimatedIndexRowKeyBytes and immutableRows together. WritableUtils.writeVInt(output, estimatedIndexRowKeyBytes * (immutableRows ? -1 : 1)); } - + public static ServerCachingProtos.IndexMaintainer toProto(IndexMaintainer maintainer) throws IOException { ServerCachingProtos.IndexMaintainer.Builder builder = ServerCachingProtos.IndexMaintainer.newBuilder(); builder.setSaltBuckets(maintainer.nIndexSaltBuckets); @@ -1555,12 +1554,12 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { size += estimatedExpressionSize; return size; } - + private int estimateIndexRowKeyByteSize(int indexColByteSize) { int estimatedIndexRowKeyBytes = indexColByteSize + dataRowKeySchema.getEstimatedValueLength() + (nIndexSaltBuckets == 0 || isLocalIndex || this.isDataTableSalted ? 0 : SaltingUtil.NUM_SALTING_BYTES); return estimatedIndexRowKeyBytes; } - + /** * Init calculated state reading/creating */ @@ -1571,12 +1570,12 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { // columns that are required to evaluate all expressions in indexedExpressions (not including columns in data row key) this.indexedColumns = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size()); for (Expression expression : indexedExpressions) { - KeyValueExpressionVisitor visitor = new KeyValueExpressionVisitor() { + KeyValueExpressionVisitor visitor = new KeyValueExpressionVisitor() { @Override public Void visit(KeyValueColumnExpression expression) { - if (indexedColumns.add(new ColumnReference(expression.getColumnFamily(), expression.getColumnQualifier()))) { - indexedColumnTypes.add(expression.getDataType()); - } + if (indexedColumns.add(new ColumnReference(expression.getColumnFamily(), expression.getColumnQualifier()))) { + indexedColumnTypes.add(expression.getDataType()); + } return null; } }; @@ -1590,7 +1589,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { allColumns.add(new ColumnReference(colRef.getFamily(), QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES)); } } - + int dataPkOffset = (isDataTableSalted ? 1 : 0) + (isMultiTenant ? 1 : 0); int nIndexPkColumns = getIndexPkColumnCount(); dataPkPosition = new int[nIndexPkColumns]; @@ -1605,7 +1604,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { numViewConstantColumns++; } } - + // Calculate the max number of trailing nulls that we should get rid of after building the index row key. // We only get rid of nulls for variable length types, so we have to be careful to consider the type of the // index table, not the data type of the data table @@ -1635,15 +1634,15 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { private int getIndexPkColumnCount() { return getIndexPkColumnCount(dataRowKeySchema, indexedExpressions.size(), isDataTableSalted, isMultiTenant); } - + private static int getIndexPkColumnCount(RowKeySchema rowKeySchema, int numIndexExpressions, boolean isDataTableSalted, boolean isMultiTenant) { return rowKeySchema.getFieldCount() + numIndexExpressions - (isDataTableSalted ? 1 : 0) - (isMultiTenant ? 1 : 0); } - + private RowKeyMetaData newRowKeyMetaData() { return getIndexPkColumnCount() < 0xFF ? new ByteSizeRowKeyMetaData() : new IntSizedRowKeyMetaData(); } - + private static RowKeyMetaData newRowKeyMetaData(IndexMaintainer i, RowKeySchema rowKeySchema, int numIndexExpressions, boolean isDataTableSalted, boolean isMultiTenant) { int indexPkColumnCount = getIndexPkColumnCount(rowKeySchema, numIndexExpressions, isDataTableSalted, isMultiTenant); return indexPkColumnCount < 0xFF ? i.new ByteSizeRowKeyMetaData() : i.new IntSizedRowKeyMetaData(); @@ -1659,26 +1658,26 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { output.write(b); } } - + private abstract class RowKeyMetaData implements Writable { private BitSet descIndexColumnBitSet; private BitSet viewConstantColumnBitSet; - + private RowKeyMetaData() { } - + private RowKeyMetaData(int nIndexedColumns) { descIndexColumnBitSet = BitSet.withCapacity(nIndexedColumns); viewConstantColumnBitSet = BitSet.withCapacity(dataRowKeySchema.getMaxFields()); // Size based on number of data PK columns - } - + } + protected int getByteSize() { return BitSet.getByteSize(getIndexPkColumnCount()) * 3 + BitSet.getByteSize(dataRowKeySchema.getMaxFields()); } - + protected abstract int getIndexPkPosition(int dataPkPosition); protected abstract int setIndexPkPosition(int dataPkPosition, int indexPkPosition); - + @Override public void readFields(DataInput input) throws IOException { int length = getIndexPkColumnCount(); @@ -1686,7 +1685,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { int vclength = dataRowKeySchema.getMaxFields(); viewConstantColumnBitSet = BitSet.read(input, vclength); } - + @Override public void write(DataOutput output) throws IOException { int length = getIndexPkColumnCount(); @@ -1703,12 +1702,12 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { return viewConstantColumnBitSet; } } - + private static int BYTE_OFFSET = 127; - + private class ByteSizeRowKeyMetaData extends RowKeyMetaData { private byte[] indexPkPosition; - + private ByteSizeRowKeyMetaData() { } @@ -1716,7 +1715,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { super(nIndexedColumns); this.indexPkPosition = new byte[nIndexedColumns]; } - + @Override protected int getIndexPkPosition(int dataPkPosition) { // Use offset for byte so that we can get full range of 0 - 255 @@ -1748,10 +1747,10 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { input.readFully(indexPkPosition); } } - + private class IntSizedRowKeyMetaData extends RowKeyMetaData { private int[] indexPkPosition; - + private IntSizedRowKeyMetaData() { } @@ -1759,7 +1758,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { super(nIndexedColumns); this.indexPkPosition = new int[nIndexedColumns]; } - + @Override protected int getIndexPkPosition(int dataPkPosition) { return this.indexPkPosition[dataPkPosition]; @@ -1769,7 +1768,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { protected int setIndexPkPosition(int dataPkPosition, int indexPkPosition) { return this.indexPkPosition[dataPkPosition] = indexPkPosition; } - + @Override public void write(DataOutput output) throws IOException { super.write(output); @@ -1814,7 +1813,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } @Override public byte[] getRowKey() { - return rowKey; + return rowKey; } }; } @@ -1822,15 +1821,15 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { public byte[] getDataEmptyKeyValueCF() { return dataEmptyKeyValueCF; } - + public boolean isLocalIndex() { return isLocalIndex; } - + public boolean isImmutableRows() { return immutableRows; } - + public Set<ColumnReference> getIndexedColumns() { return indexedColumns; } @@ -1849,22 +1848,22 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } return super.visitEnter(node); } - + public Map<String, UDFParseNode> getUdfParseNodes() { return udfParseNodes; } } - + public byte[] getEmptyKeyValueQualifier() { return dataEmptyKeyValueRef.getQualifier(); } - + public Set<Pair<String, String>> getIndexedColumnInfo() { return indexedColumnsInfo; } - + public ImmutableStorageScheme getIndexStorageScheme() { return immutableStorageScheme; } - + } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java index 6093edd..11c412c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java @@ -71,7 +71,6 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr @Override public RecordReader<NullWritable,T> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { - final Configuration configuration = context.getConfiguration(); final QueryPlan queryPlan = getQueryPlan(context,configuration); @SuppressWarnings("unchecked") @@ -163,7 +162,7 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr * @throws IOException * @throws SQLException */ - private QueryPlan getQueryPlan(final JobContext context, final Configuration configuration) + protected QueryPlan getQueryPlan(final JobContext context, final Configuration configuration) throws IOException { Preconditions.checkNotNull(context); try { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java index 58c048b..b7e1373 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java @@ -60,8 +60,8 @@ import com.google.common.collect.Lists; public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<NullWritable,T> { private static final Log LOG = LogFactory.getLog(PhoenixRecordReader.class); - private final Configuration configuration; - private final QueryPlan queryPlan; + protected final Configuration configuration; + protected final QueryPlan queryPlan; private NullWritable key = NullWritable.get(); private T value = null; private Class<T> inputClass; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java new file mode 100644 index 0000000..f8ec393 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.phoenix.compile.*; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.iterate.MapReduceParallelScanGrouper; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.mapreduce.util.ConnectionUtil; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.schema.*; +import org.apache.phoenix.util.*; + +import com.google.common.base.Preconditions; +import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolDataTableName; +import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolIndexTableName; + +/** + * {@link InputFormat} implementation from Phoenix for building index + * + */ +public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends PhoenixInputFormat { + QueryPlan queryPlan = null; + + private static final Log LOG = LogFactory.getLog(PhoenixServerBuildIndexInputFormat.class); + + /** + * instantiated by framework + */ + public PhoenixServerBuildIndexInputFormat() { + } + + @Override + protected QueryPlan getQueryPlan(final JobContext context, final Configuration configuration) + throws IOException { + Preconditions.checkNotNull(context); + if (queryPlan != null) { + return queryPlan; + } + final String txnScnValue = configuration.get(PhoenixConfigurationUtil.TX_SCN_VALUE); + final String currentScnValue = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE); + final String tenantId = configuration.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID); + final Properties overridingProps = new Properties(); + if(txnScnValue==null && currentScnValue!=null) { + overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue); + overridingProps.put(PhoenixRuntime.BUILD_INDEX_AT_ATTRIB, currentScnValue); + } + if (tenantId != null && configuration.get(PhoenixRuntime.TENANT_ID_ATTRIB) == null){ + overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); + } + String dataTableFullName = getIndexToolDataTableName(configuration); + String indexTableFullName = getIndexToolIndexTableName(configuration); + + try (final Connection connection = ConnectionUtil.getInputConnection(configuration, overridingProps)) { + PhoenixConnection phoenixConnection = connection.unwrap(PhoenixConnection.class); + Long scn = (currentScnValue != null) ? Long.valueOf(currentScnValue) : EnvironmentEdgeManager.currentTimeMillis(); + PTable indexTable = PhoenixRuntime.getTableNoCache(phoenixConnection, indexTableFullName); + ServerBuildIndexCompiler compiler = + new ServerBuildIndexCompiler(phoenixConnection, dataTableFullName); + MutationPlan plan = compiler.compile(indexTable); + Scan scan = plan.getContext().getScan(); + + try { + scan.setTimeRange(0, scn); + } catch (IOException e) { + throw new SQLException(e); + } + queryPlan = plan.getQueryPlan(); + // since we can't set a scn on connections with txn set TX_SCN attribute so that the max time range is set by BaseScannerRegionObserver + if (txnScnValue != null) { + scan.setAttribute(BaseScannerRegionObserver.TX_SCN, Bytes.toBytes(Long.valueOf(txnScnValue))); + } + + // Initialize the query plan so it sets up the parallel scans + queryPlan.iterator(MapReduceParallelScanGrouper.getInstance()); + return queryPlan; + } catch (Exception exception) { + LOG.error(String.format("Failed to get the query plan with error [%s]", + exception.getMessage())); + throw new RuntimeException(exception); + } + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java index 355cc53..a665a91 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java @@ -28,7 +28,10 @@ import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -74,6 +77,7 @@ import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixResultSet; import org.apache.phoenix.mapreduce.CsvBulkImportUtil; +import org.apache.phoenix.mapreduce.PhoenixServerBuildIndexInputFormat; import org.apache.phoenix.mapreduce.index.SourceTargetColumnNames.DataSourceColNames; import org.apache.phoenix.mapreduce.util.ColumnInfoToStringEncoderDecoder; import org.apache.phoenix.mapreduce.util.ConnectionUtil; @@ -106,6 +110,22 @@ public class IndexTool extends Configured implements Tool { private static final Logger LOG = LoggerFactory.getLogger(IndexTool.class); + private String schemaName; + private String dataTable; + private String indexTable; + private boolean isPartialBuild; + private String qDataTable; + private String qIndexTable; + private boolean useDirectApi; + private boolean useSnapshot; + private boolean isLocalIndexBuild; + private PTable pIndexTable; + private PTable pDataTable; + private String tenantId; + private Job job; + + + private static final Option SCHEMA_NAME_OPTION = new Option("s", "schema", true, "Phoenix schema name (optional)"); private static final Option DATA_TABLE_OPTION = new Option("dt", "data-table", true, @@ -247,18 +267,30 @@ public class IndexTool extends Configured implements Tool { } - public Job getJob(String schemaName, String indexTable, String dataTable, boolean useDirectApi, boolean isPartialBuild, - boolean useSnapshot, String tenantId) throws Exception { + public Job getJob() throws Exception { if (isPartialBuild) { - return configureJobForPartialBuild(schemaName, dataTable, tenantId); + return configureJobForPartialBuild(); } else { - return configureJobForAsyncIndex(schemaName, indexTable, dataTable, useDirectApi, useSnapshot, tenantId); + long maxTimeRange = pIndexTable.getTimeStamp() + 1; + // this is set to ensure index tables remains consistent post population. + + if (pDataTable.isTransactional()) { + configuration.set(PhoenixConfigurationUtil.TX_SCN_VALUE, + Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange))); + } + configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE, + Long.toString(maxTimeRange)); + if (useSnapshot || !useDirectApi || (!isLocalIndexBuild && pDataTable.isTransactional())) { + return configureJobForAysncIndex(); + } + else { + //Local and non-transactional global indexes to be built on the server side + return configureJobForServerBuildIndex(); + } } } - - private Job configureJobForPartialBuild(String schemaName, String dataTable, String tenantId) throws Exception { - final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable); - final PTable pdataTable = PhoenixRuntime.getTable(connection, qDataTable); + + private Job configureJobForPartialBuild() throws Exception { connection = ConnectionUtil.getInputConnection(configuration); long minDisableTimestamp = HConstants.LATEST_TIMESTAMP; PTable indexWithMinDisableTimestamp = null; @@ -266,7 +298,7 @@ public class IndexTool extends Configured implements Tool { //Get Indexes in building state, minDisabledTimestamp List<String> disableIndexes = new ArrayList<String>(); List<PTable> disabledPIndexes = new ArrayList<PTable>(); - for (PTable index : pdataTable.getIndexes()) { + for (PTable index : pDataTable.getIndexes()) { if (index.getIndexState().equals(PIndexState.BUILDING)) { disableIndexes.add(index.getTableName().getString()); disabledPIndexes.add(index); @@ -299,10 +331,10 @@ public class IndexTool extends Configured implements Tool { //serialize index maintaienr in job conf with Base64 TODO: Need to find better way to serialize them in conf. List<IndexMaintainer> maintainers = Lists.newArrayListWithExpectedSize(disabledPIndexes.size()); for (PTable index : disabledPIndexes) { - maintainers.add(index.getIndexMaintainer(pdataTable, connection.unwrap(PhoenixConnection.class))); + maintainers.add(index.getIndexMaintainer(pDataTable, connection.unwrap(PhoenixConnection.class))); } ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY); - IndexMaintainer.serializeAdditional(pdataTable, indexMetaDataPtr, disabledPIndexes, connection.unwrap(PhoenixConnection.class)); + IndexMaintainer.serializeAdditional(pDataTable, indexMetaDataPtr, disabledPIndexes, connection.unwrap(PhoenixConnection.class)); PhoenixConfigurationUtil.setIndexMaintainers(configuration, indexMetaDataPtr); if (tenantId != null) { PhoenixConfigurationUtil.setTenantId(configuration, tenantId); @@ -313,15 +345,15 @@ public class IndexTool extends Configured implements Tool { scan.setTimeRange(minDisableTimestamp - 1, maxTimestamp); scan.setRaw(true); scan.setCacheBlocks(false); - if (pdataTable.isTransactional()) { - long maxTimeRange = pdataTable.getTimeStamp() + 1; + if (pDataTable.isTransactional()) { + long maxTimeRange = pDataTable.getTimeStamp() + 1; scan.setAttribute(BaseScannerRegionObserver.TX_SCN, Bytes.toBytes(Long.valueOf(Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange))))); } - String physicalTableName=pdataTable.getPhysicalName().getString(); - final String jobName = String.format("Phoenix Indexes build for " + pdataTable.getName().toString()); + String physicalTableName=pDataTable.getPhysicalName().getString(); + final String jobName = String.format("Phoenix Indexes build for " + pDataTable.getName().toString()); PhoenixConfigurationUtil.setInputTableName(configuration, qDataTable); PhoenixConfigurationUtil.setPhysicalTableName(configuration, physicalTableName); @@ -338,7 +370,7 @@ public class IndexTool extends Configured implements Tool { null, job); TableMapReduceUtil.initCredentials(job); TableInputFormat.configureSplitTable(job, TableName.valueOf(physicalTableName)); - return configureSubmittableJobUsingDirectApi(job, true); + return configureSubmittableJobUsingDirectApi(job); } private long getMaxRebuildAsyncDate(String schemaName, List<String> disableIndexes) throws SQLException { @@ -368,39 +400,15 @@ public class IndexTool extends Configured implements Tool { } - private Job configureJobForAsyncIndex(String schemaName, String indexTable, String dataTable, boolean useDirectApi, boolean useSnapshot, String tenantId) - throws Exception { - final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable); - final String qIndexTable; - if (schemaName != null && !schemaName.isEmpty()) { - qIndexTable = SchemaUtil.getQualifiedTableName(schemaName, indexTable); - } else { - qIndexTable = indexTable; - } - final PTable pdataTable = PhoenixRuntime.getTable(connection, qDataTable); - - final PTable pindexTable = PhoenixRuntime.getTable(connection, qIndexTable); - - long maxTimeRange = pindexTable.getTimeStamp() + 1; - // this is set to ensure index tables remains consistent post population. + private Job configureJobForAysncIndex() - if (pdataTable.isTransactional()) { - configuration.set(PhoenixConfigurationUtil.TX_SCN_VALUE, - Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange))); - } - configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE, - Long.toString(maxTimeRange)); - - // check if the index type is LOCAL, if so, derive and set the physicalIndexName that is - // computed from the qDataTable name. - String physicalIndexTable = pindexTable.getPhysicalName().getString(); - + throws Exception { + String physicalIndexTable = pIndexTable.getPhysicalName().getString(); final PhoenixConnection pConnection = connection.unwrap(PhoenixConnection.class); final PostIndexDDLCompiler ddlCompiler = - new PostIndexDDLCompiler(pConnection, new TableRef(pdataTable)); - ddlCompiler.compile(pindexTable); - + new PostIndexDDLCompiler(pConnection, new TableRef(pDataTable)); + ddlCompiler.compile(pIndexTable); final List<String> indexColumns = ddlCompiler.getIndexColumnNames(); final String selectQuery = ddlCompiler.getSelectQuery(); final String upsertQuery = @@ -409,6 +417,7 @@ public class IndexTool extends Configured implements Tool { configuration.set(PhoenixConfigurationUtil.UPSERT_STATEMENT, upsertQuery); PhoenixConfigurationUtil.setPhysicalTableName(configuration, physicalIndexTable); PhoenixConfigurationUtil.setDisableIndexes(configuration, indexTable); + PhoenixConfigurationUtil.setUpsertColumnNames(configuration, indexColumns.toArray(new String[indexColumns.size()])); if (tenantId != null) { @@ -418,25 +427,22 @@ public class IndexTool extends Configured implements Tool { PhoenixRuntime.generateColumnInfo(connection, qIndexTable, indexColumns); ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList); + fs = outputPath.getFileSystem(configuration); + fs.delete(outputPath, true); final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable); final Job job = Job.getInstance(configuration, jobName); job.setJarByClass(IndexTool.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); - if (outputPath != null) { - fs = outputPath.getFileSystem(configuration); - fs.delete(outputPath, true); - FileOutputFormat.setOutputPath(job, outputPath); - } + FileOutputFormat.setOutputPath(job, outputPath); if (!useSnapshot) { - PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, qDataTable, - selectQuery); + PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, qDataTable, selectQuery); } else { HBaseAdmin admin = null; String snapshotName; try { admin = pConnection.getQueryServices().getAdmin(); - String pdataTableName = pdataTable.getName().getString(); + String pdataTableName = pDataTable.getName().getString(); snapshotName = new StringBuilder(pdataTableName).append("-Snapshot").toString(); admin.snapshot(snapshotName, TableName.valueOf(pdataTableName)); } finally { @@ -451,17 +457,47 @@ public class IndexTool extends Configured implements Tool { // set input for map reduce job using hbase snapshots PhoenixMapReduceUtil - .setInput(job, PhoenixIndexDBWritable.class, snapshotName, qDataTable, restoreDir, selectQuery); + .setInput(job, PhoenixIndexDBWritable.class, snapshotName, qDataTable, restoreDir, selectQuery); } TableMapReduceUtil.initCredentials(job); if (useDirectApi) { - return configureSubmittableJobUsingDirectApi(job, false); + job.setMapperClass(PhoenixIndexImportDirectMapper.class); + return configureSubmittableJobUsingDirectApi(job); } else { return configureRunnableJobUsingBulkLoad(job, outputPath); - } - + } + + private Job configureJobForServerBuildIndex() + throws Exception { + + PhoenixConfigurationUtil.setIndexToolDataTableName(configuration, qDataTable); + PhoenixConfigurationUtil.setIndexToolIndexTableName(configuration, qIndexTable); + + String physicalIndexTable = pIndexTable.getPhysicalName().getString(); + + PhoenixConfigurationUtil.setPhysicalTableName(configuration, physicalIndexTable); + PhoenixConfigurationUtil.setDisableIndexes(configuration, indexTable); + if (tenantId != null) { + PhoenixConfigurationUtil.setTenantId(configuration, tenantId); + } + + fs = outputPath.getFileSystem(configuration); + fs.delete(outputPath, true); + + final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable); + final Job job = Job.getInstance(configuration, jobName); + job.setJarByClass(IndexTool.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + FileOutputFormat.setOutputPath(job, outputPath); + + PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, PhoenixServerBuildIndexInputFormat.class, + qDataTable, ""); + + TableMapReduceUtil.initCredentials(job); + job.setMapperClass(PhoenixServerBuildIndexMapper.class); + return configureSubmittableJobUsingDirectApi(job); } /** @@ -495,12 +531,9 @@ public class IndexTool extends Configured implements Tool { * @return * @throws Exception */ - private Job configureSubmittableJobUsingDirectApi(Job job, boolean isPartialRebuild) + private Job configureSubmittableJobUsingDirectApi(Job job) throws Exception { - if (!isPartialRebuild) { - //Don't configure mapper for partial build as it is configured already - job.setMapperClass(PhoenixIndexImportDirectMapper.class); - } + job.setReducerClass(PhoenixIndexImportDirectReducer.class); Configuration conf = job.getConfiguration(); HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); @@ -519,6 +552,10 @@ public class IndexTool extends Configured implements Tool { } + public Job getJob() { + return job; + } + @Override public int run(String[] args) throws Exception { Connection connection = null; @@ -531,64 +568,75 @@ public class IndexTool extends Configured implements Tool { printHelpAndExit(e.getMessage(), getOptions()); } final Configuration configuration = HBaseConfiguration.addHbaseResources(getConf()); - final String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt()); - final String dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt()); - final String indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt()); - final boolean isPartialBuild = cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()); - final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable); - boolean useDirectApi = cmdLine.hasOption(DIRECT_API_OPTION.getOpt()); - String basePath=cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt()); - boolean isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt()); - boolean useSnapshot = cmdLine.hasOption(SNAPSHOT_OPTION.getOpt()); boolean useTenantId = cmdLine.hasOption(TENANT_ID_OPTION.getOpt()); - byte[][] splitKeysBeforeJob = null; - boolean isLocalIndexBuild = false; - PTable pindexTable = null; - String tenantId = null; + tenantId = null; if (useTenantId) { tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt()); configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); } connection = ConnectionUtil.getInputConnection(configuration); + schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt()); + dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt()); + indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt()); + isPartialBuild = cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()); + qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable); + pDataTable = PhoenixRuntime.getTableNoCache(connection, qDataTable); + useDirectApi = cmdLine.hasOption(DIRECT_API_OPTION.getOpt()); + String basePath=cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt()); + boolean isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt()); + useSnapshot = cmdLine.hasOption(SNAPSHOT_OPTION.getOpt()); + + byte[][] splitKeysBeforeJob = null; + isLocalIndexBuild = false; + pIndexTable = null; + + connection = ConnectionUtil.getInputConnection(configuration); + if (indexTable != null) { if (!isValidIndexTable(connection, qDataTable,indexTable, tenantId)) { throw new IllegalArgumentException(String.format( " %s is not an index table for %s for this connection", indexTable, qDataTable)); } - pindexTable = PhoenixRuntime.getTable(connection, schemaName != null && !schemaName.isEmpty() + pIndexTable = PhoenixRuntime.getTable(connection, schemaName != null && !schemaName.isEmpty() ? SchemaUtil.getQualifiedTableName(schemaName, indexTable) : indexTable); + + if (schemaName != null && !schemaName.isEmpty()) { + qIndexTable = SchemaUtil.getQualifiedTableName(schemaName, indexTable); + } else { + qIndexTable = indexTable; + } htable = (HTable)connection.unwrap(PhoenixConnection.class).getQueryServices() - .getTable(pindexTable.getPhysicalName().getBytes()); - if (IndexType.LOCAL.equals(pindexTable.getIndexType())) { + .getTable(pIndexTable.getPhysicalName().getBytes()); + + if (IndexType.LOCAL.equals(pIndexTable.getIndexType())) { isLocalIndexBuild = true; splitKeysBeforeJob = htable.getRegionLocator().getStartKeys(); } // presplit the index table boolean autosplit = cmdLine.hasOption(AUTO_SPLIT_INDEX_OPTION.getOpt()); - boolean isSalted = pindexTable.getBucketNum() != null; // no need to split salted tables - if (!isSalted && IndexType.GLOBAL.equals(pindexTable.getIndexType()) + boolean isSalted = pIndexTable.getBucketNum() != null; // no need to split salted tables + if (!isSalted && IndexType.GLOBAL.equals(pIndexTable.getIndexType()) && (autosplit || cmdLine.hasOption(SPLIT_INDEX_OPTION.getOpt()))) { String nOpt = cmdLine.getOptionValue(AUTO_SPLIT_INDEX_OPTION.getOpt()); int autosplitNumRegions = nOpt == null ? DEFAULT_AUTOSPLIT_NUM_REGIONS : Integer.parseInt(nOpt); String rateOpt = cmdLine.getOptionValue(SPLIT_INDEX_OPTION.getOpt()); double samplingRate = rateOpt == null ? DEFAULT_SPLIT_SAMPLING_RATE : Double.parseDouble(rateOpt); LOG.info(String.format("Will split index %s , autosplit=%s , autoSplitNumRegions=%s , samplingRate=%s", indexTable, autosplit, autosplitNumRegions, samplingRate)); - splitIndexTable(connection.unwrap(PhoenixConnection.class), qDataTable, pindexTable, autosplit, autosplitNumRegions, samplingRate); + + splitIndexTable(connection.unwrap(PhoenixConnection.class), autosplit, autosplitNumRegions, samplingRate, configuration); } } - - PTable pdataTable = PhoenixRuntime.getTableNoCache(connection, qDataTable); Path outputPath = null; FileSystem fs = null; if (basePath != null) { - outputPath = CsvBulkImportUtil.getOutputPath(new Path(basePath), pindexTable == null - ? pdataTable.getPhysicalName().getString() : pindexTable.getPhysicalName().getString()); + outputPath = CsvBulkImportUtil.getOutputPath(new Path(basePath), pIndexTable == null + ? pDataTable.getPhysicalName().getString() : pIndexTable.getPhysicalName().getString()); fs = outputPath.getFileSystem(configuration); fs.delete(outputPath, true); } - - Job job = new JobFactory(connection, configuration, outputPath).getJob(schemaName, indexTable, dataTable, - useDirectApi, isPartialBuild, useSnapshot, tenantId); + + job = new JobFactory(connection, configuration, outputPath).getJob(); + if (!isForeground && useDirectApi) { LOG.info("Running Index Build in Background - Submit async and exit"); job.submit(); @@ -634,32 +682,29 @@ public class IndexTool extends Configured implements Tool { } } - - private void splitIndexTable(PhoenixConnection pConnection, String qDataTable, - PTable pindexTable, boolean autosplit, int autosplitNumRegions, double samplingRate) + private void splitIndexTable(PhoenixConnection pConnection, boolean autosplit, int autosplitNumRegions, double samplingRate, Configuration configuration) throws SQLException, IOException, IllegalArgumentException, InterruptedException { - final PTable pdataTable = PhoenixRuntime.getTable(pConnection, qDataTable); int numRegions; try (HTable hDataTable = (HTable) pConnection.getQueryServices() - .getTable(pdataTable.getPhysicalName().getBytes())) { + .getTable(pDataTable.getPhysicalName().getBytes())) { numRegions = hDataTable.getRegionLocator().getStartKeys().length; if (autosplit && !(numRegions > autosplitNumRegions)) { LOG.info(String.format( "Will not split index %s because the data table only has %s regions, autoSplitNumRegions=%s", - pindexTable.getPhysicalName(), numRegions, autosplitNumRegions)); + pIndexTable.getPhysicalName(), numRegions, autosplitNumRegions)); return; // do nothing if # of regions is too low } } // build a tablesample query to fetch index column values from the data table - DataSourceColNames colNames = new DataSourceColNames(pdataTable, pindexTable); + DataSourceColNames colNames = new DataSourceColNames(pDataTable, pIndexTable); String qTableSample = String.format(qDataTable + " TABLESAMPLE(%.2f)", samplingRate); List<String> dataColNames = colNames.getDataColNames(); final String dataSampleQuery = QueryUtil.constructSelectStatement(qTableSample, dataColNames, null, Hint.NO_INDEX, true); - IndexMaintainer maintainer = IndexMaintainer.create(pdataTable, pindexTable, pConnection); + IndexMaintainer maintainer = IndexMaintainer.create(pDataTable, pIndexTable, pConnection); ImmutableBytesWritable dataRowKeyPtr = new ImmutableBytesWritable(); try (final PhoenixResultSet rs = pConnection.createStatement().executeQuery(dataSampleQuery) @@ -683,7 +728,7 @@ public class IndexTool extends Configured implements Tool { splitPoints[splitIdx++] = b.getRightBoundExclusive(); } // drop table and recreate with appropriate splits - TableName indexTN = TableName.valueOf(pindexTable.getPhysicalName().getBytes()); + TableName indexTN = TableName.valueOf(pIndexTable.getPhysicalName().getBytes()); HTableDescriptor descriptor = admin.getTableDescriptor(indexTN); admin.disableTable(indexTN); admin.deleteTable(indexTN); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixServerBuildIndexMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixServerBuildIndexMapper.java new file mode 100644 index 0000000..34bcc9b --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixServerBuildIndexMapper.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.index; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.UUID; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.mapreduce.PhoenixJobCounters; +import org.apache.phoenix.mapreduce.util.ConnectionUtil; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.util.ColumnInfo; +import org.apache.phoenix.util.PhoenixRuntime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Mapper that does not do much as regions servers actually build the index from the data table regions directly + */ +public class PhoenixServerBuildIndexMapper extends + Mapper<NullWritable, PhoenixIndexDBWritable, ImmutableBytesWritable, IntWritable> { + + private static final Logger LOG = LoggerFactory.getLogger(PhoenixServerBuildIndexMapper.class); + + @Override + protected void setup(final Context context) throws IOException, InterruptedException { + super.setup(context); + } + + @Override + protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context) + throws IOException, InterruptedException { + context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1); + // Make sure progress is reported to Application Master. + context.progress(); + } + + @Override + protected void cleanup(Context context) throws IOException, InterruptedException { + context.write(new ImmutableBytesWritable(UUID.randomUUID().toString().getBytes()), new IntWritable(0)); + super.cleanup(context); + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java index 301f18b..6788e5f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java @@ -120,6 +120,10 @@ public final class PhoenixConfigurationUtil { public static final String SCRUTINY_INDEX_TABLE_NAME = "phoenix.mr.scrutiny.index.table.name"; + public static final String INDEX_TOOL_DATA_TABLE_NAME = "phoenix.mr.index_tool.data.table.name"; + + public static final String INDEX_TOOL_INDEX_TABLE_NAME = "phoenix.mr.index_tool.index.table.name"; + public static final String SCRUTINY_SOURCE_TABLE = "phoenix.mr.scrutiny.source.table"; public static final String SCRUTINY_BATCH_SIZE = "phoenix.mr.scrutiny.batch.size"; @@ -543,6 +547,16 @@ public final class PhoenixConfigurationUtil { Preconditions.checkNotNull(configuration); return configuration.get(SCRUTINY_INDEX_TABLE_NAME); } + public static void setIndexToolDataTableName(Configuration configuration, String qDataTableName) { + Preconditions.checkNotNull(configuration); + Preconditions.checkNotNull(qDataTableName); + configuration.set(INDEX_TOOL_DATA_TABLE_NAME, qDataTableName); + } + + public static String getIndexToolDataTableName(Configuration configuration) { + Preconditions.checkNotNull(configuration); + return configuration.get(INDEX_TOOL_DATA_TABLE_NAME); + } public static void setScrutinyIndexTable(Configuration configuration, String qIndexTableName) { Preconditions.checkNotNull(configuration); @@ -555,6 +569,17 @@ public final class PhoenixConfigurationUtil { return SourceTable.valueOf(configuration.get(SCRUTINY_SOURCE_TABLE)); } + public static void setIndexToolIndexTableName(Configuration configuration, String qIndexTableName) { + Preconditions.checkNotNull(configuration); + Preconditions.checkNotNull(qIndexTableName); + configuration.set(INDEX_TOOL_INDEX_TABLE_NAME, qIndexTableName); + } + + public static String getIndexToolIndexTableName(Configuration configuration) { + Preconditions.checkNotNull(configuration); + return configuration.get(INDEX_TOOL_INDEX_TABLE_NAME); + } + public static void setScrutinySourceTable(Configuration configuration, SourceTable sourceTable) { Preconditions.checkNotNull(configuration); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java index 3462177..bab6cee 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java @@ -19,6 +19,7 @@ package org.apache.phoenix.mapreduce.util; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.db.DBWritable; import org.apache.phoenix.mapreduce.PhoenixInputFormat; @@ -70,6 +71,23 @@ public final class PhoenixMapReduceUtil { /** * * @param job + * @param inputClass DBWritable class + * @param inputFormatClass InputFormat class + * @param tableName Input table name + * @param inputQuery Select query + */ + + public static void setInput(final Job job, final Class<? extends DBWritable> inputClass, + final Class<? extends InputFormat> inputFormatClass, + final String tableName, final String inputQuery) { + final Configuration configuration = setInput(job, inputClass, inputFormatClass, tableName); + PhoenixConfigurationUtil.setInputQuery(configuration, inputQuery); + PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.QUERY); + } + + /** + * + * @param job * @param inputClass DBWritable class * @param snapshotName The name of a snapshot (of a table) to read from * @param tableName Input table name @@ -135,6 +153,15 @@ public final class PhoenixMapReduceUtil { return configuration; } + private static Configuration setInput(final Job job, final Class<? extends DBWritable> inputClass, + final Class<? extends InputFormat> inputFormatClass, final String tableName){ + job.setInputFormatClass(inputFormatClass); + final Configuration configuration = job.getConfiguration(); + PhoenixConfigurationUtil.setInputTableName(configuration, tableName); + PhoenixConfigurationUtil.setInputClass(configuration,inputClass); + return configuration; + } + /** * A method to override which HBase cluster for {@link PhoenixInputFormat} to read from * @param job MapReduce Job diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 765cedd..48bbf67 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -152,6 +152,7 @@ import org.apache.phoenix.compile.PostDDLCompiler; import org.apache.phoenix.compile.PostIndexDDLCompiler; import org.apache.phoenix.compile.PostLocalIndexDDLCompiler; import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.ServerBuildIndexCompiler; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.compile.StatementNormalizer; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; @@ -1352,16 +1353,17 @@ public class MetaDataClient { } private MutationPlan getMutationPlanForBuildingIndex(PTable index, TableRef dataTableRef) throws SQLException { - MutationPlan mutationPlan; if (index.getIndexType() == IndexType.LOCAL) { PostLocalIndexDDLCompiler compiler = new PostLocalIndexDDLCompiler(connection, getFullTableName(dataTableRef)); - mutationPlan = compiler.compile(index); - } else { + return compiler.compile(index); + } else if (dataTableRef.getTable().isTransactional()){ PostIndexDDLCompiler compiler = new PostIndexDDLCompiler(connection, dataTableRef); - mutationPlan = compiler.compile(index); + return compiler.compile(index); + } else { + ServerBuildIndexCompiler compiler = new ServerBuildIndexCompiler(connection, getFullTableName(dataTableRef)); + return compiler.compile(index); } - return mutationPlan; } private MutationState buildIndex(PTable index, TableRef dataTableRef) throws SQLException { @@ -1697,6 +1699,10 @@ public class MetaDataClient { if (connection.getSCN() != null) { return buildIndexAtTimeStamp(table, statement.getTable()); } + + String dataTableFullName = SchemaUtil.getTableName( + tableRef.getTable().getSchemaName().getString(), + tableRef.getTable().getTableName().getString()); return buildIndex(table, tableRef); }