/* ====================================================================
 * The Apache Software License, Version 1.1
 *
 * Copyright (c) 2001 The Apache Software Foundation.  All rights
 * reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 *
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in
 *    the documentation and/or other materials provided with the
 *    distribution.
 *
 * 3. The end-user documentation included with the redistribution,
 *    if any, must include the following acknowledgment:
 *       "This product includes software developed by the
 *        Apache Software Foundation (http://www.apache.org/)."
 *    Alternately, this acknowledgment may appear in the software itself,
 *    if and wherever such third-party acknowledgments normally appear.
 *
 * 4. The names "Apache" and "Apache Software Foundation" and
 *    "Apache ObjectRelationalBridge" must not be used to endorse or promote products
 *    derived from this software without prior written permission. For
 *    written permission, please contact apache@apache.org.
 *
 * 5. Products derived from this software may not be called "Apache",
 *    "Apache ObjectRelationalBridge", nor may "Apache" appear in their name, without
 *    prior written permission of the Apache Software Foundation.
 *
 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 * DISCLAIMED.  IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
 * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 * SUCH DAMAGE.
 * ====================================================================
 *
 * This software consists of voluntary contributions made by many
 * individuals on behalf of the Apache Software Foundation.  For more
 * information on the Apache Software Foundation, please see
 * <http://www.apache.org/>.
 */
package org.apache.ojb.broker.accesslayer;

import org.apache.ojb.broker.*;
import org.apache.ojb.broker.Identity;
import org.apache.ojb.broker.PersistenceBrokerException;
import org.apache.ojb.broker.PersistenceBrokerSQLException;
import org.apache.ojb.broker.accesslayer.sql.*;
import org.apache.ojb.broker.metadata.ClassDescriptor;
import org.apache.ojb.broker.metadata.FieldDescriptor;
import org.apache.ojb.broker.query.Query;
import org.apache.ojb.broker.util.logging.Logger;
import org.apache.ojb.broker.util.logging.LoggerFactory;

import java.sql.*;
import java.util.Map;
import java.util.HashMap;

/**
 * JdbcAccess is responsible for establishing performing
 * SQL Queries against remote Databases.
 * It hides all knowledge about JDBC from the BrokerImpl
 * @author <a href="mailto:thma@apache.org">Thomas Mahler</a>
 * @version $Id: JdbcAccess.java,v 1.17 2002/10/17 21:31:13 prophecy Exp $
 */
public class JdbcAccess
{
	private static final String SQL_STATE_KEY_VIOLATED = "23000";
    /**
     * The logger used.
     */
    protected Logger logger;

    /**
     * The broker in use.
     */
    protected PersistenceBroker broker;

    /**
     * constructor is private, use getInstance to get
     * the singleton instance of this class
     */
    public JdbcAccess(PersistenceBroker broker)
    {
        this.broker = broker;
        logger = LoggerFactory.getLogger(this.getClass());
    }

    /**
     * performs a DELETE operation against RDBMS.
     * @param cld ClassDescriptor providing mapping information.
     * @param obj The object to be deleted.
     */
    public void executeDelete(ClassDescriptor cld, Object obj) throws PersistenceBrokerException
    {
        logger.safeDebug("executeDelete",obj);

        PreparedStatement stmt = null;
        try
        {
            stmt = broker.getStatementManager().getDeleteStatement(cld);
            if (stmt == null)
            {
            	logger.error("getDeleteStatement returned a null statement");
            	throw new PersistenceBrokerException("getDeleteStatement returned a null statement");
            }
            synchronized (stmt)
            {
                broker.getStatementManager().bindDelete(stmt, cld, obj);
                if (stmt.executeUpdate() == 0 && cld.isLocking()) //BRJ
                {
                    throw new OptimisticLockException("Object has been modified by someone else", obj);
                }
            }
        }
        catch (OptimisticLockException e)
        {
        	// Don't log as error
            if (logger.isDebugEnabled()) logger.debug("OptimisticLockException during the execution of delete: " + e.getMessage(), e);
            throw e;
        }
        catch (PersistenceBrokerException e)
        {
            logger.error("PersistenceBrokerException during the execution of delete: " + e.getMessage(), e);
            throw e;
        }
        catch (SQLException e)
        {
			logger.error("SQLException during the execution of the delete (for a " + cld.getClassOfObject().getName() + "): " + e.getMessage(), e);
            throw new PersistenceBrokerSQLException(e);
        }
       finally
        {
			broker.getStatementManager().closeResources(stmt, null);
        }
    }

