Merge branch 'cassandra-2.1' into trunk Conflicts: src/java/org/apache/cassandra/config/CFMetaData.java src/java/org/apache/cassandra/tracing/TraceState.java src/java/org/apache/cassandra/tracing/Tracing.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/eb1c2831 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/eb1c2831 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/eb1c2831 Branch: refs/heads/trunk Commit: eb1c2831cdcd1b96710bdab4e4dad3e9ea48b5ab Parents: cf94e3c 26ea0f6 Author: Aleksey Yeschenko <alek...@apache.org> Authored: Thu Nov 13 01:11:09 2014 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Thu Nov 13 01:11:09 2014 +0300 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/cassandra/tracing/TraceKeyspace.java | 57 ++++++++++++++++-- .../apache/cassandra/tracing/TraceState.java | 18 +----- .../org/apache/cassandra/tracing/Tracing.java | 63 ++------------------ 4 files changed, 62 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb1c2831/CHANGES.txt ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb1c2831/src/java/org/apache/cassandra/tracing/TraceKeyspace.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/tracing/TraceKeyspace.java index a20fadd,0000000..4d234bd mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java +++ b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java @@@ -1,73 -1,0 +1,122 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.tracing; + ++import java.nio.ByteBuffer; +import java.util.Arrays; ++import java.util.Date; +import java.util.List; ++import java.util.Map; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.ImmutableMap; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.KSMetaData; ++import org.apache.cassandra.db.CFRowAdder; ++import org.apache.cassandra.db.ColumnFamily; ++import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.locator.SimpleStrategy; ++import org.apache.cassandra.utils.FBUtilities; ++import org.apache.cassandra.utils.UUIDGen; + +public final class TraceKeyspace +{ + public static final String NAME = "system_traces"; + - static final String SESSIONS_TABLE = "sessions"; - static final String EVENTS_TABLE = "events"; ++ private static final String SESSIONS_TABLE = "sessions"; ++ private static final String EVENTS_TABLE = "events"; + + private static final int DAY = (int) TimeUnit.DAYS.toSeconds(1); + - static final CFMetaData SessionsTable = ++ private static final CFMetaData SessionsTable = + compile(SESSIONS_TABLE, "tracing sessions", + "CREATE TABLE %s (" + + "session_id uuid," + + "coordinator inet," + + "duration int," + + "parameters map<text, text>," + + "request text," + + "started_at timestamp," + + "PRIMARY KEY ((session_id)))") + .defaultTimeToLive(DAY); + - static final CFMetaData EventsTable = ++ private static final CFMetaData EventsTable = + compile(EVENTS_TABLE, "tracing events", + "CREATE TABLE %s (" + + "session_id uuid," + + "event_id timeuuid," + + "activity text," + + "source inet," + + "source_elapsed int," + + "thread text," + + "PRIMARY KEY ((session_id), event_id))") + .defaultTimeToLive(DAY); + + private static CFMetaData compile(String table, String comment, String cql) + { + return CFMetaData.compile(String.format(cql, table), NAME).comment(comment); + } + + public static KSMetaData definition() + { + List<CFMetaData> tables = Arrays.asList(SessionsTable, EventsTable); + return new KSMetaData(NAME, SimpleStrategy.class, ImmutableMap.of("replication_factor", "2"), true, tables); + } ++ ++ static Mutation toStopSessionMutation(ByteBuffer sessionId, int elapsed) ++ { ++ Mutation mutation = new Mutation(NAME, sessionId); ++ ColumnFamily cells = mutation.addOrGet(SessionsTable); ++ ++ CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.builder().build(), FBUtilities.timestampMicros()); ++ adder.add("duration", elapsed); ++ ++ return mutation; ++ } ++ ++ static Mutation toStartSessionMutation(ByteBuffer sessionId, Map<String, String> parameters, String request, long startedAt) ++ { ++ Mutation mutation = new Mutation(NAME, sessionId); ++ ColumnFamily cells = mutation.addOrGet(TraceKeyspace.SessionsTable); ++ ++ CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.builder().build(), FBUtilities.timestampMicros()); ++ adder.add("coordinator", FBUtilities.getBroadcastAddress()); ++ for (Map.Entry<String, String> entry : parameters.entrySet()) ++ adder.addMapEntry("parameters", entry.getKey(), entry.getValue()); ++ adder.add("request", request); ++ adder.add("started_at", new Date(startedAt)); ++ ++ return mutation; ++ } ++ ++ static Mutation toEventMutation(ByteBuffer sessionId, String message, int elapsed, String threadName) ++ { ++ Mutation mutation = new Mutation(NAME, sessionId); ++ ColumnFamily cells = mutation.addOrGet(EventsTable); ++ ++ CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.make(UUIDGen.getTimeUUID()), FBUtilities.timestampMicros()); ++ adder.add("activity", message); ++ adder.add("source", FBUtilities.getBroadcastAddress()); ++ if (elapsed >= 0) ++ adder.add("source_elapsed", elapsed); ++ adder.add("thread", threadName); ++ ++ return mutation; ++ } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb1c2831/src/java/org/apache/cassandra/tracing/TraceState.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/tracing/TraceState.java index 2d89d39,399b6e9..04abce3 --- a/src/java/org/apache/cassandra/tracing/TraceState.java +++ b/src/java/org/apache/cassandra/tracing/TraceState.java @@@ -28,13 -28,13 +28,7 @@@ import org.slf4j.helpers.MessageFormatt import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; --import org.apache.cassandra.config.CFMetaData; - import org.apache.cassandra.db.ArrayBackedSortedColumns; - import org.apache.cassandra.db.ColumnFamily; - import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.db.CFRowAdder; -import org.apache.cassandra.db.ColumnFamily; -import org.apache.cassandra.db.Mutation; import org.apache.cassandra.utils.ByteBufferUtil; --import org.apache.cassandra.utils.FBUtilities; --import org.apache.cassandra.utils.UUIDGen; import org.apache.cassandra.utils.WrappedRunnable; /** @@@ -89,23 -89,25 +83,15 @@@ public class TraceStat TraceState.trace(sessionIdBytes, message, elapsed()); } -- public static void trace(final ByteBuffer sessionIdBytes, final String message, final int elapsed) ++ public static void trace(final ByteBuffer sessionId, final String message, final int elapsed) { - final ByteBuffer eventId = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes()); final String threadName = Thread.currentThread().getName(); StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable() { public void runMayThrow() { - CFMetaData cfMeta = TraceKeyspace.EventsTable; - ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfMeta); - Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("activity")), message); - Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source")), FBUtilities.getBroadcastAddress()); - if (elapsed >= 0) - Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source_elapsed")), elapsed); - Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("thread")), threadName); - Tracing.mutateWithCatch(new Mutation(TraceKeyspace.NAME, sessionIdBytes, cf)); - Mutation mutation = new Mutation(Tracing.TRACE_KS, sessionIdBytes); - ColumnFamily cells = mutation.addOrGet(CFMetaData.TraceEventsCf); - - CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.make(UUIDGen.getTimeUUID()), FBUtilities.timestampMicros()); - adder.add("activity", message); - adder.add("source", FBUtilities.getBroadcastAddress()); - if (elapsed >= 0) - adder.add("source_elapsed", elapsed); - adder.add("thread", threadName); - - Tracing.mutateWithCatch(mutation); ++ Tracing.mutateWithCatch(TraceKeyspace.toEventMutation(sessionId, message, elapsed, threadName)); } }); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb1c2831/src/java/org/apache/cassandra/tracing/Tracing.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/tracing/Tracing.java index 509239a,d74859a..773ccd4 --- a/src/java/org/apache/cassandra/tracing/Tracing.java +++ b/src/java/org/apache/cassandra/tracing/Tracing.java @@@ -32,9 -33,8 +32,7 @@@ import org.slf4j.LoggerFactory import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; --import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; - import org.apache.cassandra.db.composites.CellName; import org.apache.cassandra.db.marshal.TimeUUIDType; import org.apache.cassandra.exceptions.OverloadedException; import org.apache.cassandra.exceptions.UnavailableException; @@@ -153,16 -115,19 +110,13 @@@ public class Tracin else { final int elapsed = state.elapsed(); -- final ByteBuffer sessionIdBytes = state.sessionIdBytes; ++ final ByteBuffer sessionId = state.sessionIdBytes; StageManager.getStage(Stage.TRACING).execute(new Runnable() { public void run() { - CFMetaData cfMeta = TraceKeyspace.SessionsTable; - ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfMeta); - addColumn(cf, buildName(cfMeta, "duration"), elapsed); - mutateWithCatch(new Mutation(TraceKeyspace.NAME, sessionIdBytes, cf)); - Mutation mutation = new Mutation(TRACE_KS, sessionIdBytes); - ColumnFamily cells = mutation.addOrGet(CFMetaData.TraceSessionsCf); - - CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.builder().build(), FBUtilities.timestampMicros()); - adder.add("duration", elapsed); - - mutateWithCatch(mutation); ++ mutateWithCatch(TraceKeyspace.toStopSessionMutation(sessionId, elapsed)); } }); @@@ -190,21 -155,24 +144,14 @@@ { assert isTracing(); -- final long started_at = System.currentTimeMillis(); -- final ByteBuffer sessionIdBytes = state.get().sessionIdBytes; ++ final long startedAt = System.currentTimeMillis(); ++ final ByteBuffer sessionId = state.get().sessionIdBytes; StageManager.getStage(Stage.TRACING).execute(new Runnable() { public void run() { - CFMetaData cfMeta = TraceKeyspace.SessionsTable; - ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfMeta); - addColumn(cf, buildName(cfMeta, "coordinator"), FBUtilities.getBroadcastAddress()); - addParameterColumns(cf, parameters); - addColumn(cf, buildName(cfMeta, bytes("request")), request); - addColumn(cf, buildName(cfMeta, bytes("started_at")), started_at); - addParameterColumns(cf, parameters); - mutateWithCatch(new Mutation(TraceKeyspace.NAME, sessionIdBytes, cf)); - Mutation mutation = new Mutation(TRACE_KS, sessionIdBytes); - ColumnFamily cells = mutation.addOrGet(CFMetaData.TraceSessionsCf); - - CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.builder().build(), FBUtilities.timestampMicros()); - adder.add("coordinator", FBUtilities.getBroadcastAddress()); - for (Map.Entry<String, String> entry : parameters.entrySet()) - adder.addMapEntry("parameters", entry.getKey(), entry.getValue()); - adder.add("request", request); - adder.add("started_at", new Date(started_at)); - - mutateWithCatch(mutation); ++ mutateWithCatch(TraceKeyspace.toStartSessionMutation(sessionId, parameters, request, startedAt)); } }); }