This is an automated email from the ASF dual-hosted git repository. smiklosovic pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new c089818881 Make Cassandra logs able to be viewed in the virtual table system_views.system_logs c089818881 is described below commit c08981888181392017761606f4490cca7f977db9 Author: Stefan Miklosovic <smikloso...@apache.org> AuthorDate: Wed Oct 5 17:02:34 2022 +0200 Make Cassandra logs able to be viewed in the virtual table system_views.system_logs patch by Stefan Miklosovic; reviewed by Brandon Williams for CASSANDRA-17948 --- CHANGES.txt | 1 + NEWS.txt | 2 + conf/logback.xml | 11 ++ .../pages/configuration/cass_logback_xml_file.adoc | 33 +++- .../config/CassandraRelevantProperties.java | 7 +- .../cassandra/db/virtual/LogMessagesTable.java | 192 +++++++++++++++++++ .../apache/cassandra/db/virtual/SimpleDataSet.java | 10 +- .../cassandra/db/virtual/SystemViewsKeyspace.java | 1 + .../apache/cassandra/service/CassandraDaemon.java | 8 + .../utils/logging/LogbackLoggingSupport.java | 45 +++++ .../cassandra/utils/logging/LoggingSupport.java | 8 + .../utils/logging/VirtualTableAppender.java | 128 +++++++++++++ test/conf/logback-dtest_with_vtable_appender.xml | 66 +++++++ .../logback-dtest_with_vtable_appender_invalid.xml | 73 +++++++ .../cassandra/distributed/impl/Instance.java | 2 + .../distributed/test/VirtualTableLogsTest.java | 129 +++++++++++++ .../cassandra/db/virtual/LogMessagesTableTest.java | 210 +++++++++++++++++++++ 17 files changed, 921 insertions(+), 5 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index c3f0bcf534..1bae643c3e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.2 + * Make Cassandra logs able to be viewed in the virtual table system_views.system_logs (CASSANDRA-17946) * IllegalArgumentException in Gossiper#order due to concurrent mutations to elements being applied (CASSANDRA-17908) * Include estimated active compaction remaining write size when starting a new compaction (CASSANDRA-17931) * Mixed mode support for internode authentication during TLS upgrades (CASSANDRA-17923) diff --git a/NEWS.txt b/NEWS.txt index 727e711ec4..2442572d7e 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -92,6 +92,8 @@ New features clear snapshots which are older than some period for example, "--older-than 5h" to remove snapshots older than 5 hours and it is possible to clear all snapshots older than some timestamp, for example --older-than-timestamp 2022-12-03T10:15:30Z. + - Cassandra logs can be viewed in the virtual table system_views.system_logs. + Please uncomment the respective appender in logback.xml file to make logs flow into this table. This feature is turned off by default. Upgrading --------- diff --git a/conf/logback.xml b/conf/logback.xml index e98fea480f..102cf06352 100644 --- a/conf/logback.xml +++ b/conf/logback.xml @@ -111,6 +111,14 @@ appender reference in the root level section below. <appender name="LogbackMetrics" class="com.codahale.metrics.logback.InstrumentedAppender" /> --> + <!-- Uncomment below configuration and corresponding appender-ref to activate + logging into system_views.system_logs virtual table. --> + <!-- <appender name="CQLLOG" class="org.apache.cassandra.utils.logging.VirtualTableAppender"> + <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> + <level>WARN</level> + </filter> + </appender> --> + <root level="INFO"> <appender-ref ref="SYSTEMLOG" /> <appender-ref ref="STDOUT" /> @@ -118,6 +126,9 @@ appender reference in the root level section below. <!-- <appender-ref ref="LogbackMetrics" /> --> + <!-- + <appender-ref ref="CQLLOG"/> + --> </root> <logger name="org.apache.cassandra" level="DEBUG"/> diff --git a/doc/modules/cassandra/pages/configuration/cass_logback_xml_file.adoc b/doc/modules/cassandra/pages/configuration/cass_logback_xml_file.adoc index e673622099..7e64aea048 100644 --- a/doc/modules/cassandra/pages/configuration/cass_logback_xml_file.adoc +++ b/doc/modules/cassandra/pages/configuration/cass_logback_xml_file.adoc @@ -81,6 +81,27 @@ Specify the format of the message. Part of the rolling policy. <pattern>%-5level [%thread] %date\{ISO8601} %F:%L - %msg%n</pattern> </encoder> +=== Logging to Cassandra virtual table + +It is possible to configure logback.xml in such a way that logs would appear in `system_views.system_log` table. +This is achieved by appender implemented in class `VirtualTableAppender` which is called `CQLLOG` in the +default configration. When the appender is commented out, no system logs are written to the virtual table. + +CQLLOG appender is special as the underlying structure it saves log messages into can not grow without any bound +as a node would run out of memory. For this reason, `system_log` table is limited on its size. +By default, it can hold at most 50 000 log messages, it can never hold more than 100 000 log messages. + +To specify how many rows you want that virtual table to hold at most, there is +a system property called `cassandra.virtual.logs.max.rows` which takes an integer as value. + +You can execute CQL `truncate` query for `system_views.system_log` if you want to wipe out all the logs in virtual table +to e.g. save some memory. + +It is recommended to set filter to at least `WARN` level so this table holds only important logging messages as +each message will occupy memory. + +The appender to virtual table is commented out by default so logging to virtual table is not active. + === Contents of default `logback.xml` [source,XML] @@ -151,6 +172,14 @@ Specify the format of the message. Part of the rolling policy. <appender name="LogbackMetrics" class="com.codahale.metrics.logback.InstrumentedAppender" /> --> + <!-- Uncomment below configuration and corresponding appender-ref to activate + logging into system_views.system_logs virtual table. --> + <!-- <appender name="CQLLOG" class="org.apache.cassandra.utils.logging.VirtualTableAppender"> + <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> + <level>WARN</level> + </filter> + </appender> --> + <root level="INFO"> <appender-ref ref="SYSTEMLOG" /> <appender-ref ref="STDOUT" /> @@ -158,9 +187,11 @@ Specify the format of the message. Part of the rolling policy. <!-- <appender-ref ref="LogbackMetrics" /> --> + <!-- + <appender-ref ref="CQLLOG"/> + --> </root> <logger name="org.apache.cassandra" level="DEBUG"/> - <logger name="com.thinkaurelius.thrift" level="ERROR"/> </configuration> ---- diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index 49df7404e7..d7c543da67 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -20,6 +20,7 @@ package org.apache.cassandra.config; import java.util.concurrent.TimeUnit; +import org.apache.cassandra.db.virtual.LogMessagesTable; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.service.FileSystemOwnershipCheck; @@ -299,8 +300,10 @@ public enum CassandraRelevantProperties ORG_APACHE_CASSANDRA_DB_VIRTUAL_SYSTEM_PROPERTIES_TABLE_TEST("org.apache.cassandra.db.virtual.SystemPropertiesTableTest"), // Loosen the definition of "empty" for gossip state, for use during host replacements if things go awry - LOOSE_DEF_OF_EMPTY_ENABLED(Config.PROPERTY_PREFIX + "gossiper.loose_empty_enabled"); - ; + LOOSE_DEF_OF_EMPTY_ENABLED(Config.PROPERTY_PREFIX + "gossiper.loose_empty_enabled"), + + // Maximum number of rows in system_views.logs table + LOGS_VIRTUAL_TABLE_MAX_ROWS("cassandra.virtual.logs.max.rows", Integer.toString(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS)); CassandraRelevantProperties(String key, String defaultVal) diff --git a/src/java/org/apache/cassandra/db/virtual/LogMessagesTable.java b/src/java/org/apache/cassandra/db/virtual/LogMessagesTable.java new file mode 100644 index 0000000000..cd7999b308 --- /dev/null +++ b/src/java/org/apache/cassandra/db/virtual/LogMessagesTable.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.virtual; + +import java.util.Collections; +import java.util.Date; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import ch.qos.logback.classic.spi.LoggingEvent; +import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.TimestampType; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.dht.LocalPartitioner; +import org.apache.cassandra.schema.TableMetadata; + +/** + * Virtual table for holding Cassandra logs. Entries to this table are added via log appender. + * <p> + * The virtual table is bounded in its size. If a new log message is appended to virtual table, + * the oldest one is removed. + * <p> + * This virtual table can be truncated. + * + * @see org.apache.cassandra.utils.logging.VirtualTableAppender + */ +public final class LogMessagesTable extends AbstractMutableVirtualTable +{ + private static final Logger logger = LoggerFactory.getLogger(LogMessagesTable.class); + + public static final int LOGS_VIRTUAL_TABLE_MIN_ROWS = 1000; + public static final int LOGS_VIRTUAL_TABLE_DEFAULT_ROWS = 50_000; + public static final int LOGS_VIRTUAL_TABLE_MAX_ROWS = 100_000; + + public static final String TABLE_NAME = "system_logs"; + private static final String TABLE_COMMENT = "Cassandra logs"; + + public static final String TIMESTAMP_COLUMN_NAME = "timestamp"; + public static final String LOGGER_COLUMN_NAME = "logger"; + public static final String ORDER_IN_MILLISECOND_COLUMN_NAME = "order_in_millisecond"; + public static final String LEVEL_COLUMN_NAME = "level"; + public static final String MESSAGE_COLUMN_NAME = "message"; + + private final List<LogMessage> buffer; + + LogMessagesTable(String keyspace) + { + this(keyspace, resolveBufferSize()); + } + + @VisibleForTesting + LogMessagesTable(String keyspace, int size) + { + super(TableMetadata.builder(keyspace, TABLE_NAME) + .comment(TABLE_COMMENT) + .kind(TableMetadata.Kind.VIRTUAL) + .partitioner(new LocalPartitioner(TimestampType.instance)) + .addPartitionKeyColumn(TIMESTAMP_COLUMN_NAME, TimestampType.instance) + .addClusteringColumn(ORDER_IN_MILLISECOND_COLUMN_NAME, Int32Type.instance) + .addRegularColumn(LOGGER_COLUMN_NAME, UTF8Type.instance) + .addRegularColumn(LEVEL_COLUMN_NAME, UTF8Type.instance) + .addRegularColumn(MESSAGE_COLUMN_NAME, UTF8Type.instance).build()); + + logger.debug("capacity of virtual table {} is set to be at most {} rows", metadata().toString(), size); + buffer = BoundedLinkedList.create(size); + } + + @Override + public DataSet data() + { + SimpleDataSet result = new SimpleDataSet(metadata(), DecoratedKey.comparator.reversed()); + + synchronized (buffer) + { + long milliSecondsOfPreviousLog = 0; + long milliSecondsOfCurrentLog; + + int index = 0; + + Iterator<LogMessage> iterator = buffer.listIterator(); + while (iterator.hasNext()) + { + LogMessage log = iterator.next(); + + milliSecondsOfCurrentLog = log.timestamp; + if (milliSecondsOfPreviousLog == milliSecondsOfCurrentLog) + ++index; + else + index = 0; + + milliSecondsOfPreviousLog = milliSecondsOfCurrentLog; + + result.row(new Date(log.timestamp), index) + .column(LOGGER_COLUMN_NAME, log.logger) + .column(LEVEL_COLUMN_NAME, log.level) + .column(MESSAGE_COLUMN_NAME, log.message); + } + } + + return result; + } + + public void add(LoggingEvent event) + { + buffer.add(new LogMessage(event)); + } + + @Override + public void truncate() + { + buffer.clear(); + } + + @VisibleForTesting + static int resolveBufferSize() + { + int size = CassandraRelevantProperties.LOGS_VIRTUAL_TABLE_MAX_ROWS.getInt(); + return (size < LOGS_VIRTUAL_TABLE_MIN_ROWS || size > LOGS_VIRTUAL_TABLE_MAX_ROWS) + ? LOGS_VIRTUAL_TABLE_DEFAULT_ROWS : size; + } + + @VisibleForTesting + public static class LogMessage + { + public final long timestamp; + public final String logger; + public final String level; + public final String message; + + public LogMessage(LoggingEvent event) + { + this(event.getTimeStamp(), event.getLoggerName(), event.getLevel().toString(), event.getFormattedMessage()); + } + + public LogMessage(long timestamp, String logger, String level, String message) + { + this.timestamp = timestamp; + this.logger = logger; + this.level = level; + this.message = message; + } + } + + private static final class BoundedLinkedList<T> extends LinkedList<T> + { + private final int maxSize; + + public static <T> List<T> create(int size) + { + return Collections.synchronizedList(new BoundedLinkedList<>(size)); + } + + private BoundedLinkedList(int maxSize) + { + this.maxSize = maxSize; + } + + @Override + public boolean add(T t) + { + if (size() == maxSize) + removeLast(); + + addFirst(t); + + return true; + } + } +} diff --git a/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java b/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java index 715f4f89d7..6f3052d8be 100644 --- a/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java +++ b/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java @@ -18,6 +18,7 @@ package org.apache.cassandra.db.virtual; import java.nio.ByteBuffer; +import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -54,12 +55,17 @@ public class SimpleDataSet extends AbstractVirtualTable.AbstractDataSet private Row currentRow; - public SimpleDataSet(TableMetadata metadata) + public SimpleDataSet(TableMetadata metadata, Comparator<DecoratedKey> comparator) { - super(new TreeMap<>(DecoratedKey.comparator)); + super(new TreeMap<>(comparator)); this.metadata = metadata; } + public SimpleDataSet(TableMetadata metadata) + { + this(metadata, DecoratedKey.comparator); + } + public SimpleDataSet row(Object... primaryKeyValues) { if (Iterables.size(metadata.primaryKeyColumns()) != primaryKeyValues.length) diff --git a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java index 59a0aba809..d2aac53764 100644 --- a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java +++ b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java @@ -48,6 +48,7 @@ public final class SystemViewsKeyspace extends VirtualKeyspace .add(new StreamingVirtualTable(VIRTUAL_VIEWS)) .add(new GossipInfoTable(VIRTUAL_VIEWS)) .add(new QueriesTable(VIRTUAL_VIEWS)) + .add(new LogMessagesTable(VIRTUAL_VIEWS)) .addAll(LocalRepairTables.getAll(VIRTUAL_VIEWS)) .build()); } diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 90b9496138..3ca42513a6 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -88,6 +88,8 @@ import org.apache.cassandra.utils.Mx4jTool; import org.apache.cassandra.utils.NativeLibrary; import org.apache.cassandra.utils.concurrent.Future; import org.apache.cassandra.utils.concurrent.FutureCombiner; +import org.apache.cassandra.utils.logging.LoggingSupportFactory; +import org.apache.cassandra.utils.logging.VirtualTableAppender; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.apache.cassandra.config.CassandraRelevantProperties.CASSANDRA_FOREGROUND; @@ -566,6 +568,12 @@ public class CassandraDaemon { VirtualKeyspaceRegistry.instance.register(VirtualSchemaKeyspace.instance); VirtualKeyspaceRegistry.instance.register(SystemViewsKeyspace.instance); + + // flush log messages to system_views.system_logs virtual table as there were messages already logged + // before that virtual table was instantiated + LoggingSupportFactory.getLoggingSupport() + .getAppender(VirtualTableAppender.class, VirtualTableAppender.APPENDER_NAME) + .ifPresent(appender -> ((VirtualTableAppender) appender).flushBuffer()); } public void scrubDataDirectories() throws StartupException diff --git a/src/java/org/apache/cassandra/utils/logging/LogbackLoggingSupport.java b/src/java/org/apache/cassandra/utils/logging/LogbackLoggingSupport.java index eda9153e95..e710d44dd1 100644 --- a/src/java/org/apache/cassandra/utils/logging/LogbackLoggingSupport.java +++ b/src/java/org/apache/cassandra/utils/logging/LogbackLoggingSupport.java @@ -20,8 +20,11 @@ package org.apache.cassandra.utils.logging; import java.lang.management.ManagementFactory; import java.security.AccessControlException; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.Map; +import java.util.Optional; import javax.management.JMX; import javax.management.ObjectName; @@ -57,6 +60,8 @@ public class LogbackLoggingSupport implements LoggingSupport @Override public void onStartup() { + checkOnlyOneVirtualTableAppender(); + // The default logback configuration in conf/logback.xml allows reloading the // configuration when the configuration file has changed (every 60 seconds by default). // This requires logback to use file I/O APIs. But file I/O is not allowed from UDFs. @@ -132,6 +137,46 @@ public class LogbackLoggingSupport implements LoggingSupport return logLevelMaps; } + @Override + public Optional<Appender<?>> getAppender(Class<?> appenderClass, String name) + { + LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); + for (Logger logBackLogger : lc.getLoggerList()) + { + for (Iterator<Appender<ILoggingEvent>> iterator = logBackLogger.iteratorForAppenders(); iterator.hasNext();) + { + Appender<ILoggingEvent> appender = iterator.next(); + if (appender.getClass() == appenderClass && appender.getName().equals(name)) + return Optional.of(appender); + } + } + + return Optional.empty(); + } + + private void checkOnlyOneVirtualTableAppender() + { + int count = 0; + LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); + List<String> virtualAppenderNames = new ArrayList<>(); + for (Logger logBackLogger : lc.getLoggerList()) + { + for (Iterator<Appender<ILoggingEvent>> iterator = logBackLogger.iteratorForAppenders(); iterator.hasNext();) + { + Appender<?> appender = iterator.next(); + if (appender instanceof VirtualTableAppender) + { + virtualAppenderNames.add(appender.getName()); + count += 1; + } + } + } + + if (count > 1) + throw new IllegalStateException(String.format("There are multiple appenders of class %s of names %s. There is only one appender of such class allowed.", + VirtualTableAppender.class.getName(), String.join(",", virtualAppenderNames))); + } + private boolean hasAppenders(Logger logBackLogger) { Iterator<Appender<ILoggingEvent>> it = logBackLogger.iteratorForAppenders(); diff --git a/src/java/org/apache/cassandra/utils/logging/LoggingSupport.java b/src/java/org/apache/cassandra/utils/logging/LoggingSupport.java index 8ea83be004..35e11975f9 100644 --- a/src/java/org/apache/cassandra/utils/logging/LoggingSupport.java +++ b/src/java/org/apache/cassandra/utils/logging/LoggingSupport.java @@ -19,6 +19,9 @@ package org.apache.cassandra.utils.logging; import java.util.Map; +import java.util.Optional; + +import ch.qos.logback.core.Appender; /** * Common abstraction of functionality which can be implemented for different logging backend implementations (slf4j bindings). @@ -49,4 +52,9 @@ public interface LoggingSupport * @return a map of logger names and their associated log level as string representations. */ Map<String, String> getLoggingLevels(); + + default Optional<Appender<?>> getAppender(Class<?> appenderClass, String appenderName) + { + return Optional.empty(); + } } diff --git a/src/java/org/apache/cassandra/utils/logging/VirtualTableAppender.java b/src/java/org/apache/cassandra/utils/logging/VirtualTableAppender.java new file mode 100644 index 0000000000..2820b2936f --- /dev/null +++ b/src/java/org/apache/cassandra/utils/logging/VirtualTableAppender.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.logging; + +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import com.google.common.collect.ImmutableSet; + +import ch.qos.logback.classic.spi.LoggingEvent; +import ch.qos.logback.core.AppenderBase; +import org.apache.cassandra.audit.FileAuditLogger; +import org.apache.cassandra.db.virtual.LogMessagesTable; +import org.apache.cassandra.db.virtual.VirtualKeyspace; +import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry; +import org.apache.cassandra.db.virtual.VirtualTable; + +import static org.apache.cassandra.db.virtual.LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS; +import static org.apache.cassandra.db.virtual.LogMessagesTable.TABLE_NAME; +import static org.apache.cassandra.schema.SchemaConstants.VIRTUAL_VIEWS; + +/** + * Appends Cassandra logs to virtual table system_views.system_logs + */ +public final class VirtualTableAppender extends AppenderBase<LoggingEvent> +{ + public static final String APPENDER_NAME = "CQLLOG"; + + private static final Set<String> forbiddenLoggers = ImmutableSet.of(FileAuditLogger.class.getName()); + + private LogMessagesTable logs; + + // for holding messages until virtual registry contains logs virtual table + // as it takes some time during startup of a node to initialise virtual tables but messages are + // logged already + private final List<LoggingEvent> messageBuffer = new LinkedList<>(); + + @Override + protected void append(LoggingEvent eventObject) + { + if (!forbiddenLoggers.contains(eventObject.getLoggerName())) + { + if (logs == null) + { + logs = getVirtualTable(); + if (logs == null) + addToBuffer(eventObject); + else + logs.add(eventObject); + } + else + logs.add(eventObject); + } + } + + @Override + public void stop() + { + messageBuffer.clear(); + super.stop(); + } + + /** + * Flushes all logs which were appended before virtual table was registered. + * + * @see org.apache.cassandra.service.CassandraDaemon#setupVirtualKeyspaces + */ + public void flushBuffer() + { + Optional.ofNullable(getVirtualTable()).ifPresent(vtable -> { + messageBuffer.forEach(vtable::add); + messageBuffer.clear(); + }); + } + + private LogMessagesTable getVirtualTable() + { + VirtualKeyspace keyspace = VirtualKeyspaceRegistry.instance.getKeyspaceNullable(VIRTUAL_VIEWS); + + if (keyspace == null) + return null; + + Optional<VirtualTable> logsTable = keyspace.tables() + .stream() + .filter(vt -> vt.name().equals(TABLE_NAME)) + .findFirst(); + + if (!logsTable.isPresent()) + return null; + + VirtualTable vt = logsTable.get(); + + if (!(vt instanceof LogMessagesTable)) + throw new IllegalStateException(String.format("Virtual table %s.%s is not backed by an instance of %s but by %s", + VIRTUAL_VIEWS, + TABLE_NAME, + LogMessagesTable.class.getName(), + vt.getClass().getName())); + + return (LogMessagesTable) vt; + } + + private void addToBuffer(LoggingEvent eventObject) + { + // we restrict how many logging events we can put into buffer, + // so we are not growing without any bound when things go south + if (messageBuffer.size() < LOGS_VIRTUAL_TABLE_DEFAULT_ROWS) + messageBuffer.add(eventObject); + } +} diff --git a/test/conf/logback-dtest_with_vtable_appender.xml b/test/conf/logback-dtest_with_vtable_appender.xml new file mode 100644 index 0000000000..c9fd108c77 --- /dev/null +++ b/test/conf/logback-dtest_with_vtable_appender.xml @@ -0,0 +1,66 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration debug="false" scan="true" scanPeriod="60 seconds"> + <define name="cluster_id" class="org.apache.cassandra.distributed.impl.ClusterIDDefiner" /> + <define name="instance_id" class="org.apache.cassandra.distributed.impl.InstanceIDDefiner" /> + + <!-- Shutdown hook ensures that async appender flushes --> + <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/> + + <appender name="INSTANCEFILE" class="ch.qos.logback.core.FileAppender"> + <file>./build/test/logs/${cassandra.testtag}/${suitename}/${cluster_id}/${instance_id}/system.log</file> + <encoder> + <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %msg%n</pattern> + </encoder> + <immediateFlush>true</immediateFlush> + </appender> + + <appender name="INSTANCESTDERR" target="System.err" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern> + </encoder> + <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> + <level>WARN</level> + </filter> + </appender> + + <appender name="INSTANCESTDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %F:%L - %msg%n</pattern> + </encoder> + <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> + <level>DEBUG</level> + </filter> + </appender> + + <logger name="org.apache.hadoop" level="WARN"/> + + <appender name="CQLLOG" class="org.apache.cassandra.utils.logging.VirtualTableAppender"> + <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> + <level>INFO</level> + </filter> + </appender> + + <root level="DEBUG"> + <appender-ref ref="INSTANCEFILE" /> <!-- use blocking to avoid race conditions with appending and searching --> + <appender-ref ref="INSTANCESTDERR" /> + <appender-ref ref="INSTANCESTDOUT" /> + <appender-ref ref="CQLLOG" /> + </root> +</configuration> diff --git a/test/conf/logback-dtest_with_vtable_appender_invalid.xml b/test/conf/logback-dtest_with_vtable_appender_invalid.xml new file mode 100644 index 0000000000..1b30c141c2 --- /dev/null +++ b/test/conf/logback-dtest_with_vtable_appender_invalid.xml @@ -0,0 +1,73 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration debug="false" scan="true" scanPeriod="60 seconds"> + <define name="cluster_id" class="org.apache.cassandra.distributed.impl.ClusterIDDefiner" /> + <define name="instance_id" class="org.apache.cassandra.distributed.impl.InstanceIDDefiner" /> + + <!-- Shutdown hook ensures that async appender flushes --> + <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/> + + <appender name="INSTANCEFILE" class="ch.qos.logback.core.FileAppender"> + <file>./build/test/logs/${cassandra.testtag}/${suitename}/${cluster_id}/${instance_id}/system.log</file> + <encoder> + <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %msg%n</pattern> + </encoder> + <immediateFlush>true</immediateFlush> + </appender> + + <appender name="INSTANCESTDERR" target="System.err" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern> + </encoder> + <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> + <level>WARN</level> + </filter> + </appender> + + <appender name="INSTANCESTDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %F:%L - %msg%n</pattern> + </encoder> + <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> + <level>DEBUG</level> + </filter> + </appender> + + <logger name="org.apache.hadoop" level="WARN"/> + + <appender name="CQLLOG" class="org.apache.cassandra.utils.logging.VirtualTableAppender"> + <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> + <level>INFO</level> + </filter> + </appender> + + <appender name="CQLLOG2" class="org.apache.cassandra.utils.logging.VirtualTableAppender"> + <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> + <level>INFO</level> + </filter> + </appender> + + <root level="DEBUG"> + <appender-ref ref="INSTANCEFILE" /> <!-- use blocking to avoid race conditions with appending and searching --> + <appender-ref ref="INSTANCESTDERR" /> + <appender-ref ref="INSTANCESTDOUT" /> + <appender-ref ref="CQLLOG" /> + <appender-ref ref="CQLLOG2" /> <!-- invalid, we can not have multiple appenders of VirtualTableAppender class --> + </root> +</configuration> diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java index 2c3123546f..cba3807237 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@ -114,6 +114,7 @@ import org.apache.cassandra.net.Verb; import org.apache.cassandra.schema.MigrationCoordinator; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.security.ThreadAwareSecurityManager; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.CassandraDaemon; import org.apache.cassandra.service.ClientState; @@ -590,6 +591,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance DistributedTestSnitch.assign(config.networkTopology()); DatabaseDescriptor.daemonInitialization(); + ThreadAwareSecurityManager.install(); FileUtils.setFSErrorHandler(new DefaultFSErrorHandler()); DatabaseDescriptor.createAllDirectories(); CassandraDaemon.getInstanceForTesting().migrateSystemDataIfNeeded(); diff --git a/test/distributed/org/apache/cassandra/distributed/test/VirtualTableLogsTest.java b/test/distributed/org/apache/cassandra/distributed/test/VirtualTableLogsTest.java new file mode 100644 index 0000000000..17cd994256 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/VirtualTableLogsTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Test; + +import ch.qos.logback.classic.Level; +import org.apache.cassandra.db.virtual.LogMessagesTable; +import org.apache.cassandra.db.virtual.LogMessagesTable.LogMessage; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.utils.logging.VirtualTableAppender; + +import static java.lang.String.format; +import static org.apache.cassandra.db.virtual.LogMessagesTable.LEVEL_COLUMN_NAME; +import static org.apache.cassandra.db.virtual.LogMessagesTable.LOGGER_COLUMN_NAME; +import static org.apache.cassandra.db.virtual.LogMessagesTable.MESSAGE_COLUMN_NAME; +import static org.apache.cassandra.db.virtual.LogMessagesTable.ORDER_IN_MILLISECOND_COLUMN_NAME; +import static org.apache.cassandra.db.virtual.LogMessagesTable.TIMESTAMP_COLUMN_NAME; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class VirtualTableLogsTest extends TestBaseImpl +{ + @Test + public void testVTableOutput() throws Throwable + { + System.setProperty("logback.configurationFile", "test/conf/logback-dtest_with_vtable_appender.xml"); + + try (Cluster cluster = Cluster.build(1) + .withConfig(c -> c.with(Feature.values())) + .start()) + { + List<TestingLogMessage> rows = getRows(cluster); + assertFalse(rows.isEmpty()); + + rows.forEach(message -> assertTrue(Level.toLevel(message.level).isGreaterOrEqual(Level.INFO))); + } + finally + { + System.clearProperty("logback.configurationFile"); + } + } + + @Test + public void testMultipleAppendersFailToStartNode() throws Throwable + { + System.setProperty("logback.configurationFile", "test/conf/logback-dtest_with_vtable_appender_invalid.xml"); + + try (Cluster ignored = Cluster.build(1) + .withConfig(c -> c.with(Feature.values())) + .start()) + { + fail("Node should not start as there is supposed to be invalid logback configuration file."); + } + catch (IllegalStateException ex) + { + assertEquals(format("There are multiple appenders of class %s " + + "of names CQLLOG,CQLLOG2. There is only one appender of such class allowed.", + VirtualTableAppender.class.getName()), + ex.getMessage()); + } + finally + { + System.clearProperty("logback.configurationFile"); + } + } + + private List<TestingLogMessage> getRows(Cluster cluster) + { + SimpleQueryResult simpleQueryResult = cluster.coordinator(1).executeWithResult(query("select * from %s"), ONE); + List<TestingLogMessage> rows = new ArrayList<>(); + simpleQueryResult.forEachRemaining(row -> { + long timestamp = row.getTimestamp(TIMESTAMP_COLUMN_NAME).getTime(); + String logger = row.getString(LOGGER_COLUMN_NAME); + String level = row.getString(LEVEL_COLUMN_NAME); + String message = row.getString(MESSAGE_COLUMN_NAME); + int order = row.getInteger(ORDER_IN_MILLISECOND_COLUMN_NAME); + TestingLogMessage logMessage = new TestingLogMessage(timestamp, logger, level, message, order); + rows.add(logMessage); + }); + return rows; + } + + private String query(String template) + { + return format(template, getTableName()); + } + + private String getTableName() + { + return format("%s.%s", SchemaConstants.VIRTUAL_VIEWS, LogMessagesTable.TABLE_NAME); + } + + private static class TestingLogMessage extends LogMessage + { + private int order; + + public TestingLogMessage(long timestamp, String logger, String level, String message, int order) + { + super(timestamp, logger, level, message); + this.order = order; + } + } +} diff --git a/test/unit/org/apache/cassandra/db/virtual/LogMessagesTableTest.java b/test/unit/org/apache/cassandra/db/virtual/LogMessagesTableTest.java new file mode 100644 index 0000000000..2225c4a1dc --- /dev/null +++ b/test/unit/org/apache/cassandra/db/virtual/LogMessagesTableTest.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.virtual; + +import java.time.Instant; +import java.util.Date; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import com.google.common.collect.ImmutableList; +import org.junit.BeforeClass; +import org.junit.Test; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.spi.LoggingEvent; +import com.datastax.driver.core.Row; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.db.DataRange; +import org.apache.cassandra.db.marshal.TimestampType; +import org.apache.cassandra.db.virtual.AbstractVirtualTable.DataSet; +import org.apache.cassandra.db.virtual.AbstractVirtualTable.Partition; +import org.apache.cassandra.dht.LocalPartitioner; + +import static org.apache.cassandra.config.CassandraRelevantProperties.LOGS_VIRTUAL_TABLE_MAX_ROWS; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class LogMessagesTableTest extends CQLTester +{ + private String keyspace = createKeyspaceName(); + private LogMessagesTable table; + + @BeforeClass + public static void setup() + { + CQLTester.setUpClass(); + } + + @Test + public void testTruncate() throws Throwable + { + registerVirtualTable(); + + int numberOfRows = 100; + List<LoggingEvent> loggingEvents = getLoggingEvents(numberOfRows); + loggingEvents.forEach(table::add); + + execute(query("truncate %s")); + + assertTrue(executeNet(query("select timestamp from %s")).all().isEmpty()); + } + + @Test + public void empty() throws Throwable + { + registerVirtualTable(); + assertEmpty(execute(query("select * from %s"))); + } + + @Test + public void testInsert() + { + registerVirtualTable(); + + int numberOfRows = 1000; + List<LoggingEvent> loggingEvents = getLoggingEvents(numberOfRows); + loggingEvents.forEach(table::add); + + assertEquals(numberOfRows, numberOfPartitions()); + } + + @Test + public void testLimitedCapacity() throws Throwable + { + registerVirtualTable(100); + + int numberOfRows = 1000; + List<LoggingEvent> loggingEvents = getLoggingEvents(numberOfRows); + loggingEvents.forEach(table::add); + + // even we inserted 1000 rows, only 100 are present as its capacity is bounded + assertEquals(100, numberOfPartitions()); + + // the first record in the table will be the last one which we inserted + LoggingEvent firstEvent = loggingEvents.get(999); + assertRowsNet(executeNet(query("select timestamp from %s limit 1")), + new Object[] { new Date(firstEvent.getTimeStamp()) }); + + // the last record in the table will be 900th we inserted + List<Row> all = executeNet(query("select timestamp from %s")).all(); + assertEquals(100, all.size()); + Row row = all.get(all.size() - 1); + Date timestamp = row.getTimestamp(0); + assertEquals(loggingEvents.get(900).getTimeStamp(), timestamp.getTime()); + } + + @Test + public void testMultipleLogsInSameMillisecond() + { + registerVirtualTable(10); + List<LoggingEvent> loggingEvents = getLoggingEvents(10, Instant.now(), 5); + loggingEvents.forEach(table::add); + + // 2 partitions, 5 rows in each + assertEquals(2, numberOfPartitions()); + } + + @Test + public void testResolvingBufferSize() + { + System.setProperty(LOGS_VIRTUAL_TABLE_MAX_ROWS.getKey(), "-1"); + assertEquals(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, LogMessagesTable.resolveBufferSize()); + + System.setProperty(LOGS_VIRTUAL_TABLE_MAX_ROWS.getKey(), "0"); + assertEquals(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, LogMessagesTable.resolveBufferSize()); + + System.setProperty(LOGS_VIRTUAL_TABLE_MAX_ROWS.getKey(), "1000001"); + assertEquals(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, LogMessagesTable.resolveBufferSize()); + + System.setProperty(LOGS_VIRTUAL_TABLE_MAX_ROWS.getKey(), "999"); + assertEquals(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, LogMessagesTable.resolveBufferSize()); + + System.setProperty(LOGS_VIRTUAL_TABLE_MAX_ROWS.getKey(), "50001"); + assertEquals(50001, LogMessagesTable.resolveBufferSize()); + } + + private void registerVirtualTable() + { + registerVirtualTable(LogMessagesTable.LOGS_VIRTUAL_TABLE_MIN_ROWS); + } + + private void registerVirtualTable(int size) + { + table = new LogMessagesTable(keyspace, size); + VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(keyspace, ImmutableList.of(table))); + } + + private int numberOfPartitions() + { + DataSet data = table.data(); + + Iterator<Partition> partitions = data.getPartitions(DataRange.allData(new LocalPartitioner(TimestampType.instance))); + + int numberOfPartitions = 0; + + while (partitions.hasNext()) + { + partitions.next(); + numberOfPartitions += 1; + } + + return numberOfPartitions; + } + + private String query(String query) + { + return String.format(query, table.toString()); + } + + private List<LoggingEvent> getLoggingEvents(int size) + { + return getLoggingEvents(size, Instant.now(), 1); + } + + private List<LoggingEvent> getLoggingEvents(int size, Instant firstTimestamp, int logsInMillisecond) + { + List<LoggingEvent> logs = new LinkedList<>(); + int partitions = size / logsInMillisecond; + + for (int i = 0; i < partitions; i++) + { + long timestamp = firstTimestamp.toEpochMilli(); + firstTimestamp = firstTimestamp.plusSeconds(1); + + for (int j = 0; j < logsInMillisecond; j++) + logs.add(getLoggingEvent(timestamp)); + } + + return logs; + } + + private LoggingEvent getLoggingEvent(long timestamp) + { + LoggingEvent event = new LoggingEvent(); + event.setLevel(Level.INFO); + event.setMessage("message " + timestamp); + event.setLoggerName("logger " + timestamp); + event.setThreadName(Thread.currentThread().getName()); + event.setTimeStamp(timestamp); + + return event; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org