So, one year ago I was tasked with fixing some deadlocks in Webslinger(the website container Brainfood has developed). While investigating solutions to the problems I was seeing, I stumbled across the book, "Java Concurrency in Practice".

OH
MY
GOD
!

It's not normal for me to get all ecstatic about a programming book, but this one has to be at least an 11(on a scale of 1-10). Anyone doing *any* multi-threaded programming in java absolutely *must* read this book.

Anyways, after reading said book, I completely rewrote the internals of webslinger, following the guidelines I had read about. It's now been a year, and what I figured out seems to be running great; no normal code paths thru webslinger take out *any* locks now.

I've decided to start applying this knowledge against OfBiz. My first attempt is attached.

While SequenceUtil is not a highly-contended resource, it is simple enough, and used often enough, so that others can understand what is going on, and it's possible to see if the change actually breaks anything.

Summarizing, here is how it functions(at a high-level).

1: Any fields that have to be modified together have to be moved to separate class. 2: Said class is made final, and *all* fields are also made final; equals() and hashCode() are implemented.
3: The parent class is modified to use an AtomicReference.
4: Any time the parent class needs to change one of the original fields, it makes a copy of the current reference with the new values, then does an atomic compareAndSet. This can cause multiple allocations when contended, however, java 1.6 is smart enough to do some allocations on the stack, so in general this is not a problem.

These changes invariably increase the size of the code; however, they do reduce overhead in the long run, so I consider them beneficial.

Attached you will find SequenceUtil.java; I've modified it to be non-blocking. I've done "ant run-install", with no problems, but have *not* yet run any test cases.

I'm sending it here(to this list) first, before checking it in, as this is a radical departure from simple programming practices, and would like others to discuss if I should do this in other places.
/*******************************************************************************
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 * http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *******************************************************************************/
package org.ofbiz.entity.util;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;

import javax.transaction.Transaction;

import org.ofbiz.base.util.Debug;
import org.ofbiz.entity.GenericEntityException;
import org.ofbiz.entity.jdbc.ConnectionFactory;
import org.ofbiz.entity.model.ModelEntity;
import org.ofbiz.entity.model.ModelField;
import org.ofbiz.entity.transaction.GenericTransactionException;
import org.ofbiz.entity.transaction.TransactionUtil;

/**
 * Sequence Utility to get unique sequences from named sequence banks
 * Uses a collision detection approach to safely get unique sequenced ids in banks from the database
 */
public class SequenceUtil {

    public static final String module = SequenceUtil.class.getName();

    ConcurrentHashMap<String, SequenceBank> sequences = new ConcurrentHashMap<String, SequenceBank>();
    String helperName;
    ModelEntity seqEntity;
    String tableName;
    String nameColName;
    String idColName;

    private SequenceUtil() {}

    public SequenceUtil(String helperName, ModelEntity seqEntity, String nameFieldName, String idFieldName) {
        this.helperName = helperName;
        this.seqEntity = seqEntity;
        if (seqEntity == null) {
            throw new IllegalArgumentException("The sequence model entity was null but is required.");
        }
        this.tableName = seqEntity.getTableName(helperName);

        ModelField nameField = seqEntity.getField(nameFieldName);

        if (nameField == null) {
            throw new IllegalArgumentException("Could not find the field definition for the sequence name field " + nameFieldName);
        }
        this.nameColName = nameField.getColName();

        ModelField idField = seqEntity.getField(idFieldName);

        if (idField == null) {
            throw new IllegalArgumentException("Could not find the field definition for the sequence id field " + idFieldName);
        }
        this.idColName = idField.getColName();
    }

    public Long getNextSeqId(String seqName, long staggerMax, ModelEntity seqModelEntity) {
        SequenceBank bank = this.getBank(seqName, seqModelEntity);
        return bank.getNextSeqId(staggerMax);
    }
    
    public void forceBankRefresh(String seqName, long staggerMax) {
        // don't use the get method because we don't want to create if it fails
        SequenceBank bank = sequences.get(seqName);
        if (bank == null) {
            return;
        }
        
        bank.refresh(staggerMax);
    }
    
