This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 4b2d87336a Add additional index on task table (#14470)
4b2d87336a is described below
commit 4b2d87336a56eafe6c28fc93f1040e737013671e
Author: Pranav <[email protected]>
AuthorDate: Thu Jun 29 15:32:43 2023 -0700
Add additional index on task table (#14470)
---
.../druid/metadata/SQLMetadataConnector.java | 104 ++++++++++++++++++++-
.../druid/metadata/SQLMetadataConnectorTest.java | 57 +++++++++++
2 files changed, 159 insertions(+), 2 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
index 108128e09c..5c8c9d39c2 100644
--- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
+++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
@@ -19,10 +19,12 @@
package org.apache.druid.metadata;
+import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.dbcp2.BasicDataSourceFactory;
import org.apache.druid.java.util.common.ISE;
@@ -51,8 +53,10 @@ import java.sql.SQLRecoverableException;
import java.sql.SQLTransientException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.Properties;
+import java.util.Set;
public abstract class SQLMetadataConnector implements MetadataStorageConnector
{
@@ -377,10 +381,22 @@ public abstract class SQLMetadataConnector implements
MetadataStorageConnector
+ " PRIMARY KEY (id)\n"
+ ")",
tableName, getPayloadType(), getCollation()
- ),
- StringUtils.format("CREATE INDEX idx_%1$s_active_created_date ON
%1$s(active, created_date)", tableName)
+ )
)
);
+ final Set<String> createdIndexSet = getIndexOnTable(tableName);
+ createIndex(
+ tableName,
+ StringUtils.format("idx_%1$s_active_created_date", tableName),
+ ImmutableList.of("active", "created_date"),
+ createdIndexSet
+ );
+ createIndex(
+ tableName,
+ StringUtils.format("idx_%1$s_datasource_active", tableName),
+ ImmutableList.of("datasource", "active"),
+ createdIndexSet
+ );
}
private void alterEntryTable(final String tableName)
@@ -830,4 +846,88 @@ public abstract class SQLMetadataConnector implements
MetadataStorageConnector
log.warn(e, "Exception while deleting records from table");
}
}
+
+ /**
+ * Get the Set of the index on given table
+ *
+ * @param tableName name of the table to fetch the index map
+ * @return Set of the uppercase index names, returns empty set if table does
not exist
+ */
+ public Set<String> getIndexOnTable(String tableName)
+ {
+ Set<String> res = new HashSet<>();
+ try {
+ retryWithHandle(new HandleCallback<Void>()
+ {
+ @Override
+ public Void withHandle(Handle handle) throws Exception
+ {
+ DatabaseMetaData databaseMetaData =
handle.getConnection().getMetaData();
+ // Fetch the index for given table
+ ResultSet resultSet = databaseMetaData.getIndexInfo(
+ null,
+ null,
+ StringUtils.toUpperCase(tableName),
+ false,
+ false
+ );
+ while (resultSet.next()) {
+ String indexName = resultSet.getString("INDEX_NAME");
+ if (org.apache.commons.lang.StringUtils.isNotBlank(indexName)) {
+ res.add(StringUtils.toUpperCase(indexName));
+ }
+ }
+ return null;
+ }
+ });
+ }
+ catch (Exception e) {
+ log.error(e, "Exception while listing the index on table %s ",
tableName);
+ }
+ return ImmutableSet.copyOf(res);
+ }
+
+ /**
+ * create index on the table with retry if not already exist, to be called
after createTable
+ *
+ * @param tableName Name of the table to create index on
+ * @param indexName case-insensitive string index name, it helps to
check the existing index on table
+ * @param indexCols List of columns to be indexed on
+ * @param createdIndexSet
+ */
+ public void createIndex(
+ final String tableName,
+ final String indexName,
+ final List<String> indexCols,
+ final Set<String> createdIndexSet
+ )
+ {
+ try {
+ retryWithHandle(
+ new HandleCallback<Void>()
+ {
+ @Override
+ public Void withHandle(Handle handle)
+ {
+ if
(!createdIndexSet.contains(StringUtils.toUpperCase(indexName))) {
+ String indexSQL = StringUtils.format(
+ "CREATE INDEX %1$s ON %2$s(%3$s)",
+ indexName,
+ tableName,
+ Joiner.on(",").join(indexCols)
+ );
+ log.info("Creating Index on Table [%s], sql: [%s] ",
tableName, indexSQL);
+ handle.execute(indexSQL);
+ } else {
+ log.info("Index [%s] on Table [%s] already exists", indexName,
tableName);
+ }
+ return null;
+ }
+ }
+ );
+ }
+ catch (Exception e) {
+ log.error(e, StringUtils.format("Exception while creating index on table
[%s]", tableName));
+ }
+ }
}
diff --git
a/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorTest.java
b/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorTest.java
index d9df47920d..4dba35ca84 100644
---
a/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorTest.java
@@ -22,6 +22,8 @@ package org.apache.druid.metadata;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.druid.java.util.common.StringUtils;
import org.junit.Assert;
@@ -41,8 +43,11 @@ import java.sql.SQLTransientException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
public class SQLMetadataConnectorTest
@@ -107,6 +112,58 @@ public class SQLMetadataConnectorTest
}
}
+ @Test
+ public void testIndexCreationOnTaskTable()
+ {
+ final String entryType = tablesConfig.getTaskEntryType();
+ String entryTableName = tablesConfig.getEntryTable(entryType);
+ connector.createTaskTables();
+ Set<String> createdIndexSet = connector.getIndexOnTable(entryTableName);
+ Set<String> expectedIndexSet = Sets.newHashSet(
+ StringUtils.format("idx_%1$s_active_created_date", entryTableName),
+ StringUtils.format("idx_%1$s_datasource_active", entryTableName)
+ ).stream().map(StringUtils::toUpperCase).collect(Collectors.toSet());
+
+ for (String expectedIndex : expectedIndexSet) {
+ Assert.assertTrue(
+ StringUtils.format("Failed to find the expected Index %s on entry
table", expectedIndex),
+ createdIndexSet.contains(expectedIndex)
+ );
+ }
+ connector.createTaskTables();
+ dropTable(entryTableName);
+ }
+
+ @Test
+ public void testCreateIndexOnNoTable()
+ {
+ String tableName = "noTable";
+ try {
+ connector.createIndex(
+ tableName,
+ "some_string",
+ Lists.newArrayList("a", "b"),
+ new HashSet<>()
+ );
+ }
+ catch (Exception e) {
+ Assert.fail("Index creation should never throw an exception");
+ }
+ }
+
+ @Test
+ public void testGeIndexOnNoTable()
+ {
+ String tableName = "noTable";
+ try {
+ Set<String> res = connector.getIndexOnTable(tableName);
+ Assert.assertEquals(0, res.size());
+ }
+ catch (Exception e) {
+ Assert.fail("getIndexOnTable should never throw an exception");
+ }
+ }
+
@Test
public void testInsertOrUpdate()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]