Updated Branches: refs/heads/cassandra-1.2 7dc2eb95c -> fbe8a6eb2 refs/heads/trunk b73f9d423 -> 140b0311d
Fix tracing when operation completesbefore all responses arrive patch by jbellis; reviewed by slebresne for CASSANDRA-5668 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fbe8a6eb Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fbe8a6eb Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fbe8a6eb Branch: refs/heads/cassandra-1.2 Commit: fbe8a6eb213ee3558be701c73c31a0da79446657 Parents: 7dc2eb9 Author: Jonathan Ellis <jbel...@apache.org> Authored: Thu Jun 20 10:19:13 2013 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Fri Jun 21 09:23:39 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../DebuggableThreadPoolExecutor.java | 22 ++++++++--- .../cassandra/concurrent/StageManager.java | 29 +++++++++------ .../concurrent/TracingAwareExecutorService.java | 33 +++++++++++++++++ .../apache/cassandra/net/MessagingService.java | 14 ++++--- .../cassandra/net/OutboundTcpConnection.java | 13 ++++++- .../cassandra/service/MigrationManager.java | 7 +--- .../cassandra/tracing/ExpiredTraceState.java | 39 ++++++++++++++++++++ .../apache/cassandra/tracing/TraceState.java | 12 ++++-- .../org/apache/cassandra/tracing/Tracing.java | 26 +++++++------ .../service/AntiEntropyServiceTestAbstract.java | 10 ++--- 11 files changed, 156 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3847d6a..593bf7c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.2.6 + * Fix tracing when operation completes before all responses arrive (CASSANDRA-5668) * Fix cross-DC mutation forwarding (CASSANDRA-5632) * Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272) * (Hadoop) Add CQL3 input/output formats (CASSANDRA-4421, 5622) http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java index 25f15ee..26441ec 100644 --- a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java +++ b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java @@ -44,7 +44,7 @@ import static org.apache.cassandra.tracing.Tracing.isTracing; * threads and the queue is full, we want the enqueuer to block. But to allow the number of threads to drop if a * stage is less busy, core thread timeout is enabled. */ -public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor +public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements TracingAwareExecutorService { protected static final Logger logger = LoggerFactory.getLogger(DebuggableThreadPoolExecutor.class); public static final RejectedExecutionHandler blockingExecutionHandler = new RejectedExecutionHandler() @@ -131,12 +131,17 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor protected void onFinalAccept(Runnable task) {} protected void onFinalRejection(Runnable task) {} + public void execute(Runnable command, TraceState state) + { + super.execute(state == null ? command : new TraceSessionWrapper<Object>(command, state)); + } + // execute does not call newTaskFor @Override public void execute(Runnable command) { super.execute(isTracing() && !(command instanceof TraceSessionWrapper) - ? new TraceSessionWrapper<Object>(command, null) + ? new TraceSessionWrapper<Object>(command) : command); } @@ -145,7 +150,7 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor { if (isTracing() && !(runnable instanceof TraceSessionWrapper)) { - return new TraceSessionWrapper<T>(runnable, result); + return new TraceSessionWrapper<T>(Executors.callable(runnable, result)); } return super.newTaskFor(runnable, result); } @@ -256,10 +261,9 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor { private final TraceState state; - public TraceSessionWrapper(Runnable runnable, T result) + public TraceSessionWrapper(Runnable command) { - super(runnable, result); - state = Tracing.instance().get(); + this(command, null); } public TraceSessionWrapper(Callable<T> callable) @@ -268,6 +272,12 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor state = Tracing.instance().get(); } + public TraceSessionWrapper(Runnable command, TraceState state) + { + super(command, null); + this.state = state; + } + private void setupContext() { Tracing.instance().set(state); http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/concurrent/StageManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java index 287b19e..2960f22 100644 --- a/src/java/org/apache/cassandra/concurrent/StageManager.java +++ b/src/java/org/apache/cassandra/concurrent/StageManager.java @@ -24,6 +24,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.tracing.TraceState; import org.apache.cassandra.utils.FBUtilities; import static org.apache.cassandra.config.DatabaseDescriptor.*; @@ -38,7 +39,7 @@ public class StageManager { private static final Logger logger = LoggerFactory.getLogger(StageManager.class); - private static final EnumMap<Stage, ThreadPoolExecutor> stages = new EnumMap<Stage, ThreadPoolExecutor>(Stage.class); + private static final EnumMap<Stage, TracingAwareExecutorService> stages = new EnumMap<Stage, TracingAwareExecutorService>(Stage.class); public static final long KEEPALIVE = 60; // seconds to keep "extra" threads alive for when idle @@ -60,7 +61,7 @@ public class StageManager stages.put(Stage.TRACING, tracingExecutor()); } - private static ThreadPoolExecutor tracingExecutor() + private static ExecuteOnlyExecutor tracingExecutor() { RejectedExecutionHandler reh = new RejectedExecutionHandler() { @@ -78,7 +79,7 @@ public class StageManager reh); } - private static ThreadPoolExecutor multiThreadedStage(Stage stage, int numThreads) + private static JMXEnabledThreadPoolExecutor multiThreadedStage(Stage stage, int numThreads) { return new JMXEnabledThreadPoolExecutor(numThreads, KEEPALIVE, @@ -88,7 +89,7 @@ public class StageManager stage.getJmxType()); } - private static ThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads) + private static JMXConfigurableThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads) { return new JMXConfigurableThreadPoolExecutor(numThreads, KEEPALIVE, @@ -98,7 +99,7 @@ public class StageManager stage.getJmxType()); } - private static ThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads, int maxTasksBeforeBlock) + private static JMXConfigurableThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads, int maxTasksBeforeBlock) { return new JMXConfigurableThreadPoolExecutor(numThreads, KEEPALIVE, @@ -111,8 +112,8 @@ public class StageManager /** * Retrieve a stage from the StageManager * @param stage name of the stage to be retrieved. - */ - public static ThreadPoolExecutor getStage(Stage stage) + */ + public static TracingAwareExecutorService getStage(Stage stage) { return stages.get(stage); } @@ -132,29 +133,35 @@ public class StageManager * A TPE that disallows submit so that we don't need to worry about unwrapping exceptions on the * tracing stage. See CASSANDRA-1123 for background. */ - private static class ExecuteOnlyExecutor extends ThreadPoolExecutor + private static class ExecuteOnlyExecutor extends ThreadPoolExecutor implements TracingAwareExecutorService { public ExecuteOnlyExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } + public void execute(Runnable command, TraceState state) + { + assert state == null; + super.execute(command); + } + @Override public Future<?> submit(Runnable task) { - return super.submit(task); + throw new UnsupportedOperationException(); } @Override public <T> Future<T> submit(Runnable task, T result) { - return super.submit(task, result); + throw new UnsupportedOperationException(); } @Override public <T> Future<T> submit(Callable<T> task) { - return super.submit(task); + throw new UnsupportedOperationException(); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java b/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java new file mode 100644 index 0000000..e5dcd7e --- /dev/null +++ b/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.cassandra.concurrent; + +import java.util.concurrent.ExecutorService; + +import org.apache.cassandra.tracing.TraceState; + +public interface TracingAwareExecutorService extends ExecutorService +{ + // we need a way to inject a TraceState directly into the Executor context without going through + // the global Tracing sessions; see CASSANDRA-5668 + public void execute(Runnable command, TraceState state); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 5e36f44..2964d35 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -29,7 +29,6 @@ import java.nio.channels.ClosedChannelException; import java.nio.channels.ServerSocketChannel; import java.util.*; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import javax.management.MBeanServer; @@ -37,12 +36,14 @@ import javax.management.ObjectName; import com.google.common.base.Function; import com.google.common.collect.Lists; +import org.cliffc.high_scale_lib.NonBlockingHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.concurrent.TracingAwareExecutorService; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions; import org.apache.cassandra.db.*; @@ -61,9 +62,9 @@ import org.apache.cassandra.security.SSLFactory; import org.apache.cassandra.service.*; import org.apache.cassandra.streaming.*; import org.apache.cassandra.streaming.compress.CompressedFileStreamTask; +import org.apache.cassandra.tracing.TraceState; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.*; -import org.cliffc.high_scale_lib.NonBlockingHashMap; public final class MessagingService implements MessagingServiceMBean { @@ -702,15 +703,16 @@ public final class MessagingService implements MessagingServiceMBean public void receive(MessageIn message, String id, long timestamp) { - Tracing.instance().initializeFromMessage(message); - Tracing.trace("Message received from {}", message.from); + TraceState state = Tracing.instance().initializeFromMessage(message); + if (state != null) + state.trace("Message received from {}", message.from); message = SinkManager.processInboundMessage(message, id); if (message == null) return; Runnable runnable = new MessageDeliveryTask(message, id, timestamp); - ExecutorService stage = StageManager.getStage(message.getMessageType()); + TracingAwareExecutorService stage = StageManager.getStage(message.getMessageType()); assert stage != null : "No stage for message type " + message.verb; if (message.verb == Verb.REQUEST_RESPONSE && PBSPredictor.instance().isLoggingEnabled()) @@ -727,7 +729,7 @@ public final class MessagingService implements MessagingServiceMBean } } - stage.execute(runnable); + stage.execute(runnable, state); } public void setCallbackForTests(String messageId, CallbackInfo callback) http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java index 32ce224..ee30d36 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@ -179,8 +179,17 @@ public class OutboundTcpConnection extends Thread { UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes)); TraceState state = Tracing.instance().get(sessionId); - state.trace("Sending message to {}", poolReference.endPoint()); - Tracing.instance().stopIfNonLocal(state); + String message = String.format("Sending message to %s", poolReference.endPoint()); + // session may have already finished; see CASSANDRA-5668 + if (state == null) + { + TraceState.trace(ByteBuffer.wrap(sessionBytes), message, -1); + } + else + { + state.trace(message); + Tracing.instance().stopIfNonLocal(state); + } } write(qm.message, qm.id, qm.timestamp, out, targetVersion); http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/service/MigrationManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java index 127b2b8..de34785 100644 --- a/src/java/org/apache/cassandra/service/MigrationManager.java +++ b/src/java/org/apache/cassandra/service/MigrationManager.java @@ -23,10 +23,7 @@ import java.io.IOException; import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.*; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; @@ -160,7 +157,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber public static boolean isReadyForBootstrap() { - return StageManager.getStage(Stage.MIGRATION).getActiveCount() == 0; + return ((ThreadPoolExecutor) StageManager.getStage(Stage.MIGRATION)).getActiveCount() == 0; } public void notifyCreateKeyspace(KSMetaData ksm) http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java new file mode 100644 index 0000000..6b4f90b --- /dev/null +++ b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.cassandra.tracing; + +import java.util.UUID; + +import org.apache.cassandra.utils.FBUtilities; + +public class ExpiredTraceState extends TraceState +{ + public ExpiredTraceState(UUID sessionId) + { + super(FBUtilities.getBroadcastAddress(), sessionId, true); + } + + public int elapsed() + { + return -1; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/tracing/TraceState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java b/src/java/org/apache/cassandra/tracing/TraceState.java index 703c4ee..25599c4 100644 --- a/src/java/org/apache/cassandra/tracing/TraceState.java +++ b/src/java/org/apache/cassandra/tracing/TraceState.java @@ -81,11 +81,14 @@ public class TraceState trace(MessageFormatter.arrayFormat(format, args).getMessage()); } - public void trace(final String message) + public void trace(String message) { - final int elapsed = elapsed(); - final ByteBuffer eventId = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes()); + TraceState.trace(sessionIdBytes, message, elapsed()); + } + public static void trace(final ByteBuffer sessionIdBytes, final String message, final int elapsed) + { + final ByteBuffer eventId = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes()); final String threadName = Thread.currentThread().getName(); StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable() @@ -96,7 +99,8 @@ public class TraceState ColumnFamily cf = ColumnFamily.create(cfMeta); Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source")), FBUtilities.getBroadcastAddress()); Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("thread")), threadName); - Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source_elapsed")), elapsed); + if (elapsed >= 0) + Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source_elapsed")), elapsed); Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("activity")), message); RowMutation mutation = new RowMutation(Tracing.TRACE_KS, sessionIdBytes); mutation.add(cf); http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/tracing/Tracing.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java index 17241b9..eb5bad9 100644 --- a/src/java/org/apache/cassandra/tracing/Tracing.java +++ b/src/java/org/apache/cassandra/tracing/Tracing.java @@ -39,6 +39,7 @@ import org.apache.cassandra.db.ExpiringColumn; import org.apache.cassandra.db.RowMutation; import org.apache.cassandra.db.marshal.TimeUUIDType; import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@ -229,31 +230,34 @@ public class Tracing } /** - * Updates the threads query context from a message + * Determines the tracing context from a message. Does NOT set the threadlocal state. * - * @param message - * The internode message + * @param message The internode message */ - public void initializeFromMessage(final MessageIn<?> message) + public TraceState initializeFromMessage(final MessageIn<?> message) { final byte[] sessionBytes = message.parameters.get(Tracing.TRACE_HEADER); - // if the message has no session context header don't do tracing if (sessionBytes == null) - { - state.set(null); - return; - } + return null; assert sessionBytes.length == 16; UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes)); TraceState ts = sessions.get(sessionId); - if (ts == null) + if (ts != null) + return ts; + + if (message.verb == MessagingService.Verb.REQUEST_RESPONSE) + { + // received a message for a session we've already closed out. see CASSANDRA-5668 + return new ExpiredTraceState(sessionId); + } + else { ts = new TraceState(message.from, sessionId, false); sessions.put(sessionId, ts); + return ts; } - state.set(ts); } public static void trace(String message) http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java index fc0b832..b80c272 100644 --- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java +++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java @@ -20,10 +20,11 @@ package org.apache.cassandra.service; import java.net.InetAddress; import java.util.*; -import java.util.concurrent.*; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import com.google.common.collect.Sets; - import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -42,13 +43,12 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.TokenMetadata; -import static org.apache.cassandra.service.AntiEntropyService.*; - import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MerkleTree; +import static org.apache.cassandra.service.AntiEntropyService.*; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -305,7 +305,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader void flushAES() throws Exception { - final ThreadPoolExecutor stage = StageManager.getStage(Stage.ANTI_ENTROPY); + final ExecutorService stage = StageManager.getStage(Stage.ANTI_ENTROPY); final Callable noop = new Callable<Object>() { public Boolean call()