    private SequenceBank getBank(String seqName, ModelEntity seqModelEntity) {
        SequenceBank bank = sequences.get(seqName);

        if (bank == null) {
            bank = new SequenceBank(seqName, seqModelEntity, this);
            SequenceBank oldBank = sequences.putIfAbsent(seqName, bank);
            if (oldBank != null) bank = oldBank;
        }
        
        return bank;
    }

    class SequenceBank {
        public static final long defaultBankSize = 10;
        public static final long maxBankSize = 5000;
        public static final long startSeqId = 10000;
        public static final int minWaitMillis = 5;
        public static final int maxWaitMillis = 50;
        public static final int maxTries = 5;

        final AtomicReference<SequenceValue> ref;
        String seqName;
        SequenceUtil parentUtil;
        ModelEntity seqModelEntity;

        public SequenceBank(String seqName, ModelEntity seqModelEntity, SequenceUtil parentUtil) {
            this.seqName = seqName;
            this.parentUtil = parentUtil;
            this.seqModelEntity = seqModelEntity;
            ref = new AtomicReference<SequenceValue>(new SequenceValue(0, 0).fillBank(1));
        }

        public Long getNextSeqId(long staggerMax) {
            long stagger = 1;
            if (staggerMax > 1) {
                stagger = Math.round(Math.random() * staggerMax);
                if (stagger == 0) stagger = 1;
            }
            SequenceValue value;
            do {
                value = ref.get();
                if ((value.curSeqId + stagger) > value.maxSeqId) {
                    value = value.fillBank(stagger);
                    if ((value.curSeqId + stagger) > value.maxSeqId) {
                        Debug.logError("[SequenceUtil.SequenceBank.getNextSeqId] Fill bank failed, returning null", module);
                        return null;
                    }
                }
            } while (!ref.compareAndSet(value, new SequenceValue(value.curSeqId + stagger, value.maxSeqId)));
            return Long.valueOf(value.curSeqId);
        }
        
        public void refresh(long staggerMax) {
            SequenceValue value;
            do {
                value = ref.get();
            } while (!ref.compareAndSet(value, value.refresh(staggerMax)));
        }



    protected final class SequenceValue {
        protected final long curSeqId;
        protected final long maxSeqId;

        protected SequenceValue(long curSeqId, long maxSeqId) {
            this.curSeqId = curSeqId;
            this.maxSeqId = maxSeqId;
        }

        public int hashCode() {
            long r = curSeqId ^ maxSeqId;
            return (int) ((r >> 32) ^ (r & 0xffff));
        }

        public boolean equals(Object o) {
            if (!(o instanceof SequenceValue)) return false;
            SequenceValue other = (SequenceValue) o;
            return curSeqId == other.curSeqId && maxSeqId == other.maxSeqId;
        }

        protected SequenceValue refresh(long staggerMax) {
            return fillBank(staggerMax, this.curSeqId);
        }

        protected SequenceValue fillBank(long stagger) {
            return fillBank(stagger, this.curSeqId);
        }

