http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/transport/messages/AuthResponse.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java index 332b024..88c3085 100644 --- a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java +++ b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java @@ -20,6 +20,8 @@ package org.apache.cassandra.transport.messages; import java.nio.ByteBuffer; import io.netty.buffer.ByteBuf; +import org.apache.cassandra.audit.AuditLogEntry; +import org.apache.cassandra.audit.AuditLogEntryType; import org.apache.cassandra.auth.AuthenticatedUser; import org.apache.cassandra.auth.IAuthenticator; import org.apache.cassandra.exceptions.AuthenticationException; @@ -79,6 +81,14 @@ public class AuthResponse extends Message.Request AuthenticatedUser user = negotiator.getAuthenticatedUser(); queryState.getClientState().login(user); AuthMetrics.instance.markSuccess(); + if (auditLogEnabled) + { + AuditLogEntry auditEntry = new AuditLogEntry.Builder(queryState.getClientState()) + .setOperation("LOGIN SUCCESSFUL") + .setType(AuditLogEntryType.LOGIN_SUCCESS) + .build(); + auditLogManager.log(auditEntry); + } // authentication is complete, send a ready message to the client return new AuthSuccess(challenge); } @@ -90,6 +100,14 @@ public class AuthResponse extends Message.Request catch (AuthenticationException e) { AuthMetrics.instance.markFailure(); + if (auditLogEnabled) + { + AuditLogEntry auditEntry = new AuditLogEntry.Builder(queryState.getClientState()) + .setOperation("LOGIN FAILURE") + .setType(AuditLogEntryType.LOGIN_ERROR) + .build(); + auditLogManager.log(auditEntry, e); + } return ErrorMessage.fromException(e); } }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/transport/messages/BatchMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java index dcaa8da..5ffadac 100644 --- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java @@ -19,24 +19,31 @@ package org.apache.cassandra.transport.messages; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.UUID; import com.google.common.collect.ImmutableMap; -import io.netty.buffer.ByteBuf; -import org.apache.cassandra.cql3.*; +import io.netty.buffer.ByteBuf; +import org.apache.cassandra.audit.AuditLogEntry; +import org.apache.cassandra.audit.AuditLogEntryType; +import org.apache.cassandra.cql3.Attributes; +import org.apache.cassandra.cql3.BatchQueryOptions; +import org.apache.cassandra.cql3.QueryHandler; +import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.statements.BatchStatement; import org.apache.cassandra.cql3.statements.ModificationStatement; import org.apache.cassandra.cql3.statements.ParsedStatement; -import org.apache.cassandra.db.fullquerylog.FullQueryLogger; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.PreparedQueryNotFoundException; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.transport.*; +import org.apache.cassandra.transport.CBUtil; +import org.apache.cassandra.transport.Message; +import org.apache.cassandra.transport.ProtocolException; +import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.MD5Digest; import org.apache.cassandra.utils.UUIDGen; @@ -176,30 +183,20 @@ public class BatchMessage extends Message.Request QueryHandler handler = ClientState.getCQLQueryHandler(); List<ParsedStatement.Prepared> prepared = new ArrayList<>(queryOrIdList.size()); - boolean fullQueryLogEnabled = FullQueryLogger.instance.enabled(); - List<String> queryStrings = fullQueryLogEnabled ? new ArrayList<>(queryOrIdList.size()) : Collections.EMPTY_LIST; for (int i = 0; i < queryOrIdList.size(); i++) { Object query = queryOrIdList.get(i); - String queryString; ParsedStatement.Prepared p; if (query instanceof String) { p = QueryProcessor.parseStatement((String)query, state.getClientState().cloneWithKeyspaceIfSet(options.getKeyspace())); - queryString = (String)query; } else { p = handler.getPrepared((MD5Digest)query); if (p == null) throw new PreparedQueryNotFoundException((MD5Digest)query); - queryString = p.rawCQLStatement; - } - - if (fullQueryLogEnabled) - { - queryStrings.add(queryString); } List<ByteBuffer> queryValues = values.get(i); @@ -227,17 +224,16 @@ public class BatchMessage extends Message.Request // Note: It's ok at this point to pass a bogus value for the number of bound terms in the BatchState ctor // (and no value would be really correct, so we prefer passing a clearly wrong one). BatchStatement batch = new BatchStatement(-1, batchType, statements, Attributes.none()); - long fqlTime = 0; - if (fullQueryLogEnabled) - { - fqlTime = System.currentTimeMillis(); - } + + long fqlTime = isLoggingEnabled ? System.currentTimeMillis() : 0; Message.Response response = handler.processBatch(batch, state, batchOptions, getCustomPayload(), queryStartNanoTime); - if (fullQueryLogEnabled) + + if (isLoggingEnabled) { - FullQueryLogger.instance.logBatch(batchType.name(), queryStrings, values, options, fqlTime); + auditLogManager.logBatch(batchType.name(), queryOrIdList, values, prepared, options, state, fqlTime); } + if (tracingId != null) response.setTracingId(tracingId); @@ -245,6 +241,16 @@ public class BatchMessage extends Message.Request } catch (Exception e) { + if (auditLogEnabled) + { + AuditLogEntry entry = new AuditLogEntry.Builder(state.getClientState()) + .setOperation(getAuditString()) + .setOptions(options) + .setType(AuditLogEntryType.BATCH) + .build(); + auditLogManager.log(entry, e); + } + JVMStabilityInspector.inspectThrowable(e); return ErrorMessage.fromException(e); } @@ -267,4 +273,13 @@ public class BatchMessage extends Message.Request sb.append("] at consistency ").append(options.getConsistency()); return sb.toString(); } + + private String getAuditString() + { + StringBuilder sb = new StringBuilder(); + sb.append("BATCH of ["); + sb.append(queryOrIdList.size()); + sb.append("] statements at consistency ").append(options.getConsistency()); + return sb.toString(); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java index e969134..cd7f300 100644 --- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java @@ -20,23 +20,24 @@ package org.apache.cassandra.transport.messages; import java.util.UUID; import com.google.common.collect.ImmutableMap; -import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBuf; +import org.apache.cassandra.audit.AuditLogEntry; +import org.apache.cassandra.audit.AuditLogManager; import org.apache.cassandra.cql3.CQLStatement; import org.apache.cassandra.cql3.ColumnSpecification; import org.apache.cassandra.cql3.QueryHandler; import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.cql3.ResultSet; -import org.apache.cassandra.cql3.statements.BatchStatement; -import org.apache.cassandra.cql3.statements.ModificationStatement; import org.apache.cassandra.cql3.statements.ParsedStatement; -import org.apache.cassandra.cql3.statements.UpdateStatement; -import org.apache.cassandra.db.fullquerylog.FullQueryLogger; import org.apache.cassandra.exceptions.PreparedQueryNotFoundException; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.transport.*; +import org.apache.cassandra.transport.CBUtil; +import org.apache.cassandra.transport.Message; +import org.apache.cassandra.transport.ProtocolException; +import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.MD5Digest; import org.apache.cassandra.utils.UUIDGen; @@ -163,16 +164,21 @@ public class ExecuteMessage extends Message.Request // Some custom QueryHandlers are interested by the bound names. We provide them this information // by wrapping the QueryOptions. QueryOptions queryOptions = QueryOptions.addColumnSpecifications(options, prepared.boundNames); - boolean fqlEnabled = FullQueryLogger.instance.enabled(); - long fqlTime = 0; - if (fqlEnabled) - { - fqlTime = System.currentTimeMillis(); - } + + long fqlTime = isLoggingEnabled ? System.currentTimeMillis() : 0; Message.Response response = handler.processPrepared(statement, state, queryOptions, getCustomPayload(), queryStartNanoTime); - if (fqlEnabled) + + if (isLoggingEnabled) { - FullQueryLogger.instance.logQuery(prepared.rawCQLStatement, options, fqlTime); + AuditLogEntry auditEntry = new AuditLogEntry.Builder(state.getClientState()) + .setType(statement.getAuditLogContext().auditLogEntryType) + .setOperation(prepared.rawCQLStatement) + .setTimestamp(fqlTime) + .setScope(statement) + .setKeyspace(state, statement) + .setOptions(options) + .build(); + AuditLogManager.getInstance().log(auditEntry); } if (response instanceof ResultMessage.Rows) @@ -212,6 +218,33 @@ public class ExecuteMessage extends Message.Request } catch (Exception e) { + if (auditLogEnabled) + { + if (e instanceof PreparedQueryNotFoundException) + { + AuditLogEntry auditLogEntry = new AuditLogEntry.Builder(state.getClientState()) + .setOperation(toString()) + .setOptions(options) + .build(); + auditLogManager.log(auditLogEntry, e); + } + else + { + ParsedStatement.Prepared prepared = ClientState.getCQLQueryHandler().getPrepared(statementId); + if (prepared != null) + { + AuditLogEntry auditLogEntry = new AuditLogEntry.Builder(state.getClientState()) + .setOperation(toString()) + .setType(prepared.statement.getAuditLogContext().auditLogEntryType) + .setScope(prepared.statement) + .setKeyspace(state, prepared.statement) + .setOptions(options) + .build(); + auditLogManager.log(auditLogEntry, e); + } + } + } + JVMStabilityInspector.inspectThrowable(e); return ErrorMessage.fromException(e); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java index b6bf055..e5e5248 100644 --- a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java @@ -20,12 +20,18 @@ package org.apache.cassandra.transport.messages; import java.util.UUID; import com.google.common.collect.ImmutableMap; -import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBuf; +import org.apache.cassandra.audit.AuditLogEntry; +import org.apache.cassandra.audit.AuditLogEntryType; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.statements.ParsedStatement; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.transport.*; +import org.apache.cassandra.transport.CBUtil; +import org.apache.cassandra.transport.Message; +import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.UUIDGen; @@ -111,6 +117,17 @@ public class PrepareMessage extends Message.Request Message.Response response = ClientState.getCQLQueryHandler().prepare(query, state.getClientState().cloneWithKeyspaceIfSet(keyspace), getCustomPayload()); + if (auditLogEnabled) + { + ParsedStatement.Prepared parsedStmt = QueryProcessor.parseStatement(query, state.getClientState()); + AuditLogEntry auditLogEntry = new AuditLogEntry.Builder(state.getClientState()) + .setOperation(query) + .setType(AuditLogEntryType.PREPARE_STATEMENT) + .setScope(parsedStmt.statement) + .setKeyspace(parsedStmt.statement) + .build(); + auditLogManager.log(auditLogEntry); + } if (tracingId != null) response.setTracingId(tracingId); @@ -119,6 +136,15 @@ public class PrepareMessage extends Message.Request } catch (Exception e) { + if (auditLogEnabled) + { + AuditLogEntry auditLogEntry = new AuditLogEntry.Builder(state.getClientState()) + .setOperation(query) + .setKeyspace(keyspace) + .setType(AuditLogEntryType.PREPARE_STATEMENT) + .build(); + auditLogManager.log(auditLogEntry, e); + } JVMStabilityInspector.inspectThrowable(e); return ErrorMessage.fromException(e); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/transport/messages/QueryMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java index 8f64033..9df9205 100644 --- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java @@ -22,8 +22,11 @@ import java.util.UUID; import com.google.common.collect.ImmutableMap; import io.netty.buffer.ByteBuf; +import org.apache.cassandra.audit.AuditLogEntry; +import org.apache.cassandra.audit.AuditLogManager; import org.apache.cassandra.cql3.QueryOptions; -import org.apache.cassandra.db.fullquerylog.FullQueryLogger; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.statements.ParsedStatement; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.exceptions.RequestValidationException; import org.apache.cassandra.service.ClientState; @@ -114,17 +117,24 @@ public class QueryMessage extends Message.Request Tracing.instance.begin("Execute CQL3 query", state.getClientAddress(), builder.build()); } - boolean fqlEnabled = FullQueryLogger.instance.enabled(); - long fqlTime = 0; - if (fqlEnabled) - { - fqlTime = System.currentTimeMillis(); - } + long fqlTime = isLoggingEnabled ? System.currentTimeMillis() : 0; Message.Response response = ClientState.getCQLQueryHandler().process(query, state, options, getCustomPayload(), queryStartNanoTime); - if (fqlEnabled) + + if (isLoggingEnabled) { - FullQueryLogger.instance.logQuery(query, options, fqlTime); + ParsedStatement.Prepared parsedStatement = QueryProcessor.parseStatement(query, state.getClientState()); + AuditLogEntry auditEntry = new AuditLogEntry.Builder(state.getClientState()) + .setType(parsedStatement.statement.getAuditLogContext().auditLogEntryType) + .setOperation(query) + .setTimestamp(fqlTime) + .setScope(parsedStatement.statement) + .setKeyspace(state, parsedStatement.statement) + .setOptions(options) + .build(); + AuditLogManager.getInstance().log(auditEntry); + } + if (options.skipMetadata() && response instanceof ResultMessage.Rows) ((ResultMessage.Rows)response).result.metadata.setSkipMetadata(); @@ -135,6 +145,14 @@ public class QueryMessage extends Message.Request } catch (Exception e) { + if (auditLogEnabled) + { + AuditLogEntry auditLogEntry = new AuditLogEntry.Builder(state.getClientState()) + .setOperation(query) + .setOptions(options) + .build(); + auditLogManager.log(auditLogEntry, e); + } JVMStabilityInspector.inspectThrowable(e); if (!((e instanceof RequestValidationException) || (e instanceof RequestExecutionException))) logger.error("Unexpected error during query", e); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/utils/FBUtilities.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java index 078b414..b621071 100644 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@ -40,6 +40,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.auth.AllowAllNetworkAuthorizer; +import org.apache.cassandra.audit.IAuditLogger; import org.apache.cassandra.auth.IAuthenticator; import org.apache.cassandra.auth.IAuthorizer; import org.apache.cassandra.auth.INetworkAuthorizer; @@ -526,6 +527,29 @@ public class FBUtilities } return FBUtilities.construct(className, "network authorizer"); } + + public static IAuditLogger newAuditLogger(String className) throws ConfigurationException + { + if (!className.contains(".")) + className = "org.apache.cassandra.audit." + className; + return FBUtilities.construct(className, "Audit logger"); + } + + public static boolean isAuditLoggerClassExists(String className) + { + if (!className.contains(".")) + className = "org.apache.cassandra.audit." + className; + + try + { + FBUtilities.classForName(className, "Audit logger"); + } + catch (ConfigurationException e) + { + return false; + } + return true; + } /** * @return The Class for the given name. http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/utils/binlog/BinLog.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/binlog/BinLog.java b/src/java/org/apache/cassandra/utils/binlog/BinLog.java index 070a151..0c8659e 100644 --- a/src/java/org/apache/cassandra/utils/binlog/BinLog.java +++ b/src/java/org/apache/cassandra/utils/binlog/BinLog.java @@ -272,6 +272,6 @@ public class BinLog implements Runnable, StoreFileListener public abstract static class ReleaseableWriteMarshallable implements WriteMarshallable { - protected abstract void release(); + public abstract void release(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/test/unit/org/apache/cassandra/audit/AuditLogFilterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/audit/AuditLogFilterTest.java b/test/unit/org/apache/cassandra/audit/AuditLogFilterTest.java new file mode 100644 index 0000000..8054f90 --- /dev/null +++ b/test/unit/org/apache/cassandra/audit/AuditLogFilterTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.audit; + +import java.util.HashSet; +import java.util.Set; + +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.cassandra.audit.AuditLogFilter.isFiltered; + +public class AuditLogFilterTest +{ + @Test + public void isFiltered_IncludeSetOnly() + { + Set<String> includeSet = new HashSet<>(); + includeSet.add("a"); + includeSet.add("b"); + includeSet.add("c"); + + Set<String> excludeSet = new HashSet<>(); + + Assert.assertFalse(isFiltered("a", includeSet, excludeSet)); + Assert.assertFalse(isFiltered("b", includeSet, excludeSet)); + Assert.assertFalse(isFiltered("c", includeSet, excludeSet)); + Assert.assertTrue(isFiltered("d", includeSet, excludeSet)); + Assert.assertTrue(isFiltered("e", includeSet, excludeSet)); + } + + @Test + public void isFiltered_ExcludeSetOnly() + { + Set<String> includeSet = new HashSet<>(); + + Set<String> excludeSet = new HashSet<>(); + excludeSet.add("a"); + excludeSet.add("b"); + excludeSet.add("c"); + + Assert.assertTrue(isFiltered("a", includeSet, excludeSet)); + Assert.assertTrue(isFiltered("b", includeSet, excludeSet)); + Assert.assertTrue(isFiltered("c", includeSet, excludeSet)); + Assert.assertFalse(isFiltered("d", includeSet, excludeSet)); + Assert.assertFalse(isFiltered("e", includeSet, excludeSet)); + } + + @Test + public void isFiltered_MutualExclusive() + { + Set<String> includeSet = new HashSet<>(); + includeSet.add("a"); + includeSet.add("b"); + includeSet.add("c"); + + Set<String> excludeSet = new HashSet<>(); + excludeSet.add("a"); + + Assert.assertTrue(isFiltered("a", includeSet, excludeSet)); + Assert.assertFalse(isFiltered("b", includeSet, excludeSet)); + Assert.assertFalse(isFiltered("c", includeSet, excludeSet)); + Assert.assertTrue(isFiltered("e", includeSet, excludeSet)); + } + + @Test + public void isFiltered_MutualInclusive() + { + Set<String> includeSet = new HashSet<>(); + includeSet.add("a"); + includeSet.add("b"); + + Set<String> excludeSet = new HashSet<>(); + excludeSet.add("c"); + excludeSet.add("d"); + + Assert.assertFalse(isFiltered("a", includeSet, excludeSet)); + Assert.assertFalse(isFiltered("b", includeSet, excludeSet)); + Assert.assertTrue(isFiltered("c", includeSet, excludeSet)); + Assert.assertTrue(isFiltered("d", includeSet, excludeSet)); + Assert.assertTrue(isFiltered("e", includeSet, excludeSet)); + Assert.assertTrue(isFiltered("f", includeSet, excludeSet)); + } + + @Test + public void isFiltered_UnSpecifiedInput() + { + Set<String> includeSet = new HashSet<>(); + includeSet.add("a"); + includeSet.add("b"); + includeSet.add("c"); + + Set<String> excludeSet = new HashSet<>(); + excludeSet.add("a"); + + Assert.assertTrue(isFiltered("a", includeSet, excludeSet)); + Assert.assertFalse(isFiltered("b", includeSet, excludeSet)); + Assert.assertFalse(isFiltered("c", includeSet, excludeSet)); + Assert.assertTrue(isFiltered("d", includeSet, excludeSet)); + Assert.assertTrue(isFiltered("e", includeSet, excludeSet)); + } + + @Test + public void isFiltered_SpecifiedInput() + { + Set<String> includeSet = new HashSet<>(); + includeSet.add("a"); + includeSet.add("b"); + includeSet.add("c"); + + Set<String> excludeSet = new HashSet<>(); + excludeSet.add("a"); + + Assert.assertTrue(isFiltered("a", includeSet, excludeSet)); + Assert.assertFalse(isFiltered("b", includeSet, excludeSet)); + Assert.assertFalse(isFiltered("c", includeSet, excludeSet)); + } + + @Test + public void isFiltered_FilteredInput_EmptyInclude() + { + Set<String> includeSet = new HashSet<>(); + Set<String> excludeSet = new HashSet<>(); + excludeSet.add("a"); + + Assert.assertTrue(isFiltered("a", includeSet, excludeSet)); + Assert.assertFalse(isFiltered("b", includeSet, excludeSet)); + } + + @Test + public void isFiltered_FilteredInput_EmptyExclude() + { + Set<String> includeSet = new HashSet<>(); + includeSet.add("a"); + includeSet.add("b"); + includeSet.add("c"); + + Set<String> excludeSet = new HashSet<>(); + + Assert.assertFalse(isFiltered("a", includeSet, excludeSet)); + Assert.assertFalse(isFiltered("b", includeSet, excludeSet)); + Assert.assertFalse(isFiltered("c", includeSet, excludeSet)); + Assert.assertTrue(isFiltered("e", includeSet, excludeSet)); + } + + @Test + public void isFiltered_EmptyInputs() + { + Set<String> includeSet = new HashSet<>(); + Set<String> excludeSet = new HashSet<>(); + + Assert.assertFalse(isFiltered("a", includeSet, excludeSet)); + Assert.assertFalse(isFiltered("e", includeSet, excludeSet)); + } + + @Test + public void isFiltered_NullInputs() + { + Set<String> includeSet = new HashSet<>(); + Set<String> excludeSet = new HashSet<>(); + Assert.assertFalse(isFiltered(null, includeSet, excludeSet)); + + includeSet.add("a"); + includeSet.add("b"); + includeSet.add("c"); + Assert.assertTrue(isFiltered(null, includeSet, excludeSet)); + + includeSet = new HashSet<>(); + excludeSet.add("a"); + excludeSet.add("b"); + Assert.assertFalse(isFiltered(null, includeSet, excludeSet)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/test/unit/org/apache/cassandra/audit/AuditLoggerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/audit/AuditLoggerTest.java b/test/unit/org/apache/cassandra/audit/AuditLoggerTest.java new file mode 100644 index 0000000..40eadf8 --- /dev/null +++ b/test/unit/org/apache/cassandra/audit/AuditLoggerTest.java @@ -0,0 +1,690 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.audit; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.exceptions.SyntaxError; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.service.StorageService; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; + +public class AuditLoggerTest extends CQLTester +{ + @BeforeClass + public static void setUp() + { + AuditLogOptions options = new AuditLogOptions(); + options.enabled = true; + options.logger = "InMemoryAuditLogger"; + DatabaseDescriptor.setAuditLoggingOptions(options); + requireNetwork(); + } + + @Before + public void beforeTestMethod() + { + AuditLogOptions options = new AuditLogOptions(); + enableAuditLogOptions(options); + } + + private void enableAuditLogOptions(AuditLogOptions options) + { + String loggerName = "InMemoryAuditLogger"; + String includedKeyspaces = options.included_keyspaces; + String excludedKeyspaces = options.excluded_keyspaces; + String includedCategories = options.included_categories; + String excludedCategories = options.excluded_categories; + String includedUsers = options.included_users; + String excludedUsers = options.excluded_users; + + StorageService.instance.enableAuditLog(loggerName, includedKeyspaces, excludedKeyspaces, includedCategories, excludedCategories, includedUsers, excludedUsers); + } + + private void disableAuditLogOptions() + { + StorageService.instance.disableAuditLog(); + } + + @Test + public void testAuditLogFilters() throws Throwable + { + createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)"); + execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra"); + execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test"); + + AuditLogOptions options = new AuditLogOptions(); + options.excluded_keyspaces = KEYSPACE; + enableAuditLogOptions(options); + + String cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?"; + ResultSet rs = executeAndAssertNoAuditLog(cql, 1); + assertEquals(1, rs.all().size()); + + options = new AuditLogOptions(); + options.included_keyspaces = KEYSPACE; + enableAuditLogOptions(options); + + cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?"; + rs = executeAndAssertWithPrepare(cql, AuditLogEntryType.SELECT, 1); + assertEquals(1, rs.all().size()); + + options = new AuditLogOptions(); + options.included_keyspaces = KEYSPACE; + options.excluded_keyspaces = KEYSPACE; + enableAuditLogOptions(options); + + cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?"; + rs = executeAndAssertNoAuditLog(cql, 1); + assertEquals(1, rs.all().size()); + + options = new AuditLogOptions(); + enableAuditLogOptions(options); + + cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?"; + rs = executeAndAssertWithPrepare(cql, AuditLogEntryType.SELECT, 1); + assertEquals(1, rs.all().size()); + } + + @Test + public void testAuditLogFiltersTransitions() throws Throwable + { + createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)"); + execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra"); + execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test"); + + AuditLogOptions options = new AuditLogOptions(); + options.excluded_keyspaces = KEYSPACE; + enableAuditLogOptions(options); + + String cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?"; + ResultSet rs = executeAndAssertNoAuditLog(cql, 1); + assertEquals(1, rs.all().size()); + + disableAuditLogOptions(); + + cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?"; + rs = executeAndAssertDisableAuditLog(cql, 1); + assertEquals(1, rs.all().size()); + + options = new AuditLogOptions(); + options.included_keyspaces = KEYSPACE; + options.excluded_keyspaces = KEYSPACE; + enableAuditLogOptions(options); + + cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?"; + rs = executeAndAssertNoAuditLog(cql, 1); + assertEquals(1, rs.all().size()); + + disableAuditLogOptions(); + + cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?"; + rs = executeAndAssertDisableAuditLog(cql, 1); + assertEquals(1, rs.all().size()); + } + + @Test + public void testAuditLogExceptions() + { + AuditLogOptions options = new AuditLogOptions(); + options.excluded_keyspaces = KEYSPACE; + enableAuditLogOptions(options); + Assert.assertTrue(AuditLogManager.getInstance().isAuditingEnabled()); + + disableAuditLogOptions(); + } + + @Test + public void testAuditLogFilterIncludeExclude() throws Throwable + { + createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)"); + String tbl1 = currentTable(); + execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra"); + execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test"); + + AuditLogOptions options = new AuditLogOptions(); + options.excluded_categories = "QUERY"; + options.included_categories = "QUERY,DML,PREPARE"; + enableAuditLogOptions(options); + + //QUERY - Should be filtered, part of excluded categories, + String cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = 1"; + Session session = sessionNet(); + ResultSet rs = session.execute(cql); + + assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size()); + assertEquals(1, rs.all().size()); + + //DML - Should not be filtered, part of included categories + cql = "INSERT INTO " + KEYSPACE + '.' + currentTable() + " (id, v1, v2) VALUES (?, ?, ?)"; + executeAndAssertWithPrepare(cql, AuditLogEntryType.UPDATE, 1, "insert_audit", "test"); + + //DDL - Should be filtered, not part of included categories + cql = "ALTER TABLE " + KEYSPACE + '.' + currentTable() + " ADD v3 text"; + session = sessionNet(); + rs = session.execute(cql); + assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size()); + } + + @Test + public void testCqlSelectAuditing() throws Throwable + { + createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)"); + execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra"); + execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test"); + + String cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?"; + ResultSet rs = executeAndAssertWithPrepare(cql, AuditLogEntryType.SELECT, 1); + + assertEquals(1, rs.all().size()); + } + + @Test + public void testCqlInsertAuditing() throws Throwable + { + createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)"); + + String cql = "INSERT INTO " + KEYSPACE + '.' + currentTable() + " (id, v1, v2) VALUES (?, ?, ?)"; + executeAndAssertWithPrepare(cql, AuditLogEntryType.UPDATE, 1, "insert_audit", "test"); + } + + @Test + public void testCqlUpdateAuditing() throws Throwable + { + createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)"); + execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra"); + execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test"); + + String cql = "UPDATE " + KEYSPACE + '.' + currentTable() + " SET v1 = 'ApacheCassandra' WHERE id = 1"; + executeAndAssert(cql, AuditLogEntryType.UPDATE); + + cql = "UPDATE " + KEYSPACE + '.' + currentTable() + " SET v1 = ? WHERE id = ?"; + executeAndAssertWithPrepare(cql, AuditLogEntryType.UPDATE, "AuditingTest", 2); + } + + @Test + public void testCqlDeleteAuditing() throws Throwable + { + createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)"); + execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra"); + execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test"); + + String cql = "DELETE FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?"; + executeAndAssertWithPrepare(cql, AuditLogEntryType.DELETE, 1); + } + + @Test + public void testCqlTruncateAuditing() throws Throwable + { + createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)"); + execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra"); + execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test"); + + String cql = "TRUNCATE TABLE " + KEYSPACE + '.' + currentTable(); + executeAndAssertWithPrepare(cql, AuditLogEntryType.TRUNCATE); + } + + @Test + public void testCqlBatchAuditing() throws Throwable + { + createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)"); + + Session session = sessionNet(); + + BatchStatement batchStatement = new BatchStatement(); + + String cqlInsert = "INSERT INTO " + KEYSPACE + "." + currentTable() + " (id, v1, v2) VALUES (?, ?, ?)"; + PreparedStatement prep = session.prepare(cqlInsert); + AuditLogEntry logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll(); + assertLogEntry(cqlInsert, AuditLogEntryType.PREPARE_STATEMENT, logEntry, false); + + batchStatement.add(prep.bind(1, "Apapche", "Cassandra")); + batchStatement.add(prep.bind(2, "Apapche1", "Cassandra1")); + + String cqlUpdate = "UPDATE " + KEYSPACE + "." + currentTable() + " SET v1 = ? WHERE id = ?"; + prep = session.prepare(cqlUpdate); + logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll(); + assertLogEntry(cqlUpdate, AuditLogEntryType.PREPARE_STATEMENT, logEntry, false); + + batchStatement.add(prep.bind("Apache Cassandra", 1)); + + String cqlDelete = "DELETE FROM " + KEYSPACE + "." + currentTable() + " WHERE id = ?"; + prep = session.prepare(cqlDelete); + logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll(); + assertLogEntry(cqlDelete, AuditLogEntryType.PREPARE_STATEMENT, logEntry, false); + + batchStatement.add(prep.bind(1)); + + ResultSet rs = session.execute(batchStatement); + + assertEquals(5, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size()); + logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll(); + + logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll(); + assertLogEntry(cqlInsert, AuditLogEntryType.UPDATE, logEntry, false); + + logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll(); + assertLogEntry(cqlInsert, AuditLogEntryType.UPDATE, logEntry, false); + + logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll(); + assertLogEntry(cqlUpdate, AuditLogEntryType.UPDATE, logEntry, false); + + logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll(); + assertLogEntry(cqlDelete, AuditLogEntryType.DELETE, logEntry, false); + + int size = rs.all().size(); + + assertEquals(0, size); + } + + @Test + public void testCqlBatch_MultipleTablesAuditing() + { + createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)"); + String table1 = currentTable(); + + Session session = sessionNet(); + + BatchStatement batchStatement = new BatchStatement(); + + String cqlInsert1 = "INSERT INTO " + KEYSPACE + "." + table1 + " (id, v1, v2) VALUES (?, ?, ?)"; + PreparedStatement prep = session.prepare(cqlInsert1); + AuditLogEntry logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll(); + assertLogEntry(cqlInsert1, AuditLogEntryType.PREPARE_STATEMENT, logEntry, false); + + batchStatement.add(prep.bind(1, "Apapche", "Cassandra")); + + createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)"); + String table2 = currentTable(); + + String cqlInsert2 = "INSERT INTO " + KEYSPACE + "." + table2 + " (id, v1, v2) VALUES (?, ?, ?)"; + prep = session.prepare(cqlInsert2); + logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll(); + assertLogEntry(cqlInsert2, AuditLogEntryType.PREPARE_STATEMENT, logEntry, false); + + batchStatement.add(prep.bind(1, "Apapche", "Cassandra")); + + createKeyspace("CREATE KEYSPACE %s WITH replication={ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"); + String ks2 = currentKeyspace(); + + createTable(ks2, "CREATE TABLE %s (id int primary key, v1 text, v2 text)"); + String table3 = currentTable(); + + String cqlInsert3 = "INSERT INTO " + ks2 + "." + table3 + " (id, v1, v2) VALUES (?, ?, ?)"; + prep = session.prepare(cqlInsert3); + logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll(); + assertLogEntry(cqlInsert3, AuditLogEntryType.PREPARE_STATEMENT, logEntry, false, ks2); + + batchStatement.add(prep.bind(1, "Apapche", "Cassandra")); + + ResultSet rs = session.execute(batchStatement); + + assertEquals(4, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size()); + logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll(); + + logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll(); + assertLogEntry(cqlInsert1, table1, AuditLogEntryType.UPDATE, logEntry, false, KEYSPACE); + + logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll(); + assertLogEntry(cqlInsert2, table2, AuditLogEntryType.UPDATE, logEntry, false, KEYSPACE); + + logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll(); + assertLogEntry(cqlInsert3, table3, AuditLogEntryType.UPDATE, logEntry, false, ks2); + + int size = rs.all().size(); + + assertEquals(0, size); + } + + @Test + public void testCqlKeyspaceAuditing() throws Throwable + { + createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)"); + + String cql = "CREATE KEYSPACE " + createKeyspaceName() + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 2} "; + executeAndAssert(cql, AuditLogEntryType.CREATE_KEYSPACE, true, currentKeyspace()); + + cql = "CREATE KEYSPACE IF NOT EXISTS " + createKeyspaceName() + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 2} "; + executeAndAssert(cql, AuditLogEntryType.CREATE_KEYSPACE, true, currentKeyspace()); + + cql = "ALTER KEYSPACE " + currentKeyspace() + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 2} "; + executeAndAssert(cql, AuditLogEntryType.ALTER_KEYSPACE, true, currentKeyspace()); + + cql = "DROP KEYSPACE " + currentKeyspace(); + executeAndAssert(cql, AuditLogEntryType.DROP_KEYSPACE, true, currentKeyspace()); + } + + @Test + public void testCqlTableAuditing() throws Throwable + { + String cql = "CREATE TABLE " + KEYSPACE + "." + createTableName() + " (id int primary key, v1 text, v2 text)"; + executeAndAssert(cql, AuditLogEntryType.CREATE_TABLE); + + cql = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + "." + createTableName() + " (id int primary key, v1 text, v2 text)"; + executeAndAssert(cql, AuditLogEntryType.CREATE_TABLE); + + cql = "ALTER TABLE " + KEYSPACE + "." + currentTable() + " ADD v3 text"; + executeAndAssert(cql, AuditLogEntryType.ALTER_TABLE); + + cql = "DROP TABLE " + KEYSPACE + "." + currentTable(); + executeAndAssert(cql, AuditLogEntryType.DROP_TABLE); + } + + @Test + public void testCqlMVAuditing() throws Throwable + { + createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)"); + execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra"); + execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test"); + + String tblName = currentTable(); + String cql = "CREATE MATERIALIZED VIEW " + KEYSPACE + "." + createTableName() + " AS SELECT id,v1 FROM " + KEYSPACE + "." + tblName + " WHERE id IS NOT NULL AND v1 IS NOT NULL PRIMARY KEY ( id, v1 ) "; + executeAndAssert(cql, AuditLogEntryType.CREATE_VIEW); + + cql = "CREATE MATERIALIZED VIEW IF NOT EXISTS " + KEYSPACE + "." + currentTable() + " AS SELECT id,v1 FROM " + KEYSPACE + "." + tblName + " WHERE id IS NOT NULL AND v1 IS NOT NULL PRIMARY KEY ( id, v1 ) "; + executeAndAssert(cql, AuditLogEntryType.CREATE_VIEW); + + cql = "ALTER MATERIALIZED VIEW " + KEYSPACE + "." + currentTable() + " WITH caching = { 'keys' : 'NONE' };"; + executeAndAssert(cql, AuditLogEntryType.ALTER_VIEW); + + cql = "DROP MATERIALIZED VIEW " + KEYSPACE + "." + currentTable(); + executeAndAssert(cql, AuditLogEntryType.DROP_VIEW); + } + + @Test + public void testCqlTypeAuditing() throws Throwable + { + createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)"); + + String tblName = createTableName(); + + String cql = "CREATE TYPE " + KEYSPACE + "." + tblName + " (id int, v1 text, v2 text)"; + executeAndAssert(cql, AuditLogEntryType.CREATE_TYPE); + + cql = "CREATE TYPE IF NOT EXISTS " + KEYSPACE + "." + tblName + " (id int, v1 text, v2 text)"; + executeAndAssert(cql, AuditLogEntryType.CREATE_TYPE); + + cql = "ALTER TYPE " + KEYSPACE + "." + tblName + " ADD v3 int"; + executeAndAssert(cql, AuditLogEntryType.ALTER_TYPE); + + cql = "ALTER TYPE " + KEYSPACE + "." + tblName + " RENAME v3 TO v4"; + executeAndAssert(cql, AuditLogEntryType.ALTER_TYPE); + + cql = "DROP TYPE " + KEYSPACE + "." + tblName; + executeAndAssert(cql, AuditLogEntryType.DROP_TYPE); + + cql = "DROP TYPE IF EXISTS " + KEYSPACE + "." + tblName; + executeAndAssert(cql, AuditLogEntryType.DROP_TYPE); + } + + @Test + public void testCqlIndexAuditing() throws Throwable + { + createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)"); + + String tblName = currentTable(); + + String indexName = createTableName(); + + String cql = "CREATE INDEX " + indexName + " ON " + KEYSPACE + "." + tblName + " (v1)"; + executeAndAssert(cql, AuditLogEntryType.CREATE_INDEX); + + cql = "DROP INDEX " + KEYSPACE + "." + indexName; + executeAndAssert(cql, AuditLogEntryType.DROP_INDEX); + } + + @Test + public void testCqlFunctionAuditing() throws Throwable + { + String tblName = createTableName(); + + String cql = "CREATE FUNCTION IF NOT EXISTS " + KEYSPACE + "." + tblName + " (column TEXT,num int) RETURNS NULL ON NULL INPUT RETURNS text LANGUAGE javascript AS $$ column.substring(0,num) $$"; + executeAndAssert(cql, AuditLogEntryType.CREATE_FUNCTION); + + cql = "DROP FUNCTION " + KEYSPACE + "." + tblName; + executeAndAssert(cql, AuditLogEntryType.DROP_FUNCTION); + } + + @Test + public void testCqlTriggerAuditing() throws Throwable + { + createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)"); + + String tblName = currentTable(); + String triggerName = createTableName(); + + String cql = "DROP TRIGGER IF EXISTS " + triggerName + " ON " + KEYSPACE + "." + tblName; + executeAndAssert(cql, AuditLogEntryType.DROP_TRIGGER); + } + + @Test + public void testCqlAggregateAuditing() throws Throwable + { + String aggName = createTableName(); + String cql = "DROP AGGREGATE IF EXISTS " + KEYSPACE + "." + aggName; + executeAndAssert(cql, AuditLogEntryType.DROP_AGGREGATE); + } + + @Test + public void testCqlQuerySyntaxError() + { + String cql = "INSERT INTO " + KEYSPACE + '.' + currentTable() + "1 (id, v1, v2) VALUES (1, 'insert_audit, 'test')"; + try + { + createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)"); + Session session = sessionNet(); + ResultSet rs = session.execute(cql); + Assert.fail("should not succeed"); + } + catch (SyntaxError e) + { + // nop + } + + AuditLogEntry logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll(); + assertLogEntry(logEntry, cql); + assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size()); + } + + @Test + public void testCqlSelectQuerySyntaxError() + { + createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)"); + String cql = "SELECT * FROM " + KEYSPACE + '.' + currentTable() + " LIMIT 2w"; + + try + { + Session session = sessionNet(); + ResultSet rs = session.execute(cql); + Assert.fail("should not succeed"); + } + catch (SyntaxError e) + { + // nop + } + + AuditLogEntry logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll(); + assertLogEntry(logEntry, cql); + assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size()); + } + + @Test + public void testCqlPrepareQueryError() + { + createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)"); + String cql = "INSERT INTO " + KEYSPACE + '.' + currentTable() + " (id, v1, v2) VALUES (?,?,?)"; + try + { + Session session = sessionNet(); + + PreparedStatement pstmt = session.prepare(cql); + AuditLogEntry logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll(); + assertLogEntry(cql, AuditLogEntryType.PREPARE_STATEMENT, logEntry, false); + + dropTable("DROP TABLE %s"); + ResultSet rs = session.execute(pstmt.bind(1, "insert_audit", "test")); + Assert.fail("should not succeed"); + } + catch (NoHostAvailableException e) + { + // nop + } + + AuditLogEntry logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll(); + assertLogEntry(logEntry, null); + logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll(); + assertLogEntry(logEntry, cql); + assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size()); + } + + @Test + public void testCqlPrepareQuerySyntaxError() + { + String cql = "INSERT INTO " + KEYSPACE + '.' + "foo" + "(id, v1, v2) VALES (?,?,?)"; + try + { + createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)"); + Session session = sessionNet(); + PreparedStatement pstmt = session.prepare(cql); + ResultSet rs = session.execute(pstmt.bind(1, "insert_audit", "test")); + Assert.fail("should not succeed"); + } + catch (SyntaxError e) + { + // nop + } + AuditLogEntry logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll(); + assertLogEntry(logEntry, cql); + assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size()); + } + + /** + * Helper methods for Audit Log CQL Testing + */ + + private ResultSet executeAndAssert(String cql, AuditLogEntryType type) throws Throwable + { + return executeAndAssert(cql, type, false, KEYSPACE); + } + + private ResultSet executeAndAssert(String cql, AuditLogEntryType type, boolean isTableNull, String keyspace) throws Throwable + { + Session session = sessionNet(); + + ResultSet rs = session.execute(cql); + + AuditLogEntry logEntry1 = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll(); + assertLogEntry(cql, type, logEntry1, isTableNull, keyspace); + + assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size()); + return rs; + } + + private ResultSet executeAndAssertWithPrepare(String cql, AuditLogEntryType exceuteType, Object... bindValues) throws Throwable + { + return executeAndAssertWithPrepare(cql, exceuteType, false, bindValues); + } + + private ResultSet executeAndAssertWithPrepare(String cql, AuditLogEntryType executeType, boolean isTableNull, Object... bindValues) throws Throwable + { + Session session = sessionNet(); + + PreparedStatement pstmt = session.prepare(cql); + ResultSet rs = session.execute(pstmt.bind(bindValues)); + + AuditLogEntry logEntry1 = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll(); + assertLogEntry(cql, AuditLogEntryType.PREPARE_STATEMENT, logEntry1, isTableNull); + + AuditLogEntry logEntry2 = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll(); + assertLogEntry(cql, executeType, logEntry2, isTableNull); + + assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size()); + return rs; + } + + private ResultSet executeAndAssertNoAuditLog(String cql, Object... bindValues) + { + Session session = sessionNet(); + + PreparedStatement pstmt = session.prepare(cql); + ResultSet rs = session.execute(pstmt.bind(bindValues)); + + assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size()); + return rs; + } + + private ResultSet executeAndAssertDisableAuditLog(String cql, Object... bindValues) + { + Session session = sessionNet(); + + PreparedStatement pstmt = session.prepare(cql); + ResultSet rs = session.execute(pstmt.bind(bindValues)); + + assertThat(AuditLogManager.getInstance().getLogger(),instanceOf(NoOpAuditLogger.class)); + return rs; + } + + private void assertLogEntry(String cql, AuditLogEntryType type, AuditLogEntry actual, boolean isTableNull) + { + assertLogEntry(cql, type, actual, isTableNull, KEYSPACE); + } + + private void assertLogEntry(String cql, AuditLogEntryType type, AuditLogEntry actual, boolean isTableNull, String keyspace) + { + assertLogEntry(cql, currentTable(), type, actual, isTableNull, keyspace); + } + + private void assertLogEntry(String cql, String table, AuditLogEntryType type, AuditLogEntry actual, boolean isTableNull, String keyspace) + { + assertEquals(keyspace, actual.getKeyspace()); + if (!isTableNull) + { + assertEquals(table, actual.getScope()); + } + assertEquals(type, actual.getType()); + assertEquals(cql, actual.getOperation()); + assertNotEquals(0,actual.getTimestamp()); + } + + private void assertLogEntry(AuditLogEntry logEntry, String cql) + { + assertNull(logEntry.getKeyspace()); + assertNull(logEntry.getScope()); + assertNotEquals(0,logEntry.getTimestamp()); + assertEquals(AuditLogEntryType.REQUEST_FAILURE, logEntry.getType()); + if (null != cql && !cql.isEmpty()) + { + assertThat(logEntry.getOperation(), containsString(cql)); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/test/unit/org/apache/cassandra/audit/BinAuditLoggerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/audit/BinAuditLoggerTest.java b/test/unit/org/apache/cassandra/audit/BinAuditLoggerTest.java new file mode 100644 index 0000000..f9d2930 --- /dev/null +++ b/test/unit/org/apache/cassandra/audit/BinAuditLoggerTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.audit; + +import java.nio.file.Path; + +import org.junit.BeforeClass; +import org.junit.Test; + +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import net.openhft.chronicle.queue.ChronicleQueue; +import net.openhft.chronicle.queue.ChronicleQueueBuilder; +import net.openhft.chronicle.queue.ExcerptTailer; +import net.openhft.chronicle.queue.RollCycles; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.utils.binlog.BinLogTest; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +public class BinAuditLoggerTest extends CQLTester +{ + private static Path tempDir; + + @BeforeClass + public static void setUp() throws Exception + { + tempDir = BinLogTest.tempDir(); + + AuditLogOptions options = new AuditLogOptions(); + options.enabled = true; + options.logger = "BinAuditLogger"; + options.roll_cycle = "TEST_SECONDLY"; + options.audit_logs_dir = tempDir.toString(); + DatabaseDescriptor.setAuditLoggingOptions(options); + requireNetwork(); + } + + @Test + public void testSelectRoundTripQuery() throws Throwable + { + createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)"); + execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra"); + execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test"); + + String cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?"; + + Session session = sessionNet(); + + PreparedStatement pstmt = session.prepare(cql); + ResultSet rs = session.execute(pstmt.bind(1)); + + assertEquals(1, rs.all().size()); + try (ChronicleQueue queue = ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build()) + { + ExcerptTailer tailer = queue.createTailer(); + assertTrue(tailer.readDocument(wire -> { + assertEquals("AuditLog", wire.read("type").text()); + assertThat(wire.read("message").text(), containsString(AuditLogEntryType.PREPARE_STATEMENT.toString())); + })); + + assertTrue(tailer.readDocument(wire -> { + assertEquals("AuditLog", wire.read("type").text()); + assertThat(wire.read("message").text(), containsString(AuditLogEntryType.SELECT.toString())); + })); + assertFalse(tailer.readDocument(wire -> { + })); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java b/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java new file mode 100644 index 0000000..14f7f23 --- /dev/null +++ b/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java @@ -0,0 +1,610 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.audit; + + +import java.io.File; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import io.netty.buffer.Unpooled; +import net.openhft.chronicle.queue.ChronicleQueue; +import net.openhft.chronicle.queue.ChronicleQueueBuilder; +import net.openhft.chronicle.queue.ExcerptTailer; +import net.openhft.chronicle.queue.RollCycles; +import net.openhft.chronicle.wire.ValueIn; +import net.openhft.chronicle.wire.WireOut; +import org.apache.cassandra.Util; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.audit.FullQueryLogger.WeighableMarshallableQuery; +import org.apache.cassandra.audit.FullQueryLogger.WeighableMarshallableBatch; +import org.apache.cassandra.transport.ProtocolVersion; +import org.apache.cassandra.utils.ObjectSizes; +import org.apache.cassandra.utils.binlog.BinLogTest; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class FullQueryLoggerTest extends CQLTester +{ + private static Path tempDir; + + @BeforeClass + public static void beforeClass() throws Exception + { + tempDir = BinLogTest.tempDir(); + } + + private FullQueryLogger instance; + + @Before + public void setUp() + { + instance = AuditLogManager.getInstance().getFullQueryLogger(); + } + + @After + public void tearDown() + { + instance.reset(tempDir.toString()); + } + + @Test(expected = NullPointerException.class) + public void testConfigureNullPath() throws Exception + { + instance.configure(null, "", true, 1, 1); + } + + @Test(expected = NullPointerException.class) + public void testConfigureNullRollCycle() throws Exception + { + instance.configure(BinLogTest.tempDir(), null, true, 1, 1); + } + + @Test(expected = IllegalArgumentException.class) + public void testConfigureInvalidRollCycle() throws Exception + { + instance.configure(BinLogTest.tempDir(), "foobar", true, 1, 1); + } + + @Test(expected = IllegalArgumentException.class) + public void testConfigureInvalidMaxQueueWeight() throws Exception + { + instance.configure(BinLogTest.tempDir(), "DAILY", true, 0, 1); + } + + @Test(expected = IllegalArgumentException.class) + public void testConfigureInvalidMaxQueueLogSize() throws Exception + { + instance.configure(BinLogTest.tempDir(), "DAILY", true, 1, 0); + } + + @Test(expected = IllegalArgumentException.class) + public void testConfigureOverExistingFile() throws Exception + { + File f = File.createTempFile("foo", "bar"); + f.deleteOnExit(); + instance.configure(f.toPath(), "TEST_SECONDLY", true, 1, 1); + } + + @Test(expected = IllegalArgumentException.class) + public void testCanRead() throws Exception + { + tempDir.toFile().setReadable(false); + try + { + configureFQL(); + } + finally + { + tempDir.toFile().setReadable(true); + } + } + + @Test(expected = IllegalArgumentException.class) + public void testCanWrite() throws Exception + { + tempDir.toFile().setWritable(false); + try + { + configureFQL(); + } + finally + { + tempDir.toFile().setWritable(true); + } + } + + @Test(expected = IllegalArgumentException.class) + public void testCanExecute() throws Exception + { + tempDir.toFile().setExecutable(false); + try + { + configureFQL(); + } + finally + { + tempDir.toFile().setExecutable(true); + } + } + + @Test + public void testResetWithoutConfigure() throws Exception + { + instance.reset(tempDir.toString()); + instance.reset(tempDir.toString()); + } + + @Test + public void stopWithoutConfigure() throws Exception + { + instance.stop(); + instance.stop(); + } + + /** + * Both the last used and supplied directory should get cleaned + */ + @Test + public void testResetCleansPaths() throws Exception + { + configureFQL(); + File tempA = File.createTempFile("foo", "bar", tempDir.toFile()); + assertTrue(tempA.exists()); + File tempB = File.createTempFile("foo", "bar", BinLogTest.tempDir().toFile()); + instance.reset(tempB.getParent()); + assertFalse(tempA.exists()); + assertFalse(tempB.exists()); + } + + /** + * The last used and configured directory are the same and it shouldn't be an issue + */ + @Test + public void testResetSamePath() throws Exception + { + configureFQL(); + File tempA = File.createTempFile("foo", "bar", tempDir.toFile()); + assertTrue(tempA.exists()); + instance.reset(tempA.getParent()); + assertFalse(tempA.exists()); + } + + @Test(expected = IllegalStateException.class) + public void testDoubleConfigure() throws Exception + { + configureFQL(); + configureFQL(); + } + + @Test + public void testCleansDirectory() throws Exception + { + assertTrue(new File(tempDir.toFile(), "foobar").createNewFile()); + configureFQL(); + assertEquals(tempDir.toFile().listFiles().length, 1); + assertEquals("directory-listing.cq4t", tempDir.toFile().listFiles()[0].getName()); + } + + @Test + public void testEnabledReset() throws Exception + { + assertFalse(instance.enabled()); + configureFQL(); + assertTrue(instance.enabled()); + instance.reset(tempDir.toString()); + assertFalse(instance.enabled()); + } + + @Test + public void testEnabledStop() throws Exception + { + assertFalse(instance.enabled()); + configureFQL(); + assertTrue(instance.enabled()); + instance.stop(); + assertFalse(instance.enabled()); + } + + /** + * Test that a thread will block if the FQL is over weight, and unblock once the backup is cleared + */ + @Test + public void testBlocking() throws Exception + { + configureFQL(); + //Prevent the bin log thread from making progress, causing the task queue to block + Semaphore blockBinLog = new Semaphore(0); + try + { + //Find out when the bin log thread has been blocked, necessary to not run into batch task drain behavior + Semaphore binLogBlocked = new Semaphore(0); + instance.binLog.put(new WeighableMarshallableQuery("foo1", QueryOptions.DEFAULT, 1) + { + + public void writeMarshallable(WireOut wire) + { + //Notify that the bin log is blocking now + binLogBlocked.release(); + try + { + //Block the bin log thread so the task queue can be filled + blockBinLog.acquire(); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + super.writeMarshallable(wire); + } + + public void release() + { + super.release(); + } + }); + + //Wait for the bin log thread to block so it can't batch drain tasks + Util.spinAssertEquals(true, binLogBlocked::tryAcquire, 60); + + //Now fill the task queue + logQuery("foo2"); + + //Start a thread to block waiting on the bin log queue + Thread t = new Thread(() -> + { + logQuery("foo3"); + //Should be able to log another query without an issue + logQuery("foo4"); + }); + t.start(); + Thread.sleep(500); + //If thread state is terminated then the thread started, finished, and didn't block on the full task queue + assertTrue(t.getState() != Thread.State.TERMINATED); + } + finally + { + //Unblock the binlog thread + blockBinLog.release(); + } + Util.spinAssertEquals(true, () -> checkForQueries(Arrays.asList("foo1", "foo2", "foo3", "foo4")), 60); + } + + private boolean checkForQueries(List<String> queries) + { + try (ChronicleQueue queue = ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build()) + { + ExcerptTailer tailer = queue.createTailer(); + List<String> expectedQueries = new LinkedList<>(queries); + while (!expectedQueries.isEmpty()) + { + if (!tailer.readDocument(wire -> { + assertEquals(expectedQueries.get(0), wire.read("query").text()); + expectedQueries.remove(0); + })) + { + return false; + } + } + assertFalse(tailer.readDocument(wire -> {})); + return true; + } + } + + @Test + public void testNonBlocking() throws Exception + { + instance.configure(tempDir, "TEST_SECONDLY", false, 1, 1024 * 1024 * 256); + //Prevent the bin log thread from making progress, causing the task queue to refuse tasks + Semaphore blockBinLog = new Semaphore(0); + try + { + //Find out when the bin log thread has been blocked, necessary to not run into batch task drain behavior + Semaphore binLogBlocked = new Semaphore(0); + instance.binLog.put(new WeighableMarshallableQuery("foo1", QueryOptions.DEFAULT, 1) + { + + public void writeMarshallable(WireOut wire) + { + //Notify that the bin log is blocking now + binLogBlocked.release(); + try + { + //Block the bin log thread so the task queue can be filled + blockBinLog.acquire(); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + super.writeMarshallable(wire); + } + + public void release() + { + super.release(); + } + }); + + //Wait for the bin log thread to block so it can't batch drain tasks + Util.spinAssertEquals(true, binLogBlocked::tryAcquire, 60); + + //Now fill the task queue + logQuery("foo2"); + + //This sample should get dropped AKA released without being written + AtomicInteger releasedCount = new AtomicInteger(0); + AtomicInteger writtenCount = new AtomicInteger(0); + instance.logRecord(new WeighableMarshallableQuery("foo3", QueryOptions.DEFAULT, 1) { + public void writeMarshallable(WireOut wire) + { + writtenCount.incrementAndGet(); + super.writeMarshallable(wire); + } + + public void release() + { + releasedCount.incrementAndGet(); + super.release(); + } + }, instance.binLog); + + Util.spinAssertEquals(1, releasedCount::get, 60); + assertEquals(0, writtenCount.get()); + } + finally + { + blockBinLog.release(); + } + //Wait for tasks to drain so there should be space in the queue + Util.spinAssertEquals(true, () -> checkForQueries(Arrays.asList("foo1", "foo2")), 60); + //Should be able to log again + logQuery("foo4"); + Util.spinAssertEquals(true, () -> checkForQueries(Arrays.asList("foo1", "foo2", "foo4")), 60); + } + + @Test + public void testRoundTripQuery() throws Exception + { + configureFQL(); + logQuery("foo"); + Util.spinAssertEquals(true, () -> checkForQueries(Arrays.asList("foo")), 60); + try (ChronicleQueue queue = ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build()) + { + ExcerptTailer tailer = queue.createTailer(); + assertTrue(tailer.readDocument(wire -> { + assertEquals("single", wire.read("type").text()); + ProtocolVersion protocolVersion = ProtocolVersion.decode(wire.read("protocol-version").int32()); + assertEquals(ProtocolVersion.CURRENT, protocolVersion); + QueryOptions queryOptions = QueryOptions.codec.decode(Unpooled.wrappedBuffer(wire.read("query-options").bytes()), protocolVersion); + compareQueryOptions(QueryOptions.DEFAULT, queryOptions); + assertEquals(1L, wire.read("query-time").int64()); + assertEquals("foo", wire.read("query").text()); + })); + } + } + + @Test + public void testRoundTripBatch() throws Exception + { + configureFQL(); + instance.logBatch("UNLOGGED", Arrays.asList("foo1", "foo2"), Arrays.asList(Arrays.asList(ByteBuffer.allocate(1) , ByteBuffer.allocateDirect(2)), Arrays.asList()), QueryOptions.DEFAULT, 1); + Util.spinAssertEquals(true, () -> + { + try (ChronicleQueue queue = ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build()) + { + return queue.createTailer().readingDocument().isPresent(); + } + }, 60); + try (ChronicleQueue queue = ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build()) + { + ExcerptTailer tailer = queue.createTailer(); + assertTrue(tailer.readDocument(wire -> { + assertEquals("batch", wire.read("type").text()); + ProtocolVersion protocolVersion = ProtocolVersion.decode(wire.read("protocol-version").int32()); + assertEquals(ProtocolVersion.CURRENT, protocolVersion); + QueryOptions queryOptions = QueryOptions.codec.decode(Unpooled.wrappedBuffer(wire.read("query-options").bytes()), protocolVersion); + assertEquals(1L, wire.read("query-time").int64()); + compareQueryOptions(QueryOptions.DEFAULT, queryOptions); + assertEquals("UNLOGGED", wire.read("batch-type").text()); + ValueIn in = wire.read("queries"); + assertEquals(2, in.int32()); + assertEquals("foo1", in.text()); + assertEquals("foo2", in.text()); + in = wire.read("values"); + assertEquals(2, in.int32()); + assertEquals(2, in.int32()); + assertTrue(Arrays.equals(new byte[1], in.bytes())); + assertTrue(Arrays.equals(new byte[2], in.bytes())); + assertEquals(0, in.int32()); + })); + } + } + + @Test + public void testQueryWeight() + { + //Empty query should have some weight + WeighableMarshallableQuery query = new WeighableMarshallableQuery("", QueryOptions.DEFAULT, 1); + assertTrue(query.weight() >= 95); + + StringBuilder sb = new StringBuilder(); + for (int ii = 0; ii < 1024 * 1024; ii++) + { + sb.append('a'); + } + query = new WeighableMarshallableQuery(sb.toString(), QueryOptions.DEFAULT, 1); + + //A large query should be reflected in the size, * 2 since characters are still two bytes + assertTrue(query.weight() > ObjectSizes.measureDeep(sb.toString())); + + //Large query options should be reflected + QueryOptions largeOptions = QueryOptions.forInternalCalls(Arrays.asList(ByteBuffer.allocate(1024 * 1024))); + query = new WeighableMarshallableQuery("", largeOptions, 1); + assertTrue(query.weight() > 1024 * 1024); + System.out.printf("weight %d%n", query.weight()); + } + + @Test + public void testBatchWeight() + { + //An empty batch should have weight + WeighableMarshallableBatch batch = new WeighableMarshallableBatch("", new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, 1); + assertTrue(batch.weight() >= 183); + + StringBuilder sb = new StringBuilder(); + for (int ii = 0; ii < 1024 * 1024; ii++) + { + sb.append('a'); + } + + //The weight of the type string should be reflected + batch = new WeighableMarshallableBatch(sb.toString(), new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, 1); + assertTrue(batch.weight() > ObjectSizes.measureDeep(sb.toString())); + + //The weight of the list containing queries should be reflected + List<String> bigList = new ArrayList(100000); + for (int ii = 0; ii < 100000; ii++) + { + bigList.add(""); + } + batch = new WeighableMarshallableBatch("", bigList, new ArrayList<>(), QueryOptions.DEFAULT, 1); + assertTrue(batch.weight() > ObjectSizes.measureDeep(bigList)); + + //The size of the query should be reflected + bigList = new ArrayList(1); + bigList.add(sb.toString()); + batch = new WeighableMarshallableBatch("", bigList, new ArrayList<>(), QueryOptions.DEFAULT, 1); + assertTrue(batch.weight() > ObjectSizes.measureDeep(bigList)); + + bigList = null; + //The size of the list of values should be reflected + List<List<ByteBuffer>> bigValues = new ArrayList<>(100000); + for (int ii = 0; ii < 100000; ii++) + { + bigValues.add(new ArrayList<>(0)); + } + bigValues.get(0).add(ByteBuffer.allocate(1024 * 1024 * 5)); + batch = new WeighableMarshallableBatch("", new ArrayList<>(), bigValues, QueryOptions.DEFAULT, 1); + assertTrue(batch.weight() > ObjectSizes.measureDeep(bigValues)); + + //As should the size of the values + QueryOptions largeOptions = QueryOptions.forInternalCalls(Arrays.asList(ByteBuffer.allocate(1024 * 1024))); + batch = new WeighableMarshallableBatch("", new ArrayList<>(), new ArrayList<>(), largeOptions, 1); + assertTrue(batch.weight() > 1024 * 1024); + } + + @Test(expected = NullPointerException.class) + public void testLogBatchNullType() throws Exception + { + instance.logBatch(null, new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, 1); + } + + @Test(expected = NullPointerException.class) + public void testLogBatchNullQueries() throws Exception + { + instance.logBatch("", null, new ArrayList<>(), QueryOptions.DEFAULT, 1); + } + + @Test(expected = NullPointerException.class) + public void testLogBatchNullQueriesQuery() throws Exception + { + configureFQL(); + instance.logBatch("", Arrays.asList((String)null), new ArrayList<>(), QueryOptions.DEFAULT, 1); + } + + @Test(expected = NullPointerException.class) + public void testLogBatchNullValues() throws Exception + { + instance.logBatch("", new ArrayList<>(), null, QueryOptions.DEFAULT, 1); + } + + @Test(expected = NullPointerException.class) + public void testLogBatchNullValuesValue() throws Exception + { + instance.logBatch("", new ArrayList<>(), Arrays.asList((List<ByteBuffer>)null), null, 1); + } + + @Test(expected = NullPointerException.class) + public void testLogBatchNullQueryOptions() throws Exception + { + instance.logBatch("", new ArrayList<>(), new ArrayList<>(), null, 1); + } + + @Test(expected = IllegalArgumentException.class) + public void testLogBatchNegativeTime() throws Exception + { + instance.logBatch("", new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, -1); + } + + @Test(expected = NullPointerException.class) + public void testLogQueryNullQuery() throws Exception + { + instance.logQuery(null, QueryOptions.DEFAULT, 1); + } + + @Test(expected = NullPointerException.class) + public void testLogQueryNullQueryOptions() throws Exception + { + instance.logQuery("", null, 1); + } + + @Test(expected = IllegalArgumentException.class) + public void testLogQueryNegativeTime() throws Exception + { + instance.logQuery("", QueryOptions.DEFAULT, -1); + } + + private static void compareQueryOptions(QueryOptions a, QueryOptions b) + { + assertEquals(a.getClass(), b.getClass()); + assertEquals(a.getProtocolVersion(), b.getProtocolVersion()); + assertEquals(a.getPageSize(), b.getPageSize()); + assertEquals(a.getConsistency(), b.getConsistency()); + assertEquals(a.getPagingState(), b.getPagingState()); + assertEquals(a.getValues(), b.getValues()); + assertEquals(a.getSerialConsistency(), b.getSerialConsistency()); + } + + private void configureFQL() throws Exception + { + instance.configure(tempDir, "TEST_SECONDLY", true, 1, 1024 * 1024 * 256); + } + + private void logQuery(String query) + { + instance.logQuery(query, QueryOptions.DEFAULT, 1); + } + +} + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org