dimas-b commented on code in PR #1287: URL: https://github.com/apache/polaris/pull/1287#discussion_r2047580589
########## extension/persistence/relational-jdbc/src/test/java/org/apache/polaris/extension/persistence/impl/relational/jdbc/JdbcAtomicMetastoreManagerTest.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.extension.persistence.impl.relational.jdbc; + +import static org.apache.polaris.core.persistence.PrincipalSecretsGenerator.RANDOM_SECRETS; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.sql.SQLException; +import java.time.ZoneId; +import javax.sql.DataSource; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.PolarisDefaultDiagServiceImpl; +import org.apache.polaris.core.PolarisDiagnostics; +import org.apache.polaris.core.config.PolarisConfigurationStore; +import org.apache.polaris.core.persistence.AtomicOperationMetaStoreManager; +import org.apache.polaris.core.persistence.BasePolarisMetaStoreManagerTest; +import org.apache.polaris.core.persistence.PolarisTestMetaStoreManager; +import org.apache.polaris.extension.persistence.relational.jdbc.DatasourceOperations; +import org.apache.polaris.extension.persistence.relational.jdbc.JdbcBasePersistenceImpl; +import org.h2.jdbcx.JdbcConnectionPool; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class JdbcAtomicMetastoreManagerTest extends BasePolarisMetaStoreManagerTest { + + public static DataSource createH2DataSource() { + return JdbcConnectionPool.create("jdbc:h2:file:./build/test_data/polaris/db", "sa", ""); + } + + @Override + protected PolarisTestMetaStoreManager createPolarisTestMetaStoreManager() { + PolarisDiagnostics diagServices = new PolarisDefaultDiagServiceImpl(); + DatasourceOperations datasourceOperations = new DatasourceOperations(createH2DataSource()); + try { + datasourceOperations.executeScript("h2/schema-v1-h2.sql"); + } catch (SQLException e) { + throw new RuntimeException(String.format("Error executing h2 script: %s", e.getMessage()), e); + } + + JdbcBasePersistenceImpl basePersistence = + new JdbcBasePersistenceImpl(datasourceOperations, RANDOM_SECRETS, Mockito.mock(), "REALM"); + return new PolarisTestMetaStoreManager( + new AtomicOperationMetaStoreManager(), + new PolarisCallContext( + basePersistence, + diagServices, + new PolarisConfigurationStore() {}, + timeSource.withZone(ZoneId.systemDefault()))); + } + + @Override + @Test + protected void testPolicyMapping() { + assertThrows(UnsupportedOperationException.class, super::testPolicyMapping); Review Comment: Is this functionality planned to be added later? Please add a comment to explain this assertion. ########## extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java: ########## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.extension.persistence.relational.jdbc; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import jakarta.annotation.Nonnull; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; +import java.util.Objects; +import java.util.function.Function; +import java.util.function.Predicate; +import javax.sql.DataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DatasourceOperations { + private static final Logger LOGGER = LoggerFactory.getLogger(DatasourceOperations.class); + + private static final String ALREADY_EXISTS_STATE_POSTGRES = "42P07"; + private static final String CONSTRAINT_VIOLATION_SQL_CODE = "23505"; + + private final DataSource datasource; + + public DatasourceOperations(DataSource datasource) { + this.datasource = datasource; + } + + public void executeScript(String scriptFilePath) throws SQLException { + ClassLoader classLoader = DatasourceOperations.class.getClassLoader(); + try (Connection connection = borrowConnection(); + Statement statement = connection.createStatement()) { + BufferedReader reader = + new BufferedReader( + new InputStreamReader( + Objects.requireNonNull(classLoader.getResourceAsStream(scriptFilePath)), UTF_8)); + StringBuilder sqlBuffer = new StringBuilder(); + String line; + while ((line = reader.readLine()) != null) { + line = line.trim(); + if (!line.isEmpty() && !line.startsWith("--")) { // Ignore empty lines and comments + sqlBuffer.append(line).append("\n"); + if (line.endsWith(";")) { // Execute statement when semicolon is found + String sql = sqlBuffer.toString().trim(); + try { + int rowsUpdated = statement.executeUpdate(sql); + LOGGER.debug("Query {} executed {} rows affected", sql, rowsUpdated); + } catch (SQLException e) { + LOGGER.error("Error executing query {}", sql, e); + // re:throw this as unhandled exception + throw new RuntimeException(e); + } + sqlBuffer.setLength(0); // Clear the buffer for the next statement + } + } + } + } catch (IOException e) { + LOGGER.error("Error reading the script file", e); + throw new RuntimeException(e); + } catch (SQLException e) { + LOGGER.error("Error executing the script file", e); + throw e; + } + } + + public <T, R> List<R> executeSelect( + @Nonnull String query, + @Nonnull Class<T> targetClass, + @Nonnull Function<T, R> transformer, + Predicate<R> entityFilter, + int limit) + throws SQLException { + try (Connection connection = borrowConnection(); + Statement statement = connection.createStatement(); + ResultSet s = statement.executeQuery(query)) { + return ResultSetToObjectConverter.collect(s, targetClass, transformer, entityFilter, limit); + } catch (SQLException e) { + LOGGER.error("Error executing query {}", query, e); + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public int executeUpdate(String query) throws SQLException { + try (Connection connection = borrowConnection(); + Statement statement = connection.createStatement()) { + return statement.executeUpdate(query); + } catch (SQLException e) { + LOGGER.error("Error executing query {}", query, e); + throw e; + } + } + + public int executeUpdate(String query, Statement statement) throws SQLException { + LOGGER.debug("Executing query {} within transaction", query); + try { + return statement.executeUpdate(query); + } catch (SQLException e) { + LOGGER.error("Error executing query {}", query, e); + throw e; + } + } + + public void runWithinTransaction(TransactionCallback callback) throws SQLException { + Connection connection = null; + try { + connection = borrowConnection(); + connection.setAutoCommit(false); // Disable auto-commit to start a transaction + + boolean result; + try (Statement statement = connection.createStatement()) { + result = callback.execute(statement); + } + + if (result) { + connection.commit(); // Commit the transaction if successful + } else { + connection.rollback(); // Rollback the transaction if not successful + } + + } catch (SQLException e) { + if (connection != null) { + try { + connection.rollback(); // Rollback on exception + } catch (SQLException ex) { + LOGGER.error("Error rolling back transaction", ex); + throw e; + } + } + LOGGER.error("Caught Error while executing transaction", e); + throw e; + } finally { + if (connection != null) { + try { + connection.setAutoCommit(true); // Restore auto-commit Review Comment: If we get an exception that is not a `SQLException`, this will cause the Tx to commit, right? Is that intended? ########## extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcCrudQueryGenerator.java: ########## @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.extension.persistence.relational.jdbc; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.polaris.extension.persistence.relational.jdbc.models.ModelGrantRecord; +import org.apache.polaris.extension.persistence.relational.jdbc.models.ModelPrincipalAuthenticationData; + +public class JdbcCrudQueryGenerator { + + private static final Pattern CAMEL_CASE_PATTERN = + Pattern.compile("(?<=[a-z0-9])[A-Z]|(?<=[A-Z])[A-Z](?=[a-z])"); + + public static String generateSelectQuery( + Class<?> entityClass, String filter, Integer limit, Integer offset, String orderBy) { + String tableName = getTableName(entityClass); + List<String> fields = new ArrayList<>(); + + for (Field field : entityClass.getDeclaredFields()) { + fields.add(camelToSnake(field.getName())); + } + + String columns = String.join(", ", fields); + StringBuilder query = + new StringBuilder("SELECT ").append(columns).append(" FROM ").append(tableName); + if (filter != null && !filter.isEmpty()) { + query.append(" WHERE ").append(String.join(" AND ", filter)); + } + return query.toString(); + } + + public static String generateSelectQuery( + Class<?> entityClass, + Map<String, Object> whereClause, + Integer limit, + Integer offset, + String orderBy) { + String tableName = getTableName(entityClass); + List<String> fields = new ArrayList<>(); + + for (Field field : entityClass.getDeclaredFields()) { + fields.add(camelToSnake(field.getName())); + } + + String columns = String.join(", ", fields); + StringBuilder query = + new StringBuilder("SELECT ").append(columns).append(" FROM ").append(tableName); + + if (whereClause != null && !whereClause.isEmpty()) { + query.append(generateWhereClause(whereClause)); + } + + if (orderBy != null && !orderBy.isEmpty()) { + query.append(" ORDER BY ").append(orderBy); + } + + if (limit != null) { + query.append(" LIMIT ").append(limit); + } + + if (offset != null && limit != null) { // Offset only makes sense with limit. + query.append(" OFFSET ").append(offset); + } + + return query.toString(); + } + + public static String generateInsertQuery(Object object, Class<?> entityClass) { + String tableName = getTableName(entityClass); + if (object == null || tableName.isEmpty()) { + return null; // Or throw an exception + } + + Class<?> objectClass = object.getClass(); + Field[] fields = objectClass.getDeclaredFields(); + List<String> columnNames = new ArrayList<>(); + List<String> values = new ArrayList<>(); + + for (Field field : fields) { + field.setAccessible(true); // Allow access to private fields + try { + Object value = field.get(object); + if (value != null) { // Only include non-null fields + columnNames.add(camelToSnake(field.getName())); Review Comment: I mean rather than introspecting classes via java reflection, we could call a method like `Map<String, String> ModelEntity.sqlParameters()` ########## extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java: ########## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.extension.persistence.relational.jdbc; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; +import java.util.Objects; +import javax.sql.DataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DatasourceOperations { + private static final Logger LOGGER = LoggerFactory.getLogger(DatasourceOperations.class); + + /** Already exists error * */ + private static final String ALREADY_EXISTS_SQL_CODE = "42P07"; + + /** Integrity constraint * */ + private static final String CONSTRAINT_VIOLATION_SQL_CODE = "23505"; + + private final DataSource datasource; + + public DatasourceOperations(DataSource datasource) { + this.datasource = datasource; + } + + public void executeScript(String scriptFilePath) { + ClassLoader classLoader = DatasourceOperations.class.getClassLoader(); + try (Connection connection = borrowConnection(); + Statement statement = connection.createStatement()) { + BufferedReader reader = + new BufferedReader( + new InputStreamReader( + Objects.requireNonNull(classLoader.getResourceAsStream(scriptFilePath)), UTF_8)); + StringBuilder sqlBuffer = new StringBuilder(); + String line; + while ((line = reader.readLine()) != null) { + line = line.trim(); + if (!line.isEmpty() && !line.startsWith("--")) { // Ignore empty lines and comments + sqlBuffer.append(line).append("\n"); + if (line.endsWith(";")) { // Execute statement when semicolon is found + String sql = sqlBuffer.toString().trim(); + try { + int rowsUpdated = statement.executeUpdate(sql); + LOGGER.debug("Query {} executed {} rows affected", sql, rowsUpdated); + } catch (SQLException e) { + LOGGER.error("Error executing query {}", sql, e); + // re:throw this as unhandled exception + throw new RuntimeException(e); + } + sqlBuffer.setLength(0); // Clear the buffer for the next statement + } + } + } + } catch (IOException e) { + LOGGER.error("Error reading the script file", e); + throw new RuntimeException(e); + } catch (SQLException e) { + LOGGER.error("Error executing the script file", e); + throw new RuntimeException(e); + } + } + + public <T> List<T> executeSelect(String query, Class<T> targetClass) { + try (Connection connection = borrowConnection(); + Statement statement = connection.createStatement(); + ResultSet s = statement.executeQuery(query)) { + List<T> results = ResultSetToObjectConverter.convert(s, targetClass); + return results.isEmpty() ? null : results; + } catch (Exception e) { + LOGGER.error("Error executing query {}", query, e); + throw new RuntimeException(e); + } + } + + public int executeUpdate(String query) { + try (Connection connection = borrowConnection(); + Statement statement = connection.createStatement()) { + return statement.executeUpdate(query); + } catch (SQLException e) { + LOGGER.error("Error executing query {}", query, e); + return handleException(e); + } + } + + public int executeUpdate(String query, Statement statement) { + LOGGER.debug("Executing query {} within transaction", query); + try { + return statement.executeUpdate(query); + } catch (SQLException e) { + LOGGER.error("Error executing query {}", query, e); + return handleException(e); + } + } + + public void runWithinTransaction(TransactionCallback callback) { + Connection connection = null; + try { + connection = borrowConnection(); + connection.setAutoCommit(false); // Disable auto-commit to start a transaction + + boolean result; + try (Statement statement = connection.createStatement()) { + result = callback.execute(statement); + } + + if (result) { + connection.commit(); // Commit the transaction if successful + } else { + connection.rollback(); // Rollback the transaction if not successful + } + + } catch (SQLException e) { + if (connection != null) { + try { + connection.rollback(); // Rollback on exception + } catch (SQLException ex) { + LOGGER.error("Error rolling back transaction", ex); + } + } + LOGGER.error("Caught Error while executing transaction", e); + handleException(e); + } finally { + if (connection != null) { + try { + connection.setAutoCommit(true); // Restore auto-commit + connection.close(); Review Comment: `connection` is an `AutoCloseable`... Why not do `try(Connection connection = ... ) { come code; }`? ########## extension/persistence/relational-jdbc/src/main/resources/h2/schema-v1-h2.sql: ########## @@ -0,0 +1,93 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file-- +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"). You may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. +-- + +CREATE SCHEMA IF NOT EXISTS POLARIS_SCHEMA; +SET SCHEMA POLARIS_SCHEMA; +DROP TABLE IF EXISTS entities; +CREATE TABLE IF NOT EXISTS entities ( + realm_id TEXT NOT NULL, + catalog_id BIGINT NOT NULL, + id BIGINT NOT NULL, + parent_id BIGINT NOT NULL, + name TEXT NOT NULL, + entity_version INT NOT NULL, + type_code INT NOT NULL, + sub_type_code INT NOT NULL, + create_timestamp BIGINT NOT NULL, + drop_timestamp BIGINT NOT NULL, + purge_timestamp BIGINT NOT NULL, + to_purge_timestamp BIGINT NOT NULL, + last_update_timestamp BIGINT NOT NULL, + properties TEXT NOT NULL DEFAULT '{}', + internal_properties TEXT NOT NULL DEFAULT '{}', + grant_records_version INT NOT NULL, + PRIMARY KEY (realm_id, id), + CONSTRAINT constraint_name UNIQUE (realm_id, catalog_id, parent_id, type_code, name) +); + +-- TODO: create indexes based on all query pattern. +CREATE INDEX IF NOT EXISTS idx_entities ON entities (catalog_id, id); Review Comment: Should this include `realm_id`? Otherwise we're risking inefficient lookup in case of many realms, I guess. ########## extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/IdGenerator.java: ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.extension.persistence.relational.jdbc; + +import java.nio.ByteBuffer; +import java.security.SecureRandom; +import java.util.UUID; + +public class IdGenerator { + private IdGenerator() {} + + public static final IdGenerator idGenerator = new IdGenerator(); + + private static final long LONG_MAX_ID = 0x7fffffffffffffffL; + + public long nextId() { + // Make sure this is a positive number. + // conflicting ids don't get accepted and is enforced by table constraints. + return generateSecureRandomUUID().getLeastSignificantBits() & LONG_MAX_ID; Review Comment: Using a part of a random UUID here is inefficient. We could add inefficient code as it does not prevent correct operation, indeed, but why do that when we can use a more efficient approach which is equivalent in code complexity? Why not just pull a `long` from `SecureRandom`? ########## extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/models/ModelPrincipalAuthenticationData.java: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.extension.persistence.relational.jdbc.models; + +import org.apache.polaris.core.entity.PolarisPrincipalSecrets; + +public class ModelPrincipalAuthenticationData { + // the id of the principal + private long principalId; + + // the client id for that principal + private String principalClientId; + + // Hash of mainSecret + private String mainSecretHash; + + // Hash of secondarySecret + private String secondarySecretHash; + + private String secretSalt; + + public long getPrincipalId() { + return principalId; + } + + public String getPrincipalClientId() { + return principalClientId; + } + + public String getSecretSalt() { + return secretSalt; + } + + public String getMainSecretHash() { + return mainSecretHash; + } + + public String getSecondarySecretHash() { + return secondarySecretHash; + } + + public static Builder builder() { + return new Builder(); + } + + public static final class Builder { Review Comment: Why not use `@PolarisImmutable` (on the outer class)? ########## extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcBasePersistenceImpl.java: ########## @@ -0,0 +1,748 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.extension.persistence.relational.jdbc; + +import jakarta.annotation.Nonnull; +import jakarta.annotation.Nullable; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.entity.EntityNameLookupRecord; +import org.apache.polaris.core.entity.PolarisBaseEntity; +import org.apache.polaris.core.entity.PolarisChangeTrackingVersions; +import org.apache.polaris.core.entity.PolarisEntityCore; +import org.apache.polaris.core.entity.PolarisEntityId; +import org.apache.polaris.core.entity.PolarisEntityType; +import org.apache.polaris.core.entity.PolarisGrantRecord; +import org.apache.polaris.core.entity.PolarisPrincipalSecrets; +import org.apache.polaris.core.persistence.BaseMetaStoreManager; +import org.apache.polaris.core.persistence.BasePersistence; +import org.apache.polaris.core.persistence.EntityAlreadyExistsException; +import org.apache.polaris.core.persistence.IntegrationPersistence; +import org.apache.polaris.core.persistence.PrincipalSecretsGenerator; +import org.apache.polaris.core.persistence.RetryOnConcurrencyException; +import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; +import org.apache.polaris.core.storage.PolarisStorageIntegration; +import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider; +import org.apache.polaris.extension.persistence.relational.jdbc.models.ModelEntity; +import org.apache.polaris.extension.persistence.relational.jdbc.models.ModelGrantRecord; +import org.apache.polaris.extension.persistence.relational.jdbc.models.ModelPrincipalAuthenticationData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JdbcBasePersistenceImpl implements BasePersistence, IntegrationPersistence { + + private static final Logger LOGGER = LoggerFactory.getLogger(JdbcBasePersistenceImpl.class); + + private final DatasourceOperations datasourceOperations; + private final PrincipalSecretsGenerator secretsGenerator; + private final PolarisStorageIntegrationProvider storageIntegrationProvider; + private final String realmId; + + public JdbcBasePersistenceImpl( + DatasourceOperations databaseOperations, + PrincipalSecretsGenerator secretsGenerator, + PolarisStorageIntegrationProvider storageIntegrationProvider, + String realmId) { + this.datasourceOperations = databaseOperations; + this.secretsGenerator = secretsGenerator; + this.storageIntegrationProvider = storageIntegrationProvider; + this.realmId = realmId; + } + + @Override + public long generateNewId(@Nonnull PolarisCallContext callCtx) { + return IdGenerator.idGenerator.nextId(); + } + + @Override + public void writeEntity( + @Nonnull PolarisCallContext callCtx, + @Nonnull PolarisBaseEntity entity, + boolean nameOrParentChanged, + PolarisBaseEntity originalEntity) { + ModelEntity modelEntity = ModelEntity.fromEntity(entity); + String query; + if (originalEntity == null) { + query = JdbcCrudQueryGenerator.generateInsertQuery(modelEntity, realmId); + } else { + Map<String, Object> params = + Map.of( + "id", + originalEntity.getId(), + "catalog_id", + originalEntity.getCatalogId(), + "entity_version", + originalEntity.getEntityVersion(), + "realm_id", + realmId); + query = JdbcCrudQueryGenerator.generateUpdateQuery(modelEntity, params, ModelEntity.class); + } + try { + int rowsUpdated = datasourceOperations.executeUpdate(query); + if (rowsUpdated == 0 && originalEntity != null) { + throw new RetryOnConcurrencyException( + "Entity '%s' id '%s' concurrently modified; expected version %s", + entity.getName(), entity.getId(), originalEntity.getEntityVersion()); + } + } catch (SQLException e) { + if (originalEntity == null + && (datasourceOperations.isConstraintViolation(e) + || datasourceOperations.isAlreadyExistsException(e))) { + throw new EntityAlreadyExistsException(entity); + } else { + throw new RuntimeException( + String.format("Failed to write the entities due to %s", e.getMessage())); + } + } + } + + @Override + public void writeEntities( + @Nonnull PolarisCallContext callCtx, + @Nonnull List<PolarisBaseEntity> entities, + List<PolarisBaseEntity> originalEntities) { + try { + datasourceOperations.runWithinTransaction( + statement -> { + for (int i = 0; i < entities.size(); i++) { + PolarisBaseEntity entity = entities.get(i); + ModelEntity modelEntity = ModelEntity.fromEntity(entity); + + // first, check if the entity has already been created, in which case we will simply + // return it. + PolarisBaseEntity entityFound = + lookupEntity( + callCtx, entity.getCatalogId(), entity.getId(), entity.getTypeCode()); + if (entityFound != null) { + // probably the client retried, simply return it + // TODO: Check correctness of returning entityFound vs entity here. It may have + // already been updated after the creation. + continue; + } + // lookup by name + EntityNameLookupRecord exists = + lookupEntityIdAndSubTypeByName( + callCtx, + entity.getCatalogId(), + entity.getParentId(), + entity.getTypeCode(), + entity.getName()); + if (exists != null) { + throw new EntityAlreadyExistsException(entity); + } + String query; + if (originalEntities == null || originalEntities.get(i) == null) { + query = JdbcCrudQueryGenerator.generateInsertQuery(modelEntity, realmId); + } else { + Map<String, Object> params = + Map.of( + "id", + originalEntities.get(i).getId(), + "catalog_id", + originalEntities.get(i).getCatalogId(), + "entity_version", + originalEntities.get(i).getEntityVersion(), + "realm_id", + realmId); + query = + JdbcCrudQueryGenerator.generateUpdateQuery( + modelEntity, params, ModelEntity.class); + } + boolean isUpdate = (originalEntities != null && originalEntities.get(i) != null); Review Comment: nit: from the code readability POV, I believe it would be nicer to keep all alternatives under one `if` that controls the condition for behaviour differences... i.e. move update handling to line 173 and insert handling to line 159. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
