Copilot commented on code in PR #389: URL: https://github.com/apache/atlas/pull/389#discussion_r2158054101
########## graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/dao/DaoManager.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.janusgraph.diskstorage.rdbms.dao; + +import org.eclipse.persistence.config.PersistenceUnitProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.persistence.EntityManager; +import javax.persistence.EntityManagerFactory; +import javax.persistence.Persistence; +import javax.persistence.spi.PersistenceProvider; +import javax.persistence.spi.PersistenceProviderResolver; +import javax.persistence.spi.PersistenceProviderResolverHolder; + +import java.util.HashMap; +import java.util.Map; + +/** + * DAO manager that initializes JPA layer + * + * Sample properties to initialize JPA + * storage.backend=rdbms + * storage.rdbms.jpa.javax.persistence.jdbc.dialect=org.eclipse.persistence.platform.database.PostgreSQLPlatform + * storage.rdbms.jpa.javax.persistence.jdbc.driver=org.postgresql.Driver + * storage.rdbms.jpa.javax.persistence.jdbc.url=jdbc:postgresql://dbhost/dbname + * storage.rdbms.jpa.javax.persistence.jdbc.user=janus + * storage.rdbms.jpa.javax.persistence.jdbc.password=janusR0cks! + * storage.rdbms.jpa.javax.persistence.schema-generation.database.action=create + * storage.rdbms.jpa.javax.persistence.schema-generation.create-database-schemas=true + * storage.rdbms.jpa.javax.persistence.schema-generation.create-source=metadata + * + * @author Madhan Neethiraj <mad...@apache.org> + */ +public class DaoManager { + private static final Logger LOG = LoggerFactory.getLogger(DaoManager.class); + + private final EntityManagerFactory emFactory; + + /** + * + * @param jpaConfig + */ + public DaoManager(Map<String, Object> jpaConfig) { + Map<String, String> config = new HashMap<>(); + + if (jpaConfig != null) { + for (Map.Entry<String, Object> entry : jpaConfig.entrySet()) { + String key = entry.getKey(); + Object value = entry.getValue(); + + if (value != null) { + config.put(key, value.toString()); + } + } + } + + config.put(PersistenceUnitProperties.ECLIPSELINK_PERSISTENCE_XML, "META-INF/janus-persistence.xml"); + + LOG.warn("DaoManager: config={}", config); + + PersistenceProviderResolver resolver = PersistenceProviderResolverHolder.getPersistenceProviderResolver(); + + for (PersistenceProvider provider : resolver.getPersistenceProviders()) { + LOG.warn("PersistenceProvider: " + provider.toString()); + + EntityManagerFactory emf = provider.createEntityManagerFactory("janusPU", config); + if (emf != null) { + break; + } + } + + emFactory = Persistence.createEntityManagerFactory("janusPU", config); Review Comment: The loop iterating over PersistenceProviders assigns the result to a local variable but never stores it in the class member. Consider saving the non-null factory in the member variable and breaking the loop immediately to avoid redundant factory creation. ```suggestion EntityManagerFactory tempFactory = null; for (PersistenceProvider provider : resolver.getPersistenceProviders()) { LOG.warn("PersistenceProvider: " + provider.toString()); tempFactory = provider.createEntityManagerFactory("janusPU", config); if (tempFactory != null) { emFactory = tempFactory; break; } } if (emFactory == null) { emFactory = Persistence.createEntityManagerFactory("janusPU", config); } ``` ########## graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/RdbmsStore.java: ########## @@ -0,0 +1,314 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.janusgraph.diskstorage.rdbms; + +import org.janusgraph.diskstorage.BackendException; +import org.janusgraph.diskstorage.Entry; +import org.janusgraph.diskstorage.EntryList; +import org.janusgraph.diskstorage.EntryMetaData; +import org.janusgraph.diskstorage.StaticBuffer; +import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStore; +import org.janusgraph.diskstorage.keycolumnvalue.KeyIterator; +import org.janusgraph.diskstorage.keycolumnvalue.KeyRangeQuery; +import org.janusgraph.diskstorage.keycolumnvalue.KeySliceQuery; +import org.janusgraph.diskstorage.keycolumnvalue.KeySlicesIterator; +import org.janusgraph.diskstorage.keycolumnvalue.MultiSlicesQuery; +import org.janusgraph.diskstorage.keycolumnvalue.SliceQuery; +import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction; +import org.janusgraph.diskstorage.rdbms.dao.DaoManager; +import org.janusgraph.diskstorage.rdbms.dao.JanusColumnDao; +import org.janusgraph.diskstorage.rdbms.dao.JanusKeyDao; +import org.janusgraph.diskstorage.rdbms.dao.JanusStoreDao; +import org.janusgraph.diskstorage.rdbms.entity.JanusKey; +import org.janusgraph.diskstorage.rdbms.entity.JanusStore; +import org.janusgraph.diskstorage.util.StaticArrayEntry; +import org.janusgraph.diskstorage.util.StaticArrayEntryList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +/** + * KeyColumnValue store backed by RDBMS + * + * @author Madhan Neethiraj <mad...@apache.org> + */ +public class RdbmsStore implements KeyColumnValueStore { + private static final Logger LOG = LoggerFactory.getLogger(RdbmsStore.class); + + private final String name; + private final DaoManager daoManager; + private final EntryMetaData[] entryMetaData; + private Long storeId; + + public RdbmsStore(String name, RdbmsStoreManager storeManager) { + LOG.info("RdbmsStore(name={})", name); + + this.name = name; + this.daoManager = storeManager.getDaoManager(); + this.entryMetaData = storeManager.getMetaDataSchema(name); + this.storeId = null; + } + + @Override + public EntryList getSlice(KeySliceQuery query, StoreTransaction trx) throws BackendException { + LOG.debug("==> RdbmsStore.getSlice(name={}, query={}, trx={})", name, query, trx); + + final EntryList ret; + + if (isStorePresent(trx)) { + JanusColumnDao dao = new JanusColumnDao((RdbmsTransaction) trx, this); + Long keyId = getKeyIdOrCreate(toBytes(query.getKey()), trx); + byte[] sliceStart = toBytes(query.getSliceStart()); + byte[] sliceEnd = toBytes(query.getSliceEnd()); + List<JanusColumnValue> entries = dao.getColumns(keyId, sliceStart, sliceEnd, query.getLimit()); + + if (entries != null && !entries.isEmpty()) { + ret = StaticArrayEntryList.ofStaticBuffer(entries, toEntry); + } else { + ret = EntryList.EMPTY_LIST; + } + } else { + ret = EntryList.EMPTY_LIST; + } + + LOG.debug("<== RdbmsStore.getSlice(name={}, query={}, trx={}): ret={}", name, query, trx, ret.size()); + + return ret; + } + + @Override + public Map<StaticBuffer, EntryList> getSlice(List<StaticBuffer> keys, SliceQuery query, StoreTransaction trx) throws BackendException { + LOG.debug("==> RdbmsStore.getSlice(name={}, len(keys)={}, query={}, trx={})", name, keys.size(), query, trx); + + final Map<StaticBuffer, EntryList> ret; + + if (isStorePresent(trx)) { + ret = new TreeMap<>(); + + for (StaticBuffer key : keys) { + ret.put(key, getSlice(new KeySliceQuery(key, query), trx)); + } + } else { + ret = Collections.emptyMap(); + } + + LOG.debug("<== RdbmsStore.getSlice(name={}, len(keys)={}, query={}, trx={})", name, keys.size(), query, trx); + + return ret; + } + + @Override + public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions, StoreTransaction trx) throws BackendException { + LOG.debug("==> RdbmsStore.mutate(name={}, key={}, additions={}, deletions={}, trx={})", name, key, additions, deletions, trx); + + byte[] keyName = toBytes(key); + Long keyId = getKeyIdOrCreate(keyName, trx); + JanusColumnDao columnDao = new JanusColumnDao((RdbmsTransaction) trx, this); + + for (StaticBuffer column : deletions) { + byte[] columnName = toBytes(column); + + columnDao.remove(keyId, columnName); + } + + for (Entry entry : additions) { + columnDao.addOrUpdate(keyId, toBytes(entry.getColumn()), toBytes(entry.getValue())); + } + + LOG.debug("<== RdbmsStore.mutate(name={}, key={}, additions={}, deletions={}, trx={})", name, key, additions, deletions, trx); + } + + @Override + public void acquireLock(StaticBuffer key, StaticBuffer column, StaticBuffer expectedValue, StoreTransaction trx) throws BackendException { + throw new UnsupportedOperationException(); + } + + @Override + public KeyIterator getKeys(KeyRangeQuery query, StoreTransaction trx) throws BackendException { + LOG.debug("==> RdbmsStore.getKeys(name={}, query={}, trx={})", name, query, trx); + + final KeyIterator ret; + + if (isStorePresent(trx)) { + JanusColumnDao dao = new JanusColumnDao((RdbmsTransaction) trx, this); + + ret = dao.getKeysByKeyAndColumnRange(this.storeId, toBytes(query.getKeyStart()), toBytes(query.getKeyEnd()), toBytes(query.getSliceStart()), toBytes(query.getSliceEnd()), query.getLimit()); + } else { + ret = JanusColumnDao.EMPTY_KEY_ITERATOR; + } + + LOG.debug("<== RdbmsStore.debug(name={}, query={}, trx={})", name, query, trx); + + return ret; + } + + @Override + public KeyIterator getKeys(SliceQuery query, StoreTransaction trx) throws BackendException { + LOG.debug("==> RdbmsStore.getKeys(name={}, query={}, trx={})", name, query, trx); + + final KeyIterator ret; + + if (isStorePresent(trx)) { + JanusColumnDao dao = new JanusColumnDao((RdbmsTransaction) trx, this); + + ret = dao.getKeysByColumnRange(this.storeId, toBytes(query.getSliceStart()), toBytes(query.getSliceEnd()), query.getLimit()); + } else { + ret = JanusColumnDao.EMPTY_KEY_ITERATOR; + } + + LOG.debug("<== RdbmsStore.debug(name={}, query={}, trx={})", name, query, trx); + + return ret; + } + + @Override + public KeySlicesIterator getKeys(MultiSlicesQuery query, StoreTransaction trx) throws BackendException { + throw new UnsupportedOperationException(); + } + + @Override + public String getName() { + return name; + } + + @Override + public void close() throws BackendException { + LOG.debug("==> RdbmsStore.close(name={})", name); + + LOG.debug("<== RdbmsStore.close(name={})", name); + } + + private boolean isStorePresent(StoreTransaction trx) { + if (this.storeId == null) { + JanusStoreDao storeDao = new JanusStoreDao((RdbmsTransaction) trx); + + this.storeId = storeDao.getIdByName(name); + + return this.storeId != null; + } + + return true; + } + + private static byte[] toBytes(StaticBuffer val) { + return val == null ? null : val.as(StaticBuffer.ARRAY_FACTORY); + } + + private Long getStoreIdOrCreate(StoreTransaction trx) { + Long ret = this.storeId; + + while (ret == null) { Review Comment: The 'while (ret == null)' loop in getStoreIdOrCreate may lead to an infinite loop if store creation continuously fails. Consider adding a maximum retry count or a timeout mechanism to prevent potential hanging. ```suggestion int retryCount = 0; final int maxRetries = 5; // Maximum number of retries while (ret == null && retryCount < maxRetries) { ``` ########## graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/RdbmsStore.java: ########## @@ -0,0 +1,314 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.janusgraph.diskstorage.rdbms; + +import org.janusgraph.diskstorage.BackendException; +import org.janusgraph.diskstorage.Entry; +import org.janusgraph.diskstorage.EntryList; +import org.janusgraph.diskstorage.EntryMetaData; +import org.janusgraph.diskstorage.StaticBuffer; +import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStore; +import org.janusgraph.diskstorage.keycolumnvalue.KeyIterator; +import org.janusgraph.diskstorage.keycolumnvalue.KeyRangeQuery; +import org.janusgraph.diskstorage.keycolumnvalue.KeySliceQuery; +import org.janusgraph.diskstorage.keycolumnvalue.KeySlicesIterator; +import org.janusgraph.diskstorage.keycolumnvalue.MultiSlicesQuery; +import org.janusgraph.diskstorage.keycolumnvalue.SliceQuery; +import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction; +import org.janusgraph.diskstorage.rdbms.dao.DaoManager; +import org.janusgraph.diskstorage.rdbms.dao.JanusColumnDao; +import org.janusgraph.diskstorage.rdbms.dao.JanusKeyDao; +import org.janusgraph.diskstorage.rdbms.dao.JanusStoreDao; +import org.janusgraph.diskstorage.rdbms.entity.JanusKey; +import org.janusgraph.diskstorage.rdbms.entity.JanusStore; +import org.janusgraph.diskstorage.util.StaticArrayEntry; +import org.janusgraph.diskstorage.util.StaticArrayEntryList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +/** + * KeyColumnValue store backed by RDBMS + * + * @author Madhan Neethiraj <mad...@apache.org> + */ +public class RdbmsStore implements KeyColumnValueStore { + private static final Logger LOG = LoggerFactory.getLogger(RdbmsStore.class); + + private final String name; + private final DaoManager daoManager; + private final EntryMetaData[] entryMetaData; + private Long storeId; + + public RdbmsStore(String name, RdbmsStoreManager storeManager) { + LOG.info("RdbmsStore(name={})", name); + + this.name = name; + this.daoManager = storeManager.getDaoManager(); + this.entryMetaData = storeManager.getMetaDataSchema(name); + this.storeId = null; + } + + @Override + public EntryList getSlice(KeySliceQuery query, StoreTransaction trx) throws BackendException { + LOG.debug("==> RdbmsStore.getSlice(name={}, query={}, trx={})", name, query, trx); + + final EntryList ret; + + if (isStorePresent(trx)) { + JanusColumnDao dao = new JanusColumnDao((RdbmsTransaction) trx, this); + Long keyId = getKeyIdOrCreate(toBytes(query.getKey()), trx); + byte[] sliceStart = toBytes(query.getSliceStart()); + byte[] sliceEnd = toBytes(query.getSliceEnd()); + List<JanusColumnValue> entries = dao.getColumns(keyId, sliceStart, sliceEnd, query.getLimit()); + + if (entries != null && !entries.isEmpty()) { + ret = StaticArrayEntryList.ofStaticBuffer(entries, toEntry); + } else { + ret = EntryList.EMPTY_LIST; + } + } else { + ret = EntryList.EMPTY_LIST; + } + + LOG.debug("<== RdbmsStore.getSlice(name={}, query={}, trx={}): ret={}", name, query, trx, ret.size()); + + return ret; + } + + @Override + public Map<StaticBuffer, EntryList> getSlice(List<StaticBuffer> keys, SliceQuery query, StoreTransaction trx) throws BackendException { + LOG.debug("==> RdbmsStore.getSlice(name={}, len(keys)={}, query={}, trx={})", name, keys.size(), query, trx); + + final Map<StaticBuffer, EntryList> ret; + + if (isStorePresent(trx)) { + ret = new TreeMap<>(); + + for (StaticBuffer key : keys) { + ret.put(key, getSlice(new KeySliceQuery(key, query), trx)); + } + } else { + ret = Collections.emptyMap(); + } + + LOG.debug("<== RdbmsStore.getSlice(name={}, len(keys)={}, query={}, trx={})", name, keys.size(), query, trx); + + return ret; + } + + @Override + public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions, StoreTransaction trx) throws BackendException { + LOG.debug("==> RdbmsStore.mutate(name={}, key={}, additions={}, deletions={}, trx={})", name, key, additions, deletions, trx); + + byte[] keyName = toBytes(key); + Long keyId = getKeyIdOrCreate(keyName, trx); + JanusColumnDao columnDao = new JanusColumnDao((RdbmsTransaction) trx, this); + + for (StaticBuffer column : deletions) { + byte[] columnName = toBytes(column); + + columnDao.remove(keyId, columnName); + } + + for (Entry entry : additions) { + columnDao.addOrUpdate(keyId, toBytes(entry.getColumn()), toBytes(entry.getValue())); + } + + LOG.debug("<== RdbmsStore.mutate(name={}, key={}, additions={}, deletions={}, trx={})", name, key, additions, deletions, trx); + } + + @Override + public void acquireLock(StaticBuffer key, StaticBuffer column, StaticBuffer expectedValue, StoreTransaction trx) throws BackendException { + throw new UnsupportedOperationException(); + } + + @Override + public KeyIterator getKeys(KeyRangeQuery query, StoreTransaction trx) throws BackendException { + LOG.debug("==> RdbmsStore.getKeys(name={}, query={}, trx={})", name, query, trx); + + final KeyIterator ret; + + if (isStorePresent(trx)) { + JanusColumnDao dao = new JanusColumnDao((RdbmsTransaction) trx, this); + + ret = dao.getKeysByKeyAndColumnRange(this.storeId, toBytes(query.getKeyStart()), toBytes(query.getKeyEnd()), toBytes(query.getSliceStart()), toBytes(query.getSliceEnd()), query.getLimit()); + } else { + ret = JanusColumnDao.EMPTY_KEY_ITERATOR; + } + + LOG.debug("<== RdbmsStore.debug(name={}, query={}, trx={})", name, query, trx); + + return ret; + } + + @Override + public KeyIterator getKeys(SliceQuery query, StoreTransaction trx) throws BackendException { + LOG.debug("==> RdbmsStore.getKeys(name={}, query={}, trx={})", name, query, trx); + + final KeyIterator ret; + + if (isStorePresent(trx)) { + JanusColumnDao dao = new JanusColumnDao((RdbmsTransaction) trx, this); + + ret = dao.getKeysByColumnRange(this.storeId, toBytes(query.getSliceStart()), toBytes(query.getSliceEnd()), query.getLimit()); + } else { + ret = JanusColumnDao.EMPTY_KEY_ITERATOR; + } + + LOG.debug("<== RdbmsStore.debug(name={}, query={}, trx={})", name, query, trx); + + return ret; + } + + @Override + public KeySlicesIterator getKeys(MultiSlicesQuery query, StoreTransaction trx) throws BackendException { + throw new UnsupportedOperationException(); + } + + @Override + public String getName() { + return name; + } + + @Override + public void close() throws BackendException { + LOG.debug("==> RdbmsStore.close(name={})", name); + + LOG.debug("<== RdbmsStore.close(name={})", name); + } + + private boolean isStorePresent(StoreTransaction trx) { + if (this.storeId == null) { + JanusStoreDao storeDao = new JanusStoreDao((RdbmsTransaction) trx); + + this.storeId = storeDao.getIdByName(name); + + return this.storeId != null; + } + + return true; + } + + private static byte[] toBytes(StaticBuffer val) { + return val == null ? null : val.as(StaticBuffer.ARRAY_FACTORY); + } + + private Long getStoreIdOrCreate(StoreTransaction trx) { + Long ret = this.storeId; + + while (ret == null) { + JanusStoreDao dao = new JanusStoreDao((RdbmsTransaction) trx); + + ret = dao.getIdByName(name); + + if (ret == null) { + RdbmsTransaction trx2 = new RdbmsTransaction(trx.getConfiguration(), daoManager); + JanusStoreDao dao2 = new JanusStoreDao(trx2); + + try { + LOG.debug("Creating store={}", name); + + dao2.create(new JanusStore(name)); + + trx2.commit(); + + ret = dao.getIdByName(name); + + this.storeId = ret; + + LOG.debug("Created store={}: id={}", name, ret); + } catch (Exception excp) { + LOG.warn("Failed to create store={}", name, excp); + } finally { + try { + trx2.close(); + } catch (Exception excp) { + // ignore + } + } + } + } + + return ret; + } + + private Long getKeyIdOrCreate(byte[] key, StoreTransaction trx) { + Long ret = null; + JanusKeyDao dao = new JanusKeyDao((RdbmsTransaction) trx); + Long storeId = getStoreIdOrCreate(trx); + + while (ret == null) { Review Comment: Similar to the store creation, the getKeyIdOrCreate method uses a loop that might run indefinitely if a key cannot be created. Introducing a retry limit or error handling logic could mitigate the risk of an infinite loop. ```suggestion final int MAX_RETRIES = 5; // Maximum number of retry attempts int attempt = 0; // Counter to track retry attempts Long ret = null; JanusKeyDao dao = new JanusKeyDao((RdbmsTransaction) trx); Long storeId = getStoreIdOrCreate(trx); while (ret == null && attempt < MAX_RETRIES) { attempt++; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@atlas.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org