http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/administration/src/main/java/org/apache/nifi/admin/dao/impl/StandardActionDAO.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/administration/src/main/java/org/apache/nifi/admin/dao/impl/StandardActionDAO.java b/nar-bundles/framework-bundle/framework/administration/src/main/java/org/apache/nifi/admin/dao/impl/StandardActionDAO.java new file mode 100644 index 0000000..4b89655 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/administration/src/main/java/org/apache/nifi/admin/dao/impl/StandardActionDAO.java @@ -0,0 +1,1056 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.admin.dao.impl; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.nifi.action.Action; +import org.apache.nifi.action.Component; +import org.apache.nifi.action.Operation; +import org.apache.nifi.action.component.details.ComponentDetails; +import org.apache.nifi.action.component.details.ProcessorDetails; +import org.apache.nifi.action.component.details.RemoteProcessGroupDetails; +import org.apache.nifi.action.details.ActionDetails; +import org.apache.nifi.action.details.ConfigureDetails; +import org.apache.nifi.action.details.ConnectDetails; +import org.apache.nifi.action.details.MoveDetails; +import org.apache.nifi.action.details.PurgeDetails; +import org.apache.nifi.admin.RepositoryUtils; +import org.apache.nifi.admin.dao.ActionDAO; +import org.apache.nifi.admin.dao.DataAccessException; +import org.apache.nifi.history.History; +import org.apache.nifi.history.HistoryQuery; +import org.apache.nifi.history.PreviousValue; +import org.apache.commons.lang3.StringUtils; + +/** + * + */ +public class StandardActionDAO implements ActionDAO { + + // ------------ + // action table + // ------------ + private static final String INSERT_ACTION = "INSERT INTO ACTION (" + + "USER_DN, USER_NAME, SOURCE_ID, SOURCE_NAME, SOURCE_TYPE, OPERATION, ACTION_TIMESTAMP" + + ") VALUES (" + + "?, " + + "?, " + + "?, " + + "?, " + + "?, " + + "?, " + + "?, " + + ")"; + + // ----------------- + // component details + // ----------------- + private static final String INSERT_PROCESSOR_DETAILS = "INSERT INTO PROCESSOR_DETAILS (" + + "ACTION_ID, TYPE" + + ") VALUES (" + + "?, " + + "?" + + ")"; + + private static final String INSERT_REMOTE_PROCESS_GROUP_DETAILS = "INSERT INTO REMOTE_PROCESS_GROUP_DETAILS (" + + "ACTION_ID, URI" + + ") VALUES (" + + "?, " + + "?" + + ")"; + + // -------------- + // action details + // -------------- + private static final String INSERT_CONFIGURE_DETAILS = "INSERT INTO CONFIGURE_DETAILS (" + + "ACTION_ID, NAME, VALUE, PREVIOUS_VALUE" + + ") VALUES (" + + "?, " + + "?, " + + "?, " + + "?" + + ")"; + + private static final String INSERT_CONNECT_DETAILS = "INSERT INTO CONNECT_DETAILS (" + + "ACTION_ID, SOURCE_ID, SOURCE_NAME, SOURCE_TYPE, RELATIONSHIP, DESTINATION_ID, DESTINATION_NAME, DESTINATION_TYPE" + + ") VALUES (" + + "?, " + + "?, " + + "?, " + + "?, " + + "?, " + + "?, " + + "?, " + + "?" + + ")"; + + private static final String INSERT_MOVE_DETAILS = "INSERT INTO MOVE_DETAILS (" + + "ACTION_ID, GROUP_ID, GROUP_NAME, PREVIOUS_GROUP_ID, PREVIOUS_GROUP_NAME" + + ") VALUES (" + + "?, " + + "?, " + + "?, " + + "?, " + + "?" + + ")"; + + private static final String INSERT_PURGE_DETAILS = "INSERT INTO PURGE_DETAILS (" + + "ACTION_ID, END_DATE" + + ") VALUES (" + + "?, " + + "?" + + ")"; + + // ------------ + // action table + // ------------ + private static final String SELECT_ACTIONS = "SELECT * FROM ACTION"; + + private static final String SELECT_ACTION_COUNT = "SELECT COUNT(*) AS ACTION_COUNT FROM ACTION"; + + private static final String SELECT_ACTION_BY_ID = "SELECT * " + + "FROM ACTION " + + "WHERE " + + "ID = ?"; + + private static final String DELETE_ACTIONS = "DELETE FROM ACTION WHERE ACTION_TIMESTAMP < ?"; + + private static final String DELETE_SPECIFIC_ACTIONS = "DELETE FROM %s WHERE %s IN (SELECT ID FROM ACTION WHERE ACTION_TIMESTAMP < ?)"; + + // ----------------- + // component details + // ----------------- + private static final String SELECT_PROCESSOR_DETAILS_FOR_ACTION = "SELECT * FROM PROCESSOR_DETAILS WHERE ACTION_ID = ?"; + + private static final String SELECT_REMOTE_PROCESS_GROUP_DETAILS_FOR_ACTION = "SELECT * FROM REMOTE_PROCESS_GROUP_DETAILS WHERE ACTION_ID = ?"; + + // -------------- + // action details + // -------------- + private static final String SELECT_MOVE_DETAILS_FOR_ACTION = "SELECT * FROM MOVE_DETAILS WHERE ACTION_ID = ?"; + + private static final String SELECT_CONFIGURE_DETAILS_FOR_ACTION = "SELECT * FROM CONFIGURE_DETAILS WHERE ACTION_ID = ?"; + + private static final String SELECT_CONNECT_DETAILS_FOR_ACTION = "SELECT * FROM CONNECT_DETAILS WHERE ACTION_ID = ?"; + + private static final String SELECT_PURGE_DETAILS_FOR_ACTION = "SELECT * FROM PURGE_DETAILS WHERE ACTION_ID = ?"; + + // --------------- + // previous values + // --------------- + private static final String SELECT_PREVIOUSLY_CONFIGURED_FIELDS = "SELECT DISTINCT CD.NAME " + + "FROM CONFIGURE_DETAILS CD " + + "INNER JOIN ACTION A " + + "ON CD.ACTION_ID = A.ID " + + "WHERE A.SOURCE_ID = ?"; + + private static final String SELECT_PREVIOUS_VALUES = "SELECT CD.VALUE, " + + "A.ACTION_TIMESTAMP, " + + "A.USER_NAME " + + "FROM CONFIGURE_DETAILS CD " + + "INNER JOIN ACTION A " + + "ON CD.ACTION_ID = A.ID " + + "WHERE A.SOURCE_ID = ? AND CD.NAME = ? " + + "ORDER BY A.ACTION_TIMESTAMP DESC " + + "LIMIT 4"; + + private Connection connection; + private Map<String, String> columnMap; + + public StandardActionDAO(Connection connection) { + this.connection = connection; + + // initialize the column mappings + this.columnMap = new HashMap<>(); + this.columnMap.put("timestamp", "ACTION_TIMESTAMP"); + this.columnMap.put("sourceName", "SOURCE_NAME"); + this.columnMap.put("sourceType", "SOURCE_TYPE"); + this.columnMap.put("operation", "OPERATION"); + this.columnMap.put("userName", "USER_NAME"); + } + + @Override + public void createAction(Action action) throws DataAccessException { + if (action.getUserDn() == null) { + throw new IllegalArgumentException("User cannot be null."); + } + + if (action.getTimestamp() == null) { + throw new IllegalArgumentException("Action timestamp cannot be null."); + } + + PreparedStatement statement = null; + ResultSet rs = null; + try { + // obtain a statement to insert to the action table + statement = connection.prepareStatement(INSERT_ACTION, Statement.RETURN_GENERATED_KEYS); + statement.setString(1, StringUtils.left(action.getUserDn(), 255)); + statement.setString(2, StringUtils.left(action.getUserName(), 100)); + statement.setString(3, action.getSourceId()); + statement.setString(4, StringUtils.left(action.getSourceName(), 1000)); + statement.setString(5, action.getSourceType().toString()); + statement.setString(6, action.getOperation().toString()); + statement.setTimestamp(7, new java.sql.Timestamp(action.getTimestamp().getTime())); + + // insert the action + int updateCount = statement.executeUpdate(); + + // get the action id + rs = statement.getGeneratedKeys(); + if (updateCount == 1 && rs.next()) { + action.setId(rs.getInt(1)); + } else { + throw new DataAccessException("Unable to insert action."); + } + + // close the previous statement + statement.close(); + + // determine the type of component + ComponentDetails componentDetails = action.getComponentDetails(); + if (componentDetails instanceof ProcessorDetails) { + createProcessorDetails(action.getId(), (ProcessorDetails) componentDetails); + } else if (componentDetails instanceof RemoteProcessGroupDetails) { + createRemoteProcessGroupDetails(action.getId(), (RemoteProcessGroupDetails) componentDetails); + } + + // determine the type of action + ActionDetails details = action.getActionDetails(); + if (details instanceof ConnectDetails) { + createConnectDetails(action.getId(), (ConnectDetails) details); + } else if (details instanceof MoveDetails) { + createMoveDetails(action.getId(), (MoveDetails) details); + } else if (details instanceof ConfigureDetails) { + createConfigureDetails(action.getId(), (ConfigureDetails) details); + } else if (details instanceof PurgeDetails) { + createPurgeDetails(action.getId(), (PurgeDetails) details); + } + + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(rs); + RepositoryUtils.closeQuietly(statement); + } + } + + /** + * Persists the processor details. + * + * @param actionId + * @param processorDetails + * @throws DataAccessException + */ + private void createProcessorDetails(int actionId, ProcessorDetails processorDetails) throws DataAccessException { + PreparedStatement statement = null; + try { + // obtain a statement to insert to the processor action table + statement = connection.prepareStatement(INSERT_PROCESSOR_DETAILS); + statement.setInt(1, actionId); + statement.setString(2, StringUtils.left(processorDetails.getType(), 1000)); + + // insert the action + int updateCount = statement.executeUpdate(); + + // ensure the operation completed successfully + if (updateCount != 1) { + throw new DataAccessException("Unable to insert processor details."); + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(statement); + } + } + + /** + * Persists the remote process group details. + * + * @param actionId + * @param remoteProcessGroupDetails + * @throws DataAccessException + */ + private void createRemoteProcessGroupDetails(int actionId, RemoteProcessGroupDetails remoteProcessGroupDetails) throws DataAccessException { + PreparedStatement statement = null; + try { + // obtain a statement to insert to the processor action table + statement = connection.prepareStatement(INSERT_REMOTE_PROCESS_GROUP_DETAILS); + statement.setInt(1, actionId); + statement.setString(2, StringUtils.left(remoteProcessGroupDetails.getUri(), 2500)); + + // insert the action + int updateCount = statement.executeUpdate(); + + // ensure the operation completed successfully + if (updateCount != 1) { + throw new DataAccessException("Unable to insert remote prcoess group details."); + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(statement); + } + } + + /** + * Persists the connection details. + * + * @param actionId + * @param connectionDetails + * @throws DataAccessException + */ + private void createConnectDetails(int actionId, ConnectDetails connectionDetails) throws DataAccessException { + PreparedStatement statement = null; + try { + // obtain a statement to insert to the processor action table + statement = connection.prepareStatement(INSERT_CONNECT_DETAILS); + statement.setInt(1, actionId); + statement.setString(2, connectionDetails.getSourceId()); + statement.setString(3, StringUtils.left(connectionDetails.getSourceName(), 1000)); + statement.setString(4, StringUtils.left(connectionDetails.getSourceType().toString(), 1000)); + statement.setString(5, StringUtils.left(connectionDetails.getRelationship(), 1000)); + statement.setString(6, connectionDetails.getDestinationId()); + statement.setString(7, StringUtils.left(connectionDetails.getDestinationName(), 1000)); + statement.setString(8, StringUtils.left(connectionDetails.getDestinationType().toString(), 1000)); + + // insert the action + int updateCount = statement.executeUpdate(); + + // ensure the operation completed successfully + if (updateCount != 1) { + throw new DataAccessException("Unable to insert connection details."); + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(statement); + } + } + + /** + * Persists the move details. + * + * @param actionId + * @param moveDetails + * @throws DataAccessException + */ + private void createMoveDetails(int actionId, MoveDetails moveDetails) throws DataAccessException { + PreparedStatement statement = null; + try { + // obtain a statement to insert to the processor action table + statement = connection.prepareStatement(INSERT_MOVE_DETAILS); + statement.setInt(1, actionId); + statement.setString(2, moveDetails.getGroupId()); + statement.setString(3, StringUtils.left(moveDetails.getGroup(), 1000)); + statement.setString(4, moveDetails.getPreviousGroupId()); + statement.setString(5, StringUtils.left(moveDetails.getPreviousGroup(), 1000)); + + // insert the action + int updateCount = statement.executeUpdate(); + + // ensure the operation completed successfully + if (updateCount != 1) { + throw new DataAccessException("Unable to insert move details."); + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(statement); + } + } + + /** + * Persists the configuration details. + * + * @param actionId + * @param configurationDetails + * @throws DataAccessException + */ + private void createConfigureDetails(int actionId, ConfigureDetails configurationDetails) throws DataAccessException { + PreparedStatement statement = null; + try { + // obtain a statement to insert to the processor action table + statement = connection.prepareStatement(INSERT_CONFIGURE_DETAILS); + statement.setInt(1, actionId); + statement.setString(2, StringUtils.left(configurationDetails.getName(), 1000)); + statement.setString(3, StringUtils.left(configurationDetails.getValue(), 5000)); + statement.setString(4, StringUtils.left(configurationDetails.getPreviousValue(), 5000)); + + // insert the action + int updateCount = statement.executeUpdate(); + + // ensure the operation completed successfully + if (updateCount != 1) { + throw new DataAccessException("Unable to insert configure details."); + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(statement); + } + } + + /** + * Persists the purge details. + * + * @param actionId + * @param purgeDetails + * @throws DataAccessException + */ + private void createPurgeDetails(int actionId, PurgeDetails purgeDetails) throws DataAccessException { + PreparedStatement statement = null; + try { + // obtain a statement to insert to the processor action table + statement = connection.prepareStatement(INSERT_PURGE_DETAILS); + statement.setInt(1, actionId); + statement.setTimestamp(2, new java.sql.Timestamp(purgeDetails.getEndDate().getTime())); + + // insert the action + int updateCount = statement.executeUpdate(); + + // ensure the operation completed successfully + if (updateCount != 1) { + throw new DataAccessException("Unable to insert connection details."); + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(statement); + } + } + + /** + * Finds actions that meet the criteria in the specified query. + * + * @param historyQuery + * @return + * @throws DataAccessException + */ + @Override + public History findActions(HistoryQuery historyQuery) throws DataAccessException { + + // get the sort column + String sortColumn = "ACTION_TIMESTAMP"; + if (StringUtils.isNotBlank(historyQuery.getSortColumn())) { + String rawColumnName = historyQuery.getSortColumn(); + if (!columnMap.containsKey(rawColumnName)) { + throw new IllegalArgumentException(String.format("Unrecognized column name '%s'.", rawColumnName)); + } + sortColumn = columnMap.get(rawColumnName); + } + + // get the sort order + String sortOrder = "desc"; + if (StringUtils.isNotBlank(historyQuery.getSortOrder())) { + sortOrder = historyQuery.getSortOrder(); + } + + History actionResult = new History(); + Collection<Action> actions = new ArrayList<>(); + PreparedStatement statement = null; + ResultSet rs = null; + try { + List<String> where = new ArrayList<>(); + + // append the start time + if (historyQuery.getStartDate() != null) { + where.add("ACTION_TIMESTAMP >= ?"); + } + + // append the end time + if (historyQuery.getEndDate() != null) { + where.add("ACTION_TIMESTAMP <= ?"); + } + + // append the user id as necessary + if (historyQuery.getUserName() != null) { + where.add("UPPER(USER_NAME) LIKE ?"); + } + + // append the source id as necessary + if (historyQuery.getSourceId() != null) { + where.add("SOURCE_ID = ?"); + } + + String sql = SELECT_ACTION_COUNT; + if (!where.isEmpty()) { + sql += " WHERE " + StringUtils.join(where, " AND "); + } + + // get the total number of actions + statement = connection.prepareStatement(sql); + int paramIndex = 1; + + // set the start date as necessary + if (historyQuery.getStartDate() != null) { + statement.setTimestamp(paramIndex++, new java.sql.Timestamp(historyQuery.getStartDate().getTime())); + } + + // set the end date as necessary + if (historyQuery.getEndDate() != null) { + statement.setTimestamp(paramIndex++, new java.sql.Timestamp(historyQuery.getEndDate().getTime())); + } + + // set the user id as necessary + if (historyQuery.getUserName() != null) { + statement.setString(paramIndex++, "%" + historyQuery.getUserName().toUpperCase() + "%"); + } + + // set the source id as necessary + if (historyQuery.getSourceId() != null) { + statement.setString(paramIndex, historyQuery.getSourceId()); + } + + // execute the statement + rs = statement.executeQuery(); + + // ensure there are results + if (rs.next()) { + actionResult.setTotal(rs.getInt("ACTION_COUNT")); + } else { + throw new DataAccessException("Unable to determine total action count."); + } + + sql = SELECT_ACTIONS; + if (!where.isEmpty()) { + sql += " WHERE " + StringUtils.join(where, " AND "); + } + + // append the sort criteria + sql += (" ORDER BY " + sortColumn + " " + sortOrder); + + // append the offset and limit + sql += " LIMIT ? OFFSET ?"; + + // close the previous statement + statement.close(); + + // create the statement + statement = connection.prepareStatement(sql); + paramIndex = 1; + + // set the start date as necessary + if (historyQuery.getStartDate() != null) { + statement.setTimestamp(paramIndex++, new java.sql.Timestamp(historyQuery.getStartDate().getTime())); + } + + // set the end date as necessary + if (historyQuery.getEndDate() != null) { + statement.setTimestamp(paramIndex++, new java.sql.Timestamp(historyQuery.getEndDate().getTime())); + } + + // set the user id as necessary + if (historyQuery.getUserName() != null) { + statement.setString(paramIndex++, "%" + historyQuery.getUserName().toUpperCase() + "%"); + } + + // set the source id as necessary + if (historyQuery.getSourceId() != null) { + statement.setString(paramIndex++, historyQuery.getSourceId()); + } + + // set the limit + statement.setInt(paramIndex++, historyQuery.getCount()); + + // set the offset according to the currented page calculated above + statement.setInt(paramIndex, historyQuery.getOffset()); + + // execute the query + rs = statement.executeQuery(); + + // create each corresponding action + while (rs.next()) { + final Integer actionId = rs.getInt("ID"); + final Operation operation = Operation.valueOf(rs.getString("OPERATION")); + final Component component = Component.valueOf(rs.getString("SOURCE_TYPE")); + + Action action = new Action(); + action.setId(actionId); + action.setUserDn(rs.getString("USER_DN")); + action.setUserName(rs.getString("USER_NAME")); + action.setOperation(Operation.valueOf(rs.getString("OPERATION"))); + action.setTimestamp(new Date(rs.getTimestamp("ACTION_TIMESTAMP").getTime())); + action.setSourceId(rs.getString("SOURCE_ID")); + action.setSourceName(rs.getString("SOURCE_NAME")); + action.setSourceType(Component.valueOf(rs.getString("SOURCE_TYPE"))); + + // get the component details if appropriate + ComponentDetails componentDetails = null; + if (Component.Processor.equals(component)) { + componentDetails = getProcessorDetails(actionId); + } else if (Component.RemoteProcessGroup.equals(component)) { + componentDetails = getRemoteProcessGroupDetails(actionId); + } + + if (componentDetails != null) { + action.setComponentDetails(componentDetails); + } + + // get the action details if appropriate + ActionDetails actionDetails = null; + if (Operation.Move.equals(operation)) { + actionDetails = getMoveDetails(actionId); + } else if (Operation.Configure.equals(operation)) { + actionDetails = getConfigureDetails(actionId); + } else if (Operation.Connect.equals(operation) || Operation.Disconnect.equals(operation)) { + actionDetails = getConnectDetails(actionId); + } else if (Operation.Purge.equals(operation)) { + actionDetails = getPurgeDetails(actionId); + } + + // set the action details + if (actionDetails != null) { + action.setActionDetails(actionDetails); + } + + // add the action + actions.add(action); + } + + // populate the action result + actionResult.setActions(actions); + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(rs); + RepositoryUtils.closeQuietly(statement); + } + + return actionResult; + } + + @Override + public Action getAction(Integer actionId) throws DataAccessException { + Action action = null; + PreparedStatement statement = null; + ResultSet rs = null; + try { + // create the statement + statement = connection.prepareStatement(SELECT_ACTION_BY_ID); + statement.setInt(1, actionId); + + // execute the query + rs = statement.executeQuery(); + + // ensure results + if (rs.next()) { + Operation operation = Operation.valueOf(rs.getString("OPERATION")); + Component component = Component.valueOf(rs.getString("SOURCE_TYPE")); + + // populate the action + action = new Action(); + action.setId(rs.getInt("ID")); + action.setUserDn(rs.getString("USER_DN")); + action.setUserName(rs.getString("USER_NAME")); + action.setOperation(operation); + action.setTimestamp(new Date(rs.getTimestamp("ACTION_TIMESTAMP").getTime())); + action.setSourceId(rs.getString("SOURCE_ID")); + action.setSourceName(rs.getString("SOURCE_NAME")); + action.setSourceType(component); + + // get the component details if appropriate + ComponentDetails componentDetails = null; + if (Component.Processor.equals(component)) { + componentDetails = getProcessorDetails(actionId); + } else if (Component.RemoteProcessGroup.equals(component)) { + componentDetails = getRemoteProcessGroupDetails(actionId); + } + + if (componentDetails != null) { + action.setComponentDetails(componentDetails); + } + + // get the action details if appropriate + ActionDetails actionDetails = null; + if (Operation.Move.equals(operation)) { + actionDetails = getMoveDetails(actionId); + } else if (Operation.Configure.equals(operation)) { + actionDetails = getConfigureDetails(actionId); + } else if (Operation.Connect.equals(operation) || Operation.Disconnect.equals(operation)) { + actionDetails = getConnectDetails(actionId); + } else if (Operation.Purge.equals(operation)) { + actionDetails = getPurgeDetails(actionId); + } + + // set the action details + if (actionDetails != null) { + action.setActionDetails(actionDetails); + } + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(rs); + RepositoryUtils.closeQuietly(statement); + } + + return action; + } + + /** + * Loads the specified processor details. + * + * @param actionId + * @return + * @throws DataAccessException + */ + private ProcessorDetails getProcessorDetails(Integer actionId) throws DataAccessException { + ProcessorDetails processorDetails = null; + PreparedStatement statement = null; + ResultSet rs = null; + try { + // create the statement + statement = connection.prepareStatement(SELECT_PROCESSOR_DETAILS_FOR_ACTION); + statement.setInt(1, actionId); + + // execute the query + rs = statement.executeQuery(); + + // ensure results + if (rs.next()) { + processorDetails = new ProcessorDetails(); + processorDetails.setType(rs.getString("TYPE")); + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(rs); + RepositoryUtils.closeQuietly(statement); + } + + return processorDetails; + } + + /** + * Loads the specified remote process group details. + * + * @param actionId + * @return + * @throws DataAccessException + */ + private RemoteProcessGroupDetails getRemoteProcessGroupDetails(Integer actionId) throws DataAccessException { + RemoteProcessGroupDetails remoteProcessGroupDetails = null; + PreparedStatement statement = null; + ResultSet rs = null; + try { + // create the statement + statement = connection.prepareStatement(SELECT_REMOTE_PROCESS_GROUP_DETAILS_FOR_ACTION); + statement.setInt(1, actionId); + + // execute the query + rs = statement.executeQuery(); + + // ensure results + if (rs.next()) { + remoteProcessGroupDetails = new RemoteProcessGroupDetails(); + remoteProcessGroupDetails.setUri(rs.getString("URI")); + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(rs); + RepositoryUtils.closeQuietly(statement); + } + + return remoteProcessGroupDetails; + } + + /** + * Loads the specified move details. + * + * @param actionId + * @return + * @throws DataAccessException + */ + private MoveDetails getMoveDetails(Integer actionId) throws DataAccessException { + MoveDetails moveDetails = null; + PreparedStatement statement = null; + ResultSet rs = null; + try { + // create the statement + statement = connection.prepareStatement(SELECT_MOVE_DETAILS_FOR_ACTION); + statement.setInt(1, actionId); + + // execute the query + rs = statement.executeQuery(); + + // ensure results + if (rs.next()) { + moveDetails = new MoveDetails(); + moveDetails.setGroupId(rs.getString("GROUP_ID")); + moveDetails.setGroup(rs.getString("GROUP_NAME")); + moveDetails.setPreviousGroupId(rs.getString("PREVIOUS_GROUP_ID")); + moveDetails.setPreviousGroup(rs.getString("PREVIOUS_GROUP_NAME")); + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(rs); + RepositoryUtils.closeQuietly(statement); + } + + return moveDetails; + } + + /** + * Loads the specified relationship details. + * + * @param actionId + * @return + * @throws DataAccessException + */ + private ConnectDetails getConnectDetails(Integer actionId) throws DataAccessException { + ConnectDetails connectionDetails = null; + PreparedStatement statement = null; + ResultSet rs = null; + try { + // create the statement + statement = connection.prepareStatement(SELECT_CONNECT_DETAILS_FOR_ACTION); + statement.setInt(1, actionId); + + // execute the query + rs = statement.executeQuery(); + + // ensure results + if (rs.next()) { + final Component sourceComponent = Component.valueOf(rs.getString("SOURCE_TYPE")); + final Component destinationComponent = Component.valueOf(rs.getString("DESTINATION_TYPE")); + + connectionDetails = new ConnectDetails(); + connectionDetails.setSourceId(rs.getString("SOURCE_ID")); + connectionDetails.setSourceName(rs.getString("SOURCE_NAME")); + connectionDetails.setSourceType(sourceComponent); + connectionDetails.setRelationship(rs.getString("RELATIONSHIP")); + connectionDetails.setDestinationId(rs.getString("DESTINATION_ID")); + connectionDetails.setDestinationName(rs.getString("DESTINATION_NAME")); + connectionDetails.setDestinationType(destinationComponent); + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(rs); + RepositoryUtils.closeQuietly(statement); + } + + return connectionDetails; + } + + /** + * Loads the specified configuration details. + * + * @param actionId + * @return + * @throws DataAccessException + */ + private ConfigureDetails getConfigureDetails(Integer actionId) throws DataAccessException { + ConfigureDetails configurationDetails = null; + PreparedStatement statement = null; + ResultSet rs = null; + try { + // create the statement + statement = connection.prepareStatement(SELECT_CONFIGURE_DETAILS_FOR_ACTION); + statement.setInt(1, actionId); + + // execute the query + rs = statement.executeQuery(); + + // ensure results + if (rs.next()) { + configurationDetails = new ConfigureDetails(); + configurationDetails.setName(rs.getString("NAME")); + configurationDetails.setValue(rs.getString("VALUE")); + configurationDetails.setPreviousValue(rs.getString("PREVIOUS_VALUE")); + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(rs); + RepositoryUtils.closeQuietly(statement); + } + + return configurationDetails; + } + + /** + * Loads the specified purge details. + * + * @param actionId + * @return + * @throws DataAccessException + */ + private PurgeDetails getPurgeDetails(Integer actionId) throws DataAccessException { + PurgeDetails purgeDetails = null; + PreparedStatement statement = null; + ResultSet rs = null; + try { + // create the statement + statement = connection.prepareStatement(SELECT_PURGE_DETAILS_FOR_ACTION); + statement.setInt(1, actionId); + + // execute the query + rs = statement.executeQuery(); + + // ensure results + if (rs.next()) { + purgeDetails = new PurgeDetails(); + purgeDetails.setEndDate(new Date(rs.getTimestamp("END_DATE").getTime())); + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(rs); + RepositoryUtils.closeQuietly(statement); + } + + return purgeDetails; + } + + @Override + public Map<String, List<PreviousValue>> getPreviousValues(String processorId) { + Map<String, List<PreviousValue>> previousValues = new LinkedHashMap<>(); + + PreparedStatement statement = null; + ResultSet rs = null; + try { + // create the statement + statement = connection.prepareStatement(SELECT_PREVIOUSLY_CONFIGURED_FIELDS); + statement.setString(1, processorId); + + // execute the query + rs = statement.executeQuery(); + + // ensure results + while (rs.next()) { + final String property = rs.getString("NAME"); + previousValues.put(property, getPreviousValuesForProperty(processorId, property)); + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(rs); + RepositoryUtils.closeQuietly(statement); + } + + return previousValues; + } + + private List<PreviousValue> getPreviousValuesForProperty(final String processorId, final String property) { + List<PreviousValue> previousValues = new ArrayList<>(); + + PreparedStatement statement = null; + ResultSet rs = null; + try { + // create the statement + statement = connection.prepareStatement(SELECT_PREVIOUS_VALUES); + statement.setString(1, processorId); + statement.setString(2, property); + + // execute the query + rs = statement.executeQuery(); + + // ensure results + while (rs.next()) { + // get the previous value + final PreviousValue previousValue = new PreviousValue(); + previousValue.setPreviousValue(rs.getString("VALUE")); + previousValue.setTimestamp(new Date(rs.getTimestamp("ACTION_TIMESTAMP").getTime())); + previousValue.setUserName(rs.getString("USER_NAME")); + previousValues.add(previousValue); + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(rs); + RepositoryUtils.closeQuietly(statement); + } + + return previousValues; + } + + @Override + public void deleteActions(Date endDate) throws DataAccessException { + PreparedStatement statement = null; + try { + // ----------------- + // component details + // ----------------- + + // create the move delete statement + statement = connection.prepareStatement(String.format(DELETE_SPECIFIC_ACTIONS, "PROCESSOR_DETAILS", "ACTION_ID")); + statement.setTimestamp(1, new java.sql.Timestamp(endDate.getTime())); + statement.executeUpdate(); + statement.close(); + + // create the move delete statement + statement = connection.prepareStatement(String.format(DELETE_SPECIFIC_ACTIONS, "REMOTE_PROCESS_GROUP_DETAILS", "ACTION_ID")); + statement.setTimestamp(1, new java.sql.Timestamp(endDate.getTime())); + statement.executeUpdate(); + statement.close(); + + // -------------- + // action details + // -------------- + // create the move delete statement + statement = connection.prepareStatement(String.format(DELETE_SPECIFIC_ACTIONS, "MOVE_DETAILS", "ACTION_ID")); + statement.setTimestamp(1, new java.sql.Timestamp(endDate.getTime())); + statement.executeUpdate(); + statement.close(); + + // create the configure delete statement + statement = connection.prepareStatement(String.format(DELETE_SPECIFIC_ACTIONS, "CONFIGURE_DETAILS", "ACTION_ID")); + statement.setTimestamp(1, new java.sql.Timestamp(endDate.getTime())); + statement.executeUpdate(); + statement.close(); + + // create the connect delete statement + statement = connection.prepareStatement(String.format(DELETE_SPECIFIC_ACTIONS, "CONNECT_DETAILS", "ACTION_ID")); + statement.setTimestamp(1, new java.sql.Timestamp(endDate.getTime())); + statement.executeUpdate(); + statement.close(); + + // create the relationship delete statement + statement = connection.prepareStatement(String.format(DELETE_SPECIFIC_ACTIONS, "PURGE_DETAILS", "ACTION_ID")); + statement.setTimestamp(1, new java.sql.Timestamp(endDate.getTime())); + statement.executeUpdate(); + statement.close(); + + // ------- + // actions + // ------- + // create the action delete statement + statement = connection.prepareStatement(DELETE_ACTIONS); + statement.setTimestamp(1, new java.sql.Timestamp(endDate.getTime())); + statement.executeUpdate(); + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(statement); + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/administration/src/main/java/org/apache/nifi/admin/dao/impl/StandardAuthorityDAO.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/administration/src/main/java/org/apache/nifi/admin/dao/impl/StandardAuthorityDAO.java b/nar-bundles/framework-bundle/framework/administration/src/main/java/org/apache/nifi/admin/dao/impl/StandardAuthorityDAO.java new file mode 100644 index 0000000..4e2cc26 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/administration/src/main/java/org/apache/nifi/admin/dao/impl/StandardAuthorityDAO.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.admin.dao.impl; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.EnumSet; +import java.util.Set; +import org.apache.nifi.admin.RepositoryUtils; +import org.apache.nifi.admin.dao.AuthorityDAO; +import org.apache.nifi.admin.dao.DataAccessException; +import org.apache.nifi.authorization.Authority; + +/** + * + */ +public class StandardAuthorityDAO implements AuthorityDAO { + + private static final String SELECT_AUTHORITIES_FOR_USER = "SELECT ID, ROLE " + + "FROM AUTHORITY " + + "WHERE USER_ID = ?"; + + private static final String INSERT_AUTHORITY = "INSERT INTO AUTHORITY (" + + "USER_ID, ROLE" + + ") VALUES (" + + "?, ?" + + ")"; + + private static final String DELETE_AUTHORITY = "DELETE FROM AUTHORITY " + + "WHERE USER_ID = ? AND ROLE = ?"; + + private static final String DELETE_AUTHORITIES_FOR_USER = "DELETE FROM AUTHORITY " + + "WHERE USER_ID = ?"; + + private final Connection connection; + + public StandardAuthorityDAO(Connection connection) { + this.connection = connection; + } + + @Override + public void createAuthorities(Set<Authority> authorities, String userId) throws DataAccessException { + if (authorities == null) { + throw new IllegalArgumentException("Specified authorities cannot be null."); + } + + // ensure there are some authorities to create + if (!authorities.isEmpty()) { + PreparedStatement statement = null; + try { + // add each authority for the specified user + statement = connection.prepareStatement(INSERT_AUTHORITY); + statement.setString(1, userId); + for (Authority authority : authorities) { + statement.setString(2, authority.toString()); + statement.addBatch(); + } + + // insert the authorities + int[] updateCounts = statement.executeBatch(); + for (int updateCount : updateCounts) { + if (updateCount != 1) { + throw new DataAccessException("Unable to insert user authorities."); + } + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } catch (DataAccessException dae) { + throw dae; + } finally { + RepositoryUtils.closeQuietly(statement); + } + } + } + + @Override + public void deleteAuthorities(String userId) throws DataAccessException { + // ensure there are some authorities to create + PreparedStatement statement = null; + try { + // add each authority for the specified user + statement = connection.prepareStatement(DELETE_AUTHORITIES_FOR_USER); + statement.setString(1, userId); + + // insert the authorities + statement.executeUpdate(); + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(statement); + } + } + + @Override + public void deleteAuthorities(Set<Authority> authorities, String userId) throws DataAccessException { + if (authorities == null) { + throw new IllegalArgumentException("Specified authorities cannot be null."); + } + + // ensure there are some authorities to create + if (!authorities.isEmpty()) { + PreparedStatement statement = null; + try { + // add each authority for the specified user + statement = connection.prepareStatement(DELETE_AUTHORITY); + statement.setString(1, userId); + for (Authority authority : authorities) { + statement.setString(2, authority.toString()); + statement.addBatch(); + } + + // insert the authorities + int[] updateCounts = statement.executeBatch(); + for (int updateCount : updateCounts) { + if (updateCount != 1) { + throw new DataAccessException("Unable to remove user authorities."); + } + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } catch (DataAccessException dae) { + throw dae; + } finally { + RepositoryUtils.closeQuietly(statement); + } + } + } + + @Override + public Set<Authority> findAuthoritiesByUserId(String userId) throws DataAccessException { + Set<Authority> authorities = EnumSet.noneOf(Authority.class); + PreparedStatement statement = null; + ResultSet rs = null; + try { + // add each authority for the specified user + statement = connection.prepareStatement(SELECT_AUTHORITIES_FOR_USER); + statement.setString(1, userId); + + // execute the query + rs = statement.executeQuery(); + + // create each corresponding authority + while (rs.next()) { + authorities.add(Authority.valueOfAuthority(rs.getString("ROLE"))); + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(rs); + RepositoryUtils.closeQuietly(statement); + } + + return authorities; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/administration/src/main/java/org/apache/nifi/admin/dao/impl/StandardUserDAO.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/administration/src/main/java/org/apache/nifi/admin/dao/impl/StandardUserDAO.java b/nar-bundles/framework-bundle/framework/administration/src/main/java/org/apache/nifi/admin/dao/impl/StandardUserDAO.java new file mode 100644 index 0000000..ea7c1a1 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/administration/src/main/java/org/apache/nifi/admin/dao/impl/StandardUserDAO.java @@ -0,0 +1,634 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.admin.dao.impl; + +import java.nio.charset.StandardCharsets; +import java.sql.Connection; +import org.apache.nifi.admin.dao.UserDAO; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Types; +import java.util.Date; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; +import org.apache.nifi.admin.RepositoryUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.admin.dao.DataAccessException; +import org.apache.nifi.authorization.Authority; +import org.apache.nifi.user.AccountStatus; +import org.apache.nifi.user.NiFiUser; + +/** + * Responsible for loading and persisting NiFiUsers. + */ +public class StandardUserDAO implements UserDAO { + + private static final String SELECT_PENDING_ACCOUNTS_COUNT = "SELECT " + + "COUNT(*) as PENDING_ACCOUNTS " + + "FROM USER U " + + "WHERE U.STATUS = 'PENDING'"; + + private static final String SELECT_USER_BY_DN = "SELECT " + + "U.ID, " + + "U.DN, " + + "U.USER_NAME, " + + "U.USER_GROUP, " + + "U.CREATION, " + + "U.LAST_ACCESSED, " + + "U.LAST_VERIFIED, " + + "U.JUSTIFICATION, " + + "U.STATUS, " + + "A.ROLE " + + "FROM USER U " + + "LEFT JOIN AUTHORITY A " // ensures that users without authorities are still matched + + "ON U.ID = A.USER_ID " + + "WHERE U.DN = ?"; + + private static final String SELECT_USER_BY_ID = "SELECT " + + "U.ID, " + + "U.DN, " + + "U.USER_NAME, " + + "U.USER_GROUP, " + + "U.CREATION, " + + "U.LAST_ACCESSED, " + + "U.LAST_VERIFIED, " + + "U.JUSTIFICATION, " + + "U.STATUS, " + + "A.ROLE " + + "FROM USER U " + + "LEFT JOIN AUTHORITY A " // ensures that users without authorities are still matched + + "ON U.ID = A.USER_ID " + + "WHERE U.ID = ?"; + + private static final String SELECT_USERS = "SELECT " + + "U.ID, " + + "U.DN, " + + "U.USER_NAME, " + + "U.USER_GROUP, " + + "U.CREATION, " + + "U.LAST_ACCESSED, " + + "U.LAST_VERIFIED, " + + "U.JUSTIFICATION, " + + "U.STATUS, " + + "A.ROLE " + + "FROM USER U " + + "LEFT JOIN AUTHORITY A " // ensures that users without authorities are still matched + + "ON U.ID = A.USER_ID " + + "WHERE U.DN <> ?"; + + private static final String SELECT_USER_GROUPS = "SELECT DISTINCT " + + "U.USER_GROUP " + + "FROM USER U"; + + private static final String SELECT_USER_GROUP = "SELECT " + + "U.ID, " + + "U.DN, " + + "U.USER_NAME, " + + "U.USER_GROUP, " + + "U.CREATION, " + + "U.LAST_ACCESSED, " + + "U.LAST_VERIFIED, " + + "U.JUSTIFICATION, " + + "U.STATUS, " + + "A.ROLE " + + "FROM USER U " + + "LEFT JOIN AUTHORITY A " // ensures that users without authorities are still matched + + "ON U.ID = A.USER_ID " + + "WHERE U.DN <> ? AND U.USER_GROUP = ?"; + + private static final String INSERT_USER = "INSERT INTO USER (" + + "ID, DN, USER_NAME, USER_GROUP, CREATION, LAST_VERIFIED, JUSTIFICATION, STATUS" + + ") VALUES (" + + "?, " + + "?, " + + "?, " + + "?, " + + "NOW(), " + + "?, " + + "?, " + + "?" + + ")"; + + private static final String UPDATE_USER = "UPDATE USER SET " + + "DN = ?, " + + "USER_NAME = ?, " + + "USER_GROUP = ?, " + + "LAST_ACCESSED = ?, " + + "LAST_VERIFIED = ?, " + + "JUSTIFICATION = ?, " + + "STATUS = ? " + + "WHERE ID = ?"; + + private static final String UPDATE_USER_GROUP_STATUS = "UPDATE USER SET " + + "STATUS = ?," + + "USER_GROUP = NULL " + + "WHERE USER_GROUP = ?"; + + private static final String UPDATE_USER_GROUP_VERIFICATION = "UPDATE USER SET " + + "LAST_VERIFIED = ? " + + "WHERE USER_GROUP = ?"; + + private static final String UNGROUP_GROUP = "UPDATE USER SET " + + "USER_GROUP = NULL " + + "WHERE USER_GROUP = ?"; + + private static final String DELETE_USER = "DELETE FROM USER " + + "WHERE ID = ?"; + + private final Connection connection; + + public StandardUserDAO(Connection connection) { + this.connection = connection; + } + + @Override + public Boolean hasPendingUserAccounts() throws DataAccessException { + PreparedStatement statement = null; + ResultSet rs = null; + try { + // create the connection and obtain a statement + statement = connection.prepareStatement(SELECT_PENDING_ACCOUNTS_COUNT); + + // execute the query + rs = statement.executeQuery(); + + // get the first row which will contain the number of pending accounts + if (rs.next()) { + int pendingAccounts = rs.getInt("PENDING_ACCOUNTS"); + return pendingAccounts > 0; + } + + // query returned no results? + return false; + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(rs); + RepositoryUtils.closeQuietly(statement); + } + } + + @Override + public Set<NiFiUser> findUsers() throws DataAccessException { + Set<NiFiUser> users = new HashSet<>(); + + PreparedStatement statement = null; + ResultSet rs = null; + try { + // create the connection and obtain a statement + statement = connection.prepareStatement(SELECT_USERS); + statement.setString(1, NiFiUser.ANONYMOUS_USER_DN); + + // execute the query + rs = statement.executeQuery(); + + // create the user + NiFiUser user = null; + + // go through the user and its roles + while (rs.next()) { + // get the user id for the current record + String userId = rs.getString("ID"); + + // create the user during the first iteration + if (user == null || !userId.equals(user.getId())) { + user = new NiFiUser(); + user.setId(userId); + user.setDn(rs.getString("DN")); + user.setUserName(rs.getString("USER_NAME")); + user.setUserGroup(rs.getString("USER_GROUP")); + user.setJustification(rs.getString("JUSTIFICATION")); + user.setStatus(AccountStatus.valueOfStatus(rs.getString("STATUS"))); + + // set the creation date + user.setCreation(new Date(rs.getTimestamp("CREATION").getTime())); + + // get the last accessed date + if (rs.getTimestamp("LAST_ACCESSED") != null) { + user.setLastAccessed(new Date(rs.getTimestamp("LAST_ACCESSED").getTime())); + } + + // get the last verified date + if (rs.getTimestamp("LAST_VERIFIED") != null) { + user.setLastVerified(new Date(rs.getTimestamp("LAST_VERIFIED").getTime())); + } + + // add the user + users.add(user); + } + + // the select statement performs a left join since the desired + // user may not have any authorities + String authority = rs.getString("ROLE"); + if (StringUtils.isNotBlank(authority)) { + user.getAuthorities().add(Authority.valueOfAuthority(authority)); + } + } + + return users; + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(rs); + RepositoryUtils.closeQuietly(statement); + } + } + + @Override + public Set<String> findUserGroups() throws DataAccessException { + Set<String> userGroups = new HashSet<>(); + + PreparedStatement statement = null; + ResultSet rs = null; + try { + // create the connection and obtain a statement + statement = connection.prepareStatement(SELECT_USER_GROUPS); + + // execute the query + rs = statement.executeQuery(); + + // get each user group + while (rs.next()) { + userGroups.add(rs.getString("USER_GROUP")); + } + + return userGroups; + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(rs); + RepositoryUtils.closeQuietly(statement); + } + } + + @Override + public Set<NiFiUser> findUsersForGroup(String group) throws DataAccessException { + Set<NiFiUser> users = new HashSet<>(); + + PreparedStatement statement = null; + ResultSet rs = null; + try { + // create the connection and obtain a statement + statement = connection.prepareStatement(SELECT_USER_GROUP); + statement.setString(1, NiFiUser.ANONYMOUS_USER_DN); + statement.setString(2, group); + + // execute the query + rs = statement.executeQuery(); + + // create the user + NiFiUser user = null; + + // go through the user and its roles + while (rs.next()) { + // get the user id for the current record + String userId = rs.getString("ID"); + + // create the user during the first iteration + if (user == null || !userId.equals(user.getId())) { + user = new NiFiUser(); + user.setId(userId); + user.setDn(rs.getString("DN")); + user.setUserName(rs.getString("USER_NAME")); + user.setUserGroup(rs.getString("USER_GROUP")); + user.setJustification(rs.getString("JUSTIFICATION")); + user.setStatus(AccountStatus.valueOfStatus(rs.getString("STATUS"))); + + // set the creation date + user.setCreation(new Date(rs.getTimestamp("CREATION").getTime())); + + // get the last accessed date + if (rs.getTimestamp("LAST_ACCESSED") != null) { + user.setLastAccessed(new Date(rs.getTimestamp("LAST_ACCESSED").getTime())); + } + + // get the last verified date + if (rs.getTimestamp("LAST_VERIFIED") != null) { + user.setLastVerified(new Date(rs.getTimestamp("LAST_VERIFIED").getTime())); + } + + // add the user + users.add(user); + } + + // the select statement performs a left join since the desired + // user may not have any authorities + String authority = rs.getString("ROLE"); + if (StringUtils.isNotBlank(authority)) { + user.getAuthorities().add(Authority.valueOfAuthority(authority)); + } + } + + return users; + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(rs); + RepositoryUtils.closeQuietly(statement); + } + } + + @Override + public NiFiUser findUserById(String id) throws DataAccessException { + PreparedStatement statement = null; + ResultSet rs = null; + try { + // create the connection and obtain a statement + statement = connection.prepareStatement(SELECT_USER_BY_ID); + statement.setString(1, id); + + // execute the query + rs = statement.executeQuery(); + + // create the user + NiFiUser user = null; + + // go through the user and its roles + while (rs.next()) { + // create the user during the first iteration + if (user == null) { + user = new NiFiUser(); + user.setId(rs.getString("ID")); + user.setDn(rs.getString("DN")); + user.setUserName(rs.getString("USER_NAME")); + user.setUserGroup(rs.getString("USER_GROUP")); + user.setJustification(rs.getString("JUSTIFICATION")); + user.setStatus(AccountStatus.valueOfStatus(rs.getString("STATUS"))); + + // set the creation date + user.setCreation(new Date(rs.getTimestamp("CREATION").getTime())); + + // get the last accessed date + if (rs.getTimestamp("LAST_ACCESSED") != null) { + user.setLastAccessed(new Date(rs.getTimestamp("LAST_ACCESSED").getTime())); + } + + // get the last verified date + if (rs.getTimestamp("LAST_VERIFIED") != null) { + user.setLastVerified(new Date(rs.getTimestamp("LAST_VERIFIED").getTime())); + } + } + + // the select statement performs a left join since the desired + // user may not have any authorities + String authority = rs.getString("ROLE"); + if (StringUtils.isNotBlank(authority)) { + user.getAuthorities().add(Authority.valueOfAuthority(authority)); + } + } + + return user; + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(rs); + RepositoryUtils.closeQuietly(statement); + } + } + + @Override + public NiFiUser findUserByDn(String dn) throws DataAccessException { + PreparedStatement statement = null; + ResultSet rs = null; + try { + // create the connection and obtain a statement + statement = connection.prepareStatement(SELECT_USER_BY_DN); + statement.setString(1, dn); + + // execute the query + rs = statement.executeQuery(); + + // create the user + NiFiUser user = null; + + // go through the user and its roles + while (rs.next()) { + // create the user during the first iteration + if (user == null) { + user = new NiFiUser(); + user.setId(rs.getString("ID")); + user.setDn(rs.getString("DN")); + user.setUserName(rs.getString("USER_NAME")); + user.setUserGroup(rs.getString("USER_GROUP")); + user.setJustification(rs.getString("JUSTIFICATION")); + user.setStatus(AccountStatus.valueOfStatus(rs.getString("STATUS"))); + + // set the creation date + user.setCreation(new Date(rs.getTimestamp("CREATION").getTime())); + + // get the last accessed date + if (rs.getTimestamp("LAST_ACCESSED") != null) { + user.setLastAccessed(new Date(rs.getTimestamp("LAST_ACCESSED").getTime())); + } + + // get the last verified date + if (rs.getTimestamp("LAST_VERIFIED") != null) { + user.setLastVerified(new Date(rs.getTimestamp("LAST_VERIFIED").getTime())); + } + } + + // the select statement performs a left join since the desired + // user may not have any authorities + String authority = rs.getString("ROLE"); + if (StringUtils.isNotBlank(authority)) { + user.getAuthorities().add(Authority.valueOfAuthority(authority)); + } + } + + return user; + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(rs); + RepositoryUtils.closeQuietly(statement); + } + } + + @Override + public void createUser(NiFiUser user) throws DataAccessException { + if (user.getDn() == null) { + throw new IllegalArgumentException("User dn must be specified."); + } + + PreparedStatement statement = null; + ResultSet rs = null; + try { + final String id = UUID.nameUUIDFromBytes(user.getDn().getBytes(StandardCharsets.UTF_8)).toString(); + + // create a statement + statement = connection.prepareStatement(INSERT_USER, Statement.RETURN_GENERATED_KEYS); + statement.setString(1, id); + statement.setString(2, StringUtils.left(user.getDn(), 255)); + statement.setString(3, StringUtils.left(user.getUserName(), 100)); + statement.setString(4, StringUtils.left(user.getUserGroup(), 100)); + if (user.getLastVerified() != null) { + statement.setTimestamp(5, new java.sql.Timestamp(user.getLastVerified().getTime())); + } else { + statement.setTimestamp(5, null); + } + statement.setString(6, StringUtils.left(user.getJustification(), 500)); + statement.setString(7, user.getStatus().toString()); + + // insert the user + int updateCount = statement.executeUpdate(); + if (updateCount == 1) { + user.setId(id); + } else { + throw new DataAccessException("Unable to insert user."); + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } catch (DataAccessException dae) { + throw dae; + } finally { + RepositoryUtils.closeQuietly(rs); + RepositoryUtils.closeQuietly(statement); + } + } + + @Override + public void deleteUser(String id) throws DataAccessException { + // ensure there are some authorities to create + PreparedStatement statement = null; + try { + // add each authority for the specified user + statement = connection.prepareStatement(DELETE_USER); + statement.setString(1, id); + + // insert the authorities + statement.executeUpdate(); + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } catch (DataAccessException dae) { + throw dae; + } finally { + RepositoryUtils.closeQuietly(statement); + } + } + + @Override + public void updateUser(NiFiUser user) throws DataAccessException { + PreparedStatement statement = null; + try { + // create a statement + statement = connection.prepareStatement(UPDATE_USER); + statement.setString(1, StringUtils.left(user.getDn(), 255)); + statement.setString(2, StringUtils.left(user.getUserName(), 100)); + statement.setString(3, StringUtils.left(user.getUserGroup(), 100)); + statement.setString(6, StringUtils.left(user.getJustification(), 500)); + statement.setString(7, user.getStatus().toString()); + statement.setString(8, user.getId()); + + // set the last accessed time accordingly + if (user.getLastAccessed() == null) { + statement.setNull(4, Types.TIMESTAMP); + } else { + statement.setTimestamp(4, new java.sql.Timestamp(user.getLastAccessed().getTime())); + } + + // set the last verified time accordingly + if (user.getLastVerified() == null) { + statement.setNull(5, Types.TIMESTAMP); + } else { + statement.setTimestamp(5, new java.sql.Timestamp(user.getLastVerified().getTime())); + } + + // perform the update + int updateCount = statement.executeUpdate(); + if (updateCount != 1) { + throw new DataAccessException("Unable to update user."); + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } catch (DataAccessException dae) { + throw dae; + } finally { + RepositoryUtils.closeQuietly(statement); + } + } + + @Override + public void updateGroupStatus(String group, AccountStatus status) throws DataAccessException { + PreparedStatement statement = null; + try { + // create a statement + statement = connection.prepareStatement(UPDATE_USER_GROUP_STATUS); + statement.setString(1, status.toString()); + statement.setString(2, group); + + // perform the update + statement.executeUpdate(); + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } catch (DataAccessException dae) { + throw dae; + } finally { + RepositoryUtils.closeQuietly(statement); + } + } + + @Override + public void updateGroupVerification(String group, Date lastVerified) throws DataAccessException { + PreparedStatement statement = null; + try { + // create a statement + statement = connection.prepareStatement(UPDATE_USER_GROUP_VERIFICATION); + + // set the last verified time accordingly + if (lastVerified == null) { + statement.setNull(1, Types.TIMESTAMP); + } else { + statement.setTimestamp(1, new java.sql.Timestamp(lastVerified.getTime())); + } + + // set the group + statement.setString(2, group); + + // perform the update + statement.executeUpdate(); + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } catch (DataAccessException dae) { + throw dae; + } finally { + RepositoryUtils.closeQuietly(statement); + } + } + + @Override + public void ungroup(String group) throws DataAccessException { + PreparedStatement statement = null; + try { + // create a statement + statement = connection.prepareStatement(UNGROUP_GROUP); + statement.setString(1, group); + + // perform the update + statement.executeUpdate(); + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } catch (DataAccessException dae) { + throw dae; + } finally { + RepositoryUtils.closeQuietly(statement); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/administration/src/main/java/org/apache/nifi/admin/service/AccountDisabledException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/administration/src/main/java/org/apache/nifi/admin/service/AccountDisabledException.java b/nar-bundles/framework-bundle/framework/administration/src/main/java/org/apache/nifi/admin/service/AccountDisabledException.java new file mode 100644 index 0000000..e8b3d10 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/administration/src/main/java/org/apache/nifi/admin/service/AccountDisabledException.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.admin.service; + +/** + * Exception to indicate that the user account is disabled. + */ +public class AccountDisabledException extends RuntimeException { + + public AccountDisabledException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } + + public AccountDisabledException(Throwable cause) { + super(cause); + } + + public AccountDisabledException(String message, Throwable cause) { + super(message, cause); + } + + public AccountDisabledException(String message) { + super(message); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/administration/src/main/java/org/apache/nifi/admin/service/AccountNotFoundException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/administration/src/main/java/org/apache/nifi/admin/service/AccountNotFoundException.java b/nar-bundles/framework-bundle/framework/administration/src/main/java/org/apache/nifi/admin/service/AccountNotFoundException.java new file mode 100644 index 0000000..88287ce --- /dev/null +++ b/nar-bundles/framework-bundle/framework/administration/src/main/java/org/apache/nifi/admin/service/AccountNotFoundException.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.admin.service; + +/** + * Exception to indicate that the user account is disabled. + */ +public class AccountNotFoundException extends RuntimeException { + + public AccountNotFoundException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } + + public AccountNotFoundException(Throwable cause) { + super(cause); + } + + public AccountNotFoundException(String message, Throwable cause) { + super(message, cause); + } + + public AccountNotFoundException(String message) { + super(message); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/administration/src/main/java/org/apache/nifi/admin/service/AccountPendingException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/administration/src/main/java/org/apache/nifi/admin/service/AccountPendingException.java b/nar-bundles/framework-bundle/framework/administration/src/main/java/org/apache/nifi/admin/service/AccountPendingException.java new file mode 100644 index 0000000..dacc483 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/administration/src/main/java/org/apache/nifi/admin/service/AccountPendingException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.admin.service; + +/** + * Exception to indicate that the user has already submitting an account request + * and that request is still pending. + */ +public class AccountPendingException extends RuntimeException { + + public AccountPendingException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } + + public AccountPendingException(Throwable cause) { + super(cause); + } + + public AccountPendingException(String message, Throwable cause) { + super(message, cause); + } + + public AccountPendingException(String message) { + super(message); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/administration/src/main/java/org/apache/nifi/admin/service/AdministrationException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/administration/src/main/java/org/apache/nifi/admin/service/AdministrationException.java b/nar-bundles/framework-bundle/framework/administration/src/main/java/org/apache/nifi/admin/service/AdministrationException.java new file mode 100644 index 0000000..c0e8ac1 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/administration/src/main/java/org/apache/nifi/admin/service/AdministrationException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.admin.service; + +/** + * + */ +public class AdministrationException extends RuntimeException { + + public AdministrationException(Throwable cause) { + super(cause); + } + + public AdministrationException(String message, Throwable cause) { + super(message, cause); + } + + public AdministrationException(String message) { + super(message); + } + + public AdministrationException() { + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/administration/src/main/java/org/apache/nifi/admin/service/AuditService.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/administration/src/main/java/org/apache/nifi/admin/service/AuditService.java b/nar-bundles/framework-bundle/framework/administration/src/main/java/org/apache/nifi/admin/service/AuditService.java new file mode 100644 index 0000000..0843bd8 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/administration/src/main/java/org/apache/nifi/admin/service/AuditService.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.admin.service; + +import java.util.Collection; +import java.util.Date; +import java.util.List; +import java.util.Map; +import org.apache.nifi.action.Action; +import org.apache.nifi.history.HistoryQuery; +import org.apache.nifi.history.History; +import org.apache.nifi.history.PreviousValue; + +/** + * Allows NiFi actions to be audited. + */ +public interface AuditService { + + /** + * Adds the specified actions. + * + * @param actions + * @throws AdministrationException + */ + void addActions(Collection<Action> actions); + + /** + * Finds the previous values for the specified property in the specified + * processor. Returns null if there are none. + * + * @param processorId + * @return + */ + Map<String, List<PreviousValue>> getPreviousValues(String processorId); + + /** + * Get the actions within the given date range. + * + * @param actionQuery + * @return + * @throws AdministrationException + */ + History getActions(HistoryQuery actionQuery); + + /** + * Get the details for the specified action id. If the action cannot be + * found, null is returned. + * + * @param actionId + * @return + */ + Action getAction(Integer actionId); + + /** + * Purges all action's that occurred before the specified end date. + * + * @param end + * @param purgeAction + * @throws AdministrationException + */ + void purgeActions(Date end, Action purgeAction); +}