strangepleasures commented on issue #646: JENA-1785: A newly created node can remain invisible after commit URL: https://github.com/apache/jena/pull/646#issuecomment-562954480 A version based on your branch but with only one volatile field ``` /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.jena.tdb2.store.nodetable; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.apache.jena.atlas.iterator.Iter; import org.apache.jena.atlas.lib.Cache; import org.apache.jena.atlas.lib.CacheFactory; import org.apache.jena.atlas.lib.Pair; import org.apache.jena.atlas.logging.Log; import org.apache.jena.dboe.transaction.txn.Transaction; import org.apache.jena.dboe.transaction.txn.TransactionListener; import org.apache.jena.graph.Node; import org.apache.jena.tdb2.TDBException; import org.apache.jena.tdb2.params.StoreParams; import org.apache.jena.tdb2.store.NodeId; /** * Cache wrapper around a NodeTable. Assumes all access goes through this * wrapper. Read-cache - write caching is done via the object file used by the * base NodeTable. */ public class NodeTableCache implements NodeTable, TransactionListener { // These caches are updated together. // See synchronization in _retrieveNodeByNodeId and _idForNode // The cache is assumed to be single operation-thread-safe. // The buffering is for updates. Only the updating thread will see changes due to new nodes // Case 1: Not in main "not-present" // Add to local "not-present", flush down. // Case 2: In main "not-present" // May be goes into local cache. // Write back updates "not-present" // Depends on "not-rpesent" used to protect the underlying "not-present" private ThreadBufferingCache<Node, NodeId> node2id_Cache = null; private ThreadBufferingCache<NodeId, Node> id2node_Cache = null; // A small cache of "known unknowns" to speed up searching for impossible things. // Cache update needed on NodeTable changes because a node may become "known" private Cache<Node, Object> notPresent = null; private NodeTable baseTable; private final Object lock = new Object(); private volatile Thread writingThread; public static NodeTable create(NodeTable nodeTable, StoreParams params) { int nodeToIdCacheSize = params.getNode2NodeIdCacheSize(); int idToNodeCacheSize = params.getNodeId2NodeCacheSize(); if ( nodeToIdCacheSize <= 0 && idToNodeCacheSize <= 0 ) return nodeTable; return create(nodeTable, nodeToIdCacheSize, idToNodeCacheSize, params.getNodeMissCacheSize()); } private static NodeTable create(NodeTable nodeTable, int nodeToIdCacheSize, int idToNodeCacheSize, int nodeMissesCacheSize) { if ( nodeToIdCacheSize <= 0 && idToNodeCacheSize <= 0 ) return nodeTable; return new NodeTableCache(nodeTable, nodeToIdCacheSize, idToNodeCacheSize, nodeMissesCacheSize); } private NodeTableCache(NodeTable baseTable, int nodeToIdCacheSize, int idToNodeCacheSize, int nodeMissesCacheSize) { this.baseTable = baseTable; if ( nodeToIdCacheSize > 0 ) node2id_Cache = createCache("nodeToId", nodeToIdCacheSize, 1000); if ( idToNodeCacheSize > 0 ) id2node_Cache = createCache("idToNode", idToNodeCacheSize, 1000); if ( nodeMissesCacheSize > 0 ) notPresent = CacheFactory.createCache(nodeMissesCacheSize); } private static <Key, Value> ThreadBufferingCache<Key, Value> createCache(String label, int mainCachesize, int bufferSize) { Cache<Key, Value> cache = CacheFactory.createCache(mainCachesize); return new ThreadBufferingCache<>(label, cache, bufferSize); } // ---- Cache access, no going to underlying table. public Node getNodeForNodeIdCache(NodeId id) { return id2node_Cache.getIfPresent(id); } public NodeId getNodeIdForNodeCache(Node node) { return node2id_Cache.getIfPresent(node); } public boolean isCachedNodeId(NodeId id) { return getNodeForNodeIdCache(id) != null; } public boolean isCachedNode(Node node) { return getNodeIdForNodeCache(node) != null; } // ---- Cache access @Override public final NodeTable wrapped() { return baseTable; } /** Get the Node for this NodeId, or null if none */ @Override public Node getNodeForNodeId(NodeId id) { return _retrieveNodeByNodeId(id); } /** Find the NodeId for a node, or return NodeId.NodeDoesNotExist */ @Override public NodeId getNodeIdForNode(Node node) { return _idForNode(node, false); } /** * Find the NodeId for a node, allocating a new NodeId if the Node does not * yet have a NodeId */ @Override public NodeId getAllocateNodeId(Node node) { return _idForNode(node, true); } @Override public boolean containsNode(Node node) { NodeId x = getNodeIdForNode(node); return NodeId.isDoesNotExist(x); } @Override public boolean containsNodeId(NodeId nodeId) { Node x = getNodeForNodeId(nodeId); return x == null; } @Override public List<NodeId> bulkNodeToNodeId(List<Node> required, boolean withAllocation) { synchronized(lock) { List<Node> nodes = new ArrayList<>(); for ( Node n : required ) { // if ( getNodeIdForNodeCache(n) == null ) nodes.add(n); } // Check bulk access. List<NodeId> x = baseTable.bulkNodeToNodeId(nodes, true); for ( int i = 0; i < nodes.size() ; i++ ) { Node n = nodes.get(i); NodeId nid = x.get(i); cacheUpdate(n ,nid); } return x; } } @Override public List<Node> bulkNodeIdToNode(List<NodeId> nodeIds) { return NodeTableOps.bulkNodeIdToNodeImpl(this, nodeIds); } // ---- The worker functions // NodeId ==> Node private Node _retrieveNodeByNodeId(NodeId id) { if ( NodeId.isDoesNotExist(id) ) return null; if ( NodeId.isAny(id) ) return null; // Try once outside the synchronized // (Cache access is thread-safe) Node n = cacheLookup(id); if ( n != null ) return n; synchronized (lock) { // Lock to update two caches consistently. // Verify cache miss n = cacheLookup(id); if ( n != null ) return n; n = baseTable.getNodeForNodeId(id); cacheUpdate(n, id); return n; } } // Node ==> NodeId private NodeId _idForNode(Node node, boolean allocate) { if ( node == Node.ANY ) return NodeId.NodeIdAny; // Try once outside the synchronized // (Cache access is thread-safe.) NodeId nodeId = cacheLookup(node); if ( nodeId != null ) return nodeId; synchronized (lock) { // Update two caches inside synchronized. // Check still valid. nodeId = cacheLookup(node); if ( nodeId != null ) return nodeId; if ( allocate ) nodeId = baseTable.getAllocateNodeId(node); else { if ( notPresent(node) ) // Known not be in the baseTable. return NodeId.NodeDoesNotExist; else nodeId = baseTable.getNodeIdForNode(node); } // Ensure caches have it. Includes recording "no such node" cacheUpdate(node, nodeId); return nodeId; } } // ---------------- // ---- Only places that the caches are touched /** * Test whether in the "not present" cache. * True means "known to be absent from the baseTable". */ private boolean notPresent(Node node) { if ( notPresent == null ) return false; return notPresent.containsKey(node); } /** * Check caches to see if we can map a NodeId to a Node. Returns null on no * cache entry. */ private Node cacheLookup(NodeId id) { if ( id2node_Cache == null ) return null; return id2node_Cache.getIfPresent(id); } /** * Check caches to see if we can map a Node to a NodeId. Returns null on no * cache entry. */ private NodeId cacheLookup(Node node) { // Remember things known (currently) not to exist. // Does not matter if notPresent is being updated elsewhere. return node2id_Cache.getIfPresent(node); } /** Update the Node<->NodeId caches */ private void cacheUpdate(Node node, NodeId id) { if ( node == null ) return; // synchronized is further out. // The "notPresent" cache is used to note whether a node // is known not to exist in the baseTable.. // This must be specially handled later if the node is added. // Only top-level transactions can add nodes to the "notPresent" cache. if ( NodeId.isDoesNotExist(id) ) { if ( notPresent != null && inTopLevelTxn()) notPresent.put(node, Boolean.TRUE); return; } if ( id == NodeId.NodeIdAny ) { Log.warn(this, "Attempt to cache NodeIdAny - ignored"); return; } if ( node2id_Cache != null ) node2id_Cache.put(node, id); if ( id2node_Cache != null ) id2node_Cache.put(id, node); // Remove if previously marked "not present" if ( notPresent != null ) notPresent.remove(node); } // A top-level transaction is either // - a write transaction or // - a read transaction with most recent data version given that there's no active write transaction. private boolean inTopLevelTxn() { Thread writer = writingThread; return (writer == null) || (writer == Thread.currentThread()); } @Override public void notifyTxnStart(Transaction transaction) { if (transaction.isWriteTxn()) updateStart(); } @Override public void notifyPromoteFinish(Transaction transaction) { if(transaction.isWriteTxn()) updateStart(); } @Override public void notifyCompleteFinish(Transaction transaction) { if(transaction.isWriteTxn()) { updateCommit(); } } @Override public void notifyAbortStart(Transaction transaction) { if(transaction.isWriteTxn()) updateAbort(); } // ---- // The cache is "optimistic" - nodes are added during the transaction. // It does not matter if they get added (and visible earlier) // because this is nothing more than "preallocation". Triples (Tuple of NodeIds) don't match. // Underlying file has them "transactionally". // On abort, it does need to be undone because the underlying NodeTable // being cached will not have them. private void updateStart() { node2id_Cache.enableBuffering(); id2node_Cache.enableBuffering(); writingThread = Thread.currentThread(); } private void updateAbort() { writingThread = null; node2id_Cache.dropBuffer(); id2node_Cache.dropBuffer(); } private void updateCommit() { writingThread = null; // Write to main caches. node2id_Cache.flushBuffer(); id2node_Cache.flushBuffer(); } @Override public boolean isEmpty() { synchronized (lock) { if ( node2id_Cache != null ) return node2id_Cache.isEmpty(); if ( id2node_Cache != null ) id2node_Cache.isEmpty(); // Write through. return baseTable.isEmpty(); } } @Override public synchronized void close() { if ( baseTable == null ) // Already closed return; baseTable.close(); node2id_Cache = null; id2node_Cache = null; notPresent = null; baseTable = null; } @Override public void sync() { baseTable.sync(); } @Override public Iterator<Pair<NodeId, Node>> all() { if ( false ) testForConsistency(); return baseTable.all(); } private void testForConsistency() { Iterator<Node> iter1 = Iter.toList(node2id_Cache.keys()).iterator(); for (; iter1.hasNext() ; ) { Node n = iter1.next(); NodeId nId = node2id_Cache.getIfPresent(n); if ( !id2node_Cache.containsKey(nId) ) throw new TDBException("Inconsistent: " + n + " => " + nId); if ( notPresent.containsKey(n) ) throw new TDBException("Inconsistent: " + n + " in notPresent cache (1)"); } Iterator<NodeId> iter2 = Iter.toList(id2node_Cache.keys()).iterator(); for (; iter2.hasNext() ; ) { NodeId nId = iter2.next(); Node n = id2node_Cache.getIfPresent(nId); if ( !node2id_Cache.containsKey(n) ) throw new TDBException("Inconsistent: " + nId + " => " + n); if ( notPresent.containsKey(n) ) throw new TDBException("Inconsistent: " + n + " in notPresent cache (2)"); } } @Override public String toString() { return "Cache(" + baseTable.toString() + ")"; } } ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services