singhpk234 commented on code in PR #1287: URL: https://github.com/apache/polaris/pull/1287#discussion_r2042966608
########## extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/models/ModelEntity.java: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.extension.persistence.relational.jdbc.models; + +import org.apache.polaris.core.entity.PolarisBaseEntity; +import org.apache.polaris.core.entity.PolarisEntitySubType; +import org.apache.polaris.core.entity.PolarisEntityType; + +public class ModelEntity { + // the id of the catalog associated to that entity. NULL_ID i.e. 0 if this entity is top-level + // like + // a catalog + private long catalogId; + + // the id of the entity which was resolved + private long id; + + // the id of the parent of this entity, use 0 for a top-level entity whose parent is the account + private long parentId; + + // the type of the entity when it was resolved + private int typeCode; + + // the name that this entity had when it was resolved + private String name; + + // the version that this entity had when it was resolved + private int entityVersion; + + // the type of the entity when it was resolved + private int subTypeCode; + + // timestamp when this entity was created + private long createTimestamp; + + // when this entity was dropped. Null if was never dropped + private long dropTimestamp; + + // when did we start purging this entity. When not null, un-drop is no longer possible + private long purgeTimestamp; + + // when should we start purging this entity + private long toPurgeTimestamp; + + // last time this entity was updated + private long lastUpdateTimestamp; + + // properties, serialized as a JSON string + private String properties; + + // internal properties, serialized as a JSON string + private String internalProperties; + + // current version for that entity, will be monotonically incremented + private int grantRecordsVersion; + + public long getId() { + return id; + } + + public long getParentId() { + return parentId; + } + + public int getTypeCode() { + return typeCode; + } + + public String getName() { + return name; + } + + public int getEntityVersion() { + return entityVersion; + } + + public long getCatalogId() { + return catalogId; + } + + public int getSubTypeCode() { + return subTypeCode; + } + + public long getCreateTimestamp() { + return createTimestamp; + } + + public long getDropTimestamp() { + return dropTimestamp; + } + + public long getPurgeTimestamp() { + return purgeTimestamp; + } + + public long getToPurgeTimestamp() { + return toPurgeTimestamp; + } + + public long getLastUpdateTimestamp() { + return lastUpdateTimestamp; + } + + public String getProperties() { + return properties != null ? properties : "{}"; + } + + public String getInternalProperties() { + return internalProperties != null ? internalProperties : "{}"; + } + + public int getGrantRecordsVersion() { + return grantRecordsVersion; + } + + public static Builder builder() { + return new Builder(); + } + + public static final class Builder { + private final ModelEntity entity; + + private Builder() { + entity = new ModelEntity(); + } + + public Builder catalogId(long catalogId) { + entity.catalogId = catalogId; + return this; + } + + public Builder id(long id) { + entity.id = id; + return this; + } + + public Builder parentId(long parentId) { + entity.parentId = parentId; + return this; + } + + public Builder typeCode(int typeCode) { + entity.typeCode = typeCode; + return this; + } + + public Builder name(String name) { + entity.name = name; + return this; + } + + public Builder entityVersion(int entityVersion) { + entity.entityVersion = entityVersion; + return this; + } + + public Builder subTypeCode(int subTypeCode) { + entity.subTypeCode = subTypeCode; + return this; + } + + public Builder createTimestamp(long createTimestamp) { + entity.createTimestamp = createTimestamp; + return this; + } + + public Builder dropTimestamp(long dropTimestamp) { + entity.dropTimestamp = dropTimestamp; + return this; + } + + public Builder purgeTimestamp(long purgeTimestamp) { + entity.purgeTimestamp = purgeTimestamp; + return this; + } + + public Builder toPurgeTimestamp(long toPurgeTimestamp) { + entity.toPurgeTimestamp = toPurgeTimestamp; + return this; + } + + public Builder lastUpdateTimestamp(long lastUpdateTimestamp) { + entity.lastUpdateTimestamp = lastUpdateTimestamp; + return this; + } + + public Builder properties(String properties) { + entity.properties = properties; + return this; + } + + public Builder internalProperties(String internalProperties) { + entity.internalProperties = internalProperties; + return this; + } + + public Builder grantRecordsVersion(int grantRecordsVersion) { + entity.grantRecordsVersion = grantRecordsVersion; + return this; + } + + public ModelEntity build() { + return entity; + } + } + + public static ModelEntity fromEntity(PolarisBaseEntity entity) { + return ModelEntity.builder() + .catalogId(entity.getCatalogId()) + .id(entity.getId()) + .parentId(entity.getParentId()) + .typeCode(entity.getTypeCode()) + .name(entity.getName()) + .entityVersion(entity.getEntityVersion()) + .subTypeCode(entity.getSubTypeCode()) + .createTimestamp(entity.getCreateTimestamp()) + .dropTimestamp(entity.getDropTimestamp()) + .purgeTimestamp(entity.getPurgeTimestamp()) + .toPurgeTimestamp(entity.getToPurgeTimestamp()) + .lastUpdateTimestamp(entity.getLastUpdateTimestamp()) + .properties(entity.getProperties()) + .internalProperties(entity.getInternalProperties()) + .grantRecordsVersion(entity.getGrantRecordsVersion()) + .build(); + } + + public static PolarisBaseEntity toEntity(ModelEntity model) { + if (model == null) { + return null; + } + + PolarisEntityType entityType = PolarisEntityType.fromCode(model.getTypeCode()); + PolarisEntitySubType subType = PolarisEntitySubType.fromCode(model.getSubTypeCode()); + + if (entityType == null) { + throw new IllegalArgumentException("Invalid entity type: " + model.getTypeCode()); + } + + if (subType == null) { + throw new IllegalArgumentException("Invalid entity subtype: " + model.getSubTypeCode()); + } + + var entity = + new PolarisBaseEntity( Review Comment: we can't do this as we need to use the setters below, to set things which are not the part of PolarisBaseEntity. ########## extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcBasePersistenceImpl.java: ########## @@ -0,0 +1,619 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.extension.persistence.relational.jdbc; + +import jakarta.annotation.Nonnull; +import jakarta.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.entity.EntityNameLookupRecord; +import org.apache.polaris.core.entity.PolarisBaseEntity; +import org.apache.polaris.core.entity.PolarisChangeTrackingVersions; +import org.apache.polaris.core.entity.PolarisEntityCore; +import org.apache.polaris.core.entity.PolarisEntityId; +import org.apache.polaris.core.entity.PolarisEntityType; +import org.apache.polaris.core.entity.PolarisGrantRecord; +import org.apache.polaris.core.entity.PolarisPrincipalSecrets; +import org.apache.polaris.core.persistence.BaseMetaStoreManager; +import org.apache.polaris.core.persistence.BasePersistence; +import org.apache.polaris.core.persistence.EntityAlreadyExistsException; +import org.apache.polaris.core.persistence.IntegrationPersistence; +import org.apache.polaris.core.persistence.PrincipalSecretsGenerator; +import org.apache.polaris.core.persistence.RetryOnConcurrencyException; +import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; +import org.apache.polaris.core.storage.PolarisStorageIntegration; +import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider; +import org.apache.polaris.extension.persistence.relational.jdbc.models.ModelEntity; +import org.apache.polaris.extension.persistence.relational.jdbc.models.ModelGrantRecord; +import org.apache.polaris.extension.persistence.relational.jdbc.models.ModelPrincipalAuthenticationData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JdbcBasePersistenceImpl implements BasePersistence, IntegrationPersistence { + + private static final Logger LOGGER = LoggerFactory.getLogger(JdbcBasePersistenceImpl.class); + + private final DatasourceOperations datasourceOperations; + private final PrincipalSecretsGenerator secretsGenerator; + private final PolarisStorageIntegrationProvider storageIntegrationProvider; + + public JdbcBasePersistenceImpl( + DatasourceOperations databaseOperations, + PrincipalSecretsGenerator secretsGenerator, + PolarisStorageIntegrationProvider storageIntegrationProvider) { + this.datasourceOperations = databaseOperations; + this.secretsGenerator = secretsGenerator; + this.storageIntegrationProvider = storageIntegrationProvider; + } + + @Override + public long generateNewId(@Nonnull PolarisCallContext callCtx) { + return IdGenerator.idGenerator.nextId(); + } + + @Override + public void writeEntity( + @Nonnull PolarisCallContext callCtx, + @Nonnull PolarisBaseEntity entity, + boolean nameOrParentChanged, + PolarisBaseEntity originalEntity) { + ModelEntity modelEntity = ModelEntity.fromEntity(entity); + String query; + if (originalEntity == null) { + query = JdbcCrudQueryGenerator.generateInsertQuery(modelEntity, ModelEntity.class); + } else { + Map<String, Object> params = new HashMap<>(); + params.put("id", originalEntity.getId()); + params.put("catalog_id", originalEntity.getCatalogId()); + params.put("entity_version", originalEntity.getEntityVersion()); + query = JdbcCrudQueryGenerator.generateUpdateQuery(modelEntity, params, ModelEntity.class); + } + int rowsUpdated = datasourceOperations.executeUpdate(query); + if (rowsUpdated == -1 && originalEntity == null) { + // constraint validation. + throw new EntityAlreadyExistsException(entity); + } else if (rowsUpdated == 0 && originalEntity != null) { + // concurrently row got updated, as version mismatched. + throw new RetryOnConcurrencyException("CAS failed"); + } + } + + @Override + public void writeEntities( + @Nonnull PolarisCallContext callCtx, + @Nonnull List<PolarisBaseEntity> entities, + List<PolarisBaseEntity> originalEntities) { + try { + datasourceOperations.runWithinTransaction( + statement -> { + for (int i = 0; i < entities.size(); i++) { + PolarisBaseEntity entity = entities.get(i); + ModelEntity modelEntity = ModelEntity.fromEntity(entity); + + // first, check if the entity has already been created, in which case we will simply + // return it. + PolarisBaseEntity entityFound = + lookupEntity( + callCtx, entity.getCatalogId(), entity.getId(), entity.getTypeCode()); + if (entityFound != null) { + // probably the client retried, simply return it + // TODO: Check correctness of returning entityFound vs entity here. It may have + // already + // been updated after the creation. + continue; + } + // lookup by name + EntityNameLookupRecord exists = + lookupEntityIdAndSubTypeByName( + callCtx, + entity.getCatalogId(), + entity.getParentId(), + entity.getTypeCode(), + entity.getName()); + if (exists != null) { + throw new EntityAlreadyExistsException(entity); + } + String query; + if (originalEntities == null || originalEntities.get(i) == null) { + query = JdbcCrudQueryGenerator.generateInsertQuery(modelEntity, ModelEntity.class); + } else { + // CAS + Map<String, Object> params = new HashMap<>(); + params.put("id", originalEntities.get(i).getId()); + params.put("catalog_id", originalEntities.get(i).getCatalogId()); + params.put("entity_version", originalEntities.get(i).getEntityVersion()); + query = + JdbcCrudQueryGenerator.generateUpdateQuery( + modelEntity, params, ModelEntity.class); + } + int rowsUpdated = datasourceOperations.executeUpdate(query); + boolean isUpdate = (originalEntities != null && originalEntities.get(i) != null); + if (rowsUpdated == -1 && !isUpdate) { + // constrain validation exception. + throw new EntityAlreadyExistsException(entity); + } else if (rowsUpdated == 0 && isUpdate) { + throw new RetryOnConcurrencyException("CAS failed"); + } + } + return true; + }); + } catch (Exception e) { + LOGGER.error("Error executing transaction {}", e.getMessage()); + throw e; + } + } + + @Override + public void writeToGrantRecords( + @Nonnull PolarisCallContext callCtx, @Nonnull PolarisGrantRecord grantRec) { + ModelGrantRecord modelGrantRecord = ModelGrantRecord.fromGrantRecord(grantRec); + String query = + JdbcCrudQueryGenerator.generateInsertQuery(modelGrantRecord, ModelGrantRecord.class); + datasourceOperations.executeUpdate(query); + } + + @Override + public void deleteEntity(@Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity entity) { + ModelEntity modelEntity = ModelEntity.fromEntity(entity); + Map<String, Object> params = new HashMap<>(); + params.put("id", modelEntity.getId()); + params.put("catalog_id", modelEntity.getCatalogId()); + datasourceOperations.executeUpdate( + JdbcCrudQueryGenerator.generateDeleteQuery(params, ModelEntity.class)); + } + + @Override + public void deleteFromGrantRecords( + @Nonnull PolarisCallContext callCtx, @Nonnull PolarisGrantRecord grantRec) { + ModelGrantRecord modelGrantRecord = ModelGrantRecord.fromGrantRecord(grantRec); + String query = + JdbcCrudQueryGenerator.generateDeleteQuery(modelGrantRecord, ModelGrantRecord.class); + datasourceOperations.executeUpdate(query); + } + + @Override + public void deleteAllEntityGrantRecords( + @Nonnull PolarisCallContext callCtx, + PolarisEntityCore entity, + @Nonnull List<PolarisGrantRecord> grantsOnGrantee, + @Nonnull List<PolarisGrantRecord> grantsOnSecurable) { + // generate where clause + StringBuilder granteeCondition = new StringBuilder("(grantee_id, grantee_catalog_id) IN ("); + granteeCondition.append("(" + entity.getId() + ", " + entity.getCatalogId() + ")"); + granteeCondition.append(","); + // extra , removed + granteeCondition.deleteCharAt(granteeCondition.length() - 1); + granteeCondition.append(")"); + + StringBuilder securableCondition = + new StringBuilder("(securable_catalog_id, securable_id) IN ("); + + String in = "(" + entity.getCatalogId() + ", " + entity.getId() + ")"; + securableCondition.append(in); + securableCondition.append(","); + + // extra , removed + securableCondition.deleteCharAt(securableCondition.length() - 1); + securableCondition.append(")"); + + String whereClause = " WHERE " + granteeCondition + " OR " + securableCondition; + datasourceOperations.executeUpdate( + JdbcCrudQueryGenerator.generateDeleteQuery(ModelGrantRecord.class, whereClause)); + } + + @Override + public void deleteAll(@Nonnull PolarisCallContext callCtx) { + datasourceOperations.executeUpdate(JdbcCrudQueryGenerator.generateDeleteAll(ModelEntity.class)); + datasourceOperations.executeUpdate( + JdbcCrudQueryGenerator.generateDeleteAll(ModelGrantRecord.class)); + datasourceOperations.executeUpdate(JdbcCrudQueryGenerator.generateDeleteAll(ModelEntity.class)); + } + + @Override + public PolarisBaseEntity lookupEntity( + @Nonnull PolarisCallContext callCtx, long catalogId, long entityId, int typeCode) { + Map<String, Object> params = new HashMap<>(); + params.put("catalog_id", catalogId); + params.put("id", entityId); + params.put("type_code", typeCode); + String query = + JdbcCrudQueryGenerator.generateSelectQuery( + ModelEntity.class, params, null, null, "last_update_timestamp"); Review Comment: actually we don't let me remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
