This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 4ee3438 RATIS-167. Implement a high performance OutputStream using
async API.
4ee3438 is described below
commit 4ee34383eb81f565b6cf06072170f2c130c51e8e
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Jun 3 14:50:25 2019 +0800
RATIS-167. Implement a high performance OutputStream using async API.
---
.../apache/ratis/client/impl/RaftOutputStream.java | 136 +++++++++++++++++
.../apache/ratis/grpc/client/GrpcOutputStream.java | 4 +-
.../org/apache/ratis/OutputStreamBaseTest.java | 166 +++++++++++----------
.../apache/ratis/grpc/TestGrpcOutputStream.java | 50 +++++++
.../ratis/grpc/TestRaftOutputStreamWithGrpc.java | 36 +++++
5 files changed, 308 insertions(+), 84 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftOutputStream.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftOutputStream.java
new file mode 100644
index 0000000..1cdbeac
--- /dev/null
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftOutputStream.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.client.impl;
+
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.SizeInBytes;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+
+/** An {@link OutputStream} implementation using {@link
RaftClient#sendAsync(Message)} API. */
+public class RaftOutputStream extends OutputStream {
+ private final Supplier<RaftClient> client;
+ private final AtomicBoolean closed = new AtomicBoolean();
+ private final Queue<CompletableFuture<Long>> flushFutures = new
LinkedList<>();
+
+ private final byte[] buffer;
+ private int byteCount;
+ private long byteFlushed;
+
+ public RaftOutputStream(Supplier<RaftClient> clientSupplier, SizeInBytes
bufferSize) {
+ this.client = JavaUtils.memoize(clientSupplier);
+ this.buffer = new byte[bufferSize.getSizeInt()];
+ }
+
+ private RaftClient getClient() {
+ return client.get();
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ checkClosed();
+ buffer[byteCount++] = (byte)b;
+ flushIfNecessary();
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ checkClosed();
+ if (off < 0 || len < 0 || off > b.length - len) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+
+ for(int total = 0; total < len; ) {
+ final int toWrite = Math.min(len - total, buffer.length - byteCount);
+ System.arraycopy(b, off + total, buffer, byteCount, toWrite);
+ byteCount += toWrite;
+ total += toWrite;
+ flushIfNecessary();
+ }
+ }
+
+ private void flushIfNecessary() {
+ if (byteCount == buffer.length) {
+ flushAsync();
+ }
+ }
+
+ /** Non-blocking flush call */
+ private void flushAsync() {
+ final long pos = byteFlushed;
+ if (byteCount == 0) {
+ return;
+ }
+
+ final CompletableFuture<Long> f = getClient().sendAsync(
+ Message.valueOf(ProtoUtils.toByteString(buffer, 0, byteCount))
+ ).thenApply(reply -> RaftClientImpl.handleRaftException(reply,
CompletionException::new)
+ ).thenApply(reply -> reply != null && reply.isSuccess()? pos: null);
+ flushFutures.offer(f);
+
+ byteFlushed += byteCount;
+ byteCount = 0;
+ }
+
+ /** Blocking flush call */
+ private void flushImpl() throws IOException {
+ final long pos = byteFlushed;
+ flushAsync();
+ for(; !flushFutures.isEmpty();) {
+ final Long flushed = flushFutures.poll().join();
+ if (flushed == null) {
+ throw new IOException("Failed to flush at position " + pos);
+ }
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ checkClosed();
+ flushImpl();
+ }
+
+ private void checkClosed() throws IOException {
+ if (closed.get()) {
+ throw new IOException(this + " was closed.");
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed.compareAndSet(false, true)) {
+ flushImpl();
+ getClient().close();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "-" + getClient().getId() +
":byteFlushed=" + byteFlushed;
+ }
+}
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcOutputStream.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcOutputStream.java
index e2eff08..7158381 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcOutputStream.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcOutputStream.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -102,7 +102,7 @@ public class GrpcOutputStream extends OutputStream {
@Override
public String toString() {
- return "GrpcOutputStream-" + clientId;
+ return getClass().getSimpleName() + "-" + clientId;
}
private void checkClosed() throws IOException {
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
b/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
similarity index 64%
rename from ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
rename to ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
index 408f4be..84d99d0 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
+++ b/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,62 +15,51 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.grpc;
-
-import org.apache.log4j.Level;
-import org.apache.ratis.BaseTest;
-import org.apache.ratis.RaftTestUtil;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.grpc.client.GrpcClientStreamer;
-import org.apache.ratis.grpc.client.GrpcOutputStream;
-import org.apache.ratis.protocol.ClientId;
+package org.apache.ratis;
+
+import org.apache.ratis.client.impl.RaftOutputStream;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
-import org.apache.ratis.proto.RaftProtos;
-import org.apache.ratis.util.LogUtils;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.StringUtils;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
-import java.util.*;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import static org.apache.ratis.RaftTestUtil.waitForLeader;
import static org.junit.Assert.fail;
-@Ignore
-public class TestRaftStream extends BaseTest {
- static {
- LogUtils.setLogLevel(GrpcClientStreamer.LOG, Level.ALL);
- }
-
- private static final RaftProperties prop = new RaftProperties();
+public abstract class OutputStreamBaseTest<CLUSTER extends MiniRaftCluster>
+ extends BaseTest
+ implements MiniRaftCluster.Factory.Get<CLUSTER> {
private static final int NUM_SERVERS = 3;
private static final byte[] BYTES = new byte[4];
- private MiniRaftClusterWithGrpc cluster;
-
- @After
- public void tearDown() {
- if (cluster != null) {
- cluster.shutdown();
- }
+ public OutputStream newOutputStream(CLUSTER cluster, int bufferSize) {
+ return new RaftOutputStream(cluster::createClient,
SizeInBytes.valueOf(bufferSize));
}
- private byte[] toBytes(int i) {
- return toBytes(i, BYTES);
- }
- private byte[] toBytes(int i, byte[] b) {
+ private static byte[] toBytes(int i) {
+ final byte[] b = BYTES;
b[0] = (byte) ((i >>> 24) & 0xFF);
b[1] = (byte) ((i >>> 16) & 0xFF);
b[2] = (byte) ((i >>> 8) & 0xFF);
@@ -80,25 +69,20 @@ public class TestRaftStream extends BaseTest {
@Test
public void testSimpleWrite() throws Exception {
- final int numRequests = 500;
- LOG.info("Running testSimpleWrite, numRequests=" + numRequests);
-
- // default 64K is too large for a test
- GrpcConfigKeys.OutputStream.setBufferSize(prop, SizeInBytes.valueOf(4));
- cluster = MiniRaftClusterWithGrpc.FACTORY.newCluster(NUM_SERVERS, prop);
-
- cluster.start();
- RaftServerImpl leader = waitForLeader(cluster);
+ runWithNewCluster(NUM_SERVERS, this::runTestSimpleWrite);
+ }
- try (GrpcOutputStream out = new GrpcOutputStream(prop, ClientId.randomId(),
- cluster.getGroup(), leader.getId(), null)) {
+ private void runTestSimpleWrite(CLUSTER cluster) throws Exception {
+ final int numRequests = 5000;
+ final int bufferSize = 4;
+ try (OutputStream out = newOutputStream(cluster, bufferSize)) {
for (int i = 0; i < numRequests; i++) { // generate requests
out.write(toBytes(i));
}
}
// check the leader's raft log
- final RaftLog raftLog = leader.getState().getLog();
+ final RaftLog raftLog = cluster.getLeader().getState().getLog();
final AtomicInteger i = new AtomicInteger();
checkLog(raftLog, numRequests, () -> toBytes(i.getAndIncrement()));
}
@@ -106,30 +90,36 @@ public class TestRaftStream extends BaseTest {
private void checkLog(RaftLog raftLog, long expectedCommittedIndex,
Supplier<byte[]> s) throws IOException {
long committedIndex = raftLog.getLastCommittedIndex();
- Assert.assertEquals(expectedCommittedIndex, committedIndex);
+ Assert.assertTrue(committedIndex >= expectedCommittedIndex);
// check the log content
- TermIndex[] entries = raftLog.getEntries(1, expectedCommittedIndex + 1);
+ TermIndex[] entries = raftLog.getEntries(0, Long.MAX_VALUE);
+ int count = 0;
for (TermIndex entry : entries) {
- RaftProtos.LogEntryProto log = raftLog.get(entry.getIndex());
+ LogEntryProto log = raftLog.get(entry.getIndex());
+ if (!log.hasStateMachineLogEntry()) {
+ continue;
+ }
byte[] logData =
log.getStateMachineLogEntry().getLogData().toByteArray();
byte[] expected = s.get();
- LOG.info("log " + entry + " " + log.getLogEntryBodyCase() + " " +
StringUtils.bytes2HexString(logData));
- Assert.assertEquals(expected.length, logData.length);
- Assert.assertArrayEquals(expected, logData);
+ final String message = "log " + entry + " " + log.getLogEntryBodyCase()
+ + " " + StringUtils.bytes2HexString(logData)
+ + ", expected=" + StringUtils.bytes2HexString(expected);
+ LOG.info(message);
+ Assert.assertArrayEquals(message, expected, logData);
+ count++;
}
+ Assert.assertEquals(expectedCommittedIndex, count);
}
@Test
public void testWriteAndFlush() throws Exception {
- LOG.info("Running testWriteAndFlush");
-
- GrpcConfigKeys.OutputStream.setBufferSize(prop,
SizeInBytes.valueOf(ByteValue.BUFFERSIZE));
- cluster = MiniRaftClusterWithGrpc.FACTORY.newCluster(NUM_SERVERS, prop);
- cluster.start();
+ runWithNewCluster(NUM_SERVERS, this::runTestWriteAndFlush);
+ }
+ private void runTestWriteAndFlush(CLUSTER cluster) throws Exception {
+ final int bufferSize = ByteValue.BUFFERSIZE;
RaftServerImpl leader = waitForLeader(cluster);
- GrpcOutputStream out = new GrpcOutputStream(prop, ClientId.randomId(),
- cluster.getGroup(), leader.getId(), null);
+ OutputStream out = newOutputStream(cluster, bufferSize);
int[] lengths = new int[]{1, 500, 1023, 1024, 1025, 2048, 3000, 3072};
ByteValue[] values = new ByteValue[lengths.length];
@@ -145,8 +135,7 @@ public class TestRaftStream extends BaseTest {
out.flush();
// make sure after the flush the data has been committed
- Assert.assertEquals(expectedTxs.size(),
- leader.getState().getLastAppliedIndex());
+ assertRaftLog(expectedTxs.size(), leader);
}
out.close();
@@ -162,6 +151,18 @@ public class TestRaftStream extends BaseTest {
() -> expectedTxs.get(index.getAndIncrement()));
}
+ private RaftLog assertRaftLog(int expectedEntries, RaftServerImpl server)
throws Exception {
+ final RaftLog raftLog = server.getState().getLog();
+ final EnumMap<LogEntryBodyCase, AtomicLong> counts =
RaftTestUtil.countEntries(raftLog);
+ Assert.assertEquals(expectedEntries,
counts.get(LogEntryBodyCase.STATEMACHINELOGENTRY).get());
+
+ final LogEntryProto last =
RaftTestUtil.getLastEntry(LogEntryBodyCase.STATEMACHINELOGENTRY, raftLog);
+ Assert.assertNotNull(last);
+ Assert.assertTrue(raftLog.getLastCommittedIndex() >= last.getIndex());
+ Assert.assertTrue(server.getState().getLastAppliedIndex() >=
last.getIndex());
+ return raftLog;
+ }
+
private static class ByteValue {
final static int BUFFERSIZE = 1024;
@@ -200,15 +201,14 @@ public class TestRaftStream extends BaseTest {
@Test
public void testWriteWithOffset() throws Exception {
- LOG.info("Running testWriteWithOffset");
- GrpcConfigKeys.OutputStream.setBufferSize(prop,
SizeInBytes.valueOf(ByteValue.BUFFERSIZE));
+ runWithNewCluster(NUM_SERVERS, this::runTestWriteWithOffset);
+ }
- cluster = MiniRaftClusterWithGrpc.FACTORY.newCluster(NUM_SERVERS, prop);
- cluster.start();
+ private void runTestWriteWithOffset(CLUSTER cluster) throws Exception {
+ final int bufferSize = ByteValue.BUFFERSIZE;
RaftServerImpl leader = waitForLeader(cluster);
- GrpcOutputStream out = new GrpcOutputStream(prop, ClientId.randomId(),
- cluster.getGroup(), leader.getId(), null);
+ final OutputStream out = newOutputStream(cluster, bufferSize);
byte[] b1 = new byte[ByteValue.BUFFERSIZE / 2];
Arrays.fill(b1, (byte) 1);
@@ -237,18 +237,21 @@ public class TestRaftStream extends BaseTest {
}
out.close();
- final RaftLog log = leader.getState().getLog();
// 0.5 + 1 + 2.5 + 4 = 8
- Assert.assertEquals(8, leader.getState().getLastAppliedIndex());
- Assert.assertEquals(8, log.getLastCommittedIndex());
- TermIndex[] entries = log.getEntries(1, 9);
- byte[] actual = new byte[ByteValue.BUFFERSIZE * 8];
+ final int expectedEntries = 8;
+ final RaftLog raftLog = assertRaftLog(expectedEntries, leader);
+
+ final TermIndex[] entries = raftLog.getEntries(1, Long.MAX_VALUE);
+ final byte[] actual = new byte[ByteValue.BUFFERSIZE * expectedEntries];
totalSize = 0;
- for (TermIndex e : entries) {
- byte[] eValue =
log.get(e.getIndex()).getStateMachineLogEntry().getLogData().toByteArray();
- Assert.assertEquals(ByteValue.BUFFERSIZE, eValue.length);
- System.arraycopy(eValue, 0, actual, totalSize, eValue.length);
- totalSize += eValue.length;
+ for (TermIndex ti : entries) {
+ final LogEntryProto e = raftLog.get(ti.getIndex());
+ if (e.hasStateMachineLogEntry()) {
+ final byte[] eValue =
e.getStateMachineLogEntry().getLogData().toByteArray();
+ Assert.assertEquals(ByteValue.BUFFERSIZE, eValue.length);
+ System.arraycopy(eValue, 0, actual, totalSize, eValue.length);
+ totalSize += eValue.length;
+ }
}
Assert.assertArrayEquals(expected, actual);
}
@@ -258,11 +261,11 @@ public class TestRaftStream extends BaseTest {
*/
@Test
public void testKillLeader() throws Exception {
- LOG.info("Running testChangeLeader");
+ runWithNewCluster(NUM_SERVERS, this::runTestKillLeader);
+ }
- GrpcConfigKeys.OutputStream.setBufferSize(prop, SizeInBytes.valueOf(4));
- cluster = MiniRaftClusterWithGrpc.FACTORY.newCluster(NUM_SERVERS, prop);
- cluster.start();
+ private void runTestKillLeader(CLUSTER cluster) throws Exception {
+ final int bufferSize = 4;
final RaftServerImpl leader = waitForLeader(cluster);
final AtomicBoolean running = new AtomicBoolean(true);
@@ -273,8 +276,7 @@ public class TestRaftStream extends BaseTest {
new Thread(() -> {
LOG.info("Writer thread starts");
int count = 0;
- try (GrpcOutputStream out = new GrpcOutputStream(prop,
ClientId.randomId(),
- cluster.getGroup(), leader.getId(), null)) {
+ try (OutputStream out = newOutputStream(cluster, bufferSize)) {
while (running.get()) {
out.write(toBytes(count++));
Thread.sleep(10);
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcOutputStream.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcOutputStream.java
new file mode 100644
index 0000000..2258a1e
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcOutputStream.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.OutputStreamBaseTest;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.client.GrpcClientStreamer;
+import org.apache.ratis.grpc.client.GrpcOutputStream;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.util.LogUtils;
+import org.apache.ratis.util.SizeInBytes;
+import org.junit.Ignore;
+
+import java.io.OutputStream;
+
+/**
+ * Test {@link GrpcOutputStream}
+ * TODO: {@link GrpcOutputStream} current has some bugs.
+ */
+@Ignore
+public class TestGrpcOutputStream
+ extends OutputStreamBaseTest<MiniRaftClusterWithGrpc>
+ implements MiniRaftClusterWithGrpc.FactoryGet {
+ static {
+ LogUtils.setLogLevel(GrpcClientStreamer.LOG, Level.ALL);
+ }
+
+ @Override
+ public OutputStream newOutputStream(MiniRaftClusterWithGrpc cluster, int
bufferSize) {
+ final RaftProperties p = getProperties();
+ GrpcConfigKeys.OutputStream.setBufferSize(p,
SizeInBytes.valueOf(bufferSize));
+ return new GrpcOutputStream(p, ClientId.randomId(), cluster.getGroup(),
cluster.getLeader().getId(), null);
+ }
+}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftOutputStreamWithGrpc.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftOutputStreamWithGrpc.java
new file mode 100644
index 0000000..1c4202b
--- /dev/null
+++
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftOutputStreamWithGrpc.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.OutputStreamBaseTest;
+import org.apache.ratis.grpc.client.GrpcClientProtocolService;
+import org.apache.ratis.util.LogUtils;
+
+public class TestRaftOutputStreamWithGrpc
+ extends OutputStreamBaseTest<MiniRaftClusterWithGrpc>
+ implements MiniRaftClusterWithGrpc.FactoryGet {
+ {
+ LogUtils.setLogLevel(GrpcClientProtocolService.LOG, Level.TRACE);
+ }
+
+ @Override
+ public int getGlobalTimeoutSeconds() {
+ return 30;
+ }
+}