This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch stablize
in repository https://gitbox.apache.org/repos/asf/skywalking-nodejs.git

commit 8ddad77b4699ea1aa8ac89f1bb50ab8044cb4175
Author: kezhenxu94 <[email protected]>
AuthorDate: Wed Sep 30 14:36:33 2020 +0800

    Try to stablize the first version
---
 package-lock.json                                  |  20 ++-
 package.json                                       |   2 +
 src/plugins/HttpPlugin.ts                          |  27 +--
 src/plugins/MongodbPlugin.ts                       | 182 +++++++++++++++++++++
 src/trace/context/Context.ts                       |   4 +
 src/trace/context/ContextManager.ts                |  23 +--
 src/trace/context/DummyContext.ts                  |   8 +
 src/trace/context/SpanContext.ts                   |  27 ++-
 src/trace/span/Span.ts                             |  18 ++
 tests/dev.ts                                       |  59 +++++++
 tests/plugins/http/client.ts                       |  57 +++++++
 .../Context.ts => tests/plugins/http/server.ts     |  31 ++--
 12 files changed, 404 insertions(+), 54 deletions(-)

diff --git a/package-lock.json b/package-lock.json
index ca345e1..2b69f83 100644
--- a/package-lock.json
+++ b/package-lock.json
@@ -1,6 +1,6 @@
 {
-  "name": "@ali/skywalking-nodejs",
-  "version": "0.0.21-RC",
+  "name": "skywalking",
+  "version": "0.0.1",
   "lockfileVersion": 1,
   "requires": true,
   "dependencies": {
@@ -60,6 +60,14 @@
       "integrity": 
"sha512-rr+OQyAjxze7GgWrSaJwydHStIhHq2lvY3BOC2Mj7KnzI7XK0Uw1TOOdI9lDoajEbSWLiYgoo4f1R51erQfhPQ==",
       "dev": true
     },
+    "@types/end-of-stream": {
+      "version": "1.4.0",
+      "resolved": 
"https://registry.npmjs.org/@types/end-of-stream/-/end-of-stream-1.4.0.tgz";,
+      "integrity": 
"sha512-d0FD2A4vpFI8wyQeQbr9VDVKtA1PmeGO3Ntn+6j626QTtAQ9HSqWFACP7rTHaV2cspVhLijl00Vvkf/U2UZGWA==",
+      "requires": {
+        "@types/node": "*"
+      }
+    },
     "@types/google-protobuf": {
       "version": "3.7.2",
       "resolved": 
"https://registry.npmjs.org/@types/google-protobuf/-/google-protobuf-3.7.2.tgz";,
@@ -1167,6 +1175,14 @@
         "env-variable": "0.0.x"
       }
     },