    /**
     * performs a SELECT operation against RDBMS.
     * @param query the query string.
     * @param cld ClassDescriptor providing JDBC information.
     */
	public void executeDelete(Query query, ClassDescriptor cld) throws PersistenceBrokerException
	{
		logger.safeDebug("executeDelete (by Query)", query);

        PreparedStatement stmt = null;

		try
		{
			// if query has criteria, use them in where clause
			if (query.getCriteria() != null)
			{
				String sql = SqlGeneratorFactory.getInstance().createSqlGenerator().getPreparedDeleteStatement(query, cld);
				stmt = broker.getStatementManager().getPreparedStatement(cld, sql, false);
				broker.getStatementManager().bindStatement(stmt, query.getCriteria(), cld, 1);
				stmt.executeUpdate();
			}
			// if query has no criteria, issue a warning
			else
			{
				logger.warn("deleteByQuery without criteria !");
			}
		}
		catch (SQLException e)
		{
			logger.error("SQLException during the execution of delete by query (for a " + cld.getClassOfObject().getName() + "): " + e.getMessage(), e);
            throw new PersistenceBrokerSQLException(e);
		}
        finally
        {
			broker.getStatementManager().closeResources(stmt, null);
        }
	}

    /**
     * performs an INSERT operation against RDBMS.
     * @param obj The Object to be inserted as a row of the underlying table.
     * @param cld ClassDescriptor providing mapping information.
     */
    public void executeInsert(ClassDescriptor cld, Object obj) throws PersistenceBrokerException
    {
        logger.safeDebug("executeInsert",obj);
        
        PreparedStatement stmt = null;
        try
        {
            stmt = broker.getStatementManager().getInsertStatement(cld);
            if (stmt == null)
            {
            	logger.error("getInsertStatement returned a null statement");
            	throw new PersistenceBrokerException("getInsertStatement returned a null statement");
            }
            synchronized (stmt)
            {
                broker.getStatementManager().bindInsert(stmt, cld, obj);
                stmt.executeUpdate();
            }
        }
        catch (PersistenceBrokerException e)
        {
            logger.error("PersistenceBrokerException during the execution of the insert: " + e.getMessage(), e);
            throw e;
        }
        catch (SQLException e)
        {
			logger.error("SQLException during the execution of the insert (for a " + cld.getClassOfObject().getName() + "): " + e.getMessage(), e);
			/**
			 * throw a specific type of runtime exception for a key constraint.
			 */
			if (SQL_STATE_KEY_VIOLATED.equals(e.getSQLState()))
			{
				throw new KeyConstraintViolatedException(e);
			}
			else
			{
            	throw new PersistenceBrokerSQLException(e);
			}
        }
        finally
        {
			broker.getStatementManager().closeResources(stmt, null);
        }
    }

    /**
     * performs a SELECT operation against RDBMS.
     * @param query the query string.
     * @param cld ClassDescriptor providing JDBC information.
     */
    ResultSetAndStatement executeQuery(Query query, ClassDescriptor cld) throws PersistenceBrokerException
    {
        logger.safeDebug("executeQuery", query);
        
        ResultSetAndStatement retval = new ResultSetAndStatement();
		boolean scrollable = ((query.getStartAtIndex() > Query.NO_START_AT_INDEX) || (query.getEndAtIndex() > Query.NO_END_AT_INDEX));
        try
        {
            // if query has criteria, use them in where clause
            if (query.getCriteria() != null)
            {
                String sql = SqlGeneratorFactory.getInstance().createSqlGenerator().getPreparedSelectStatement(query, cld);
                PreparedStatement stmt = broker.getStatementManager().getPreparedStatement(cld, sql, scrollable);
                broker.getStatementManager().bindStatement(stmt, query.getCriteria(), cld, 1);
                ResultSet rs = stmt.executeQuery();
                // as we return the resultset for further operations, we cannot release the statement yet.
                // that has to be done by the JdbcAccess-clients (i.e. RsIterator, ProxyRsIterator and PkEnumeration.)
                retval.m_rs = rs;
                retval.m_stmt = stmt;
                return retval;

            }
            // if query has no criteria, perform select * from table
            else
            {
                Statement stmt = broker.getStatementManager().getGenericStatement(cld, scrollable);
                String sql = SqlGeneratorFactory.getInstance().createSqlGenerator().getSelectStatementDep(query, cld);
                ResultSet rs = stmt.executeQuery(sql);
                retval.m_rs = rs;
                retval.m_stmt = stmt;
                return retval;
            }
        }
        catch (PersistenceBrokerException e)
        {
			logger.error("PersistenceBrokerException during the execution of the query: " + e.getMessage(), e);
            throw e;
        }
        catch (SQLException e)
        {
			logger.error("SQLException during the execution of the query (for a " + cld.getClassOfObject().getName() + "): " + e.getMessage(), e);
            throw new PersistenceBrokerSQLException(e);
        }

    }