        private SequenceValue fillBank(long stagger, long curSeqId) {
            //Debug.logWarning("[SequenceUtil.SequenceBank.fillBank] Starting fillBank Thread Name is: " + Thread.currentThread().getName() + ":" + Thread.currentThread().toString(), module);

            // no need to get a new bank, SeqIds available
            if ((curSeqId + stagger) <= maxSeqId) return this;
            
            long bankSize = defaultBankSize;
            if (seqModelEntity != null && seqModelEntity.getSequenceBankSize() != null) {
                bankSize = seqModelEntity.getSequenceBankSize().longValue();
            }
            if (stagger > 1) {
                // NOTE: could use staggerMax for this, but if that is done it would be easier to guess a valid next id without a brute force attack
                bankSize = stagger * defaultBankSize;
            }
             
            if (bankSize > maxBankSize) bankSize = maxBankSize;
            
            long val1 = 0;
            long val2 = 0;

            // NOTE: the fancy ethernet type stuff is for the case where transactions not available, or something funny happens with really sensitive timing (between first select and update, for example)
            int numTries = 0;

            while (val1 + bankSize != val2) {
                if (Debug.verboseOn()) Debug.logVerbose("[SequenceUtil.SequenceBank.fillBank] Trying to get a bank of sequenced ids for " +
                        seqName + "; start of loop val1=" + val1 + ", val2=" + val2 + ", bankSize=" + bankSize, module);
                
                // not sure if this synchronized block is totally necessary, the method is synchronized but it does do a wait/sleep 
                //outside of this block, and this is the really sensitive block, so making sure it is isolated; there is some overhead 
                //to this, but very bad things can happen if we try to do too many of these at once for a single sequencer
                    Transaction suspendedTransaction = null;
                    try {
                        //if we can suspend the transaction, we'll try to do this in a local manual transaction
                        suspendedTransaction = TransactionUtil.suspend();
                        
                        boolean beganTransaction = false;
                        try {
                            beganTransaction = TransactionUtil.begin();
        
                            Connection connection = null;
                            Statement stmt = null;
                            ResultSet rs = null;
        
                            try {
                                connection = ConnectionFactory.getConnection(parentUtil.helperName);
                            } catch (SQLException sqle) {
                                Debug.logWarning("[SequenceUtil.SequenceBank.fillBank]: Unable to esablish a connection with the database... Error was:" + sqle.toString(), module);
                                throw sqle;
                            } catch (GenericEntityException e) {
                                Debug.logWarning("[SequenceUtil.SequenceBank.fillBank]: Unable to esablish a connection with the database... Error was: " + e.toString(), module);
                                throw e;
                            }
                            
                            if (connection == null) {
                                throw new GenericEntityException("[SequenceUtil.SequenceBank.fillBank]: Unable to esablish a connection with the database, connection was null...");
                            }
        
                            String sql = null;
        
                            try {
                                // we shouldn't need this, and some TX managers complain about it, so not including it: connection.setAutoCommit(false);
        
                                stmt = connection.createStatement();
                                    
                                sql = "SELECT " + parentUtil.idColName + " FROM " + parentUtil.tableName + " WHERE " + parentUtil.nameColName + "='" + seqName + "'";
                                rs = stmt.executeQuery(sql);
                                boolean gotVal1 = false;
                                if (rs.next()) {
                                    val1 = rs.getLong(parentUtil.idColName);
                                    gotVal1 = true;
                                }
                                rs.close();
                                
                                if (!gotVal1) {
                                    Debug.logWarning("[SequenceUtil.SequenceBank.fillBank] first select failed: will try to add new row, result set was empty for sequence [" + seqName + "] \nUsed SQL: " + sql + " \n Thread Name is: " + Thread.currentThread().getName() + ":" + Thread.currentThread().toString(), module);
                                    sql = "INSERT INTO " + parentUtil.tableName + " (" + parentUtil.nameColName + ", " + parentUtil.idColName + ") VALUES ('" + seqName + "', " + startSeqId + ")";
                                    if (stmt.executeUpdate(sql) <= 0) {
                                        throw new GenericEntityException("No rows changed when trying insert new sequence row with this SQL: " + sql);
                                    }
                                    continue;
                                }
        
                                sql = "UPDATE " + parentUtil.tableName + " SET " + parentUtil.idColName + "=" + parentUtil.idColName + "+" + bankSize + " WHERE " + parentUtil.nameColName + "='" + seqName + "'";
                                if (stmt.executeUpdate(sql) <= 0) {
                                    throw new GenericEntityException("[SequenceUtil.SequenceBank.fillBank] update failed, no rows changes for seqName: " + seqName);
                                }
        
                                sql = "SELECT " + parentUtil.idColName + " FROM " + parentUtil.tableName + " WHERE " + parentUtil.nameColName + "='" + seqName + "'";
                                rs = stmt.executeQuery(sql);
                                boolean gotVal2 = false;
                                if (rs.next()) {
                                    val2 = rs.getLong(parentUtil.idColName);
                                    gotVal2 = true;
                                }
        
                                rs.close();
                                
                                if (!gotVal2) {
                                    throw new GenericEntityException("[SequenceUtil.SequenceBank.fillBank] second select failed: aborting, result set was empty for sequence: " + seqName);
                                }
                                
                                // got val1 and val2 at this point, if we don't have the right difference between them, force a rollback (with 
                                //setRollbackOnly and NOT with an exception because we don't want to break from the loop, just err out and 
                                //continue), then flow out to allow the wait and loop thing to happen
                                if (val1 + bankSize != val2) {
                                    TransactionUtil.setRollbackOnly("Forcing transaction rollback in sequence increment because we didn't get a clean update, ie a conflict was found, so not saving the results", null);
                                }
                            } catch (SQLException sqle) {
                                Debug.logWarning(sqle, "[SequenceUtil.SequenceBank.fillBank] SQL Exception while executing the following:\n" + sql + "\nError was:" + sqle.getMessage(), module);
                                throw sqle;
                            } finally {
                                try {
                                    if (stmt != null) stmt.close();
                                } catch (SQLException sqle) {
                                    Debug.logWarning(sqle, "Error closing statement in sequence util", module);
                                }
                                try {
                                    if (connection != null) connection.close();
                                } catch (SQLException sqle) {
                                    Debug.logWarning(sqle, "Error closing connection in sequence util", module);
                                }
                            }
                        } catch (Exception e) {
                            String errMsg = "General error in getting a sequenced ID";
                            Debug.logError(e, errMsg, module);
                            try {
                                TransactionUtil.rollback(beganTransaction, errMsg, e);
                            } catch (GenericTransactionException gte2) {
                                Debug.logError(gte2, "Unable to rollback transaction", module);
                            }
                            
                            // error, break out of the loop to not try to continue forever
                            break;
                        } finally {
                            try {
                                TransactionUtil.commit(beganTransaction);
                            } catch (GenericTransactionException gte) {
                                Debug.logError(gte, "Unable to commit sequence increment transaction, continuing anyway though", module);
                            }
                        }
                    } catch (GenericTransactionException e) {
                        Debug.logError(e, "System Error suspending transaction in sequence util", module);
                    } finally {
                        if (suspendedTransaction != null) {
                            try {
                                TransactionUtil.resume(suspendedTransaction);
                            } catch (GenericTransactionException e) {
                                Debug.logError(e, "Error resuming suspended transaction in sequence util", module);
                            }
                        }
                    }
            
                if (val1 + bankSize != val2) {
                    if (numTries >= maxTries) {
                        String errMsg = "[SequenceUtil.SequenceBank.fillBank] maxTries (" + maxTries + ") reached for seqName [" + seqName + "], giving up.";
                        Debug.logError(errMsg, module);
                        return this;
                    }
                    
                    // collision happened, wait a bounded random amount of time then continue
                    int waitTime = (Double.valueOf(Math.random() * (maxWaitMillis - minWaitMillis))).intValue() + minWaitMillis;

                    Debug.logWarning("[SequenceUtil.SequenceBank.fillBank] Collision found for seqName [" + seqName + "], val1=" + val1 + ", val2=" + val2 + ", val1+bankSize=" + (val1 + bankSize) + ", bankSize=" + bankSize + ", waitTime=" + waitTime, module);

                    try {
                        // using the Thread.sleep to more reliably lock this thread: this.wait(waitTime);
                        java.lang.Thread.sleep(waitTime);
                    } catch (Exception e) {
                        Debug.logWarning(e, "Error waiting in sequence util", module);
                        return this;
                    }
                }

                numTries++;
            }

            if (Debug.infoOn()) Debug.logInfo("Got bank of sequenced IDs for [" + seqName + "]; curSeqId=" + val1 + ", maxSeqId=" + val2 + ", bankSize=" + bankSize, module);
            return new SequenceValue(val1, val2);
            //Debug.logWarning("[SequenceUtil.SequenceBank.fillBank] Ending fillBank Thread Name is: " + Thread.currentThread().getName() + ":" + Thread.currentThread().toString(), module);
        }

    }
    }
}

Reply via email to