+    "end-of-stream": {
+      "version": "1.4.4",
+      "resolved": 
"https://registry.npmjs.org/end-of-stream/-/end-of-stream-1.4.4.tgz";,
+      "integrity": 
"sha512-+uw1inIHVPQoaVuHzRyXd21icM+cnt4CzD5rW+NC1wjOUSTOs+Te7FOv7AhN7vS9x/oIyhLP5PR1H+phQAHu5Q==",
+      "requires": {
+        "once": "^1.4.0"
+      }
+    },
     "ent": {
       "version": "2.2.0",
       "resolved": "https://registry.npmjs.org/ent/-/ent-2.2.0.tgz";,
diff --git a/package.json b/package.json
index 69656c5..1ceab2e 100644
--- a/package.json
+++ b/package.json
@@ -55,6 +55,8 @@
     "worker-loader": "^2.0.0"
   },
   "dependencies": {
+    "@types/end-of-stream": "^1.4.0",
+    "end-of-stream": "^1.4.4",
     "google-protobuf": "^3.12.2",
     "grpc": "^1.10.1",
     "semver": "^7.3.2",
diff --git a/src/plugins/HttpPlugin.ts b/src/plugins/HttpPlugin.ts
index a7caf18..5a7f39d 100644
--- a/src/plugins/HttpPlugin.ts
+++ b/src/plugins/HttpPlugin.ts
@@ -26,6 +26,7 @@ import Tag from '../Tag';
 import { SpanLayer } from '../proto/language-agent/Tracing_pb';
 import { ContextCarrier } from '../trace/context/ContextCarrier';
 import { createLogger } from '../logging';
+import * as eos from 'end-of-stream';
 
 const logger = createLogger(__filename);
 
@@ -46,7 +47,6 @@ class HttpPlugin implements SwPlugin {
         const argc = arguments.length;
 
         const url: URL | string | RequestOptions = arguments[0];
-        const options = argc > 1 ? (typeof arguments[1] === 'function' ? {} : 
arguments[1]) : {};
         const callback = typeof arguments[argc - 1] === 'function' ? 
arguments[argc - 1] : undefined;
 
         const { host, pathname } =
@@ -65,32 +65,19 @@ class HttpPlugin implements SwPlugin {
         span.layer = SpanLayer.HTTP;
         span.tag(Tag.httpURL(host + pathname));
 
-        const snapshot = ContextManager.current.capture();
-
         const request: ClientRequest = original.apply(this, arguments);
 
         span.extract().items.forEach((item) => {
           request.setHeader(item.key, item.value);
         });
 
-        request.on('response', (res) => {
-          res.prependListener('end', () => {
-            
span.tag(Tag.httpStatusCode(res.statusCode)).tag(Tag.httpStatusMsg(res.statusMessage));
-
-            const callbackSpan = 
ContextManager.current.newLocalSpan('callback').start();
-            callbackSpan.layer = SpanLayer.HTTP;
-            callbackSpan.component = Component.HTTP;
+        span.async().stop();
 
-            ContextManager.current.restore(snapshot);
-
-            if (callback) {
-              callback(res);
-            }
+        request.on('response', (res) => {
+          
span.tag(Tag.httpStatusCode(res.statusCode)).tag(Tag.httpStatusMsg(res.statusMessage));
 
-            callbackSpan.stop();
-          });
+          eos(res, () => span.await());
         });
-        span.stop();
 
         return request;
       };
@@ -123,7 +110,9 @@ class HttpPlugin implements SwPlugin {
         span.layer = SpanLayer.HTTP;
         span.tag(Tag.httpURL(req.url));
 
-        
span.tag(Tag.httpStatusCode(res.statusCode)).tag(Tag.httpStatusMsg(res.statusMessage)).stop();
+        
span.tag(Tag.httpStatusCode(res.statusCode)).tag(Tag.httpStatusMsg(res.statusMessage)).async().stop();
+
+        eos(res, () => span.await());
 
         return original.apply(this, arguments);
       };
diff --git a/src/plugins/MongodbPlugin.ts b/src/plugins/MongodbPlugin.ts
new file mode 100644
index 0000000..44a045b
--- /dev/null
+++ b/src/plugins/MongodbPlugin.ts
@@ -0,0 +1,182 @@
+/*!
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import SwPlugin from '../core/SwPlugin';
+import { createLogger } from '../logging';
+import ContextManager from '../trace/context/ContextManager';
+import Span from '../trace/span/Span';
+import { Component } from '../trace/Component';
+import { SpanLayer } from '../proto/language-agent/Tracing_pb';
+import { InsertWriteOpResult, MongoClient, MongoError, WithId } from 'mongodb';
+
+const logger = createLogger(__filename);
+
+class MongodbPlugin implements SwPlugin {
+  readonly module = 'mongodb';
+  readonly versions = '*'; // TODO: narrow down the supported version
+
+  install(): void {
+    const spanKeyedByRequestId: {
+      [requestId: string]: Span;
+    } = {};
+
+    const onStartEvent = (event: any) => {
+      if (logger.isDebugEnabled()) {
+        logger.debug('Mongodb start', event);
+      }
+      const span = ContextManager.current.newExitSpan(
+        `MongoDB/${event.databaseName}/${event.commandName}`,
+        event.address,
+      );
+      span.component = Component.MONGODB;
+      span.layer = SpanLayer.DATABASE;
+      spanKeyedByRequestId[event.requestId] = span.start();
+    };
+
+    const onSuccessEvent = async (event: any) => {
+      if (logger.isDebugEnabled()) {
+        logger.debug('Mongodb succeeded', event);
+      }
+
+      await (async () => setTimeout(() => {}, 1000))();
+
+      if (!spanKeyedByRequestId[event.requestId]) {
+        return;
+      }
+      const span = spanKeyedByRequestId[event.requestId];
+
+      delete spanKeyedByRequestId[event.requestId];
+
+      if (event.reply?.writeErrors) {
+        const errors = event.reply.writeErrors as [{ errmsg: string }];
+
+        span.errored = true;
+        span.logs.push({
+          items: [
+            {
+              key: 'ErrorMsg',
+              val: errors.map((it) => it.errmsg).join('\n'),
+            },
+          ],
+          timestamp: new Date().getTime(),
+        });
+      }
+
+      span.stop();
+    };
+
+    const onFailedEvent = (event: any) => {
+      if (logger.isDebugEnabled()) {
+        logger.debug('Mongodb failed', event);
+      }
+      if (!spanKeyedByRequestId[event.requestId]) {
+        return;
+      }
+      const span = spanKeyedByRequestId[event.requestId];
+
+      span.stop();
+
+      delete spanKeyedByRequestId[event.requestId];
+    };
+
+    const Client = require('mongodb/lib/mongo_client');
+
+    ((original) => {
+      Client.prototype.connect = function () {
+        const span = ContextManager.current.newExitSpan(`MongoDB/connect`, 
this.s.url).start();
+        span.component = Component.MONGODB;
+        span.layer = SpanLayer.DATABASE;
+
+        const snapshot = ContextManager.current.capture();
+
+        const callback = arguments[arguments.length - 1];
+        if (typeof callback === 'function') {
+          arguments[arguments.length - 1] = (error: MongoError, r: 
MongoClient) => {
+            ContextManager.current.restore(snapshot);
+
+            callback(error, r);
+          };
+        }
+
+        const result = original.apply(this, arguments);
+
+        span.stop();
+
+        return result;
+      };
+    })(Client.prototype.connect);
+
+    ((original) => {
+      Client.prototype.close = function () {
+        const span = ContextManager.current.newExitSpan(`MongoDB/close`, 
this.s.url).start();
+        span.component = Component.MONGODB;
+        span.layer = SpanLayer.DATABASE;
+
+        const snapshot = ContextManager.current.capture();
+
+        const callback = arguments[arguments.length - 1];
+        if (typeof callback === 'function') {
+          arguments[arguments.length - 1] = (error: MongoError, r: 
MongoClient) => {
+            ContextManager.current.restore(snapshot);
+
+            callback(error, r);
+          };
+        }
+
+        const result = original.apply(this, arguments);
+
+        span.stop();
+
+        return result;
+      };
+    })(Client.prototype.close);
+
+    const Collection = require('mongodb/lib/collection');
+
+    ((original) => {
+      Collection.prototype.insertMany = function (ns: string, ops: object[]) {
+        const { db, collection } = this.s.namespace;
+
+        const span = 
ContextManager.current.newExitSpan(`MongoDB/${db}/${collection}/insertMany`, 
this.s.url).start();
+        span.component = Component.MONGODB;
+        span.layer = SpanLayer.DATABASE;
+
+        const snapshot = ContextManager.current.capture();
+
+        const callback = arguments[arguments.length - 1];
+        if (typeof callback === 'function') {
+          arguments[arguments.length - 1] = (error: MongoError, r: 
InsertWriteOpResult<WithId<any>>) => {
+            ContextManager.current.restore(snapshot);
+
+            callback(error, r);
+          };
+        }
+
+        const result = original.apply(this, arguments);
+
+        span.stop();
+
+        return result;
+      };
+    })(Collection.prototype.insertMany);
+  }
+}
+
+// noinspection JSUnusedGlobalSymbols
+export default new MongodbPlugin();
diff --git a/src/trace/context/Context.ts b/src/trace/context/Context.ts
index 7393887..c17cdd9 100644
--- a/src/trace/context/Context.ts
+++ b/src/trace/context/Context.ts
@@ -36,6 +36,10 @@ export default interface Context {
 
   stop(span: Span): boolean;
 
+  async(span: Span): Context;
+
+  await(span: Span): Context;
+
   currentSpan(): Span | undefined;
 
   capture(): Snapshot;
diff --git a/src/trace/context/ContextManager.ts 
b/src/trace/context/ContextManager.ts
index e49bbb0..de6dc34 100644
--- a/src/trace/context/ContextManager.ts
+++ b/src/trace/context/ContextManager.ts
@@ -25,19 +25,22 @@ class ContextManager {
   contextKeyedByAsyncId: { [asyncId: number]: Context } = {};
 
   constructor() {
-    createHook({
-      destroy: (asyncId: number) => {
-        delete this.contextKeyedByAsyncId[asyncId];
-      },
-    }).enable();
+    // createHook({
+    //   destroy: (asyncId: number) => {
+    //     delete this.contextKeyedByAsyncId[asyncId];
+    //   },
+    // }).enable();
   }
 
-  get current(): Context {
-    const thisAsyncId = executionAsyncId();
-
-    this.contextKeyedByAsyncId[thisAsyncId] = 
this.contextKeyedByAsyncId[thisAsyncId] || new SpanContext(thisAsyncId);
+  context = new SpanContext(1);
 
-    return this.contextKeyedByAsyncId[thisAsyncId];
+  get current(): Context {
+    return this.context;
+    // const thisAsyncId = executionAsyncId();
+    //
+    // this.contextKeyedByAsyncId[thisAsyncId] = 
this.contextKeyedByAsyncId[thisAsyncId] || new SpanContext(thisAsyncId);
+    //
+    // return this.contextKeyedByAsyncId[thisAsyncId];
   }
 }
 
diff --git a/src/trace/context/DummyContext.ts 
b/src/trace/context/DummyContext.ts
index 54a98e8..52b7754 100644
--- a/src/trace/context/DummyContext.ts
+++ b/src/trace/context/DummyContext.ts
@@ -75,4 +75,12 @@ export default class DummyContext implements Context {
   restore(snapshot: Snapshot) {
     // Big Bang ~
   }
+
+  async(span: Span): this {
+    return this;
+  }
+
+  await(span: Span): this {
+    return this;
+  }
 }
diff --git a/src/trace/context/SpanContext.ts b/src/trace/context/SpanContext.ts
index d7c53b0..f7470f8 100644
--- a/src/trace/context/SpanContext.ts
+++ b/src/trace/context/SpanContext.ts
@@ -37,6 +37,7 @@ export default class SpanContext implements Context {
   spanId = 0;
   spans: Span[] = [];
   segment: Segment = new Segment();
+  asyncSpans: Span[] = [];
 
   constructor(public asyncId: number) {}
 
@@ -122,8 +123,12 @@ export default class SpanContext implements Context {
       return true;
     }
 
-    if (this.tryFinish(span)) {
-      this.spans.splice(0, 1);
+    const finished = this.tryFinish(span);
+    if (finished) {
+      this.spans.splice(this.spans.indexOf(span), 1);
+    }
+    if (finished && this.asyncSpans.length === 0) {
+      buffer.put(this.segment);
     }
 
     return this.spans.length === 0;
@@ -134,7 +139,6 @@ export default class SpanContext implements Context {
       if (logger.isDebugEnabled()) {
         logger.debug('Finishing span', { span });
       }
-      buffer.put(this.segment);
       return true;
     }
     return false;
@@ -159,4 +163,21 @@ export default class SpanContext implements Context {
     this.currentSpan()?.refer(ref);
     this.segment.relate(ref.traceId);
   }
+
+  async(span: Span): this {
+    if (!this.asyncSpans.includes(span)) {
+      this.asyncSpans.push(span);
+    }
+    return this;
+  }
+
+  await(span: Span): this {
+    if (this.asyncSpans.includes(span)) {
+      this.asyncSpans.splice(this.asyncSpans.indexOf(span), 1);
+    }
+    if (this.asyncSpans.length === 0) {
+      buffer.put(this.segment);
+    }
+    return this;
+  }
 }
diff --git a/src/trace/span/Span.ts b/src/trace/span/Span.ts
index decfcf3..0a92868 100644
--- a/src/trace/span/Span.ts
+++ b/src/trace/span/Span.ts
@@ -58,6 +58,7 @@ export default abstract class Span {
   startTime = 0;
   endTime = 0;
   errored = false;
+  _async = false;
 
   constructor(options: SpanCtorOptions & { type: SpanType }) {
     this.context = options.context;
@@ -141,4 +142,21 @@ export default abstract class Span {
     }
     return this;
   }
+
+  get isAsync(): boolean {
+    return this._async;
+  }
+
+  async(): this {
+    this.context.async(this);
+    this._async = true;
+    return this;
+  }
+
+  await(): this {
+    this.endTime = new Date().getTime();
+    this.context.await(this);
+    this._async = false;
+    return this;
+  }
 }
diff --git a/tests/dev.ts b/tests/dev.ts
new file mode 100644
index 0000000..0988a86
--- /dev/null
+++ b/tests/dev.ts
@@ -0,0 +1,59 @@
+/*!
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import Agent from '../src/index';
+import { Db, MongoClient } from 'mongodb';
+
+import * as assert from 'assert';
+
+Agent.start({});
+// Connection URL
+const url = 'mongodb://localhost:27017';
+
+// Database Name
+const dbName = 'teambition';
+// Create a new MongoClient
+const client = new MongoClient(url, { useUnifiedTopology: true });
+const insertDocumentsOk = (db, callback) => {
+  const collection = db.collection('documents');
+  collection.insertMany([{ a: 1 }, { a: 2 }, { a: 3 }], (err, result) => {
+    callback(result);
+  });
+};
+const insertDocumentsFailed = (db: Db, callback) => {
+  const collection = db.collection('documents');
+  collection.insertMany([{ a: 1 }, { a: 2 }, { a: 3 }], (err, result) => {
+    callback(result);
+  });
+};
+// Use connect method to connect to the Server
+client.connect((err) => {
+  assert.equal(null, err);
+  const db = client.db(dbName);
+  insertDocumentsOk(db, () => {});
+  (async () => {
+    await (async () => setTimeout(() => {}, 1000))();
+  })();
+  insertDocumentsOk(db, () => {});
+  insertDocumentsFailed(db, () => {
+    client.close();
+  });
+});
+
+setTimeout(() => {}, 10000);
diff --git a/tests/plugins/http/client.ts b/tests/plugins/http/client.ts
new file mode 100644
index 0000000..ddd6785
--- /dev/null
+++ b/tests/plugins/http/client.ts
@@ -0,0 +1,57 @@
+/*!
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// @ts-ignore
+import agent from '../../../src/index';
+
+agent.start({});
+
+import * as http from 'http';
+import { createServer, IncomingMessage, ServerResponse } from 'http';
+
+const providerHostPort = process.env.SW_PROVIDER || '127.0.0.1:5000';
+
+const port = 5001;
+
+const server = createServer((request: IncomingMessage, response: 
ServerResponse) => {
+  http
+    .request(
+      {
+        host: providerHostPort.split(':')[0],
+        port: providerHostPort.split(':')[1],
+        path: '/kids',
+      },
+      (res) => {
+        // http
+        //   .request({
+        //     host: providerHostPort.split(':')[0],
+        //     port: providerHostPort.split(':')[1],
+        //     path: '/inner-callback',
+        //   })
+        //   .end();
+        res.on('data', (chunk) => response.write(chunk));
+        res.on('end', () => response.end('\n'));
+      },
+    )
+    .end();
+});
+
+server.listen(port, () => {
+  console.log(`Server is listening on port ${port}`);
+});
diff --git a/src/trace/context/Context.ts b/tests/plugins/http/server.ts
similarity index 56%
copy from src/trace/context/Context.ts
copy to tests/plugins/http/server.ts
index 7393887..32b9292 100644
--- a/src/trace/context/Context.ts
+++ b/tests/plugins/http/server.ts
@@ -17,28 +17,19 @@
  *
  */
 
-import Span from '../../trace/span/Span';
-import Segment from '../../trace/context/Segment';
-import Snapshot from '../../trace/context/Snapshot';
-import { ContextCarrier } from './ContextCarrier';
+// @ts-ignore
+import agent from '../../../src/index';
 
-export default interface Context {
-  segment: Segment;
-  spans: Span[];
+agent.start({});
 
-  newLocalSpan(operation: string): Span;
+import { createServer, IncomingMessage, ServerResponse } from 'http';
 
-  newEntrySpan(operation: string, carrier?: ContextCarrier): Span;
+const port = 5000;
 
-  newExitSpan(operation: string, peer: string, carrier?: ContextCarrier): Span;
+const server = createServer((request: IncomingMessage, response: 
ServerResponse) => {
+  setTimeout(() => response.end('How you doin'), 1000);
+});
 
-  start(span: Span): Context;
-
-  stop(span: Span): boolean;
-
-  currentSpan(): Span | undefined;
-
-  capture(): Snapshot;
-
-  restore(snapshot: Snapshot): void;
-}
+server.listen(port, () => {
+  console.log(`Server is listening on port ${port}`);
+});

Reply via email to