    /**
     * performs a SQL SELECT statement against RDBMS.
     * @param sqlStatement the query string.
     * @param cld ClassDescriptor providing meta-information.
     */
    public ResultSetAndStatement executeSQL(String sqlStatement, ClassDescriptor cld) throws PersistenceBrokerException
    {
        if (logger.isDebugEnabled()) logger.debug("executeSQL: " + sqlStatement);
        
        ResultSetAndStatement retval = new ResultSetAndStatement();
        try
        {
            Statement stmt = broker.getStatementManager().getGenericStatement(cld, Query.SCROLLABLE);
            ResultSet rs = stmt.executeQuery(sqlStatement);
            // as we return the resultset for further operations, we cannot release the statement yet.
            // that has to be done by the JdbcAccess-clients (i.e. RsIterator, ProxyRsIterator and PkEnumeration.)
            retval.m_rs = rs;
            retval.m_stmt = stmt;
            return retval;
        }
        catch (PersistenceBrokerException e)
        {
			logger.error("PersistenceBrokerException during the execution of the SQL query: " + e.getMessage(), e);
            throw e;
        }
        catch (SQLException e)
        {
			logger.error("SQLException during the execution of the SQL query: " + e.getMessage(), e);
            throw new PersistenceBrokerSQLException(e);
        }
    }

    /**
     * performs a SQL UPDTE, INSERT or DELETE statement against RDBMS.
     * @param sqlStatement the query string.
     * @param cld ClassDescriptor providing meta-information.
     * @return int returncode
     */
    public int executeUpdateSQL(String sqlStatement, ClassDescriptor cld)
        throws PersistenceBrokerException
    {
        if (logger.isDebugEnabled()) logger.debug("executeUpdateSQL: " + sqlStatement);
        
        int result;
        Statement stmt = null;
        try
        {
             stmt = broker.getStatementManager().getGenericStatement(cld, Query.SCROLLABLE);
             result = stmt.executeUpdate(sqlStatement);
        }
        catch (PersistenceBrokerException e)
        {
			logger.error("PersistenceBrokerException during the execution of the Update SQL query: " + e.getMessage(), e);
            throw e;
        }
        catch (SQLException e)
        {
			logger.error("SQLException during the execution of the Update SQL query (for a " + cld.getClassOfObject().getName() + "): " + e.getMessage(), e);
			if (SQL_STATE_KEY_VIOLATED.equals(e.getSQLState()))
			{
				throw new KeyConstraintViolatedException(e);
			}
            throw new PersistenceBrokerSQLException(e);
        }
        finally
        {
			broker.getStatementManager().closeResources(stmt, null);
        }
        return result;
    }

    /**
     * performs an UPDATE operation against RDBMS.
     * @param obj The Object to be updated in the underlying table.
     * @param cld ClassDescriptor providing mapping information.
     */
    public void executeUpdate(ClassDescriptor cld, Object obj) throws PersistenceBrokerException
    {
        logger.safeDebug("executeUpdate",obj);
        
        PreparedStatement stmt = null;
        
        // obj with nothing but key fields is not updated
        if (cld.getNonPkFields().length == 0)
        {
        	return;	
        }	
        try
        {
            stmt = broker.getStatementManager().getUpdateStatement(cld);
            if (stmt == null)
            {
            	logger.error("getUpdateStatement returned a null statement");
            	throw new PersistenceBrokerException("getUpdateStatement returned a null statement");
            }
            synchronized (stmt)
            {
                broker.getStatementManager().bindUpdate(stmt, cld, obj);
                if (stmt.executeUpdate() == 0 && cld.isLocking()) //BRJ
                {
                    throw new OptimisticLockException("Object has been modified by someone else", obj);
                }

            }
        }
        catch (OptimisticLockException e)
        {
        	// Don't log as error
            if (logger.isDebugEnabled()) logger.debug("OptimisticLockException during the execution of update: " + e.getMessage(), e);
            throw e;
        }
        catch (PersistenceBrokerException e)
        {
			logger.error("PersistenceBrokerException during the execution of the update: " + e.getMessage(), e);
            throw e;
        }
        catch (SQLException e)
        {
			logger.error("SQLException during the execution of the update (for a " + cld.getClassOfObject().getName() + "): " + e.getMessage(), e);
			if (SQL_STATE_KEY_VIOLATED.equals(e.getSQLState()))
			{
				throw new KeyConstraintViolatedException(e);
			}
            throw new PersistenceBrokerSQLException(e);
        }
        finally
        {
			broker.getStatementManager().closeResources(stmt, null);
        }
    }

