http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/js/transport.js ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/resources/transport/js/transport.js b/examples/streaming/transport/src/main/resources/transport/js/transport.js deleted file mode 100644 index eef0fe9..0000000 --- a/examples/streaming/transport/src/main/resources/transport/js/transport.js +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -var myChart = echarts.init(document.getElementById("mychart")) - -echarts.util.mapData.params.params.football = { - getGeoJson: function (callback) { - $.ajax({ - url: "../svg/beijing.svg", - dataType: 'xml', - success: function (xml) { - callback(xml) - } - }); - } -} - -function updateRecords(tableId) { - $.getJSON("records", function (json) { - var tableStr = "<table class=\"dataintable\" style=\"margin-left: 5px;\">"; - tableStr += "<tr><th>Over Speed Vehicle ID</th><th>Speed</th><th>Location</th><th>Time</th></tr>"; - var records = json.records; - for (var i = 0; i < Math.min(records.length, 20); i++) { - var record = records[i]; - var vehicleId = record.vehicleId; - var location = record.locationId.split("_"); - var speed = record.speed; - var row = location[1]; - var column = location[2]; - var time = new Date(Number(record.timestamp)).toLocaleTimeString().replace(/^\D*/, ''); - tableStr += "<tr><td>" + vehicleId + "</td>"; - tableStr += "<td>" + speed + "km/h </td>" - tableStr += "<td>(" + row + ", " + column + ")</td>"; - tableStr += "<td>" + time + "</td></tr>"; - } - if (records.length < 20) { - for (var i = records.length; i < 20; i++) { - tableStr += "<tr><td></td>"; - tableStr += "<td> </td>" - tableStr += "<td> </td>"; - tableStr += "<td> </td></tr>"; - } - } - tableStr += "</table>" - document.getElementById(tableId).innerHTML = tableStr; - } - ) -} - -function initChart(chartid, vehicleId) { - // åºäºåå¤å¥½çdomï¼åå§åechartså¾è¡¨ - $.getJSON("trace/" + vehicleId, function (json) { - // 为echarts对象å è½½æ°æ® - var records = json.records; - var timeLine = new Array(records.length); - var markPoints = new Array(records.length); - var options_ = new Array(records.length - 2); - for (var i = 0; i < records.length; i++) { - var record = records[i]; - var vehicleId = record.vehicleId; - var location = record.locationId.split("_"); - var row = location[1]; - var column = location[2]; - var time = new Date(Number(record.timeStamp)).toLocaleTimeString().replace(/^\D*/, ''); - timeLine[i] = time; - var currentPonit = {name: "", value: i, geoCoord: [row * 90, column * 90]}; - markPoints[i] = currentPonit; - } - options_[0] = - { - title: { - text: 'Vehicle trace' - }, - tooltip: { - trigger: 'item' - }, - toolbox: { - show: false, - feature: { - mark: {show: true}, - dataView: {show: true, readOnly: false}, - magicType: {show: true, type: ['line', 'bar']}, - restore: {show: true}, - saveAsImage: {show: true} - } - }, - series: [ - { - name: 'Vehicle trace', - type: 'map', - mapType: 'football', - mapLocation: { - y: 30, - height: 430 - }, - itemStyle: { - normal: {label: {show: false}}, - emphasis: {label: {show: false}} - }, - data: [ - {name: 'City', hoverable: false, itemStyle: {normal: {label: {show: false}}}} - ], - markPoint: { - symbol: 'circle', - symbolSize: 8, - itemStyle: { - normal: { - borderWidth: 1, - color: 'blue', - lineStyle: { - type: 'solid' - } - } - }, - data: [markPoints[0]] - }, - markLine: { - smooth: true, - effect: { - show: true, - scaleSize: 1.5, - period: 1.5, - color: '#fff' - }, - itemStyle: { - normal: { - borderWidth: 2, - color: 'red', - lineStyle: { - type: 'solid' - } - } - }, - data: [] - } - } - ] - } - for (var i = 1; i < markPoints.length; i++) { - options_[i] = - { - series: [ - { - markPoint: { - data: [markPoints[i]] - }, - markLine: { - data: [] - } - } - ] - } - } - var option = { - timeline: { - type: 'number', - playInterval: 500, - autoPlay: true, - data: timeLine - }, - options: options_ - }; - myChart.setOption(option); - }); -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/svg/beijing.svg ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/resources/transport/svg/beijing.svg b/examples/streaming/transport/src/main/resources/transport/svg/beijing.svg deleted file mode 100644 index 5342c24..0000000 --- a/examples/streaming/transport/src/main/resources/transport/svg/beijing.svg +++ /dev/null @@ -1,199 +0,0 @@ -<?xml version="1.0" encoding="utf-8"?> - -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -<!-- Generator: Adobe Illustrator 14.0.0, SVG Export Plug-In . SVG Version: 6.00 Build 43363) --> -<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd"> -<svg version="1.1" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" x="0px" y="0px" width="1103px" - height="1115px" viewBox="0 0 200 202" enable-background="new 0 0 1103 1115" xml:space="preserve"> -<g id="èæ¯"> - - <polygon fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points=" - 719,377 533,377 487,398 491,563 469,571 469,682 504,682 546,676 661,667 672,672 710,672 740,676 751,645 746,567 725,563 - 719,510 "/> - - <path fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" d=" - M359,533V402l-7-76l36-18l333-10l76,65v315l-38,41h-49l-37,6H534l-81,31h-37c-33.5-0.5-56.5-54.5-57-70V533z"/> - - <path fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" d=" - M252,571v44l11,39l16,41v71l7.5,27.5L290,811l11,13l16,1l447-4l48-10l51-45l7-25l8-70V370c1.5-12.5-4.5-24.5-11-32l-103-96l-25-15 - l-21-6l-392,17l-19,8l-17,24l-35,28l-4,10l4,38l-3,40v47l3,37l-3,40V571z"/> - - <path fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" d=" - M259,133l63-8l52-13l57-19h49l241,6l69,26h51c18.5,2.5,27.5,6.5,37,16l82,121l16,28l9,20l11,37l11,21l25,37l7,122l11,80v24v73l6,30 - l-6,17L944,857l-93,45l-46,62l-17,10l-49,1c-17.5,1.5-37.5,20.5-43,35l-19,52l-18,12l-84,27c-5.5,1.5-21.5,6.5-27-13l-6-37 - c-2.5-8.5-3.5-22.5-27-23l-151-18c-15.5-1.5-30.5,5.5-50,18l-23,12c-17.5,8.5-41.5-2.5-49-30l-15-60v-35c0,0-49.5-86.5-55-90 - s-35-9-35-9l-38-65l-5-23l-33-60l-6-22l13-46v-89l2-43l-2-63l10-37l19-111l-3-27c0.5-8.5,5.5-30.5,24-36l29-6l25-15h46l17-10l16-22 - L259,133z"/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points=" - 1112,523 990,523 967,530 878,530 "/> - - <line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" x1="1030" y1="405" x2="1112" y2="406"/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points=" - 1112,48 962,122 777.5,343.5 "/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points=" - 856,-7 856,10 848,31 797,81 788,93 763,186 706,298 "/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points=" - 559.5,303.5 565,270 547,207 422,-7 "/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points=" - -10,1003 13,995 25,988 42,931 42,912 47,898 70,870 74,853 66,789 74,779 110,767 143,742 152,717 280,647 331,633 359.5,617.5 - "/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points=" - 456,1115 449,1049 449,1022 467,986 466.5,977.5 455.5,868.5 460,842 460.5,754.5 "/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points=" - 1103,1115 1066,1044 1042,1011 1015,982 984,927 952,888 901,849 778,703 "/> - - <line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" x1="1036.5" y1="768.5" x2="1112" y2="810"/> - - <line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" x1="878" y1="671" x2="1112" y2="682"/> - - <line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" x1="1019" y1="-7" x2="955" y2="129"/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - -10,533 540,536 671,531 878,530 "/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - 0,468 132,468 168,457 175,457 201,470 212,470 360,470 422,474 458,474 494,472 539,472 "/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - 539,477 590,477 595,474 623,474 630,470 732,470 753,476 763,477 825,502 843,503 929,502 984.5,496.5 1037,500 1091,500 - 1112,503 "/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - 494,565 533.5,564.5 556,562 616,562 626,559 670,559 680,562 725,562 "/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - 1030,405 971,405 885,435 683,435 589.5,435.5 539,439 523,439 372,439 254,439 "/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - 360,618 389,602 403,600 469,602 591.5,601.5 625.5,591.5 652,589 699,588 987,589 "/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - 716,377 748,377 778,344 "/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - -10,596 6,603 47,603 63,588 91,572 394,572 472,571 "/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - 725,544 689,544 674,559 "/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - 539,406 474.5,410.5 458,415 424.5,417.5 400.5,414.5 378,413 305.5,379.5 283.5,385.5 254,384 "/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - 878,530 864,541 725,549 "/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - 213,688 231,694 278,694 397,682 467,682 "/> - - <line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" x1="469" y1="682" x2="460.5" y2="754.5"/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - 308,755 313,638 313,496 316,487 315.5,420.5 313,399 313,344 282,280 "/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - 352,327 322,338 299,344 254,344 "/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - 162,-7 206,13 240,48 287,120 294,175 312,193 312,206 324,219.5 330,280 346,298 352,327 "/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - 332.5,-9 357,41 364,70 369,109 379,124 381,155 379,159 373,159 370,168 375,265 391,322 401,414 401,474 "/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - 477,93 482,229 486,361 489,372 490,399 "/> - - <line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" x1="560" y1="304" x2="556" y2="377"/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - 537,377 542,536 542,694 539,703 535,716 535,727 530,825 "/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - 613,596 615,671 624,924 666,921 666,930 670,935 678,991 755,1061 766,1115 "/> - - <line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" x1="600" y1="227" x2="604" y2="377"/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - 642,434 640,377 638,302 638,227 642,99 653,10 653,0 "/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - 1037,769 951,709 951,677 870,622 678,623 "/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - 653,11 670,58 668,85 668,110 664,129 664,146 668,176 667,199 667,227 672,369 664,369 664,377 672,609 678,622 680,725 "/> - - <line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" x1="878" y1="671" x2="745" y2="671"/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - 1112,470 973,472 965,474 770,474 763,478 "/> - - <line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" x1="845" y1="434" x2="842" y2="696"/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - 763,530 763,383 751,372 "/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - 812.5,125 816,205 861,243 "/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - 667,172 619,174 578,175 494,178 434,180 "/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - 720,271 709,269 392,272 340,275 329,279 282,283 "/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - 826,797 837,810 840.5,831.5 885.5,884.5 967.16,1014.78 "/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - 888,1003 1025,917 1050,905 1064.5,888.5 "/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - 1071,1051 1051,1065 1024,1081 987,1065 976,1051 961,1040 898,1017 861,959 851,935 837,923 813,888 763,838 757,825 751,769 - 749,737 747,682 737,677 "/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - 1009,988 967.28,1015.13 933.5,1117.5 "/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - 0,984 9,974 15,957 15,950 15,939 26,909 34,901 45,897 60,878 58,867 63,856 64,844 61,825 64,818 61,776 104,769 "/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - 917,866 845,900 797,919 781,915 742,923 725,923 667,930 "/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - -3,1076 93,1076 148,1056 248,1055 270,1047 333,1009 458,909 "/> - - <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#14A97E" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points=" - 239,129 241,154 251,172 270,176 292,176 295,187 312,193 346,192 352,184 371,184 374,237 377,272 391,327 401,414 422,418 - 449,416 472,411 530,406 539,414 542,677 560,703 535,725 530,810 "/> -</g> -<g id="å±_1" display="none"> - - <polyline display="inline" fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#0096C0" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points=" - 797,623.5 797,362.5 778,345.5 792,328.5 725,273.5 706,270.5 386.5,273 340,277.5 333,279.5 299,282.5 "/> -</g> -</svg> http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/transport.html ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/resources/transport/transport.html b/examples/streaming/transport/src/main/resources/transport/transport.html deleted file mode 100644 index baee931..0000000 --- a/examples/streaming/transport/src/main/resources/transport/transport.html +++ /dev/null @@ -1,88 +0,0 @@ -<!DOCTYPE html> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ with the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<html> - -<head> - <meta charset="utf-8"> - <link rel=stylesheet type=text/css href="css/custom.css"> - <script src="http://echarts.baidu.com/build/source/echarts-all.js"></script> - <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js"></script> -</head> - -<body style="background-color:#F2F2F2"> -<div id="container" style="width:882px; height:450px;margin-left:auto;margin-right:auto;"> - <div style="height:0px"></div> - <div id="header"> - <div - style="font-weight:600;position:relative;left:50px;top:50px;font-family: calibri, Arial, Helvetica, sans-serif;font-size:29px;color:white"> - Big Data Transport Monitoring Demo - </div> - </div> - <div id="body"> - <div id="Menu"> - <div style="position:relative;margin-left:30px; margin-right:20px;margin-top:20px;"> - <!-- form to post to accompany to get accompanying cars --> - - <table style="width:100%"> - <tr> - <td class="sidebar-label">Vehicle Id:</td> - </tr> - <tr> - <td class="sidebar-label"></td> - </tr> - <tr> - <td style="vertical-align:top;"> - <input id="vehicleId" class="sidebar" type="text" name="vehicleId"/> - </td> - </tr> - </table> - <div class="splitter"></div> - <div> - <button id="search" onclick="search_onclick()">Search</button> - </div> - </div> - </div> - <div id="content" - style="height:100%;width:585px;float:left;position:relative;left:20px;overflow:scroll;"> - <div - style="height:50px;position:relative;top:15px;vertical-align:middle;font-weight:300;font-family: calibri, Arial, Helvetica, sans-serif;font-size:22px;color:black"> - Vehicle Trace: - </div> - <div style="height:7px;background-color:#92BDF2;"></div> - - <div id="mychart"></div> - - <div id="mytable"></div> - </div> - </div> - <div id="footer"> - Big Data Team @ Intel - </div> - <script src="js/transport.js"></script> - <script type="text/javascript"> - function search_onclick() { - var vehicleId = document.getElementById('vehicleId').value - initChart("mychart", vehicleId) - } - setInterval(updateRecords, 1000, "mytable") - </script> -</div> -</body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Data.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Data.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Data.scala deleted file mode 100644 index 0aaf72c..0000000 --- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Data.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.streaming.examples.transport - -case class LocationInfo(id: String, row: Int, column: Int) - -// scalastyle:off equals.hash.code -case class PassRecord(vehicleId: String, locationId: String, timeStamp: Long) { - override def hashCode: Int = vehicleId.hashCode -} -// scalastyle:on equals.hash.code - -case class GetTrace(vehicleId: String) - -case class VehicleTrace(records: Array[PassRecord]) - -case class OverSpeedReport(vehicleId: String, speed: String, timestamp: Long, locationId: String) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/DataSource.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/DataSource.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/DataSource.scala deleted file mode 100644 index 555e850..0000000 --- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/DataSource.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.streaming.examples.transport - -import scala.concurrent.duration._ - -import org.apache.gearpump.Message -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.examples.transport.generator.{MockCity, PassRecordGenerator} -import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId} - -class DataSource(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { - import taskContext.{output, parallelism, scheduleOnce, taskId} - private val overdriveThreshold = conf.getInt(VelocityInspector.OVER_DRIVE_THRESHOLD).get - private val vehicleNum = conf.getInt(DataSource.VEHICLE_NUM).get / parallelism - private val citySize = conf.getInt(DataSource.MOCK_CITY_SIZE).get - private val mockCity = new MockCity(citySize) - private val recordGenerators: Array[PassRecordGenerator] = - PassRecordGenerator.create(vehicleNum, getIdentifier(taskId), mockCity, overdriveThreshold) - - override def onStart(startTime: StartTime): Unit = { - self ! Message("start", System.currentTimeMillis()) - } - - override def onNext(msg: Message): Unit = { - recordGenerators.foreach(generator => - output(Message(generator.getNextPassRecord(), System.currentTimeMillis()))) - scheduleOnce(1.second)(self ! Message("continue", System.currentTimeMillis())) - } - - private def getIdentifier(taskId: TaskId): String = { - // scalastyle:off non.ascii.character.disallowed - s"沪A${taskId.processorId}${taskId.index}" - // scalastyle:on non.ascii.character.disallowed - } -} - -object DataSource { - final val VEHICLE_NUM = "vehicle.number" - final val MOCK_CITY_SIZE = "mock.city.size" -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/QueryServer.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/QueryServer.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/QueryServer.scala deleted file mode 100644 index ff3b4b4..0000000 --- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/QueryServer.scala +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.streaming.examples.transport - -import java.util.concurrent.TimeUnit -import scala.concurrent.Future -import scala.util.{Failure, Success} - -import akka.actor.Actor._ -import akka.actor.{Actor, ActorRefFactory, Props} -import akka.io.IO -import akka.pattern.ask -import spray.can.Http -import spray.http.StatusCodes -import spray.json._ -import spray.routing.{HttpService, Route} -import upickle.default.write - -import org.apache.gearpump.Message -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.partitioner.PartitionerDescription -import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef} -import org.apache.gearpump.streaming.examples.transport.QueryServer.{GetAllRecords, WebServer} -import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId} -import org.apache.gearpump.streaming.{DAG, ProcessorDescription, ProcessorId, StreamApplication} -import org.apache.gearpump.util.Graph - -class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { - import system.dispatcher - import taskContext.appMaster - - var inspector: (ProcessorId, ProcessorDescription) = null - implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS) - private var overSpeedRecords = List.empty[OverSpeedReport] - - override def onStart(startTime: StartTime): Unit = { - val dag = DAG(conf.getValue[Graph[ProcessorDescription, PartitionerDescription]]( - StreamApplication.DAG).get) - inspector = dag.processors.find { kv => - val (_, processor) = kv - processor.taskClass == classOf[VelocityInspector].getName - }.get - taskContext.actorOf(Props(new WebServer)) - } - - override def onNext(msg: Message): Unit = { - } - - override def receiveUnManagedMessage: Receive = { - case getTrace@GetTrace(vehicleId: String) => - val parallism = inspector._2.parallelism - val processorId = inspector._1 - val analyzerTaskId = TaskId(processorId, (vehicleId.hashCode & Integer.MAX_VALUE) % parallism) - val requester = sender - (appMaster ? LookupTaskActorRef(analyzerTaskId)) - .asInstanceOf[Future[TaskActorRef]].flatMap { task => - (task.task ? getTrace).asInstanceOf[Future[VehicleTrace]] - }.map { trace => - LOG.info(s"reporting $trace") - requester ! trace - } - case record@OverSpeedReport(vehicleId, speed, timestamp, locationId) => - LOG.info(s"vehicle $vehicleId is over speed, the speed is $speed km/h") - overSpeedRecords :+= record - case GetAllRecords => - sender ! QueryServer.OverSpeedRecords(overSpeedRecords.toArray.sortBy(_.timestamp)) - overSpeedRecords = List.empty[OverSpeedReport] - case _ => - // Ignore - } -} - -object QueryServer { - object GetAllRecords - - case class OverSpeedRecords(records: Array[OverSpeedReport]) - - class WebServer extends Actor with HttpService { - - import context.dispatcher - implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS) - def actorRefFactory: ActorRefFactory = context - implicit val system = context.system - - IO(Http) ! Http.Bind(self, interface = "0.0.0.0", port = 8080) - - override def receive: Receive = runRoute(webServer ~ staticRoute) - - def webServer: Route = { - path("trace" / Segment) { vehicleId => - get { - onComplete((context.parent ? GetTrace(vehicleId)).asInstanceOf[Future[VehicleTrace]]) { - case Success(trace: VehicleTrace) => - val json = write(trace) - complete(pretty(json)) - case Failure(ex) => complete(StatusCodes.InternalServerError, - s"An error occurred: ${ex.getMessage}") - } - } - } ~ - path("records") { - get { - onComplete((context.parent ? GetAllRecords).asInstanceOf[Future[OverSpeedRecords]]) { - case Success(records: OverSpeedRecords) => - val json = write(records) - complete(pretty(json)) - case Failure(ex) => complete(StatusCodes.InternalServerError, - s"An error occurred: ${ex.getMessage}") - } - } - } - } - - val staticRoute = { - pathEndOrSingleSlash { - getFromResource("transport/transport.html") - } ~ - pathPrefix("css") { - get { - getFromResourceDirectory("transport/css") - } - } ~ - pathPrefix("svg") { - get { - getFromResourceDirectory("transport/svg") - } - } ~ - pathPrefix("js") { - get { - getFromResourceDirectory("transport/js") - } - } - } - - private def pretty(json: String): String = { - json.parseJson.prettyPrint - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Transport.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Transport.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Transport.scala deleted file mode 100644 index 5beb2e1..0000000 --- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Transport.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.streaming.examples.transport - -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.cluster.client.ClientContext -import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} -import org.apache.gearpump.partitioner.HashPartitioner -import org.apache.gearpump.streaming.{Processor, StreamApplication} -import org.apache.gearpump.util.Graph._ -import org.apache.gearpump.util.{AkkaApp, Graph} - -/** A city smart transportation streaming application */ -object Transport extends AkkaApp with ArgumentsParser { - override val options: Array[(String, CLIOption[Any])] = Array( - "source" -> CLIOption[Int]("<how many task to generate data>", required = false, - defaultValue = Some(10)), - "inspector" -> CLIOption[Int]("<how many over speed inspector>", required = false, - defaultValue = Some(4)), - "vehicle" -> CLIOption[Int]("<how many vehicles's to generate>", required = false, - defaultValue = Some(1000)), - "citysize" -> CLIOption[Int]("<the blocks number of the mock city>", required = false, - defaultValue = Some(10)), - "threshold" -> CLIOption[Int]("<overdrive threshold, km/h>", required = false, - defaultValue = Some(60))) - - def application(config: ParseResult): StreamApplication = { - val sourceNum = config.getInt("source") - val inspectorNum = config.getInt("inspector") - val vehicleNum = config.getInt("vehicle") - val citysize = config.getInt("citysize") - val threshold = config.getInt("threshold") - val source = Processor[DataSource](sourceNum) - val inspector = Processor[VelocityInspector](inspectorNum) - val queryServer = Processor[QueryServer](1) - val partitioner = new HashPartitioner - - val userConfig = UserConfig.empty.withInt(DataSource.VEHICLE_NUM, vehicleNum). - withInt(DataSource.MOCK_CITY_SIZE, citysize). - withInt(VelocityInspector.OVER_DRIVE_THRESHOLD, threshold). - withInt(VelocityInspector.FAKE_PLATE_THRESHOLD, 200) - StreamApplication("transport", Graph(source ~ partitioner ~> inspector, - Node(queryServer)), userConfig) - } - - override def main(akkaConf: Config, args: Array[String]): Unit = { - val config = parse(args) - val context = ClientContext(akkaConf) - implicit val system = context.system - context.submit(application(config)) - context.close() - } -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/VelocityInspector.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/VelocityInspector.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/VelocityInspector.scala deleted file mode 100644 index 4d9bd04..0000000 --- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/VelocityInspector.scala +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.streaming.examples.transport - -import java.util.concurrent.TimeUnit -import scala.collection.immutable.Queue -import scala.collection.mutable -import scala.concurrent.Future - -import akka.actor.Actor._ -import akka.actor.ActorRef -import akka.pattern.ask - -import org.apache.gearpump.Message -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.partitioner.PartitionerDescription -import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef} -import org.apache.gearpump.streaming.examples.transport.generator.MockCity -import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId} -import org.apache.gearpump.streaming.{DAG, ProcessorDescription, StreamApplication} -import org.apache.gearpump.util.Graph - -class VelocityInspector(taskContext: TaskContext, conf: UserConfig) - extends Task(taskContext, conf) { - - import system.dispatcher - import taskContext.appMaster - implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS) - private val passRecords = mutable.Map.empty[String, Queue[PassRecord]] - private val fakePlateThreshold = conf.getInt(VelocityInspector.FAKE_PLATE_THRESHOLD).get - private val overdriveThreshold = conf.getInt(VelocityInspector.OVER_DRIVE_THRESHOLD).get - private val citySize = conf.getInt(DataSource.MOCK_CITY_SIZE).get - private val mockCity = new MockCity(citySize) - private var queryServerActor: ActorRef = null - - override def onStart(startTime: StartTime): Unit = { - val dag = DAG(conf.getValue[Graph[ProcessorDescription, PartitionerDescription]]( - StreamApplication.DAG).get) - val queryServer = dag.processors.find { kv => - val (_, processor) = kv - processor.taskClass == classOf[QueryServer].getName - }.get - val queryServerTaskId = TaskId(queryServer._1, 0) - (appMaster ? LookupTaskActorRef(queryServerTaskId)).asInstanceOf[Future[TaskActorRef]] - .map { task => - queryServerActor = task.task - } - } - - import org.apache.gearpump.streaming.examples.transport.VelocityInspector._ - override def onNext(msg: Message): Unit = { - msg.msg match { - case passRecord: PassRecord => - val records = passRecords.getOrElse(passRecord.vehicleId, Queue.empty[PassRecord]) - if (records.size > 0) { - val velocity = getVelocity(passRecord, records.last) - val formatted = "%.2f".format(velocity) - if (velocity > overdriveThreshold) { - if (velocity > fakePlateThreshold) { - LOG.info(s"vehicle ${passRecord.vehicleId} maybe a fake plate, " + - s"the speed is $formatted km/h") - } - if (queryServerActor != null) { - queryServerActor ! OverSpeedReport(passRecord.vehicleId, formatted, - passRecord.timeStamp, passRecord.locationId) - } - } - } - passRecords.update(passRecord.vehicleId, records.enqueueFinite(passRecord, RECORDS_NUM)) - } - } - - override def receiveUnManagedMessage: Receive = { - case GetTrace(vehicleId) => - val records = passRecords.getOrElse(vehicleId, Queue.empty[PassRecord]) - sender ! VehicleTrace(records.toArray.sortBy(_.timeStamp)) - } - - private def getVelocity(passRecord: PassRecord, lastPassRecord: PassRecord): Float = { - val distanceInKm = getDistance(lastPassRecord.locationId, passRecord.locationId) - val timeInHour = (passRecord.timeStamp - lastPassRecord.timeStamp).toFloat / (1000 * 60 * 60) - distanceInKm / timeInHour - } - - private def getDistance(location1: String, location2: String): Long = { - mockCity.getDistance(location1, location2) - } -} - -object VelocityInspector { - final val OVER_DRIVE_THRESHOLD = "overdrive.threshold" - final val FAKE_PLATE_THRESHOLD = "fakeplate.threshold" - final val RECORDS_NUM = 20 - - class FiniteQueue[T](q: Queue[T]) { - def enqueueFinite[B >: T](elem: B, maxSize: Int): Queue[B] = { - var result = q.enqueue(elem) - while (result.size > maxSize) { - result = result.dequeue._2 - } - result - } - } - - import scala.language.implicitConversions - - implicit def queue2FiniteQueue[T](q: Queue[T]): FiniteQueue[T] = new FiniteQueue[T](q) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCity.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCity.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCity.scala deleted file mode 100644 index 60e0bcf..0000000 --- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCity.scala +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.streaming.examples.transport.generator - -import scala.util.Random - -import org.apache.gearpump.streaming.examples.transport.generator.MockCity._ - -class MockCity(size: Int) { - private val random = new Random() - private val directions = Array(UP, DOWN, LEFT, RIGHT) - - def nextLocation(currentLocationId: String): String = { - val coordinate = idToCoordinate(currentLocationId) - val direction = directions(random.nextInt(4)) - val newCoordinate = coordinate.addOffset(direction) - if (inCity(newCoordinate)) { - coordinateToId(newCoordinate) - } else { - nextLocation(currentLocationId) - } - } - - def getDistance(locationId1: String, locationId2: String): Long = { - val coordinate1 = idToCoordinate(locationId1) - val coordinate2 = idToCoordinate(locationId2) - val blocks = Math.abs(coordinate1.row - coordinate2.row) + - Math.abs(coordinate1.column - coordinate2.column) - blocks * LENGTH_PER_BLOCK - } - - def randomLocationId(): String = { - val row = random.nextInt(size) - val column = random.nextInt(size) - coordinateToId(Coordinate(row, column)) - } - - private def coordinateToId(coordinate: Coordinate): String = { - s"Id_${coordinate.row}_${coordinate.column}" - } - - private def idToCoordinate(locationId: String): Coordinate = { - val attr = locationId.split("_") - val row = attr(1).toInt - val column = attr(2).toInt - Coordinate(row, column) - } - - private def inCity(coordinate: Coordinate): Boolean = { - coordinate.row >= 0 && - coordinate.row < size && - coordinate.column >= 0 && - coordinate.column < size - } -} - -object MockCity { - // The length of the mock city, km - final val LENGTH_PER_BLOCK = 5 - // The minimal speed, km/h - final val MINIMAL_SPEED = 10 - - final val UP = Coordinate(0, 1) - final val DOWN = Coordinate(0, -1) - final val LEFT = Coordinate(-1, 0) - final val RIGHT = Coordinate(1, 0) - - case class Coordinate(row: Int, column: Int) { - def addOffset(coordinate: Coordinate): Coordinate = { - Coordinate(this.row + coordinate.row, this.column + coordinate.column) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala deleted file mode 100644 index e8c1c59..0000000 --- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.streaming.examples.transport.generator - -import scala.util.Random - -import org.apache.gearpump.streaming.examples.transport.PassRecord -import org.apache.gearpump.util.LogUtil - -class PassRecordGenerator(vehicleId: String, city: MockCity, overdriveThreshold: Int) { - private val LOG = LogUtil.getLogger(getClass) - LOG.info(s"Generate pass record for vehicle $vehicleId") - private var timeStamp = System.currentTimeMillis() - - private var locationId = city.randomLocationId() - private val random = new Random() - private val fakePlate = random.nextInt(1000) < 1000 * PassRecordGenerator.FAKE_PLATE_RATE - private val (randomMin, randomRange) = { - val lowerBound = MockCity.LENGTH_PER_BLOCK * 1000 * 60 * 60 / overdriveThreshold.toFloat - val upperBound = MockCity.LENGTH_PER_BLOCK * 1000 * 60 * 60 / MockCity.MINIMAL_SPEED.toFloat - val overdrive = (upperBound - lowerBound) * PassRecordGenerator.OVERDRIVE_RATE - val randomMin = Math.max(lowerBound - overdrive, PassRecordGenerator.TWOMINUTES) - val randomRange = upperBound - randomMin - (randomMin.toInt, randomRange.toInt) - } - - def getNextPassRecord(): PassRecord = { - locationId = if (fakePlate) { - city.randomLocationId() - } else { - city.nextLocation(locationId) - } - timeStamp += (random.nextInt(randomRange) + randomMin) - PassRecord(vehicleId, locationId, timeStamp) - } -} - -object PassRecordGenerator { - final val FAKE_PLATE_RATE = 0.01F - final val OVERDRIVE_RATE = 0.05F - final val TWOMINUTES = 2 * 60 * 1000 - - def create(generatorNum: Int, prefix: String, city: MockCity, overdriveThreshold: Int) - : Array[PassRecordGenerator] = { - var result = Map.empty[String, PassRecordGenerator] - val digitsNum = (Math.log10(generatorNum) + 1).toInt - for (i <- 1 to generatorNum) { - val vehicleId = prefix + s"%0${digitsNum}d".format(i) - val generator = new PassRecordGenerator(vehicleId, city, overdriveThreshold) - result += vehicleId -> generator - } - result.values.toArray - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/DataSourceSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/DataSourceSpec.scala b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/DataSourceSpec.scala deleted file mode 100644 index 1f525ae..0000000 --- a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/DataSourceSpec.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.streaming.examples.transport - -import org.mockito.Matchers._ -import org.mockito.Mockito._ -import org.scalatest.{FlatSpec, Matchers} - -import org.apache.gearpump.Message -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.task.StartTime - -class DataSourceSpec extends FlatSpec with Matchers { - it should "create the pass record" in { - val vehicleNum = 2 - val context = MockUtil.mockTaskContext - - val userConfig = UserConfig.empty.withInt(DataSource.VEHICLE_NUM, vehicleNum). - withInt(DataSource.MOCK_CITY_SIZE, 10). - withInt(VelocityInspector.OVER_DRIVE_THRESHOLD, 60). - withInt(VelocityInspector.FAKE_PLATE_THRESHOLD, 200) - - val source = new DataSource(context, userConfig) - source.onStart(StartTime(0)) - source.onNext(Message("start")) - verify(context, times(vehicleNum)).output(any[Message]) - source.onStop() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/TransportSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/TransportSpec.scala b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/TransportSpec.scala deleted file mode 100644 index 2f83de5..0000000 --- a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/TransportSpec.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.streaming.examples.transport - -import scala.concurrent.Future -import scala.util.Success - -import org.scalatest.prop.PropertyChecks -import org.scalatest.{BeforeAndAfterAll, Matchers, PropSpec} - -import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication -import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult -import org.apache.gearpump.cluster.{MasterHarness, TestUtil} - -class TransportSpec - extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness { - - override def beforeAll { - startActorSystem() - } - - override def afterAll { - shutdownActorSystem() - } - - protected override def config = TestUtil.DEFAULT_CONFIG - - property("Transport should succeed to submit application with required arguments") { - val requiredArgs = Array.empty[String] - val optionalArgs = Array( - "-source", "1", - "-inspector", "1", - "-vehicle", "100", - "-citysize", "10", - "-threshold", "60") - - val args = { - Table( - ("requiredArgs", "optionalArgs"), - (requiredArgs, optionalArgs) - ) - } - val masterReceiver = createMockMaster() - forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) => - val args = requiredArgs ++ optionalArgs - - Future { - Transport.main(masterConfig, args) - } - masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME) - masterReceiver.reply(SubmitApplicationResult(Success(0))) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCitySpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCitySpec.scala b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCitySpec.scala deleted file mode 100644 index ba4eb2d..0000000 --- a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCitySpec.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.streaming.examples.transport.generator - -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -class MockCitySpec extends PropSpec with PropertyChecks with Matchers { - - property("MockCity should maintain the location properly") { - val city = new MockCity(10) - val start = city.randomLocationId() - val nextLocation = city.nextLocation(start) - assert(city.getDistance(start, nextLocation) == MockCity.LENGTH_PER_BLOCK) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala deleted file mode 100644 index f0eebbf..0000000 --- a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.streaming.examples.transport.generator - -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -class PassRecordGeneratorSpec extends PropSpec with PropertyChecks with Matchers { - - property("PassRecordGenerator should generate pass record") { - val id = "test" - val city = new MockCity(10) - val generator = new PassRecordGenerator(id, city, 60) - val passrecord1 = generator.getNextPassRecord() - val passrecord2 = generator.getNextPassRecord() - assert(city.getDistance(passrecord1.locationId, passrecord2.locationId) == - MockCity.LENGTH_PER_BLOCK) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java index 76069c1..0a8fb4f 100644 --- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java +++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java @@ -21,9 +21,10 @@ package org.apache.gearpump.streaming.examples.wordcountjava; import org.apache.gearpump.Message; import org.apache.gearpump.cluster.UserConfig; import org.apache.gearpump.streaming.javaapi.Task; -import org.apache.gearpump.streaming.task.StartTime; import org.apache.gearpump.streaming.task.TaskContext; +import java.time.Instant; + public class Split extends Task { public static String TEXT = "This is a good start for java! bingo! bingo! "; @@ -37,7 +38,7 @@ public class Split extends Task { } @Override - public void onStart(StartTime startTime) { + public void onStart(Instant startTime) { self().tell(new Message("start", now()), self()); } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java index 89c3b14..3daa6e0 100644 --- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java +++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java @@ -21,10 +21,10 @@ package org.apache.gearpump.streaming.examples.wordcountjava; import org.apache.gearpump.Message; import org.apache.gearpump.cluster.UserConfig; import org.apache.gearpump.streaming.javaapi.Task; -import org.apache.gearpump.streaming.task.StartTime; import org.apache.gearpump.streaming.task.TaskContext; import org.slf4j.Logger; +import java.time.Instant; import java.util.HashMap; public class Sum extends Task { @@ -37,7 +37,7 @@ public class Sum extends Task { } @Override - public void onStart(StartTime startTime) { + public void onStart(Instant startTime) { //skip } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala index ae63f10..af3c04c 100644 --- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala +++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala @@ -18,16 +18,17 @@ package org.apache.gearpump.streaming.examples.wordcount +import java.time.Instant import java.util.concurrent.TimeUnit import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} +import org.apache.gearpump.streaming.task.{Task, TaskContext} class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { import taskContext.output - override def onStart(startTime: StartTime): Unit = { + override def onStart(startTime: Instant): Unit = { self ! Message("start") } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala index c3fa82a..dbefc93 100644 --- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala +++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala @@ -18,6 +18,7 @@ package org.apache.gearpump.streaming.examples.wordcount +import java.time.Instant import java.util.concurrent.TimeUnit import scala.collection.mutable import scala.concurrent.duration.FiniteDuration @@ -26,7 +27,7 @@ import akka.actor.Cancellable import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} +import org.apache.gearpump.streaming.task.{Task, TaskContext} class Sum(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { private[wordcount] val map: mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]() @@ -37,7 +38,7 @@ class Sum(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, private var scheduler: Cancellable = null - override def onStart(startTime: StartTime): Unit = { + override def onStart(startTime: Instant): Unit = { scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS), new FiniteDuration(30, TimeUnit.SECONDS))(reportWordCount) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala index cef9337..8b50890 100644 --- a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala +++ b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala @@ -17,6 +17,8 @@ */ package org.apache.gearpump.streaming.examples.wordcount +import java.time.Instant + import scala.concurrent.Await import scala.concurrent.duration.Duration @@ -29,7 +31,6 @@ import org.scalatest.{Matchers, WordSpec} import org.apache.gearpump.Message import org.apache.gearpump.cluster.{TestUtil, UserConfig} import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.task.StartTime class SplitSpec extends WordSpec with Matchers { @@ -47,10 +48,10 @@ class SplitSpec extends WordSpec with Matchers { val conf = UserConfig.empty val split = new Split(taskContext, conf) - split.onStart(StartTime(0)) + split.onStart(Instant.EPOCH) mockTaskActor.expectMsgType[Message] - val expectedWordCount = Split.TEXT_TO_SPLIT.split( """[\s\n]+""").filter(_.nonEmpty).length + val expectedWordCount = Split.TEXT_TO_SPLIT.split( """[\s\n]+""").count(_.nonEmpty) split.onNext(Message("next")) verify(taskContext, times(expectedWordCount)).output(anyObject()) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SumSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SumSpec.scala b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SumSpec.scala index e42d696..17e1765 100644 --- a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SumSpec.scala +++ b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SumSpec.scala @@ -17,6 +17,8 @@ */ package org.apache.gearpump.streaming.examples.wordcount +import java.time.Instant + import org.scalacheck.Gen import org.scalatest.prop.PropertyChecks import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} @@ -24,7 +26,6 @@ import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.task.StartTime class SumSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter { val stringGenerator = Gen.alphaStr @@ -39,7 +40,7 @@ class SumSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndA val sum = new Sum(taskContext, conf) - sum.onStart(StartTime(0)) + sum.onStart(Instant.EPOCH) forAll(stringGenerator) { txt => wordcount += 1 http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala index 1d3048e..e3b45fb 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala @@ -18,6 +18,7 @@ package org.apache.gearpump.experiments.storm.processor +import java.time.Instant import java.util.concurrent.TimeUnit import scala.concurrent.duration.Duration @@ -46,7 +47,7 @@ private[storm] class StormProcessor(gearpumpBolt: GearpumpBolt, private val freqOpt = gearpumpBolt.getTickFrequency - override def onStart(startTime: StartTime): Unit = { + override def onStart(startTime: Instant): Unit = { gearpumpBolt.start(startTime) freqOpt.foreach(scheduleTick) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala index 5d4a6a2..b92f037 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala @@ -18,6 +18,7 @@ package org.apache.gearpump.experiments.storm.producer +import java.time.Instant import java.util.concurrent.TimeUnit import akka.actor.Actor.Receive @@ -48,7 +49,7 @@ private[storm] class StormProducer(gearpumpSpout: GearpumpSpout, private val timeoutMillis = gearpumpSpout.getMessageTimeout - override def onStart(startTime: StartTime): Unit = { + override def onStart(startTime: Instant): Unit = { gearpumpSpout.start(startTime) if (gearpumpSpout.ackEnabled) { getCheckpointClock http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala index d0f2949..a8e061c 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala @@ -19,6 +19,7 @@ package org.apache.gearpump.experiments.storm.topology import java.io.{File, FileOutputStream, IOException} +import java.time.Instant import java.util import java.util.jar.JarFile import java.util.{HashMap => JHashMap, List => JList, Map => JMap} @@ -40,7 +41,7 @@ import org.apache.gearpump.experiments.storm.util.StormConstants._ import org.apache.gearpump.experiments.storm.util.StormUtil._ import org.apache.gearpump.experiments.storm.util.{StormOutputCollector, StormUtil} import org.apache.gearpump.streaming.DAG -import org.apache.gearpump.streaming.task.{GetDAG, TaskId, TaskContext, StartTime} +import org.apache.gearpump.streaming.task.{GetDAG, TaskId, TaskContext} import org.apache.gearpump.util.{Constants, LogUtil} import org.apache.gearpump.{Message, TimeStamp} import org.slf4j.Logger @@ -57,7 +58,7 @@ trait GearpumpStormComponent { * invoked at Task.onStart * @param startTime task start time */ - def start(startTime: StartTime): Unit + def start(startTime: Instant): Unit /** * invoked at Task.onNext @@ -123,7 +124,7 @@ object GearpumpStormComponent { private var collector: StormSpoutOutputCollector = null - override def start(startTime: StartTime): Unit = { + override def start(startTime: Instant): Unit = { val dag = getDAG(taskContext.appMaster) val topologyContext = getTopologyContext(dag, taskContext.taskId) collector = getOutputCollector(taskContext, topologyContext) @@ -206,7 +207,7 @@ object GearpumpStormComponent { private var generalTopologyContext: GeneralTopologyContext = null private var tickTuple: Tuple = null - override def start(startTime: StartTime): Unit = { + override def start(startTime: Instant): Unit = { val dag = getDAG(taskContext.appMaster) topologyContext = getTopologyContext(dag, taskContext.taskId) generalTopologyContext = getGeneralTopologyContext(dag) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormProcessorSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormProcessorSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormProcessorSpec.scala index 2111df6..9bbac58 100644 --- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormProcessorSpec.scala +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormProcessorSpec.scala @@ -18,11 +18,12 @@ package org.apache.gearpump.experiments.storm.processor +import java.time.Instant + import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpBolt import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.task.StartTime import org.mockito.Mockito._ import org.scalatest.mock.MockitoSugar import org.scalatest.{Matchers, WordSpec} @@ -31,7 +32,7 @@ class StormProcessorSpec extends WordSpec with Matchers with MockitoSugar { "StormProcessor" should { "start GearpumpSpout onStart" in { - val startTime = mock[StartTime] + val startTime = Instant.EPOCH val gearpumpBolt = mock[GearpumpBolt] when(gearpumpBolt.getTickFrequency).thenReturn(None) val taskContext = MockUtil.mockTaskContext http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala index 39a008f..ee89a4a 100644 --- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala @@ -18,12 +18,13 @@ package org.apache.gearpump.experiments.storm.producer +import java.time.Instant + import akka.testkit.TestProbe import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpSpout import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.task.StartTime import org.mockito.Mockito._ import org.scalatest.mock.MockitoSugar import org.scalatest.{Matchers, WordSpec} @@ -32,7 +33,7 @@ class StormProducerSpec extends WordSpec with Matchers with MockitoSugar { "StormProducer" should { "start GearpumpSpout onStart" in { - val startTime = mock[StartTime] + val startTime = Instant.EPOCH val gearpumpSpout = mock[GearpumpSpout] when(gearpumpSpout.getMessageTimeout).thenReturn(None) val taskContext = MockUtil.mockTaskContext http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala index bdea50c..0891070 100644 --- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala @@ -17,6 +17,7 @@ */ package org.apache.gearpump.experiments.storm.topology +import java.time.Instant import java.util.{Map => JMap} import akka.actor.ActorRef @@ -26,7 +27,7 @@ import backtype.storm.tuple.Tuple import org.apache.gearpump.experiments.storm.producer.StormSpoutOutputCollector import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout} import org.apache.gearpump.experiments.storm.util.StormOutputCollector -import org.apache.gearpump.streaming.task.{StartTime, TaskContext, TaskId} +import org.apache.gearpump.streaming.task.{TaskContext, TaskId} import org.apache.gearpump.streaming.{DAG, MockUtil} import org.apache.gearpump.{Message, TimeStamp} import org.mockito.Matchers.{anyObject, eq => mockitoEq} @@ -59,8 +60,7 @@ class GearpumpStormComponentSpec getOutputCollector, ackEnabled = false, taskContext) // Start - val startTime = mock[StartTime] - gearpumpSpout.start(startTime) + gearpumpSpout.start(Instant.EPOCH) verify(spout).open(mockitoEq(config), mockitoEq(topologyContext), anyObject[SpoutOutputCollector]) @@ -100,8 +100,7 @@ class GearpumpStormComponentSpec getGeneralTopologyContext, getOutputCollector, getTickTuple, taskContext) // Start - val startTime = mock[StartTime] - gearpumpBolt.start(startTime) + gearpumpBolt.start(Instant.EPOCH) verify(bolt).prepare(mockitoEq(config), mockitoEq(topologyContext), anyObject[OutputCollector]) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala index ef383ad..b92b2e1 100644 --- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala @@ -79,7 +79,6 @@ class GearpumpStormTopologySpec extends WordSpec with Matchers with MockitoSugar "get target processors from source id" in { val stormTopology = TopologyUtil.getTestTopology implicit val system = MockUtil.system - val sysConfig = new JHashMap[AnyRef, AnyRef] val gearpumpStormTopology = GearpumpStormTopology("app", stormTopology, null) val targets0 = gearpumpStormTopology.getTargets("1") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala index da08b04..314eae8 100644 --- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala @@ -18,6 +18,7 @@ package org.apache.gearpump.streaming.kafka.lib.source +import java.time.Instant import java.util.Properties import com.twitter.bijection.Injection @@ -87,11 +88,11 @@ abstract class AbstractKafkaSource( this.checkpointStoreFactory = Some(checkpointStoreFactory) } - override def open(context: TaskContext, startTime: TimeStamp): Unit = { + override def open(context: TaskContext, startTime: Instant): Unit = { import context.{parallelism, taskId} LOG.info("KafkaSource opened at start time {}", startTime) - this.startTime = startTime + this.startTime = startTime.toEpochMilli val topicList = topic.split(",", -1).toList val grouper = config.getConfiguredInstance(KafkaConfig.PARTITION_GROUPER_CLASS_CONFIG, classOf[PartitionGrouper]) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala index e40276f..6ccb231 100644 --- a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala +++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala @@ -18,6 +18,7 @@ package org.apache.gearpump.streaming.kafka +import java.time.Instant import java.util.Properties import com.twitter.bijection.Injection @@ -42,7 +43,7 @@ import org.scalatest.{Matchers, PropSpec} class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { - val startTimeGen = Gen.choose[Long](0L, 100L) + val startTimeGen = Gen.choose[Long](0L, 100L).map(Instant.ofEpochMilli) val offsetGen = Gen.choose[Long](0L, 100L) val topicAndPartitionGen = for { topic <- Gen.alphaStr suchThat (_.nonEmpty) @@ -51,7 +52,7 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo val tpsGen = Gen.listOf[TopicAndPartition](topicAndPartitionGen) suchThat (_.nonEmpty) property("KafkaSource open should not recover without checkpoint") { - forAll(startTimeGen, tpsGen) { (startTime: Long, tps: List[TopicAndPartition]) => + forAll(startTimeGen, tpsGen) { (startTime: Instant, tps: List[TopicAndPartition]) => val taskContext = MockUtil.mockTaskContext val fetchThread = mock[FetchThread] val kafkaClient = mock[KafkaClient] @@ -84,7 +85,7 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo property("KafkaSource open should recover with checkpoint") { forAll(startTimeGen, offsetGen, tpsGen) { - (startTime: Long, offset: Long, tps: List[TopicAndPartition]) => + (startTime: Instant, offset: Long, tps: List[TopicAndPartition]) => val taskContext = MockUtil.mockTaskContext val checkpointStoreFactory = mock[CheckpointStoreFactory] val checkpointStores = tps.map(_ -> mock[CheckpointStore]).toMap @@ -115,7 +116,8 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo checkpointStores.foreach { case (tp, store) => when(checkpointStoreFactory.getCheckpointStore( KafkaConfig.getCheckpointStoreNameSuffix(tp))).thenReturn(store) - when(store.recover(startTime)).thenReturn(Some(Injection[Long, Array[Byte]](offset))) + when(store.recover(startTime.toEpochMilli)) + .thenReturn(Some(Injection[Long, Array[Byte]](offset))) } source.setCheckpointStore(checkpointStoreFactory)
