This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch 6.0
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/6.0 by this push:
     new 5e3afe5  Buffer library performance test and functional test 
successfully. (#1612)
5e3afe5 is described below

commit 5e3afe5ec28bdcfdbd2248cd78fc0b5fc4f8d36b
Author: 彭勇升 pengys <8082...@qq.com>
AuthorDate: Fri Aug 31 06:32:51 2018 +0800

    Buffer library performance test and functional test successfully. (#1612)
    
    * Buffer stream.
    
    * Buffer file reader.
    
    * Buffer library performance test and functional test successfully.
    
    * Fixed the code merge mistake.
---
 .../oap/server/library/buffer/BufferFileUtils.java | 15 +++----
 .../oap/server/library/buffer/DataStream.java      |  2 +-
 .../server/library/buffer/DataStreamReader.java    | 49 ++++++++++++++--------
 .../server/library/buffer/DataStreamWriter.java    | 32 +++++++-------
 .../oap/server/library/buffer/Offset.java          | 11 ++++-
 .../oap/server/library/buffer/OffsetStream.java    |  4 +-
 .../library/buffer/BufferStreamTestCase.java       | 25 ++++++-----
 skywalking-ui                                      |  2 +-
 8 files changed, 83 insertions(+), 57 deletions(-)

diff --git 
a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtils.java
 
b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtils.java
index 9857005..d39709b 100644
--- 
a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtils.java
+++ 
b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtils.java
@@ -18,8 +18,7 @@
 
 package org.apache.skywalking.oap.server.library.buffer;
 
-import java.text.*;
-import java.util.*;
+import java.util.Arrays;
 
 /**
  * @author peng-yongsheng
@@ -34,19 +33,17 @@ class BufferFileUtils {
     static final String OFFSET_FILE_PREFIX = "offset";
     private static final String SEPARATOR = "-";
     private static final String SUFFIX = ".sw";
-    private static final String DATA_FORMAT_STR = "yyyyMMddHHmmss";
 
     static void sort(String[] fileList) {
         Arrays.sort(fileList, (f1, f2) -> {
-            int fileId1 = Integer.parseInt(f1.split("_")[1]);
-            int fileId2 = Integer.parseInt(f2.split("_")[1]);
+            long t1 = Long.parseLong(f1.substring(0, f1.length() - 
3).split(SEPARATOR)[1]);
+            long t2 = Long.parseLong(f2.substring(0, f2.length() - 
3).split(SEPARATOR)[1]);
 
-            return fileId1 - fileId2;
+            return (int)(t1 - t2);
         });
     }
 
     static String buildFileName(String prefix) {
-        DateFormat dateFormat = new SimpleDateFormat(DATA_FORMAT_STR);
-        return prefix + SEPARATOR + dateFormat.format(new Date()) + SUFFIX;
+        return prefix + SEPARATOR + System.currentTimeMillis() + SUFFIX;
     }
-}
+}
\ No newline at end of file
diff --git 
a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStream.java
 
b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStream.java
index 1bb380d..54394d7 100644
--- 
a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStream.java
+++ 
b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStream.java
@@ -38,7 +38,7 @@ class DataStream<MESSAGE_TYPE extends GeneratedMessageV3> {
     @Getter private final DataStreamWriter<MESSAGE_TYPE> writer;
     private boolean initialized = false;
 
-    DataStream(File directory, int offsetFileMaxSize, int dataFileMaxSize, 
Parser<MESSAGE_TYPE> parser,
+    DataStream(File directory, int dataFileMaxSize, int offsetFileMaxSize, 
Parser<MESSAGE_TYPE> parser,
         DataStreamReader.CallBack<MESSAGE_TYPE> callBack) {
         this.directory = directory;
         this.offsetStream = new OffsetStream(directory, offsetFileMaxSize);
diff --git 
a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java
 
b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java
index 16b0cd9..f838020 100644
--- 
a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java
+++ 
b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.library.buffer;
 import com.google.protobuf.*;
 import java.io.*;
 import java.util.concurrent.*;
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.filefilter.PrefixFileFilter;
 import org.apache.skywalking.apm.util.*;
 import org.slf4j.*;
@@ -36,6 +37,7 @@ class DataStreamReader<MESSAGE_TYPE extends 
GeneratedMessageV3> {
     private final Offset.ReadOffset readOffset;
     private final Parser<MESSAGE_TYPE> parser;
     private final CallBack<MESSAGE_TYPE> callBack;
+    private File readingFile;
     private InputStream inputStream;
 
     DataStreamReader(File directory, Offset.ReadOffset readOffset, 
Parser<MESSAGE_TYPE> parser,
@@ -49,35 +51,40 @@ class DataStreamReader<MESSAGE_TYPE extends 
GeneratedMessageV3> {
     void initialize() {
         preRead();
 
-        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
+        Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(
             new RunnableWithExceptionProtection(this::read,
-                t -> logger.error("Segment buffer pre read failure.", t)), 3, 
3, TimeUnit.SECONDS);
+                t -> logger.error("Buffer data pre read failure.", t)), 3, 1, 
TimeUnit.SECONDS);
     }
 
     private void preRead() {
         String fileName = readOffset.getFileName();
         if (StringUtil.isEmpty(fileName)) {
-            openInputStream(readEarliestCreateDataFile());
+            openInputStream(readEarliestDataFile());
         } else {
-            File dataFile = new File(directory, fileName);
-            if (dataFile.exists()) {
-                openInputStream(dataFile);
-                read();
+            File readingFile = new File(directory, fileName);
+            if (readingFile.exists()) {
+                openInputStream(readingFile);
+                try {
+                    inputStream.skip(readOffset.getOffset());
+                } catch (IOException e) {
+                    logger.error(e.getMessage(), e);
+                }
             } else {
-                openInputStream(readEarliestCreateDataFile());
+                openInputStream(readEarliestDataFile());
             }
         }
     }
 
-    private void openInputStream(File readFile) {
+    private void openInputStream(File readingFile) {
         try {
-            inputStream = new FileInputStream(readFile);
+            this.readingFile = readingFile;
+            inputStream = new FileInputStream(readingFile);
         } catch (FileNotFoundException e) {
             logger.error(e.getMessage(), e);
         }
     }
 
-    private File readEarliestCreateDataFile() {
+    private File readEarliestDataFile() {
         String[] fileNames = directory.list(new 
PrefixFileFilter(BufferFileUtils.DATA_FILE_PREFIX));
 
         if (fileNames != null && fileNames.length > 0) {
@@ -92,12 +99,20 @@ class DataStreamReader<MESSAGE_TYPE extends 
GeneratedMessageV3> {
 
     private void read() {
         try {
-            MESSAGE_TYPE messageType = parser.parseDelimitedFrom(inputStream);
-            if (messageType != null) {
-                callBack.call(messageType);
-                final int serialized = messageType.getSerializedSize();
-                final int offset = 
CodedOutputStream.computeUInt32SizeNoTag(serialized) + serialized;
-                readOffset.setOffset(readOffset.getOffset() + offset);
+            if (readOffset.getOffset() == readingFile.length() && 
!readOffset.isCurrentWriteFile()) {
+                FileUtils.forceDelete(readingFile);
+                openInputStream(readEarliestDataFile());
+            }
+
+            while (readOffset.getOffset() < readingFile.length()) {
+
+                MESSAGE_TYPE messageType = 
parser.parseDelimitedFrom(inputStream);
+                if (messageType != null) {
+                    callBack.call(messageType);
+                    final int serialized = messageType.getSerializedSize();
+                    final int offset = 
CodedOutputStream.computeUInt32SizeNoTag(serialized) + serialized;
+                    readOffset.setOffset(readOffset.getOffset() + offset);
+                }
             }
         } catch (IOException e) {
             logger.error(e.getMessage(), e);
diff --git 
a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamWriter.java
 
b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamWriter.java
index 13a88b1..cb1a0c6 100644
--- 
a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamWriter.java
+++ 
b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamWriter.java
@@ -49,46 +49,46 @@ class DataStreamWriter<MESSAGE_TYPE extends 
GeneratedMessageV3> {
         if (!initialized) {
             String writeFileName = writeOffset.getFileName();
 
-            File dataFile;
+            File writingFile;
             if (StringUtil.isEmpty(writeFileName)) {
-                dataFile = createNewFile();
+                writingFile = createNewFile();
             } else {
-                dataFile = new File(directory, writeFileName);
-                if (!dataFile.exists()) {
-                    dataFile = createNewFile();
+                writingFile = new File(directory, writeFileName);
+                if (!writingFile.exists()) {
+                    writingFile = createNewFile();
                 }
             }
 
-            outputStream = FileUtils.openOutputStream(dataFile, true);
+            outputStream = FileUtils.openOutputStream(writingFile, true);
             initialized = true;
         }
     }
 
     private File createNewFile() throws IOException {
         String fileName = 
BufferFileUtils.buildFileName(BufferFileUtils.DATA_FILE_PREFIX);
-        File dataFile = new File(directory, fileName);
+        File writingFile = new File(directory, fileName);
 
-        boolean created = dataFile.createNewFile();
+        boolean created = writingFile.createNewFile();
         if (!created) {
-            logger.info("The file named {} already exists.", 
dataFile.getAbsolutePath());
+            logger.info("The file named {} already exists.", 
writingFile.getAbsolutePath());
         } else {
-            logger.info("Create a new buffer data file: {}", 
dataFile.getAbsolutePath());
+            logger.info("Create a new buffer data file: {}", 
writingFile.getAbsolutePath());
         }
 
         writeOffset.setOffset(0);
-        writeOffset.setFileName(dataFile.getName());
+        writeOffset.setFileName(writingFile.getName());
 
-        return dataFile;
+        return writingFile;
     }
 
-    void write(AbstractMessageLite messageLite) {
+    synchronized void write(AbstractMessageLite messageLite) {
         try {
             messageLite.writeDelimitedTo(outputStream);
             long position = outputStream.getChannel().position();
             writeOffset.setOffset(position);
-            if (position > (FileUtils.ONE_MB * dataFileMaxSize)) {
-                File dataFile = createNewFile();
-                outputStream = FileUtils.openOutputStream(dataFile, true);
+            if (position >= (FileUtils.ONE_MB * dataFileMaxSize)) {
+                File writingFile = createNewFile();
+                outputStream = FileUtils.openOutputStream(writingFile, true);
             }
         } catch (IOException e) {
             logger.error(e.getMessage(), e);
diff --git 
a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/Offset.java
 
b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/Offset.java
index 09e5936..5119bb9 100644
--- 
a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/Offset.java
+++ 
b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/Offset.java
@@ -31,8 +31,8 @@ class Offset {
     @Getter private final WriteOffset writeOffset;
 
     Offset() {
-        readOffset = new ReadOffset();
         writeOffset = new WriteOffset();
+        readOffset = new ReadOffset(writeOffset);
     }
 
     String serialize() {
@@ -55,6 +55,15 @@ class Offset {
     static class ReadOffset {
         @Getter @Setter private String fileName;
         @Getter @Setter private long offset = 0;
+        private final WriteOffset writeOffset;
+
+        private ReadOffset(WriteOffset writeOffset) {
+            this.writeOffset = writeOffset;
+        }
+
+        boolean isCurrentWriteFile() {
+            return fileName.equals(writeOffset.fileName);
+        }
     }
 
     static class WriteOffset {
diff --git 
a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/OffsetStream.java
 
b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/OffsetStream.java
index 7f46331..9fb4ed5 100644
--- 
a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/OffsetStream.java
+++ 
b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/OffsetStream.java
@@ -66,8 +66,8 @@ class OffsetStream {
         if (!initialized) {
             String[] fileNames = directory.list(new 
PrefixFileFilter(BufferFileUtils.OFFSET_FILE_PREFIX));
             if (fileNames != null && fileNames.length > 0) {
-                for (int i = 0; i < fileNames.length; i++) {
-                }
+                BufferFileUtils.sort(fileNames);
+                offsetFile = new File(directory, fileNames[0]);
             } else {
                 offsetFile = newFile();
             }
diff --git 
a/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java
 
b/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java
index cff9100..53eca71 100644
--- 
a/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java
+++ 
b/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java
@@ -33,9 +33,9 @@ public class BufferStreamTestCase {
     public static void main(String[] args) throws IOException, 
InterruptedException {
         String directory = "/Users/pengys5/code/sky-walking/buffer-test";
         BufferStream.Builder<TraceSegmentObject> builder = new 
BufferStream.Builder<>(directory);
-        builder.cleanWhenRestart(true);
-        builder.dataFileMaxSize(1);
-        builder.offsetFileMaxSize(1);
+//        builder.cleanWhenRestart(true);
+        builder.dataFileMaxSize(50);
+        builder.offsetFileMaxSize(10);
         builder.parser(TraceSegmentObject.parser());
         builder.callBack(new SegmentParse());
 
@@ -44,18 +44,23 @@ public class BufferStreamTestCase {
 
         TimeUnit.SECONDS.sleep(5);
 
-        String str = "2018-08-27 11:59:45,261 main DEBUG Registering MBean 
org.apache.logging.log4j2:type=6d6f6e28" +
-            "main DEBUG Registering MBean 
org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean 
org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean 
org.apache.logging.log4j2:type=6d6f6e28" +
-            "main DEBUG Registering MBean 
org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean 
org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean 
org.apache.logging.log4j2:type=6d6f6e28" +
-            "main DEBUG Registering MBean 
org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean 
org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean 
org.apache.logging.log4j2:type=6d6f6e28";
+        StringBuilder str = new StringBuilder("2018-08-27 11:59:45,261 main 
DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28");
+        for (int i = 0; i < 1000; i++) {
+            str.append("main DEBUG Registering MBean 
org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean 
org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean 
org.apache.logging.log4j2:type=6d6f6e28");
+        }
 
-        for (int i = 0; i < 100; i++) {
+        for (int i = 0; i < 20000; i++) {
             TraceSegmentObject.Builder segment = 
TraceSegmentObject.newBuilder();
             SpanObject.Builder span = SpanObject.newBuilder();
 
-            span.setOperationName(String.valueOf(i) + "  " + str);
+            span.setSpanId(i);
+            span.setOperationName(str.toString());
             segment.addSpans(span);
             stream.write(segment.build());
+
+            if (i % 1000 == 0) {
+                TimeUnit.MILLISECONDS.sleep(50);
+            }
         }
 
     }
@@ -63,7 +68,7 @@ public class BufferStreamTestCase {
     private static class SegmentParse implements 
DataStreamReader.CallBack<TraceSegmentObject> {
 
         @Override public void call(TraceSegmentObject message) {
-            logger.info("segment parse: {}", 
message.getSpans(0).getOperationName());
+            logger.info("segment parse: {}", message.getSpans(0).getSpanId());
         }
     }
 }
diff --git a/skywalking-ui b/skywalking-ui
index ad3ee45..f9c6029 160000
--- a/skywalking-ui
+++ b/skywalking-ui
@@ -1 +1 @@
-Subproject commit ad3ee45dbadfae35d77238bdd7a1df593158f109
+Subproject commit f9c602936ab4f386576bf16f203efac61962e424

Reply via email to