    /**
     * performs a primary key lookup operation against RDBMS and materializes
     * an object from the resulting row. Only skalar attributes are filled from
     * the row, references are not resolved.
     * @param oid contains the primary key info.
     * @param cld ClassDescriptor providing mapping information.
     * @return the materialized object, null if no matching row was found or if
     * any error occured.
     */
    public Object materializeObject(ClassDescriptor cld, Identity oid)
        throws PersistenceBrokerException
    {
        Object obj = null;
        ResultSet rs = null;
        PreparedStatement stmt = null;
        try
        {
            stmt = broker.getStatementManager().getSelectByPKStatement(cld);
            if (stmt == null)
            {
            	logger.error("getSelectByPKStatement returned a null statement");
            	throw new PersistenceBrokerException("getSelectByPKStatement returned a null statement");
            }
            synchronized (stmt)
            {
                broker.getStatementManager().bindSelect(stmt, oid);
                rs = stmt.executeQuery();
                // data available read object, else return null
                if (rs.next())
                {
                    RowReader reader = cld.getRowReader();
                    Map row = new HashMap();
                    reader.readObjectArrayFrom(rs, cld, row);
                    obj = cld.getRowReader().readObjectFrom(row, cld);
                }
                else
                {
                    obj = null;
                }
            }
        }
        catch (PersistenceBrokerException e)
        {
			logger.error("PersistenceBrokerException during the execution of materializeObject: " + e.getMessage(), e);
            throw e;
        }
        catch (SQLException e)
        {
			logger.error("SQLException during the execution of materializeObject (for a " + cld.getClassOfObject().getName() + "): " + e.getMessage(), e);
            throw new PersistenceBrokerSQLException(e);
        }
        finally
        {
			broker.getStatementManager().closeResources(stmt, rs);
        }
        return obj;
    }

    /**
     * retrieves an Object from a ResultSet column.
     * @param rs the ResultSet to be read from.
     * @param fld The FIELDDESCRIPTOR containing metainfo on the column:
     * it contins info on the expected JDBC Type and the name of the column.
     * @return Object the read in object.
     */
    static Object getObjectFromColumn(ResultSet rs, FieldDescriptor fld) throws SQLException
    {
        int jdbcType = fld.getColumnJdbcType();
        // BRJ: caused by QueryBySQL
        // as long as we can not guarantee that the sequence of selected
        // columns matches the sequence of defined fields in ClassDescriptor
        // the position of the column has to be defined by it's name
        int columnId = rs.findColumn(fld.getColumnName());
        return getObjectFromColumn(rs, jdbcType, columnId);
    }

