Copilot commented on code in PR #405: URL: https://github.com/apache/atlas/pull/405#discussion_r2217148711
########## graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/RdbmsUniqueKeyHandler.java: ########## @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.janusgraph.diskstorage.rdbms; + +import org.janusgraph.graphdb.relations.RelationIdentifier; + +public class RdbmsUniqueKeyHandler { + private static final String SQL_INSERT_UNIQUE_VERTEX_KEY = "INSERT INTO janus_unique_vertex_key (id, vertex_id, key_name, val) VALUES (NEXTVAL('janus_unique_vertex_key_seq'), ?, ?, ?)"; + private static final String SQL_DELETE_UNIQUE_VERTEX_KEY = "DELETE FROM janus_unique_vertex_key WHERE key_name = ? AND val = ?"; + private static final String SQL_INSERT_UNIQUE_VERTEX_TYPE_KEY = "INSERT INTO janus_unique_vertex_type_key (id, vertex_id, type_name, key_name, val) VALUES (NEXTVAL('janus_unique_vertex_type_key_seq'), ?, ?, ?, ?)"; + private static final String SQL_DELETE_UNIQUE_VERTEX_TYPE_KEY = "DELETE FROM janus_unique_vertex_type_key WHERE type_name = ? AND key_name = ? AND val = ?"; + private static final String SQL_DELETE_UNIQUE_VERTEX_KEY_BY_VERTEX_ID = "DELETE FROM janus_unique_vertex_key WHERE vertex_id = ?"; + private static final String SQL_DELETE_UNIQUE_VERTEX_TYPE_KEY_BY_VERTEX_ID = "DELETE FROM janus_unique_vertex_type_key WHERE vertex_id = ?"; + + private static final String SQL_INSERT_UNIQUE_EDGE_KEY = "INSERT INTO janus_unique_edge_key (id, edge_id, key_name, val) VALUES (NEXTVAL('janus_unique_edge_key_seq'), ?, ?, ?)"; + private static final String SQL_DELETE_UNIQUE_EDGE_KEY = "DELETE FROM janus_unique_edge_key WHERE key_name = ? AND val = ?"; + private static final String SQL_INSERT_UNIQUE_EDGE_TYPE_KEY = "INSERT INTO janus_unique_edge_type_key (id, edge_id, type_name, key_name, val) VALUES (NEXTVAL('janus_unique_edge_type_key_seq'), ?, ?, ?, ?)"; + private static final String SQL_DELETE_UNIQUE_EDGE_TYPE_KEY = "DELETE FROM janus_unique_edge_type_key WHERE type_name = ? AND key_name = ? AND val = ?"; + private static final String SQL_DELETE_UNIQUE_EDGE_KEY_BY_EDGE_ID = "DELETE FROM janus_unique_edge_key WHERE edge_id = ?"; + private static final String SQL_DELETE_UNIQUE_EDGE_TYPE_KEY_BY_EDGE_ID = "DELETE FROM janus_unique_edge_type_key WHERE edge_id = ?"; + + public void addUniqueKey(String keyName, Object value, Object elementId, boolean isVertex) { + RdbmsTransaction trx = RdbmsTransaction.getActiveTransaction(); + + if (trx != null) { + trx.getEntityManager().createNativeQuery(isVertex ? SQL_INSERT_UNIQUE_VERTEX_KEY : SQL_INSERT_UNIQUE_EDGE_KEY) + .setParameter(1, getNumberId(elementId)) + .setParameter(2, keyName) + .setParameter(3, value) + .executeUpdate(); + } else { + throw new IllegalStateException("No active transaction found to add unique key: keyName=" + keyName + ", value=" + value); + } + } + + public void removeUniqueKey(String keyName, Object value, Object elementId, boolean isVertex) { Review Comment: The SQL DELETE statements are not using the elementId parameter correctly. Lines 56-57 pass getNumberId(elementId) as parameter 1, but the SQL expects keyName and value for the WHERE clause, not elementId. ########## repository/src/main/java/org/apache/atlas/repository/audit/rdbms/RdbmsBasedAuditRepository.java: ########## @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.audit.rdbms; + +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasException; +import org.apache.atlas.EntityAuditEvent; +import org.apache.atlas.annotation.ConditionalOnAtlasProperty; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.audit.EntityAuditEventV2; +import org.apache.atlas.repository.audit.AbstractStorageBasedAuditRepository; +import org.apache.atlas.repository.audit.rdbms.dao.DbEntityAuditDao; +import org.apache.atlas.repository.audit.rdbms.entity.DbEntityAudit; +import org.apache.commons.configuration.Configuration; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; + +import javax.inject.Singleton; + +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +@Singleton +@Component +@ConditionalOnAtlasProperty(property = "atlas.EntityAuditRepository.impl", isDefault = true) +@Order(0) +public class RdbmsBasedAuditRepository extends AbstractStorageBasedAuditRepository { + @Override + public void putEventsV1(List<EntityAuditEvent> events) throws AtlasException { + // TODO: is V1 support needed anymore? + } + + @Override + public List<EntityAuditEvent> listEventsV1(String entityId, String startKey, short n) throws AtlasException { + // TODO: is V1 support needed anymore? + return Collections.emptyList(); + } + + @Override + public void putEventsV2(List<EntityAuditEventV2> events) throws AtlasBaseException { + try (RdbmsTransaction trx = new RdbmsTransaction()) { + DbEntityAuditDao dao = new DbEntityAuditDao(trx.getEntityManager()); + + for (int i = 0; i < events.size(); i++) { + EntityAuditEventV2 event = events.get(i); + DbEntityAudit dbEvent = toDbEntityAudit(event); + + dbEvent.setEventIndex(i); + + dao.create(dbEvent); + } + + trx.commit(); + } catch (Exception excp) { + throw new AtlasBaseException("Error while persisting audit events", excp); + } + } + + @Override + public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String startKey, short maxResultCount) throws AtlasBaseException { + try (RdbmsTransaction trx = new RdbmsTransaction()) { + DbEntityAuditDao dao = new DbEntityAuditDao(trx.getEntityManager()); + + List<DbEntityAudit> dbEvents = dao.getByEntityIdActionStartTimeStartIdx(entityId, auditAction.ordinal(), getTimestampFromKey(startKey), getIndexFromKey(startKey), maxResultCount); + + return dbEvents.stream().map(RdbmsBasedAuditRepository::fromDbEntityAudit).collect(Collectors.toList()); + } catch (Exception excp) { + throw new AtlasBaseException("Error while retrieving audit events", excp); + } + } + + @Override + public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn, boolean sortOrderDesc, int offset, short limit) throws AtlasBaseException { + try (RdbmsTransaction trx = new RdbmsTransaction()) { + DbEntityAuditDao dao = new DbEntityAuditDao(trx.getEntityManager()); + + List<DbEntityAudit> dbEvents = dao.getByEntityIdAction(entityId, auditAction == null ? null : auditAction.ordinal(), offset, limit); + + return dbEvents.stream().map(RdbmsBasedAuditRepository::fromDbEntityAudit).collect(Collectors.toList()); + } catch (Exception excp) { + throw new AtlasBaseException("Error while retrieving audit events", excp); + } + } + + @Override + public Set<String> getEntitiesWithTagChanges(long fromTimestamp, long toTimestamp) throws AtlasBaseException { + // TODO: Review Comment: Method getEntitiesWithTagChanges contains only a TODO comment and returns an empty set. This incomplete implementation may cause issues if this functionality is required by the audit system. ########## repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java: ########## @@ -229,17 +231,56 @@ public static <T extends AtlasElement> void setProperty(T element, String proper LOG.debug("==> setProperty({}, {}, {})", toString(element), propertyName, value); } + AtlasUniqueKeyHandler uniqueKeyHandler = getGraphInstance().getUniqueKeyHandler(); + if (!isEncoded) { propertyName = encodePropertyKey(propertyName); } if (value == null) { + if (uniqueKeyHandler != null) { + if (GraphBackedSearchIndexer.isTypeUniqueIndexKey(propertyName)) { + uniqueKeyHandler.removeTypeUniqueKey(getProperty(element, TYPE_NAME_PROPERTY_KEY, String.class), propertyName, getProperty(element, propertyName, Object.class), element.getId(), element instanceof AtlasVertex); + } else if (GraphBackedSearchIndexer.isGlobalUniqueIndexKey(propertyName)) { + uniqueKeyHandler.removeUniqueKey(propertyName, getProperty(element, propertyName, String.class), element.getId(), element instanceof AtlasVertex); Review Comment: [nitpick] Multiple calls to getProperty() for the same element and property could be optimized by storing the result in a variable, especially since this is called within conditional blocks that may execute frequently. ```suggestion String typeName = getProperty(element, TYPE_NAME_PROPERTY_KEY, String.class); Object existingValue = getProperty(element, propertyName, Object.class); uniqueKeyHandler.removeTypeUniqueKey(typeName, propertyName, existingValue, element.getId(), element instanceof AtlasVertex); } else if (GraphBackedSearchIndexer.isGlobalUniqueIndexKey(propertyName)) { String existingValueStr = getProperty(element, propertyName, String.class); uniqueKeyHandler.removeUniqueKey(propertyName, existingValueStr, element.getId(), element instanceof AtlasVertex); ``` ########## graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/dao/JanusColumnDao.java: ########## @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.janusgraph.diskstorage.rdbms.dao; + +import org.eclipse.persistence.queries.ScrollableCursor; +import org.janusgraph.diskstorage.Entry; +import org.janusgraph.diskstorage.StaticBuffer; +import org.janusgraph.diskstorage.keycolumnvalue.KeyIterator; +import org.janusgraph.diskstorage.rdbms.JanusColumnValue; +import org.janusgraph.diskstorage.rdbms.RdbmsStore; +import org.janusgraph.diskstorage.rdbms.RdbmsTransaction; +import org.janusgraph.diskstorage.rdbms.entity.JanusColumn; +import org.janusgraph.diskstorage.util.RecordIterator; +import org.janusgraph.diskstorage.util.StaticArrayBuffer; +import org.janusgraph.diskstorage.util.StaticArrayEntry; + +import javax.persistence.NoResultException; + +import java.util.ArrayList; +import java.util.List; + +/** + * DAO to access Column entities stored in RDBMS + * + */ +public class JanusColumnDao extends BaseDao<JanusColumn> { + private final RdbmsStore store; + + public JanusColumnDao(RdbmsTransaction trx, RdbmsStore store) { + super(trx); + + this.store = store; + } + + public void addOrUpdate(long keyId, byte[] name, byte[] val) { + try { + em.createNativeQuery("INSERT INTO janus_column (id, key_id, name, val) VALUES (NEXTVAL('janus_column_seq'), ?, ?, ?) ON CONFLICT (key_id, name) DO UPDATE SET val = EXCLUDED.val") Review Comment: [nitpick] Using native SQL queries with hardcoded table names reduces portability and makes the code database-specific. Consider using JPA named queries or criteria API for better database abstraction. ########## repository/src/main/java/org/apache/atlas/repository/audit/rdbms/RdbmsBasedAuditRepository.java: ########## @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.audit.rdbms; + +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasException; +import org.apache.atlas.EntityAuditEvent; +import org.apache.atlas.annotation.ConditionalOnAtlasProperty; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.audit.EntityAuditEventV2; +import org.apache.atlas.repository.audit.AbstractStorageBasedAuditRepository; +import org.apache.atlas.repository.audit.rdbms.dao.DbEntityAuditDao; +import org.apache.atlas.repository.audit.rdbms.entity.DbEntityAudit; +import org.apache.commons.configuration.Configuration; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; + +import javax.inject.Singleton; + +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +@Singleton +@Component +@ConditionalOnAtlasProperty(property = "atlas.EntityAuditRepository.impl", isDefault = true) +@Order(0) +public class RdbmsBasedAuditRepository extends AbstractStorageBasedAuditRepository { + @Override + public void putEventsV1(List<EntityAuditEvent> events) throws AtlasException { + // TODO: is V1 support needed anymore? Review Comment: [nitpick] V1 audit methods contain TODO comments and empty implementations. If V1 support is not needed, these methods should either be properly implemented or throw UnsupportedOperationException to make the intent clear. ########## graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/RdbmsStoreManager.java: ########## @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.janusgraph.diskstorage.rdbms; + +import org.janusgraph.diskstorage.BackendException; +import org.janusgraph.diskstorage.BaseTransactionConfig; +import org.janusgraph.diskstorage.StaticBuffer; +import org.janusgraph.diskstorage.StoreMetaData; +import org.janusgraph.diskstorage.common.AbstractStoreManager; +import org.janusgraph.diskstorage.configuration.ConfigNamespace; +import org.janusgraph.diskstorage.configuration.Configuration; +import org.janusgraph.diskstorage.keycolumnvalue.KCVMutation; +import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStore; +import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStoreManager; +import org.janusgraph.diskstorage.keycolumnvalue.KeyRange; +import org.janusgraph.diskstorage.keycolumnvalue.StandardStoreFeatures; +import org.janusgraph.diskstorage.keycolumnvalue.StoreFeatures; +import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction; +import org.janusgraph.diskstorage.rdbms.dao.DaoManager; +import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration; +import org.janusgraph.graphdb.configuration.PreInitializeConfigOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Storage Manager for RDBMS + * + */ +@PreInitializeConfigOptions +public class RdbmsStoreManager extends AbstractStoreManager implements KeyColumnValueStoreManager { + private static final Logger LOG = LoggerFactory.getLogger(RdbmsStoreManager.class); + + private static final String NAME = "rdbms"; + + public static final ConfigNamespace RDBMS_NS = new ConfigNamespace(GraphDatabaseConfiguration.STORAGE_NS, NAME, "RDBMS configuration options"); + public static final ConfigNamespace JPA_CONFIG_NS = new ConfigNamespace(RDBMS_NS, "jpa", "JPA configurations", true); + + private static RdbmsStoreManager sInstance; + + private final StandardStoreFeatures features; + private final Map<String, RdbmsStore> stores; + private final DaoManager daoManager; + + public RdbmsStoreManager(Configuration config) { + super(config); + + features = new StandardStoreFeatures.Builder() + .orderedScan(true) + .unorderedScan(true) + .multiQuery(true) + .transactional(true) + .keyConsistent(GraphDatabaseConfiguration.buildGraphConfiguration()) + .keyOrdered(true) + .batchMutation(true) + .build(); + stores = new HashMap<>(); + daoManager = new DaoManager(config.getSubset(JPA_CONFIG_NS)); + + sInstance = this;; Review Comment: Double semicolon should be a single semicolon. ```suggestion sInstance = this; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@atlas.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org