This is an automated email from the ASF dual-hosted git repository. kezhenxu94 pushed a commit to branch stablize in repository https://gitbox.apache.org/repos/asf/skywalking-nodejs.git
commit 8ddad77b4699ea1aa8ac89f1bb50ab8044cb4175 Author: kezhenxu94 <[email protected]> AuthorDate: Wed Sep 30 14:36:33 2020 +0800 Try to stablize the first version --- package-lock.json | 20 ++- package.json | 2 + src/plugins/HttpPlugin.ts | 27 +-- src/plugins/MongodbPlugin.ts | 182 +++++++++++++++++++++ src/trace/context/Context.ts | 4 + src/trace/context/ContextManager.ts | 23 +-- src/trace/context/DummyContext.ts | 8 + src/trace/context/SpanContext.ts | 27 ++- src/trace/span/Span.ts | 18 ++ tests/dev.ts | 59 +++++++ tests/plugins/http/client.ts | 57 +++++++ .../Context.ts => tests/plugins/http/server.ts | 31 ++-- 12 files changed, 404 insertions(+), 54 deletions(-) diff --git a/package-lock.json b/package-lock.json index ca345e1..2b69f83 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { - "name": "@ali/skywalking-nodejs", - "version": "0.0.21-RC", + "name": "skywalking", + "version": "0.0.1", "lockfileVersion": 1, "requires": true, "dependencies": { @@ -60,6 +60,14 @@ "integrity": "sha512-rr+OQyAjxze7GgWrSaJwydHStIhHq2lvY3BOC2Mj7KnzI7XK0Uw1TOOdI9lDoajEbSWLiYgoo4f1R51erQfhPQ==", "dev": true }, + "@types/end-of-stream": { + "version": "1.4.0", + "resolved": "https://registry.npmjs.org/@types/end-of-stream/-/end-of-stream-1.4.0.tgz", + "integrity": "sha512-d0FD2A4vpFI8wyQeQbr9VDVKtA1PmeGO3Ntn+6j626QTtAQ9HSqWFACP7rTHaV2cspVhLijl00Vvkf/U2UZGWA==", + "requires": { + "@types/node": "*" + } + }, "@types/google-protobuf": { "version": "3.7.2", "resolved": "https://registry.npmjs.org/@types/google-protobuf/-/google-protobuf-3.7.2.tgz", @@ -1167,6 +1175,14 @@ "env-variable": "0.0.x" } }, + "end-of-stream": { + "version": "1.4.4", + "resolved": "https://registry.npmjs.org/end-of-stream/-/end-of-stream-1.4.4.tgz", + "integrity": "sha512-+uw1inIHVPQoaVuHzRyXd21icM+cnt4CzD5rW+NC1wjOUSTOs+Te7FOv7AhN7vS9x/oIyhLP5PR1H+phQAHu5Q==", + "requires": { + "once": "^1.4.0" + } + }, "ent": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/ent/-/ent-2.2.0.tgz", diff --git a/package.json b/package.json index 69656c5..1ceab2e 100644 --- a/package.json +++ b/package.json @@ -55,6 +55,8 @@ "worker-loader": "^2.0.0" }, "dependencies": { + "@types/end-of-stream": "^1.4.0", + "end-of-stream": "^1.4.4", "google-protobuf": "^3.12.2", "grpc": "^1.10.1", "semver": "^7.3.2", diff --git a/src/plugins/HttpPlugin.ts b/src/plugins/HttpPlugin.ts index a7caf18..5a7f39d 100644 --- a/src/plugins/HttpPlugin.ts +++ b/src/plugins/HttpPlugin.ts @@ -26,6 +26,7 @@ import Tag from '../Tag'; import { SpanLayer } from '../proto/language-agent/Tracing_pb'; import { ContextCarrier } from '../trace/context/ContextCarrier'; import { createLogger } from '../logging'; +import * as eos from 'end-of-stream'; const logger = createLogger(__filename); @@ -46,7 +47,6 @@ class HttpPlugin implements SwPlugin { const argc = arguments.length; const url: URL | string | RequestOptions = arguments[0]; - const options = argc > 1 ? (typeof arguments[1] === 'function' ? {} : arguments[1]) : {}; const callback = typeof arguments[argc - 1] === 'function' ? arguments[argc - 1] : undefined; const { host, pathname } = @@ -65,32 +65,19 @@ class HttpPlugin implements SwPlugin { span.layer = SpanLayer.HTTP; span.tag(Tag.httpURL(host + pathname)); - const snapshot = ContextManager.current.capture(); - const request: ClientRequest = original.apply(this, arguments); span.extract().items.forEach((item) => { request.setHeader(item.key, item.value); }); - request.on('response', (res) => { - res.prependListener('end', () => { - span.tag(Tag.httpStatusCode(res.statusCode)).tag(Tag.httpStatusMsg(res.statusMessage)); - - const callbackSpan = ContextManager.current.newLocalSpan('callback').start(); - callbackSpan.layer = SpanLayer.HTTP; - callbackSpan.component = Component.HTTP; + span.async().stop(); - ContextManager.current.restore(snapshot); - - if (callback) { - callback(res); - } + request.on('response', (res) => { + span.tag(Tag.httpStatusCode(res.statusCode)).tag(Tag.httpStatusMsg(res.statusMessage)); - callbackSpan.stop(); - }); + eos(res, () => span.await()); }); - span.stop(); return request; }; @@ -123,7 +110,9 @@ class HttpPlugin implements SwPlugin { span.layer = SpanLayer.HTTP; span.tag(Tag.httpURL(req.url)); - span.tag(Tag.httpStatusCode(res.statusCode)).tag(Tag.httpStatusMsg(res.statusMessage)).stop(); + span.tag(Tag.httpStatusCode(res.statusCode)).tag(Tag.httpStatusMsg(res.statusMessage)).async().stop(); + + eos(res, () => span.await()); return original.apply(this, arguments); }; diff --git a/src/plugins/MongodbPlugin.ts b/src/plugins/MongodbPlugin.ts new file mode 100644 index 0000000..44a045b --- /dev/null +++ b/src/plugins/MongodbPlugin.ts @@ -0,0 +1,182 @@ +/*! + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import SwPlugin from '../core/SwPlugin'; +import { createLogger } from '../logging'; +import ContextManager from '../trace/context/ContextManager'; +import Span from '../trace/span/Span'; +import { Component } from '../trace/Component'; +import { SpanLayer } from '../proto/language-agent/Tracing_pb'; +import { InsertWriteOpResult, MongoClient, MongoError, WithId } from 'mongodb'; + +const logger = createLogger(__filename); + +class MongodbPlugin implements SwPlugin { + readonly module = 'mongodb'; + readonly versions = '*'; // TODO: narrow down the supported version + + install(): void { + const spanKeyedByRequestId: { + [requestId: string]: Span; + } = {}; + + const onStartEvent = (event: any) => { + if (logger.isDebugEnabled()) { + logger.debug('Mongodb start', event); + } + const span = ContextManager.current.newExitSpan( + `MongoDB/${event.databaseName}/${event.commandName}`, + event.address, + ); + span.component = Component.MONGODB; + span.layer = SpanLayer.DATABASE; + spanKeyedByRequestId[event.requestId] = span.start(); + }; + + const onSuccessEvent = async (event: any) => { + if (logger.isDebugEnabled()) { + logger.debug('Mongodb succeeded', event); + } + + await (async () => setTimeout(() => {}, 1000))(); + + if (!spanKeyedByRequestId[event.requestId]) { + return; + } + const span = spanKeyedByRequestId[event.requestId]; + + delete spanKeyedByRequestId[event.requestId]; + + if (event.reply?.writeErrors) { + const errors = event.reply.writeErrors as [{ errmsg: string }]; + + span.errored = true; + span.logs.push({ + items: [ + { + key: 'ErrorMsg', + val: errors.map((it) => it.errmsg).join('\n'), + }, + ], + timestamp: new Date().getTime(), + }); + } + + span.stop(); + }; + + const onFailedEvent = (event: any) => { + if (logger.isDebugEnabled()) { + logger.debug('Mongodb failed', event); + } + if (!spanKeyedByRequestId[event.requestId]) { + return; + } + const span = spanKeyedByRequestId[event.requestId]; + + span.stop(); + + delete spanKeyedByRequestId[event.requestId]; + }; + + const Client = require('mongodb/lib/mongo_client'); + + ((original) => { + Client.prototype.connect = function () { + const span = ContextManager.current.newExitSpan(`MongoDB/connect`, this.s.url).start(); + span.component = Component.MONGODB; + span.layer = SpanLayer.DATABASE; + + const snapshot = ContextManager.current.capture(); + + const callback = arguments[arguments.length - 1]; + if (typeof callback === 'function') { + arguments[arguments.length - 1] = (error: MongoError, r: MongoClient) => { + ContextManager.current.restore(snapshot); + + callback(error, r); + }; + } + + const result = original.apply(this, arguments); + + span.stop(); + + return result; + }; + })(Client.prototype.connect); + + ((original) => { + Client.prototype.close = function () { + const span = ContextManager.current.newExitSpan(`MongoDB/close`, this.s.url).start(); + span.component = Component.MONGODB; + span.layer = SpanLayer.DATABASE; + + const snapshot = ContextManager.current.capture(); + + const callback = arguments[arguments.length - 1]; + if (typeof callback === 'function') { + arguments[arguments.length - 1] = (error: MongoError, r: MongoClient) => { + ContextManager.current.restore(snapshot); + + callback(error, r); + }; + } + + const result = original.apply(this, arguments); + + span.stop(); + + return result; + }; + })(Client.prototype.close); + + const Collection = require('mongodb/lib/collection'); + + ((original) => { + Collection.prototype.insertMany = function (ns: string, ops: object[]) { + const { db, collection } = this.s.namespace; + + const span = ContextManager.current.newExitSpan(`MongoDB/${db}/${collection}/insertMany`, this.s.url).start(); + span.component = Component.MONGODB; + span.layer = SpanLayer.DATABASE; + + const snapshot = ContextManager.current.capture(); + + const callback = arguments[arguments.length - 1]; + if (typeof callback === 'function') { + arguments[arguments.length - 1] = (error: MongoError, r: InsertWriteOpResult<WithId<any>>) => { + ContextManager.current.restore(snapshot); + + callback(error, r); + }; + } + + const result = original.apply(this, arguments); + + span.stop(); + + return result; + }; + })(Collection.prototype.insertMany); + } +} + +// noinspection JSUnusedGlobalSymbols +export default new MongodbPlugin(); diff --git a/src/trace/context/Context.ts b/src/trace/context/Context.ts index 7393887..c17cdd9 100644 --- a/src/trace/context/Context.ts +++ b/src/trace/context/Context.ts @@ -36,6 +36,10 @@ export default interface Context { stop(span: Span): boolean; + async(span: Span): Context; + + await(span: Span): Context; + currentSpan(): Span | undefined; capture(): Snapshot; diff --git a/src/trace/context/ContextManager.ts b/src/trace/context/ContextManager.ts index e49bbb0..de6dc34 100644 --- a/src/trace/context/ContextManager.ts +++ b/src/trace/context/ContextManager.ts @@ -25,19 +25,22 @@ class ContextManager { contextKeyedByAsyncId: { [asyncId: number]: Context } = {}; constructor() { - createHook({ - destroy: (asyncId: number) => { - delete this.contextKeyedByAsyncId[asyncId]; - }, - }).enable(); + // createHook({ + // destroy: (asyncId: number) => { + // delete this.contextKeyedByAsyncId[asyncId]; + // }, + // }).enable(); } - get current(): Context { - const thisAsyncId = executionAsyncId(); - - this.contextKeyedByAsyncId[thisAsyncId] = this.contextKeyedByAsyncId[thisAsyncId] || new SpanContext(thisAsyncId); + context = new SpanContext(1); - return this.contextKeyedByAsyncId[thisAsyncId]; + get current(): Context { + return this.context; + // const thisAsyncId = executionAsyncId(); + // + // this.contextKeyedByAsyncId[thisAsyncId] = this.contextKeyedByAsyncId[thisAsyncId] || new SpanContext(thisAsyncId); + // + // return this.contextKeyedByAsyncId[thisAsyncId]; } } diff --git a/src/trace/context/DummyContext.ts b/src/trace/context/DummyContext.ts index 54a98e8..52b7754 100644 --- a/src/trace/context/DummyContext.ts +++ b/src/trace/context/DummyContext.ts @@ -75,4 +75,12 @@ export default class DummyContext implements Context { restore(snapshot: Snapshot) { // Big Bang ~ } + + async(span: Span): this { + return this; + } + + await(span: Span): this { + return this; + } } diff --git a/src/trace/context/SpanContext.ts b/src/trace/context/SpanContext.ts index d7c53b0..f7470f8 100644 --- a/src/trace/context/SpanContext.ts +++ b/src/trace/context/SpanContext.ts @@ -37,6 +37,7 @@ export default class SpanContext implements Context { spanId = 0; spans: Span[] = []; segment: Segment = new Segment(); + asyncSpans: Span[] = []; constructor(public asyncId: number) {} @@ -122,8 +123,12 @@ export default class SpanContext implements Context { return true; } - if (this.tryFinish(span)) { - this.spans.splice(0, 1); + const finished = this.tryFinish(span); + if (finished) { + this.spans.splice(this.spans.indexOf(span), 1); + } + if (finished && this.asyncSpans.length === 0) { + buffer.put(this.segment); } return this.spans.length === 0; @@ -134,7 +139,6 @@ export default class SpanContext implements Context { if (logger.isDebugEnabled()) { logger.debug('Finishing span', { span }); } - buffer.put(this.segment); return true; } return false; @@ -159,4 +163,21 @@ export default class SpanContext implements Context { this.currentSpan()?.refer(ref); this.segment.relate(ref.traceId); } + + async(span: Span): this { + if (!this.asyncSpans.includes(span)) { + this.asyncSpans.push(span); + } + return this; + } + + await(span: Span): this { + if (this.asyncSpans.includes(span)) { + this.asyncSpans.splice(this.asyncSpans.indexOf(span), 1); + } + if (this.asyncSpans.length === 0) { + buffer.put(this.segment); + } + return this; + } } diff --git a/src/trace/span/Span.ts b/src/trace/span/Span.ts index decfcf3..0a92868 100644 --- a/src/trace/span/Span.ts +++ b/src/trace/span/Span.ts @@ -58,6 +58,7 @@ export default abstract class Span { startTime = 0; endTime = 0; errored = false; + _async = false; constructor(options: SpanCtorOptions & { type: SpanType }) { this.context = options.context; @@ -141,4 +142,21 @@ export default abstract class Span { } return this; } + + get isAsync(): boolean { + return this._async; + } + + async(): this { + this.context.async(this); + this._async = true; + return this; + } + + await(): this { + this.endTime = new Date().getTime(); + this.context.await(this); + this._async = false; + return this; + } } diff --git a/tests/dev.ts b/tests/dev.ts new file mode 100644 index 0000000..0988a86 --- /dev/null +++ b/tests/dev.ts @@ -0,0 +1,59 @@ +/*! + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import Agent from '../src/index'; +import { Db, MongoClient } from 'mongodb'; + +import * as assert from 'assert'; + +Agent.start({}); +// Connection URL +const url = 'mongodb://localhost:27017'; + +// Database Name +const dbName = 'teambition'; +// Create a new MongoClient +const client = new MongoClient(url, { useUnifiedTopology: true }); +const insertDocumentsOk = (db, callback) => { + const collection = db.collection('documents'); + collection.insertMany([{ a: 1 }, { a: 2 }, { a: 3 }], (err, result) => { + callback(result); + }); +}; +const insertDocumentsFailed = (db: Db, callback) => { + const collection = db.collection('documents'); + collection.insertMany([{ a: 1 }, { a: 2 }, { a: 3 }], (err, result) => { + callback(result); + }); +}; +// Use connect method to connect to the Server +client.connect((err) => { + assert.equal(null, err); + const db = client.db(dbName); + insertDocumentsOk(db, () => {}); + (async () => { + await (async () => setTimeout(() => {}, 1000))(); + })(); + insertDocumentsOk(db, () => {}); + insertDocumentsFailed(db, () => { + client.close(); + }); +}); + +setTimeout(() => {}, 10000); diff --git a/tests/plugins/http/client.ts b/tests/plugins/http/client.ts new file mode 100644 index 0000000..ddd6785 --- /dev/null +++ b/tests/plugins/http/client.ts @@ -0,0 +1,57 @@ +/*! + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// @ts-ignore +import agent from '../../../src/index'; + +agent.start({}); + +import * as http from 'http'; +import { createServer, IncomingMessage, ServerResponse } from 'http'; + +const providerHostPort = process.env.SW_PROVIDER || '127.0.0.1:5000'; + +const port = 5001; + +const server = createServer((request: IncomingMessage, response: ServerResponse) => { + http + .request( + { + host: providerHostPort.split(':')[0], + port: providerHostPort.split(':')[1], + path: '/kids', + }, + (res) => { + // http + // .request({ + // host: providerHostPort.split(':')[0], + // port: providerHostPort.split(':')[1], + // path: '/inner-callback', + // }) + // .end(); + res.on('data', (chunk) => response.write(chunk)); + res.on('end', () => response.end('\n')); + }, + ) + .end(); +}); + +server.listen(port, () => { + console.log(`Server is listening on port ${port}`); +}); diff --git a/src/trace/context/Context.ts b/tests/plugins/http/server.ts similarity index 56% copy from src/trace/context/Context.ts copy to tests/plugins/http/server.ts index 7393887..32b9292 100644 --- a/src/trace/context/Context.ts +++ b/tests/plugins/http/server.ts @@ -17,28 +17,19 @@ * */ -import Span from '../../trace/span/Span'; -import Segment from '../../trace/context/Segment'; -import Snapshot from '../../trace/context/Snapshot'; -import { ContextCarrier } from './ContextCarrier'; +// @ts-ignore +import agent from '../../../src/index'; -export default interface Context { - segment: Segment; - spans: Span[]; +agent.start({}); - newLocalSpan(operation: string): Span; +import { createServer, IncomingMessage, ServerResponse } from 'http'; - newEntrySpan(operation: string, carrier?: ContextCarrier): Span; +const port = 5000; - newExitSpan(operation: string, peer: string, carrier?: ContextCarrier): Span; +const server = createServer((request: IncomingMessage, response: ServerResponse) => { + setTimeout(() => response.end('How you doin'), 1000); +}); - start(span: Span): Context; - - stop(span: Span): boolean; - - currentSpan(): Span | undefined; - - capture(): Snapshot; - - restore(snapshot: Snapshot): void; -} +server.listen(port, () => { + console.log(`Server is listening on port ${port}`); +});