    static Object getObjectFromColumn(ResultSet rs, int jdbcType, int columnId) throws SQLException
    {
        Object result = null;
        switch (jdbcType)
        {
            case Types.BIT :
                {
                    result = new Boolean(rs.getBoolean(columnId));
                    break;
                }
            case Types.TINYINT :
                {
                    byte val = rs.getByte(columnId);
                    if (!rs.wasNull())
                        result = new Byte(val);
                    break;
                }
            case Types.SMALLINT :
                {
                    short val = rs.getShort(columnId);
                    if (!rs.wasNull())
                        result = new Short(val);
                    break;
                }
            case Types.INTEGER :
                {
                    int val = rs.getInt(columnId);
                    if (!rs.wasNull())
                        result = new Integer(val);
                    break;
                }
            case Types.BIGINT :
                {
                    long val = rs.getLong(columnId);
                    if (!rs.wasNull())
                        result = new Long(val);
                    break;
                }
            case Types.FLOAT :
                {
                    double val = rs.getDouble(columnId);
                    if (!rs.wasNull())
                        result = new Double(val);
                    break;
                }
            case Types.REAL :
                {
                    float val = rs.getFloat(columnId);
                    if (!rs.wasNull())
                        result = new Float(val);
                    break;
                }
            case Types.DOUBLE :
                {
                    double val = rs.getDouble(columnId);
                    if (!rs.wasNull())
                        result = new Double(val);
                    break;
                }
            case Types.NUMERIC :
                {
                    result = rs.getBigDecimal(columnId);
                    break;
                }
            case Types.DECIMAL :
                {
                    result = rs.getBigDecimal(columnId);
                    break;
                }

            case Types.CHAR :
                {
                    result = rs.getString(columnId);
                    break;
                }
            case Types.VARCHAR :
                {
                    result = rs.getString(columnId);
                    break;
                }
            case Types.LONGVARCHAR :
                {
                    result = rs.getString(columnId);
                    break;
                }

            case Types.DATE :
                {
                    result = rs.getDate(columnId);
                    break;
                }
            case Types.TIME :
                {
                    result = rs.getTime(columnId);
                    break;
                }
            case Types.TIMESTAMP :
                {
                    result = rs.getTimestamp(columnId);
                    break;
                }

            case Types.BINARY :
                {
                    result = rs.getBytes(columnId);
                    break;
                }
            case Types.VARBINARY :
                {
                    result = rs.getBytes(columnId);
                    break;
                }
            case Types.LONGVARBINARY :
                {
                    result = rs.getBytes(columnId);
                    break;
                }
            case Types.CLOB :
                {
                    java.sql.Clob aClob = rs.getClob(columnId);
                    result = aClob.getSubString(1L, (int) aClob.length()).toCharArray();
                    break;
                }
            case Types.BLOB :
                {
                    java.sql.Blob aBlob = rs.getBlob(columnId);
                    result = aBlob.getBytes(1L, (int) aBlob.length());
                    break;
                }
            default :
                {
                    throw new RuntimeException(
                        "The type "
                            + jdbcType
                            + " for attribute "
                            + columnId
                            + " can not be handled by OJB. Please specify only types as defined by java.sql.Types.");
                }
        }
        return result;
    }

    /**
     * determines the JDBC type (represented as an int value as specified
     * by java.sql.Types) of a FIELDDESCRIPTOR idetified by index.
     * @param cld The ClassDescriptor containing the FIELDDESCRIPTOR.
     * @param index index identifies the FIELDDESCRIPTOR within cld.
     * @return the int value representing the Type according to
     * java.sql.Types
     */
    static int getSqlTypeAll(ClassDescriptor cld, int index)
    {
        FieldDescriptor fld = cld.getFieldDescriptions()[index];
        return fld.getColumnJdbcType();
    }

    /**
     * determines the JDBC type (represented as an int value as specified
     * by java.sql.Types) of a FIELDDESCRIPTOR idetified by index.
     * @param cld The ClassDescriptor containing the FIELDDESCRIPTOR.
     * @param index index identifies the FIELDDESCRIPTOR within the
     * Non-PrimaryKey-fields of cld.
     * @return the int value representing the Type according to
     * java.sql.Types
     */
    static int getSqlTypeNonPk(ClassDescriptor cld, int index)
    {
        FieldDescriptor fld = cld.getNonPkFields()[index];
        return fld.getColumnJdbcType();
    }

    /**
     * determines the JDBC type (represented as an int value as specified
     * by java.sql.Types) of a FIELDDESCRIPTOR idetified by index.
     * @param cld The ClassDescriptor containing the FIELDDESCRIPTOR.
     * @param index index identifies the FIELDDESCRIPTOR within the
     * PrimaryKey-fields of cld.
     * @return the int value representing the Type according to
     * java.sql.Types
     */
    static int getSqlTypePk(ClassDescriptor cld, int index)
    {
        FieldDescriptor fld = cld.getPkFields()[index];
        return fld.getColumnJdbcType();
    }

    /**
     * determines the JDBC type (represented as an int value as specified
     * by java.sql.Types) of a FIELDDESCRIPTOR idetified by index.
     * @param cld The ClassDescriptor containing the FIELDDESCRIPTOR.
     * @param index index identifies the FIELDDESCRIPTOR within the
     * PrimaryKey-fields of cld.
     * @return the int value representing the Type according to
     * java.sql.Types
     */
    static int getSqlTypeLocking(ClassDescriptor cld, int index)
    {
        FieldDescriptor fld = cld.getLockingFields()[index];
        return fld.getColumnJdbcType();
    }
}
