http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/Random.cc ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/Random.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/Random.cc new file mode 100644 index 0000000..e6cfec2 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/Random.cc @@ -0,0 +1,285 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <math.h> +#include "commons.h" +#include "Random.h" + +namespace NativeTask { + +static long RandomInitializeID = 8682522807148012ULL; + +// A random list of 1000 words from /usr/share/dict/words +static const +char * Words[] = {"diurnalness", "Homoiousian", "spiranthic", "tetragynian", "silverhead", + "ungreat", "lithograph", "exploiter", "physiologian", "by", "hellbender", "Filipendula", + "undeterring", "antiscolic", "pentagamist", "hypoid", "cacuminal", "sertularian", + "schoolmasterism", "nonuple", "gallybeggar", "phytonic", "swearingly", "nebular", "Confervales", + "thermochemically", "characinoid", "cocksuredom", "fallacious", "feasibleness", "debromination", + "playfellowship", "tramplike", "testa", "participatingly", "unaccessible", "bromate", + "experientialist", "roughcast", "docimastical", "choralcelo", "blightbird", "peptonate", + "sombreroed", "unschematized", "antiabolitionist", "besagne", "mastication", "bromic", + "sviatonosite", "cattimandoo", "metaphrastical", "endotheliomyoma", "hysterolysis", + "unfulminated", "Hester", "oblongly", "blurredness", "authorling", "chasmy", "Scorpaenidae", + "toxihaemia", "Dictograph", "Quakerishly", "deaf", "timbermonger", "strammel", "Thraupidae", + "seditious", "plerome", "Arneb", "eristically", "serpentinic", "glaumrie", "socioromantic", + "apocalypst", "tartrous", "Bassaris", "angiolymphoma", "horsefly", "kenno", "astronomize", + "euphemious", "arsenide", "untongued", "parabolicness", "uvanite", "helpless", "gemmeous", + "stormy", "templar", "erythrodextrin", "comism", "interfraternal", "preparative", "parastas", + "frontoorbital", "Ophiosaurus", "diopside", "serosanguineous", "ununiformly", "karyological", + "collegian", "allotropic", "depravity", "amylogenesis", "reformatory", "epidymides", + "pleurotropous", "trillium", "dastardliness", "coadvice", "embryotic", "benthonic", + "pomiferous", "figureheadship", "Megaluridae", "Harpa", "frenal", "commotion", "abthainry", + "cobeliever", "manilla", "spiciferous", "nativeness", "obispo", "monilioid", "biopsic", + "valvula", "enterostomy", "planosubulate", "pterostigma", "lifter", "triradiated", "venialness", + "tum", "archistome", "tautness", "unswanlike", "antivenin", "Lentibulariaceae", "Triphora", + "angiopathy", "anta", "Dawsonia", "becomma", "Yannigan", "winterproof", "antalgol", "harr", + "underogating", "ineunt", "cornberry", "flippantness", "scyphostoma", "approbation", "Ghent", + "Macraucheniidae", "scabbiness", "unanatomized", "photoelasticity", "eurythermal", "enation", + "prepavement", "flushgate", "subsequentially", "Edo", "antihero", "Isokontae", "unforkedness", + "porriginous", "daytime", "nonexecutive", "trisilicic", "morphiomania", "paranephros", + "botchedly", "impugnation", "Dodecatheon", "obolus", "unburnt", "provedore", "Aktistetae", + "superindifference", "Alethea", "Joachimite", "cyanophilous", "chorograph", "brooky", "figured", + "periclitation", "quintette", "hondo", "ornithodelphous", "unefficient", "pondside", "bogydom", + "laurinoxylon", "Shiah", "unharmed", "cartful", "noncrystallized", "abusiveness", "cromlech", + "japanned", "rizzomed", "underskin", "adscendent", "allectory", "gelatinousness", "volcano", + "uncompromisingly", "cubit", "idiotize", "unfurbelowed", "undinted", "magnetooptics", "Savitar", + "diwata", "ramosopalmate", "Pishquow", "tomorn", "apopenptic", "Haversian", "Hysterocarpus", + "ten", "outhue", "Bertat", "mechanist", "asparaginic", "velaric", "tonsure", "bubble", + "Pyrales", "regardful", "glyphography", "calabazilla", "shellworker", "stradametrical", "havoc", + "theologicopolitical", "sawdust", "diatomaceous", "jajman", "temporomastoid", "Serrifera", + "Ochnaceae", "aspersor", "trailmaking", "Bishareen", "digitule", "octogynous", "epididymitis", + "smokefarthings", "bacillite", "overcrown", "mangonism", "sirrah", "undecorated", "psychofugal", + "bismuthiferous", "rechar", "Lemuridae", "frameable", "thiodiazole", "Scanic", + "sportswomanship", "interruptedness", "admissory", "osteopaedion", "tingly", "tomorrowness", + "ethnocracy", "trabecular", "vitally", "fossilism", "adz", "metopon", "prefatorial", + "expiscate", "diathermacy", "chronist", "nigh", "generalizable", "hysterogen", + "aurothiosulphuric", "whitlowwort", "downthrust", "Protestantize", "monander", "Itea", + "chronographic", "silicize", "Dunlop", "eer", "componental", "spot", "pamphlet", "antineuritic", + "paradisean", "interruptor", "debellator", "overcultured", "Florissant", "hyocholic", + "pneumatotherapy", "tailoress", "rave", "unpeople", "Sebastian", "thermanesthesia", "Coniferae", + "swacking", "posterishness", "ethmopalatal", "whittle", "analgize", "scabbardless", "naught", + "symbiogenetically", "trip", "parodist", "columniform", "trunnel", "yawler", "goodwill", + "pseudohalogen", "swangy", "cervisial", "mediateness", "genii", "imprescribable", "pony", + "consumptional", "carposporangial", "poleax", "bestill", "subfebrile", "sapphiric", "arrowworm", + "qualminess", "ultraobscure", "thorite", "Fouquieria", "Bermudian", "prescriber", "elemicin", + "warlike", "semiangle", "rotular", "misthread", "returnability", "seraphism", "precostal", + "quarried", "Babylonism", "sangaree", "seelful", "placatory", "pachydermous", "bozal", + "galbulus", "spermaphyte", "cumbrousness", "pope", "signifier", "Endomycetaceae", "shallowish", + "sequacity", "periarthritis", "bathysphere", "pentosuria", "Dadaism", "spookdom", + "Consolamentum", "afterpressure", "mutter", "louse", "ovoviviparous", "corbel", "metastoma", + "biventer", "Hydrangea", "hogmace", "seizing", "nonsuppressed", "oratorize", "uncarefully", + "benzothiofuran", "penult", "balanocele", "macropterous", "dishpan", "marten", "absvolt", + "jirble", "parmelioid", "airfreighter", "acocotl", "archesporial", "hypoplastral", "preoral", + "quailberry", "cinque", "terrestrially", "stroking", "limpet", "moodishness", "canicule", + "archididascalian", "pompiloid", "overstaid", "introducer", "Italical", "Christianopaganism", + "prescriptible", "subofficer", "danseuse", "cloy", "saguran", "frictionlessly", + "deindividualization", "Bulanda", "ventricous", "subfoliar", "basto", "scapuloradial", + "suspend", "stiffish", "Sphenodontidae", "eternal", "verbid", "mammonish", "upcushion", + "barkometer", "concretion", "preagitate", "incomprehensible", "tristich", "visceral", + "hemimelus", "patroller", "stentorophonic", "pinulus", "kerykeion", "brutism", "monstership", + "merciful", "overinstruct", "defensibly", "bettermost", "splenauxe", "Mormyrus", + "unreprimanded", "taver", "ell", "proacquittal", "infestation", "overwoven", "Lincolnlike", + "chacona", "Tamil", "classificational", "lebensraum", "reeveland", "intuition", "Whilkut", + "focaloid", "Eleusinian", "micromembrane", "byroad", "nonrepetition", "bacterioblast", "brag", + "ribaldrous", "phytoma", "counteralliance", "pelvimetry", "pelf", "relaster", "thermoresistant", + "aneurism", "molossic", "euphonym", "upswell", "ladhood", "phallaceous", "inertly", "gunshop", + "stereotypography", "laryngic", "refasten", "twinling", "oflete", "hepatorrhaphy", + "electrotechnics", "cockal", "guitarist", "topsail", "Cimmerianism", "larklike", "Llandovery", + "pyrocatechol", "immatchable", "chooser", "metrocratic", "craglike", "quadrennial", + "nonpoisonous", "undercolored", "knob", "ultratense", "balladmonger", "slait", "sialadenitis", + "bucketer", "magnificently", "unstipulated", "unscourged", "unsupercilious", "packsack", + "pansophism", "soorkee", "percent", "subirrigate", "champer", "metapolitics", "spherulitic", + "involatile", "metaphonical", "stachyuraceous", "speckedness", "bespin", "proboscidiform", + "gul", "squit", "yeelaman", "peristeropode", "opacousness", "shibuichi", "retinize", "yote", + "misexposition", "devilwise", "pumpkinification", "vinny", "bonze", "glossing", "decardinalize", + "transcortical", "serphoid", "deepmost", "guanajuatite", "wemless", "arval", "lammy", "Effie", + "Saponaria", "tetrahedral", "prolificy", "excerpt", "dunkadoo", "Spencerism", "insatiately", + "Gilaki", "oratorship", "arduousness", "unbashfulness", "Pithecolobium", "unisexuality", + "veterinarian", "detractive", "liquidity", "acidophile", "proauction", "sural", "totaquina", + "Vichyite", "uninhabitedness", "allegedly", "Gothish", "manny", "Inger", "flutist", "ticktick", + "Ludgatian", "homotransplant", "orthopedical", "diminutively", "monogoneutic", "Kenipsim", + "sarcologist", "drome", "stronghearted", "Fameuse", "Swaziland", "alen", "chilblain", + "beatable", "agglomeratic", "constitutor", "tendomucoid", "porencephalous", "arteriasis", + "boser", "tantivy", "rede", "lineamental", "uncontradictableness", "homeotypical", "masa", + "folious", "dosseret", "neurodegenerative", "subtransverse", "Chiasmodontidae", + "palaeotheriodont", "unstressedly", "chalcites", "piquantness", "lampyrine", "Aplacentalia", + "projecting", "elastivity", "isopelletierin", "bladderwort", "strander", "almud", + "iniquitously", "theologal", "bugre", "chargeably", "imperceptivity", "meriquinoidal", + "mesophyte", "divinator", "perfunctory", "counterappellant", "synovial", "charioteer", + "crystallographical", "comprovincial", "infrastapedial", "pleasurehood", "inventurous", + "ultrasystematic", "subangulated", "supraoesophageal", "Vaishnavism", "transude", + "chrysochrous", "ungrave", "reconciliable", "uninterpleaded", "erlking", "wherefrom", + "aprosopia", "antiadiaphorist", "metoxazine", "incalculable", "umbellic", "predebit", + "foursquare", "unimmortal", "nonmanufacture", "slangy", "predisputant", "familist", + "preaffiliate", "friarhood", "corelysis", "zoonitic", "halloo", "paunchy", "neuromimesis", + "aconitine", "hackneyed", "unfeeble", "cubby", "autoschediastical", "naprapath", "lyrebird", + "inexistency", "leucophoenicite", "ferrogoslarite", "reperuse", "uncombable", "tambo", + "propodiale", "diplomatize", "Russifier", "clanned", "corona", "michigan", "nonutilitarian", + "transcorporeal", "bought", "Cercosporella", "stapedius", "glandularly", "pictorially", "weism", + "disilane", "rainproof", "Caphtor", "scrubbed", "oinomancy", "pseudoxanthine", "nonlustrous", + "redesertion", "Oryzorictinae", "gala", "Mycogone", "reappreciate", "cyanoguanidine", + "seeingness", "breadwinner", "noreast", "furacious", "epauliere", "omniscribent", + "Passiflorales", "uninductive", "inductivity", "Orbitolina", "Semecarpus", "migrainoid", + "steprelationship", "phlogisticate", "mesymnion", "sloped", "edificator", "beneficent", "culm", + "paleornithology", "unurban", "throbless", "amplexifoliate", "sesquiquintile", "sapience", + "astucious", "dithery", "boor", "ambitus", "scotching", "uloid", "uncompromisingness", "hoove", + "waird", "marshiness", "Jerusalem", "mericarp", "unevoked", "benzoperoxide", "outguess", + "pyxie", "hymnic", "euphemize", "mendacity", "erythremia", "rosaniline", "unchatteled", + "lienteria", "Bushongo", "dialoguer", "unrepealably", "rivethead", "antideflation", + "vinegarish", "manganosiderite", "doubtingness", "ovopyriform", "Cephalodiscus", "Muscicapa", + "Animalivora", "angina", "planispheric", "ipomoein", "cuproiodargyrite", "sandbox", "scrat", + "Munnopsidae", "shola", "pentafid", "overstudiousness", "times", "nonprofession", "appetible", + "valvulotomy", "goladar", "uniarticular", "oxyterpene", "unlapsing", "omega", "trophonema", + "seminonflammable", "circumzenithal", "starer", "depthwise", "liberatress", "unleavened", + "unrevolting", "groundneedle", "topline", "wandoo", "umangite", "ordinant", "unachievable", + "oversand", "snare", "avengeful", "unexplicit", "mustafina", "sonable", "rehabilitative", + "eulogization", "papery", "technopsychology", "impressor", "cresylite", "entame", + "transudatory", "scotale", "pachydermatoid", "imaginary", "yeat", "slipped", "stewardship", + "adatom", "cockstone", "skyshine", "heavenful", "comparability", "exprobratory", + "dermorhynchous", "parquet", "cretaceous", "vesperal", "raphis", "undangered", "Glecoma", + "engrain", "counteractively", "Zuludom", "orchiocatabasis", "Auriculariales", "warriorwise", + "extraorganismal", "overbuilt", "alveolite", "tetchy", "terrificness", "widdle", + "unpremonished", "rebilling", "sequestrum", "equiconvex", "heliocentricism", "catabaptist", + "okonite", "propheticism", "helminthagogic", "calycular", "giantly", "wingable", "golem", + "unprovided", "commandingness", "greave", "haply", "doina", "depressingly", "subdentate", + "impairment", "decidable", "neurotrophic", "unpredict", "bicorporeal", "pendulant", "flatman", + "intrabred", "toplike", "Prosobranchiata", "farrantly", "toxoplasmosis", "gorilloid", + "dipsomaniacal", "aquiline", "atlantite", "ascitic", "perculsive", "prospectiveness", + "saponaceous", "centrifugalization", "dinical", "infravaginal", "beadroll", "affaite", + "Helvidian", "tickleproof", "abstractionism", "enhedge", "outwealth", "overcontribute", + "coldfinch", "gymnastic", "Pincian", "Munychian", "codisjunct", "quad", "coracomandibular", + "phoenicochroite", "amender", "selectivity", "putative", "semantician", "lophotrichic", + "Spatangoidea", "saccharogenic", "inferent", "Triconodonta", "arrendation", "sheepskin", + "taurocolla", "bunghole", "Machiavel", "triakistetrahedral", "dehairer", "prezygapophysial", + "cylindric", "pneumonalgia", "sleigher", "emir", "Socraticism", "licitness", "massedly", + "instructiveness", "sturdied", "redecrease", "starosta", "evictor", "orgiastic", "squdge", + "meloplasty", "Tsonecan", "repealableness", "swoony", "myesthesia", "molecule", + "autobiographist", "reciprocation", "refective", "unobservantness", "tricae", "ungouged", + "floatability", "Mesua", "fetlocked", "chordacentrum", "sedentariness", "various", "laubanite", + "nectopod", "zenick", "sequentially", "analgic", "biodynamics", "posttraumatic", "nummi", + "pyroacetic", "bot", "redescend", "dispermy", "undiffusive", "circular", "trillion", + "Uraniidae", "ploration", "discipular", "potentness", "sud", "Hu", "Eryon", "plugger", + "subdrainage", "jharal", "abscission", "supermarket", "countergabion", "glacierist", + "lithotresis", "minniebush", "zanyism", "eucalypteol", "sterilely", "unrealize", "unpatched", + "hypochondriacism", "critically", "cheesecutter", }; + +static size_t WordsCount = sizeof(Words) / sizeof(char *); + +Random::Random() { + setSeed(time(NULL) + clock() + RandomInitializeID++); +} + +Random::Random(int64_t seed) { + if (seed == -1) { + setSeed(time(NULL) + clock() + RandomInitializeID++); + } else { + setSeed(seed); + } +} + +Random::~Random() { + +} + +void Random::setSeed(int64_t seed) { + _seed = (seed ^ multiplier) & mask; +} + +int32_t Random::next(int bits) { + _seed = (_seed * multiplier + addend) & mask; + return (int32_t)(_seed >> (48 - bits)); +} + +int32_t Random::next_int32() { + return next(32); +} + +uint32_t Random::next_uint32() { + return (uint32_t)next(32); +} + +uint64_t Random::next_uint64() { + return ((uint64_t)(next(32)) << 32) + next(32); +} + +int32_t Random::next_int32(int32_t n) { + if ((n & -n) == n) // i.e., n is a power of 2 + return (int32_t)((n * (int64_t)next(31)) >> 31); + + int32_t bits, val; + do { + bits = next(31); + val = bits % n; + } while (bits - val + (n - 1) < 0); + return val; +} + +float Random::nextFloat() { + return next(24) / ((float)(1 << 24)); +} + +double Random::nextDouble() { + return (((uint64_t)(next(26)) << 27) + next(27)) / (double)(1L << 53); +} + +uint64_t Random::nextLog2() { + return (uint64_t)exp2(nextDouble() * 64); +} + +uint64_t Random::nextLog2(uint64_t range) { + double range_r = log2(range); + double v = nextDouble() * range_r; + return (uint64_t)exp2(v); +} + +uint64_t Random::nextLog10(uint64_t range) { + double range_r = log10(range); + double v = nextDouble() * range_r; + return (uint64_t)pow(10, v); +} + +char Random::nextByte(const string & range) { + if (range.length() == 0) { + return (char)next(8); + } else { + return range[next_int32(range.length())]; + } +} + +string Random::nextBytes(uint32_t length, const string & range) { + string ret(length, '-'); + for (uint32_t i = 0; i < length; i++) { + ret[i] = nextByte(range); + } + return ret; +} + +const char * Random::nextWord(int64_t limit) { + if (limit < 0) { + return Words[next_int32(WordsCount)]; + } + uint32_t r = limit < WordsCount ? limit : WordsCount; + return Words[next_int32(r)]; +} + +void Random::nextWord(string & dest, int64_t limit) { + dest = nextWord(limit); +} + +} // namespace NativeTask
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/Random.h ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/Random.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/Random.h new file mode 100644 index 0000000..9f062e0 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/Random.h @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef RANDOM_H_ +#define RANDOM_H_ + +#include <stdint.h> +#include <string> + +namespace NativeTask { + +using std::string; + +/** + * Copy of java.lang.Random & some random text/bytes generator + */ +class Random { +protected: + static const int64_t multiplier = 0x5DEECE66DULL; + static const int64_t addend = 0xBL; + static const int64_t mask = (1ULL << 48) - 1; +protected: + int64_t _seed; + + int32_t next(int bits); +public: + Random(); + + Random(int64_t seed); + + ~Random(); + + /** + * Set random seed + */ + void setSeed(int64_t seed); + + /** + * Returns uniformly distributed uint32_t in [INT_MIN, INT_MAX] + */ + int32_t next_int32(); + + /** + * Returns uniformly distributed uint32_t in [0,(uint32_t)-1) + */ + uint32_t next_uint32(); + + /** + * Returns uniformly distributed uint64_t in [0,(uint64_t)-1) + */ + uint64_t next_uint64(); + + /** + * Returns uniformly distributed int32_t in [0,n) + */ + int32_t next_int32(int32_t n); + + /** + * Returns the next pseudorandom, uniformly distributed + * {@code float} value between {@code 0.0} and + * {@code 1.0} from this random number generator's sequence. + */ + float nextFloat(); + + /** + * Returns the next pseudorandom, uniformly distributed + * {@code double} value between {@code 0.0} and + * {@code 1.0} from this random number generator's sequence. + */ + double nextDouble(); + + /** + * Returns the next pseudorandom, log2-normal distributed + * value between [0, MAX_UNIT64] + */ + uint64_t nextLog2(); + + /** + * Returns the next pseudorandom, log2-normal distributed + * value between [0, range] + */ + uint64_t nextLog2(uint64_t range); + + /** + * Returns the next pseudorandom, log10-normal distributed + * value between [0, range] + */ + uint64_t nextLog10(uint64_t range); + + /** + * Returns uniformly distributed byte in range + * @param range e.g. "ABCDEFG", "01234566789" + */ + char nextByte(const string & range); + + /** + * Return byte sequence of <code>length</code> + * each byte in the sequence is generated using + * <code>nextByte</code> + */ + string nextBytes(uint32_t length, const string & range); + + /** + * Generate random word from a 100 word collection(same + * as RandomTextWriter), Just a utility function to + * construct the test data. + * @param limit use first <code>limit</code> words in + * the word collection + */ + const char * nextWord(int64_t limit = -1); + + /** + * Generate random word from a 100 word collection(same + * as RandomTextWriter), Just a utility function to + * construct the test data. + * @param dest assign the generated word to dest + * @param limit use first <code>limit</code> words in + * the word collection + */ + void nextWord(string & dest, int64_t limit = -1); +}; + +} // namespace NativeTask + +#endif /* RANDOM_H_ */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/StringUtil.cc ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/StringUtil.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/StringUtil.cc new file mode 100644 index 0000000..d70dcc8 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/StringUtil.cc @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <stdarg.h> +#include "commons.h" +#include "StringUtil.h" + +namespace NativeTask { + +string StringUtil::ToString(int32_t v) { + char tmp[32]; + snprintf(tmp, 32, "%d", v); + return tmp; +} + +string StringUtil::ToString(uint32_t v) { + char tmp[32]; + snprintf(tmp, 32, "%u", v); + return tmp; +} + +string StringUtil::ToString(int64_t v) { + char tmp[32]; + snprintf(tmp, 32, "%lld", (long long int)v); + return tmp; +} + +string StringUtil::ToString(int64_t v, char pad, int64_t len) { + char tmp[32]; + snprintf(tmp, 32, "%%%c%lldlld", pad, len); + return Format(tmp, v); +} + +string StringUtil::ToString(uint64_t v) { + char tmp[32]; + snprintf(tmp, 32, "%llu", (long long unsigned int)v); + return tmp; +} + +string StringUtil::ToString(bool v) { + if (v) { + return "true"; + } else { + return "false"; + } +} + +string StringUtil::ToString(float v) { + char tmp[32]; + snprintf(tmp, 32, "%f", v); + return tmp; +} + +string StringUtil::ToString(double v) { + char tmp[32]; + snprintf(tmp, 32, "%lf", v); + return tmp; +} + +string StringUtil::ToString(const void * v, uint32_t len) { + string ret = string(len * 2, '0'); + for (uint32_t i = 0; i < len; i++) { + ret[i] = (((uint8_t*)v)[i] >> 4) + '0'; + ret[i] = (((uint8_t*)v)[i] & 0xff) + '0'; + } + return ret; +} + +bool StringUtil::toBool(const string & str) { + if (str == "true") { + return true; + } else { + return false; + } +} + +int64_t StringUtil::toInt(const string & str) { + return strtoll(str.c_str(), NULL, 10); +} + +float StringUtil::toFloat(const string & str) { + return strtof(str.c_str(), NULL); +} + +string StringUtil::Format(const char * fmt, ...) { + char tmp[256]; + string dest; + va_list al; + va_start(al, fmt); + int len = vsnprintf(tmp, 255, fmt, al); + va_end(al); + if (len > 255) { + char * destbuff = new char[len + 1]; + va_start(al, fmt); + len = vsnprintf(destbuff, len + 1, fmt, al); + va_end(al); + dest.append(destbuff, len); + delete destbuff; + } else { + dest.append(tmp, len); + } + return dest; +} + +void StringUtil::Format(string & dest, const char * fmt, ...) { + char tmp[256]; + va_list al; + va_start(al, fmt); + int len = vsnprintf(tmp, 255, fmt, al); + if (len > 255) { + char * destbuff = new char[len + 1]; + len = vsnprintf(destbuff, len + 1, fmt, al); + dest.append(destbuff, len); + } else { + dest.append(tmp, len); + } + va_end(al); +} + +string StringUtil::ToLower(const string & name) { + string ret = name; + for (size_t i = 0; i < ret.length(); i++) { + ret.at(i) = ::tolower(ret[i]); + } + return ret; +} + +string StringUtil::Trim(const string & str) { + if (str.length() == 0) { + return str; + } + size_t l = 0; + while (l < str.length() && isspace(str[l])) { + l++; + } + if (l >= str.length()) { + return string(); + } + size_t r = str.length(); + while (isspace(str[r - 1])) { + r--; + } + return str.substr(l, r - l); +} + +void StringUtil::Split(const string & src, const string & sep, vector<string> & dest, bool clean) { + if (sep.length() == 0) { + return; + } + size_t cur = 0; + while (true) { + size_t pos; + if (sep.length() == 1) { + pos = src.find(sep[0], cur); + } else { + pos = src.find(sep, cur); + } + string add = src.substr(cur, pos - cur); + if (clean) { + string trimed = Trim(add); + if (trimed.length() > 0) { + dest.push_back(trimed); + } + } else { + dest.push_back(add); + } + if (pos == string::npos) { + break; + } + cur = pos + sep.length(); + } +} + +string StringUtil::Join(const vector<string> & strs, const string & sep) { + string ret; + for (size_t i = 0; i < strs.size(); i++) { + if (i > 0) { + ret.append(sep); + } + ret.append(strs[i]); + } + return ret; +} + +bool StringUtil::StartsWith(const string & str, const string & prefix) { + if ((prefix.length() > str.length()) + || (memcmp(str.data(), prefix.data(), prefix.length()) != 0)) { + return false; + } + return true; +} + +bool StringUtil::EndsWith(const string & str, const string & suffix) { + if ((suffix.length() > str.length()) + || (memcmp(str.data() + str.length() - suffix.length(), suffix.data(), suffix.length()) != 0)) { + return false; + } + return true; +} + +} // namespace NativeTask + http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/StringUtil.h ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/StringUtil.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/StringUtil.h new file mode 100644 index 0000000..384c678 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/StringUtil.h @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef STRINGUTIL_H_ +#define STRINGUTIL_H_ + +#include <stdint.h> +#include <vector> +#include <string> + +namespace NativeTask { + +using std::vector; +using std::string; + +class StringUtil { +public: + static string ToString(int32_t v); + static string ToString(uint32_t v); + static string ToString(int64_t v); + static string ToString(int64_t v, char pad, int64_t len); + static string ToString(uint64_t v); + static string ToString(bool v); + static string ToString(float v); + static string ToString(double v); + static string ToString(const void * v, uint32_t len); + + static int64_t toInt(const string & str); + static bool toBool(const string & str); + static float toFloat(const string & str); + + static string Format(const char * fmt, ...); + + static void Format(string & dest, const char * fmt, ...); + + static string ToLower(const string & name); + + static string Trim(const string & str); + + static void Split(const string & src, const string & sep, vector<string> & dest, + bool clean = false); + + static string Join(const vector<string> & strs, const string & sep); + + static bool StartsWith(const string & str, const string & prefix); + static bool EndsWith(const string & str, const string & suffix); +}; + +} // namespace NativeTask + +#endif /* STRINGUTIL_H_ */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/SyncUtils.cc ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/SyncUtils.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/SyncUtils.cc new file mode 100644 index 0000000..c3fb307 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/SyncUtils.cc @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "commons.h" +#include "jniutils.h" +#include "StringUtil.h" +#include "SyncUtils.h" + +namespace NativeTask { + +static void PthreadCall(const char* label, int result) { + if (result != 0) { + THROW_EXCEPTION_EX(IOException, "pthread %s: %s", label, strerror(result)); + } +} + +Lock::Lock() { + pthread_mutexattr_t attr; + pthread_mutexattr_init(&attr); + pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); + int ret = pthread_mutex_init(&_mutex, &attr); + pthread_mutexattr_destroy(&attr); + if (ret != 0) { + THROW_EXCEPTION_EX(IOException, "pthread_mutex_init: %s", strerror(ret)); + } +} + +Lock::~Lock() { + PthreadCall("destroy mutex", pthread_mutex_destroy(&_mutex)); +} + +void Lock::lock() { + PthreadCall("lock", pthread_mutex_lock(&_mutex)); +} + +void Lock::unlock() { + PthreadCall("unlock", pthread_mutex_unlock(&_mutex)); +} + +#ifdef __MACH__ +SpinLock::SpinLock() : _spin(0) { +} + +SpinLock::~SpinLock() { + +} + +void SpinLock::lock() { + OSSpinLockLock(&_spin); +} + +void SpinLock::unlock() { + OSSpinLockUnlock(&_spin); +} +#else +SpinLock::SpinLock() { + PthreadCall("init mutex", pthread_spin_init(&_spin, 0)); +} + +SpinLock::~SpinLock() { + PthreadCall("destroy mutex", pthread_spin_destroy(&_spin)); +} + +void SpinLock::lock() { + PthreadCall("lock", pthread_spin_lock(&_spin)); +} + +void SpinLock::unlock() { + PthreadCall("unlock", pthread_spin_unlock(&_spin)); +} +#endif + +Condition::Condition(Lock* mu) + : _lock(mu) { + PthreadCall("init cv", pthread_cond_init(&_condition, NULL)); +} + +Condition::~Condition() { + PthreadCall("destroy cv", pthread_cond_destroy(&_condition)); +} + +void Condition::wait() { + PthreadCall("wait", pthread_cond_wait(&_condition, &_lock->_mutex)); +} + +void Condition::signal() { + PthreadCall("signal", pthread_cond_signal(&_condition)); +} + +void Condition::signalAll() { + PthreadCall("broadcast", pthread_cond_broadcast(&_condition)); +} + +void * Thread::ThreadRunner(void * pthis) { + try { + ((Thread*)pthis)->run(); + } catch (std::exception & e) { + LOG("err!!!! %s", e.what()); + } + return NULL; +} + +Thread::Thread() + : _thread((pthread_t)0), // safe for linux & macos + _runable(NULL) { +} + +Thread::Thread(Runnable * runnable) + : _thread((pthread_t)0), _runable(runnable) { +} + +void Thread::setTask(const Runnable & runnable) { + _runable = const_cast<Runnable*>(&runnable); +} + +Thread::~Thread() { + +} + +void Thread::start() { + PthreadCall("pthread_create", pthread_create(&_thread, NULL, ThreadRunner, this)); +} + +void Thread::join() { + PthreadCall("pthread_join", pthread_join(_thread, NULL)); +} + +void Thread::stop() { + PthreadCall("pthread_cancel", pthread_cancel(_thread)); +} + +void Thread::run() { + if (_runable != NULL) { + _runable->run(); + } +} + +void Thread::EnableJNI() { + JNU_AttachCurrentThread(); +} + +void Thread::ReleaseJNI() { + JNU_DetachCurrentThread(); +} + +} // namespace NativeTask http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/SyncUtils.h ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/SyncUtils.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/SyncUtils.h new file mode 100644 index 0000000..15ce2c8 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/SyncUtils.h @@ -0,0 +1,299 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef SYNCUTILS_H_ +#define SYNCUTILS_H_ + +#include <unistd.h> +#include <string.h> +#ifdef __MACH__ +#include <libkern/OSAtomic.h> +#endif +#include <pthread.h> + +namespace NativeTask { + +class Condition; + +class Lock { +public: + Lock(); + ~Lock(); + + void lock(); + void unlock(); + +private: + friend class Condition; + pthread_mutex_t _mutex; + + // No copying + Lock(const Lock&); + void operator=(const Lock&); +}; + +class SpinLock { +public: + SpinLock(); + ~SpinLock(); + + void lock(); + void unlock(); + +private: +#ifdef __MACH__ + OSSpinLock _spin; +#else + pthread_spinlock_t _spin; +#endif + + // No copying + SpinLock(const Lock&); + void operator=(const Lock&); +}; + +class Condition { +public: + explicit Condition(Lock* mu); + ~Condition(); + void wait(); + void signal(); + void signalAll(); +private: + pthread_cond_t _condition; + Lock* _lock; +}; + +template<typename LockT> +class ScopeLock { +public: + ScopeLock(LockT & lock) + : _lock(&lock) { + _lock->lock(); + } + ~ScopeLock() { + _lock->unlock(); + } +private: + LockT * _lock; + + // No copying + ScopeLock(const ScopeLock&); + void operator=(const ScopeLock&); +}; + +class Runnable { +public: + virtual ~Runnable() { + } + virtual void run() = 0; +}; + +class Thread : public Runnable { +protected: + pthread_t _thread; + Runnable * _runable; +public: + Thread(); + Thread(Runnable * runnable); + virtual ~Thread(); + + void setTask(const Runnable & runnable); + void start(); + void join(); + void stop(); + virtual void run(); + + /** + * Enable JNI for current thread + */ + static void EnableJNI(); + /** + * Release JNI when thread is at end if current + * thread called EnableJNI before + */ + static void ReleaseJNI(); +private: + static void * ThreadRunner(void * pthis); +}; + +// Sure <tr1/functional> is better +template<typename Subject, typename Method> +class FunctionRunner : public Runnable { +protected: + Subject & _subject; + Method _method; +public: + FunctionRunner(Subject & subject, Method method) + : _subject(subject), _method(method) { + } + + virtual void run() { + (_subject.*_method)(); + } +}; + +template<typename Subject, typename Method, typename Arg> +class FunctionRunner1 : public Runnable { +protected: + Subject & _subject; + Method _method; + Arg _arg; +public: + FunctionRunner1(Subject & subject, Method method, Arg arg) + : _subject(subject), _method(method), _arg(arg) { + } + + virtual void run() { + (_subject.*_method)(_arg); + } +}; + +template<typename Subject, typename Method, typename Arg1, typename Arg2> +class FunctionRunner2 : public Runnable { +protected: + Subject & _subject; + Method _method; + Arg1 _arg1; + Arg2 _arg2; +public: + FunctionRunner2(Subject & subject, Method method, Arg1 arg1, Arg2 arg2) + : _subject(subject), _method(method), _arg1(arg1), _arg2(arg2) { + } + + virtual void run() { + (_subject.*_method)(_arg1, _arg2); + } +}; + +template<typename Subject, typename Method> +inline FunctionRunner<Subject, Method> * BindNew(Subject & subject, Method method) { + return new FunctionRunner<Subject, Method>(subject, method); +} + +template<typename Subject, typename Method, typename Arg> +inline FunctionRunner1<Subject, Method, Arg> * BindNew(Subject & subject, Method method, Arg arg) { + return new FunctionRunner1<Subject, Method, Arg>(subject, method, arg); +} + +template<typename Subject, typename Method, typename Arg1, typename Arg2> +inline FunctionRunner2<Subject, Method, Arg1, Arg2> * BindNew(Subject & subject, Method method, + Arg1 arg1, Arg2 arg2) { + return new FunctionRunner2<Subject, Method, Arg1, Arg2>(subject, method, arg1, arg2); +} + +class ConcurrentIndex { +private: + size_t _index; + size_t _end; + SpinLock _lock; +public: + ConcurrentIndex(size_t count) + : _index(0), _end(count) { + } + + ConcurrentIndex(size_t start, size_t end) + : _index(start), _end(end) { + } + + size_t count() { + return _end; + } + + ssize_t next() { + ScopeLock<SpinLock> autoLock(_lock); + if (_index == _end) { + return -1; + } else { + ssize_t ret = _index; + _index++; + return ret; + } + } +}; + +template<typename Subject, typename Method, typename RangeType> +class ParallelForWorker : public Runnable { +protected: + ConcurrentIndex * _index; + Subject * _subject; + Method _method; +public: + ParallelForWorker() + : _index(NULL), _subject(NULL) { + } + + ParallelForWorker(ConcurrentIndex * index, Subject * subject, Method method) + : _index(index), _subject(subject), _method(method) { + } + + void reset(ConcurrentIndex * index, Subject * subject, Method method) { + _index = index; + _subject = subject; + _method = method; + } + + virtual void run() { + ssize_t i; + while ((i = _index->next()) >= 0) { + (_subject->*_method)(i); + } + } +}; + +template<typename Subject, typename Method, typename RangeType> +void ParallelFor(Subject & subject, Method method, RangeType begin, RangeType end, + size_t thread_num) { + RangeType count = end - begin; + if (thread_num <= 1 || count <= 1) { + for (RangeType i = begin; i < end; i++) { + (subject.*method)(i); + } + } else if (thread_num == 2) { + ConcurrentIndex index = ConcurrentIndex(begin, end); + ParallelForWorker<Subject, Method, RangeType> workers[2]; + Thread sideThread; + workers[0].reset(&index, &subject, method); + workers[1].reset(&index, &subject, method); + sideThread.setTask(workers[0]); + sideThread.start(); + workers[1].run(); + sideThread.join(); + } else { + ConcurrentIndex index = ConcurrentIndex(begin, end); + ParallelForWorker<Subject, Method, RangeType> * workers = new ParallelForWorker<Subject, Method, + RangeType> [thread_num]; + Thread * threads = new Thread[thread_num - 1]; + for (size_t i = 0; i < thread_num - 1; i++) { + workers[i].reset(&index, &subject, method); + threads[i].setTask(workers[i]); + threads[i].start(); + } + workers[thread_num - 1].reset(&index, &subject, method); + workers[thread_num - 1].run(); + for (size_t i = 0; i < thread_num - 1; i++) { + threads[i].join(); + } + delete[] threads; + delete[] workers; + } +} + +} // namespace NativeTask + +#endif /* SYNCUTILS_H_ */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/Timer.cc ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/Timer.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/Timer.cc new file mode 100644 index 0000000..b1d3a00 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/Timer.cc @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <time.h> +#include "commons.h" +#include "StringUtil.h" +#include "Timer.h" + +namespace NativeTask { + +#ifdef __MACH__ +#include <mach/clock.h> +#include <mach/mach.h> + +static uint64_t clock_get() { + clock_serv_t cclock; + mach_timespec_t mts; + host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock); + clock_get_time(cclock, &mts); + mach_port_deallocate(mach_task_self(), cclock); + return 1000000000ULL * mts.tv_sec + mts.tv_nsec; +} + +#else + +static uint64_t clock_get() { + timespec ts; + clock_gettime(_POSIX_CPUTIME, &ts); + return 1000000000 * ts.tv_sec + ts.tv_nsec; +} + +#endif + +Timer::Timer() { + _last = clock_get(); +} + +Timer::~Timer() { + +} + +uint64_t Timer::last() { + return _last; +} + +uint64_t Timer::now() { + return clock_get(); +} + +void Timer::reset() { + _last = clock_get(); +} + +string Timer::getInterval(const char * msg) { + uint64_t now = clock_get(); + uint64_t interval = now - _last; + _last = now; + return StringUtil::Format("%s time: %.5lfs", msg, (double)interval / 1000000000.0); +} + +string Timer::getSpeed(const char * msg, uint64_t size) { + uint64_t now = clock_get(); + double interval = (now - _last) / 1000000000.0; + _last = now; + double speed = size / interval; + return StringUtil::Format("%s time:\t %3.5lfs size: %10llu speed: %12.0lf/s", msg, interval, size, + speed); +} + +string Timer::getSpeedM(const char * msg, uint64_t size) { + uint64_t now = clock_get(); + double interval = (now - _last) / 1000000000.0; + _last = now; + double msize = size / (1024.0 * 1024.0); + double speed = msize / interval; + return StringUtil::Format("%s time: %3.5lfs size: %.3lfM speed: %.2lfM/s", msg, interval, msize, + speed); +} + +string Timer::getSpeed2(const char * msg, uint64_t size1, uint64_t size2) { + uint64_t now = clock_get(); + double interval = (now - _last) / 1000000000.0; + _last = now; + double speed1 = size1 / interval; + double speed2 = size2 / interval; + return StringUtil::Format("%s time: %3.5lfs size: %llu/%llu speed: %.0lf/%.0lf", msg, interval, + size1, size2, speed1, speed2); +} + +string Timer::getSpeedM2(const char * msg, uint64_t size1, uint64_t size2) { + uint64_t now = clock_get(); + double interval = (now - _last) / 1000000000.0; + _last = now; + double msize1 = size1 / (1024.0 * 1024.0); + double speed1 = msize1 / interval; + double msize2 = size2 / (1024.0 * 1024.0); + double speed2 = msize2 / interval; + return StringUtil::Format("%s time: %3.5lfs size: %.3lfM/%.3lfM speed: %.2lfM/%.2lfM", msg, + interval, msize1, msize2, speed1, speed2); +} + +} // namespace NativeTask http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/Timer.h ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/Timer.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/Timer.h new file mode 100644 index 0000000..c26e2e4 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/Timer.h @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef TIMER_H_ +#define TIMER_H_ + +#include <stdint.h> +#include <stdio.h> +#include <string> + +namespace NativeTask { + +using std::string; + +class Timer { +protected: + uint64_t _last; +public: + Timer(); + ~Timer(); + + uint64_t last(); + + uint64_t now(); + + void reset(); + + string getInterval(const char * msg); + + string getSpeed(const char * msg, uint64_t size); + + string getSpeed2(const char * msg, uint64_t size1, uint64_t size2); + + string getSpeedM(const char * msg, uint64_t size); + + string getSpeedM2(const char * msg, uint64_t size1, uint64_t size2); +}; + +} // namespace NativeTask + +#endif /* TIMER_H_ */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/WritableUtils.cc ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/WritableUtils.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/WritableUtils.cc new file mode 100644 index 0000000..3e8043a --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/WritableUtils.cc @@ -0,0 +1,309 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "commons.h" +#include "StringUtil.h" +#include "WritableUtils.h" + +namespace NativeTask { + +KeyValueType JavaClassToKeyValueType(const std::string & clazz) { + if (clazz == "org.apache.hadoop.io.Text") { + return TextType; + } + if (clazz == "org.apache.hadoop.io.BytesWritable") { + return BytesType; + } + if (clazz == "org.apache.hadoop.io.ByteWritable") { + return ByteType; + } + if (clazz == "org.apache.hadoop.io.BooleanWritable") { + return BoolType; + } + if (clazz == "org.apache.hadoop.io.IntWritable") { + return IntType; + } + if (clazz == "org.apache.hadoop.io.LongWritable") { + return LongType; + } + if (clazz == "org.apache.hadoop.io.FloatWritable") { + return FloatType; + } + if (clazz == "org.apache.hadoop.io.DoubleWritable") { + return DoubleType; + } + if (clazz == "org.apache.hadoop.io.MD5Hash") { + return MD5HashType; + } + if (clazz == "org.apache.hadoop.io.VIntWritable") { + return VIntType; + } + if (clazz == "org.apache.hadoop.io.VLongWritable") { + return VLongType; + } + return UnknownType; +} + +int64_t WritableUtils::ReadVLongInner(const char * pos, uint32_t & len) { + bool neg = *pos < -120; + len = neg ? (-119 - *pos) : (-111 - *pos); + const char * end = pos + len; + int64_t value = 0; + while (++pos < end) { + value = (value << 8) | *(uint8_t*)pos; + } + return neg ? (value ^ -1LL) : value; +} + +uint32_t WritableUtils::GetVLongSizeInner(int64_t value) { + if (value < 0) { + value ^= -1L; // take one's complement' + } + + if (value < (1LL << 8)) { + return 2; + } else if (value < (1LL << 16)) { + return 3; + } else if (value < (1LL << 24)) { + return 4; + } else if (value < (1LL << 32)) { + return 5; + } else if (value < (1LL << 40)) { + return 6; + } else if (value < (1LL << 48)) { + return 7; + } else if (value < (1LL << 56)) { + return 8; + } else { + return 9; + } +} + +void WritableUtils::WriteVLongInner(int64_t v, char * pos, uint32_t & len) { + char base; + if (v >= 0) { + base = -113; + } else { + v ^= -1L; // take one's complement + base = -121; + } + uint64_t value = v; + if (value < (1 << 8)) { + *(pos++) = base; + *(uint8_t*)(pos) = value; + len = 2; + } else if (value < (1 << 16)) { + *(pos++) = base - 1; + *(uint8_t*)(pos++) = value >> 8; + *(uint8_t*)(pos) = value; + len = 3; + } else if (value < (1 << 24)) { + *(pos++) = base - 2; + *(uint8_t*)(pos++) = value >> 16; + *(uint8_t*)(pos++) = value >> 8; + *(uint8_t*)(pos) = value; + len = 4; + } else if (value < (1ULL << 32)) { + *(pos++) = base - 3; + *(uint32_t*)(pos) = bswap((uint32_t)value); + len = 5; + } else if (value < (1ULL << 40)) { + *(pos++) = base - 4; + *(uint32_t*)(pos) = bswap((uint32_t)(value >> 8)); + *(uint8_t*)(pos + 4) = value; + len = 6; + } else if (value < (1ULL << 48)) { + *(pos++) = base - 5; + *(uint32_t*)(pos) = bswap((uint32_t)(value >> 16)); + *(uint8_t*)(pos + 4) = value >> 8; + *(uint8_t*)(pos + 5) = value; + len = 7; + } else if (value < (1ULL << 56)) { + *(pos++) = base - 6; + *(uint32_t*)(pos) = bswap((uint32_t)(value >> 24)); + *(uint8_t*)(pos + 4) = value >> 16; + *(uint8_t*)(pos + 5) = value >> 8; + *(uint8_t*)(pos + 6) = value; + len = 8; + } else { + *(pos++) = base - 7; + *(uint64_t*)pos = bswap64(value); + len = 9; + } +} + +// Stream interfaces +int64_t WritableUtils::ReadVLong(InputStream * stream) { + char buff[10]; + if (stream->read(buff, 1) != 1) { + THROW_EXCEPTION(IOException, "ReadVLong reach EOF"); + } + uint32_t len = DecodeVLongSize(buff); + if (len > 1) { + if (stream->readFully(buff + 1, len - 1) != len - 1) { + THROW_EXCEPTION(IOException, "ReadVLong reach EOF"); + } + } + return ReadVLong(buff, len); +} + +int64_t WritableUtils::ReadLong(InputStream * stream) { + int64_t ret; + if (stream->readFully(&ret, 8) != 8) { + THROW_EXCEPTION(IOException, "ReadLong reach EOF"); + } + return (int64_t)bswap64(ret); +} + +int32_t WritableUtils::ReadInt(InputStream * stream) { + int32_t ret; + if (stream->readFully(&ret, 4) != 4) { + THROW_EXCEPTION(IOException, "ReadInt reach EOF"); + } + return (int32_t)bswap(ret); +} + +int16_t WritableUtils::ReadShort(InputStream * stream) { + uint16_t ret; + if (stream->readFully(&ret, 2) != 2) { + THROW_EXCEPTION(IOException, "ReadShort reach EOF"); + } + return (int16_t)((ret >> 8) | (ret << 8)); +} + +float WritableUtils::ReadFloat(InputStream * stream) { + uint32_t ret; + if (stream->readFully(&ret, 4) != 4) { + THROW_EXCEPTION(IOException, "ReadFloat reach EOF"); + } + ret = bswap(ret); + return *(float*)&ret; +} + +string WritableUtils::ReadText(InputStream * stream) { + int64_t len = ReadVLong(stream); + string ret = string(len, '\0'); + if (stream->readFully((void *)ret.data(), len) != len) { + THROW_EXCEPTION_EX(IOException, "ReadString reach EOF, need %d", len); + } + return ret; +} + +string WritableUtils::ReadBytes(InputStream * stream) { + int32_t len = ReadInt(stream); + string ret = string(len, '\0'); + if (stream->readFully((void *)ret.data(), len) != len) { + THROW_EXCEPTION_EX(IOException, "ReadString reach EOF, need %d", len); + } + return ret; +} + +string WritableUtils::ReadUTF8(InputStream * stream) { + int16_t len = ReadShort(stream); + string ret = string(len, '\0'); + if (stream->readFully((void *)ret.data(), len) != len) { + THROW_EXCEPTION_EX(IOException, "ReadString reach EOF, need %d", len); + } + return ret; +} + + +void WritableUtils::WriteVLong(OutputStream * stream, int64_t v) { + char buff[10]; + uint32_t len; + WriteVLong(v, buff, len); + stream->write(buff, len); +} + +void WritableUtils::WriteLong(OutputStream * stream, int64_t v) { + uint64_t be = bswap64((uint64_t)v); + stream->write(&be, 8); +} + +void WritableUtils::WriteInt(OutputStream * stream, int32_t v) { + uint32_t be = bswap((uint32_t)v); + stream->write(&be, 4); +} + +void WritableUtils::WriteShort(OutputStream * stream, int16_t v) { + uint16_t be = v; + be = ((be >> 8) | (be << 8)); + stream->write(&be, 2); +} + +void WritableUtils::WriteFloat(OutputStream * stream, float v) { + uint32_t intv = *(uint32_t*)&v; + intv = bswap(intv); + stream->write(&intv, 4); +} + +void WritableUtils::WriteText(OutputStream * stream, const string & v) { + WriteVLong(stream, v.length()); + stream->write(v.c_str(), (uint32_t)v.length()); +} + +void WritableUtils::WriteBytes(OutputStream * stream, const string & v) { + WriteInt(stream, (int32_t)v.length()); + stream->write(v.c_str(), (uint32_t)v.length()); +} + +void WritableUtils::WriteUTF8(OutputStream * stream, const string & v) { + if (v.length() > 65535) { + THROW_EXCEPTION_EX(IOException, "string too long (%lu) for WriteUTF8", v.length()); + } + WriteShort(stream, (int16_t)v.length()); + stream->write(v.c_str(), (uint32_t)v.length()); +} + +void WritableUtils::toString(string & dest, KeyValueType type, const void * data, uint32_t length) { + switch (type) { + case TextType: + dest.append((const char*)data, length); + break; + case BytesType: + dest.append((const char*)data, length); + break; + case ByteType: + dest.append(1, *(char*)data); + break; + case BoolType: + dest.append(*(uint8_t*)data ? "true" : "false"); + break; + case IntType: + dest.append(StringUtil::ToString((int32_t)bswap(*(uint32_t*)data))); + break; + case LongType: + dest.append(StringUtil::ToString((int64_t)bswap64(*(uint64_t*)data))); + break; + case FloatType: + dest.append(StringUtil::ToString(*(float*)data)); + break; + case DoubleType: + dest.append(StringUtil::ToString(*(double*)data)); + break; + case MD5HashType: + dest.append(StringUtil::ToString(data, length)); + break; + default: + dest.append((const char*)data, length); + break; + } +} + +} // namespace NativeTask + http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/WritableUtils.h ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/WritableUtils.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/WritableUtils.h new file mode 100644 index 0000000..3e93cbe --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/util/WritableUtils.h @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef WRITABLEUTILS_H_ +#define WRITABLEUTILS_H_ + +#include <stdint.h> +#include <string> +#include "Streams.h" +#include "NativeTask.h" + +namespace NativeTask { + +KeyValueType JavaClassToKeyValueType(const std::string & clazz); + +using std::string; + +class WritableUtils { +protected: + static int64_t ReadVLongInner(const char * pos, uint32_t & len); + static void WriteVLongInner(int64_t value, char * pos, uint32_t & len); + static uint32_t GetVLongSizeInner(int64_t value); +public: + inline static uint32_t DecodeVLongSize(int8_t ch) { + if (ch >= -112) { + return 1; + } else if (ch < -120) { + return -119 - ch; + } + return -111 - ch; + } + + inline static uint32_t DecodeVLongSize(const char * pos) { + return DecodeVLongSize(*pos); + } + + inline static uint32_t GetVLongSize(int64_t value) { + if (value >= -112 && value <= 127) { + return 1; + } + return GetVLongSizeInner(value); + } + + inline static int64_t ReadVLong(const char * pos, uint32_t & len) { + if (*pos >= (char)-112) { + len = 1; + return *pos; + } else { + return ReadVLongInner(pos, len); + } + } + + inline static int32_t ReadVInt(const char * pos, uint32_t & len) { + return (int32_t)ReadVLong(pos, len); + } + + inline static void WriteVLong(int64_t v, char * target, uint32_t & written) { + if (v <= 127 && v >= -112) { + written = 1; + *target = (char)v; + } else { + WriteVLongInner(v, target, written); + } + } + + inline static void WriteVInt(int32_t v, char * target, uint32_t & written) { + WriteVLong(v, target, written); + } + + // Stream interfaces + static int64_t ReadVLong(InputStream * stream); + + static int64_t ReadLong(InputStream * stream); + + static int32_t ReadInt(InputStream * stream); + + static int16_t ReadShort(InputStream * stream); + + static float ReadFloat(InputStream * stream); + + static string ReadText(InputStream * stream); + + static string ReadBytes(InputStream * stream); + + static string ReadUTF8(InputStream * stream); + + static void WriteVLong(OutputStream * stream, int64_t v); + + static void WriteLong(OutputStream * stream, int64_t v); + + static void WriteInt(OutputStream * stream, int32_t v); + + static void WriteShort(OutputStream * stream, int16_t v); + + static void WriteFloat(OutputStream * stream, float v); + + static void WriteText(OutputStream * stream, const string & v); + + static void WriteBytes(OutputStream * stream, const string & v); + + static void WriteUTF8(OutputStream * stream, const string & v); + + // Writable binary to string interface + static void toString(string & dest, KeyValueType type, const void * data, uint32_t length); +}; + +} // namespace NativeTask + +#endif /* WRITABLEUTILS_H_ */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test.sh ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test.sh b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test.sh new file mode 100644 index 0000000..7310c9d --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +# do all tests +if [ "$1" == "all" ]; then +shift +./nttest $@ +exit $? +fi + +# do performance tests only +if [ "$1" == "perf" ]; then +shift +./nttest --gtest_filter=Perf.* $@ +exit $? +fi + +# do not do performance test by default +./nttest --gtest_filter=-Perf.* $@ http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestCommand.cc ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestCommand.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestCommand.cc new file mode 100644 index 0000000..04e80fd --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestCommand.cc @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "commons.h" +#include "BufferStream.h" +#include "Buffers.h" +#include "test_commons.h" +#include "NativeTask.h" + +using namespace NativeTask; + +TEST(Command, equals) { + Command * cmd1 = new Command(100, "hello command"); + Command * cmd2 = new Command(100, "hello command 2"); + + ASSERT_TRUE(cmd1->equals(*cmd2)); + ASSERT_TRUE(cmd2->equals(*cmd1)); + ASSERT_EQ(100, cmd1->id()); + + std::string helloCommand = "hello command"; + ASSERT_EQ(0, helloCommand.compare(cmd1->description())); +} + http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestCompressions.cc ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestCompressions.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestCompressions.cc new file mode 100644 index 0000000..c65d3d0 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestCompressions.cc @@ -0,0 +1,284 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "snappy.h" +#include "commons.h" +#include "Path.h" +#include "BufferStream.h" +#include "FileSystem.h" +#include "Compressions.h" +#include "test_commons.h" + +void TestCodec(const string & codec, const string & data, char * buff, char * buff2, size_t buffLen, + uint32_t buffhint) { + Timer timer; + OutputBuffer outputBuffer = OutputBuffer(buff, buffLen); + CompressStream * compressor = Compressions::getCompressionStream(codec, &outputBuffer, buffhint); + + LOG("%s", codec.c_str()); + timer.reset(); + for (size_t i = 0; i < data.length(); i += 128 * 1024) { + compressor->write(data.c_str() + i, std::min(data.length() - i, (size_t)(128 * 1024))); + } + compressor->flush(); + LOG("%s", + timer.getSpeedM2("compress origin/compressed", data.length(), outputBuffer.tell()).c_str()); + + InputBuffer decompInputBuffer = InputBuffer(buff, outputBuffer.tell()); + DecompressStream * decompressor = Compressions::getDecompressionStream(codec, &decompInputBuffer, + buffhint); + size_t total = 0; + timer.reset(); + while (true) { + int32_t rd = decompressor->read(buff2 + total, buffLen - total); + if (rd <= 0) { + break; + } + total += rd; + } + LOG("%s", timer.getSpeedM2("decompress origin/uncompressed", outputBuffer.tell(), total).c_str()); + LOG("ratio: %.3lf", outputBuffer.tell() / (double )total); + ASSERT_EQ(data.length(), total); + ASSERT_EQ(0, memcmp(data.c_str(), buff2, total)); + + delete compressor; + delete decompressor; +} + +TEST(Perf, Compressions) { + string data; + size_t length = TestConfig.getInt("compression.input.length", 100 * 1024 * 1024); + uint32_t buffhint = TestConfig.getInt("compression.buffer.hint", 128 * 1024); + string type = TestConfig.get("compression.input.type", "bytes"); + Timer timer; + GenerateKVTextLength(data, length, type); + LOG("%s", timer.getInterval("Generate data").c_str()); + + InputBuffer inputBuffer = InputBuffer(data); + size_t buffLen = data.length() / 2 * 3; + + timer.reset(); + char * buff = new char[buffLen]; + char * buff2 = new char[buffLen]; + memset(buff, 0, buffLen); + memset(buff2, 0, buffLen); + LOG("%s", timer.getInterval("memset buffer to prevent missing page").c_str()); + + TestCodec("org.apache.hadoop.io.compress.SnappyCodec", data, buff, buff2, buffLen, buffhint); + TestCodec("org.apache.hadoop.io.compress.Lz4Codec", data, buff, buff2, buffLen, buffhint); + TestCodec("org.apache.hadoop.io.compress.GzipCodec", data, buff, buff2, buffLen, buffhint); + + delete[] buff; + delete[] buff2; +} + +TEST(Perf, CompressionUtil) { + string inputfile = TestConfig.get("input", ""); + string outputfile = TestConfig.get("output", ""); + uint32_t buffhint = TestConfig.getInt("compression.buffer.hint", 128 * 1024); + string inputcodec = Compressions::getCodecByFile(inputfile); + string outputcodec = Compressions::getCodecByFile(outputfile); + size_t bufferSize = buffhint; + if (inputcodec.length() > 0 && outputcodec.length() == 0) { + // decompression + InputStream * fin = FileSystem::getLocal().open(inputfile); + if (fin == NULL) { + THROW_EXCEPTION(IOException, "input file not found"); + } + DecompressStream * source = Compressions::getDecompressionStream(inputcodec, fin, bufferSize); + OutputStream * fout = FileSystem::getLocal().create(outputfile, true); + char * buffer = new char[bufferSize]; + while (true) { + int rd = source->read(buffer, bufferSize); + if (rd <= 0) { + break; + } + fout->write(buffer, rd); + } + source->close(); + delete source; + fin->close(); + delete fin; + fout->flush(); + fout->close(); + delete fout; + delete buffer; + } else if (inputcodec.length() == 0 && outputcodec.length() > 0) { + // compression + InputStream * fin = FileSystem::getLocal().open(inputfile); + if (fin == NULL) { + THROW_EXCEPTION(IOException, "input file not found"); + } + OutputStream * fout = FileSystem::getLocal().create(outputfile, true); + CompressStream * dest = Compressions::getCompressionStream(outputcodec, fout, bufferSize); + char * buffer = new char[bufferSize]; + while (true) { + int rd = fin->read(buffer, bufferSize); + if (rd <= 0) { + break; + } + dest->write(buffer, rd); + } + dest->flush(); + dest->close(); + delete dest; + fout->close(); + delete fout; + fin->close(); + delete fin; + delete buffer; + } else { + LOG("Not compression or decompression, do nothing"); + } +} + +class CompressResult { +public: + uint64_t uncompressedSize; + uint64_t compressedSize; + uint64_t compressTime; + uint64_t uncompressTime; + CompressResult() + : uncompressedSize(0), compressedSize(0), compressTime(0), uncompressTime(0) { + } + CompressResult & operator+=(const CompressResult & rhs) { + uncompressedSize += rhs.uncompressedSize; + compressedSize += rhs.compressedSize; + compressTime += rhs.compressTime; + uncompressTime += rhs.uncompressTime; + return *this; + } + string toString() { + return StringUtil::Format("Compress: %4.0fM/s Decompress: %5.0fM/s(%5.0fM/s) ratio: %.1f%%", + (uncompressedSize / 1024.0 / 1024) / (compressTime / 1000000000.), + (compressedSize / 1024.0 / 1024) / (uncompressTime / 1000000000.), + (uncompressedSize / 1024.0 / 1024) / (uncompressTime / 1000000000.), + compressedSize / (float)uncompressedSize * 100); + } +}; + +extern "C" { +extern int LZ4_compress(char* source, char* dest, int isize); +extern int LZ4_uncompress(char* source, char* dest, int osize); +} +; + +void MeasureSingleFileLz4(const string & path, CompressResult & total, size_t blockSize, + int times) { + string data; + ReadFile(data, path); + size_t maxlength = std::max((size_t)(blockSize * 1.005), blockSize + 8); + char * outputBuffer = new char[maxlength]; + char * dest = new char[blockSize + 8]; + CompressResult result; + Timer t; + int compressedSize; + for (size_t start = 0; start < data.length(); start += blockSize) { + size_t currentblocksize = std::min(data.length() - start, blockSize); + uint64_t startTime = t.now(); + for (int i = 0; i < times; i++) { + int osize = LZ4_compress((char*)data.data() + start, outputBuffer, currentblocksize); + compressedSize = osize; + result.compressedSize += osize; + result.uncompressedSize += currentblocksize; + } + uint64_t endTime = t.now(); + result.compressTime += endTime - startTime; + startTime = t.now(); + for (int i = 0; i < times; i++) { +// memset(dest, 0, currentblocksize+8); + int osize = LZ4_uncompress(outputBuffer, dest, currentblocksize); +// printf("%016llx blocksize: %lu\n", bswap64(*(uint64_t*)(dest+currentblocksize)), currentblocksize); + } + endTime = t.now(); + result.uncompressTime += endTime - startTime; + } + printf("%s - %s\n", result.toString().c_str(), Path::GetName(path).c_str()); + delete[] outputBuffer; + delete[] dest; + total += result; +} + +TEST(Perf, RawCompressionLz4) { + string inputdir = TestConfig.get("compressions.input.path", ""); + int64_t times = TestConfig.getInt("compression.time", 400); + int64_t blockSize = TestConfig.getInt("compression.block.size", 1024 * 64); + vector<FileEntry> inputfiles; + FileSystem::getLocal().list(inputdir, inputfiles); + CompressResult total; + printf("Block size: %lldK\n", (long long int)(blockSize / 1024)); + for (size_t i = 0; i < inputfiles.size(); i++) { + if (!inputfiles[i].isDirectory) { + MeasureSingleFileLz4((inputdir + "/" + inputfiles[i].name).c_str(), total, blockSize, times); + } + } + printf("%s - Total\n", total.toString().c_str()); +} + +void MeasureSingleFileSnappy(const string & path, CompressResult & total, size_t blockSize, + int times) { + string data; + ReadFile(data, path); + size_t maxlength = snappy::MaxCompressedLength(blockSize); + char * outputBuffer = new char[maxlength]; + char * dest = new char[blockSize]; + CompressResult result; + Timer t; + int compressedSize; + for (size_t start = 0; start < data.length(); start += blockSize) { + size_t currentblocksize = std::min(data.length() - start, blockSize); + uint64_t startTime = t.now(); + for (int i = 0; i < times; i++) { + size_t osize = maxlength; + snappy::RawCompress(data.data() + start, currentblocksize, outputBuffer, &osize); + compressedSize = osize; + result.compressedSize += osize; + result.uncompressedSize += currentblocksize; + } + uint64_t endTime = t.now(); + result.compressTime += endTime - startTime; + startTime = t.now(); + for (int i = 0; i < times; i++) { + snappy::RawUncompress(outputBuffer, compressedSize, dest); + } + endTime = t.now(); + result.uncompressTime += endTime - startTime; + } + printf("%s - %s\n", result.toString().c_str(), Path::GetName(path).c_str()); + delete[] outputBuffer; + delete[] dest; + total += result; +} + +TEST(Perf, RawCompressionSnappy) { + string inputdir = TestConfig.get("compressions.input.path", ""); + int64_t times = TestConfig.getInt("compression.time", 400); + int64_t blockSize = TestConfig.getInt("compression.block.size", 1024 * 64); + vector<FileEntry> inputfiles; + FileSystem::getLocal().list(inputdir, inputfiles); + CompressResult total; + printf("Block size: %lldK\n", blockSize / 1024); + for (size_t i = 0; i < inputfiles.size(); i++) { + if (!inputfiles[i].isDirectory) { + MeasureSingleFileSnappy((inputdir + "/" + inputfiles[i].name).c_str(), total, blockSize, + times); + } + } + printf("%s - Total\n", total.toString().c_str()); +} + http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestConfig.cc ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestConfig.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestConfig.cc new file mode 100644 index 0000000..36315ce --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestConfig.cc @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "commons.h" +#include "BufferStream.h" +#include "Buffers.h" +#include "test_commons.h" + +float absoute(float v) { + if (v > 0) { + return v; + } else { + return -v; + } +} + +TEST(Config, readAndWrite) { + Config config; + std::string STR = "CONFIG"; + std::string STRS = "CONFIG,LOG"; + int INT = 3; + bool BOOL = true; + + config.set("STR", STR.c_str()); + config.set("STRS", STRS.c_str()); + config.setInt("INT", INT); + config.setBool("BOOL", BOOL); + config.set("INTS", "3,4"); + config.set("FLOAT", "3.5"); + config.set("FLOATS", "3.5,4.6"); + + ASSERT_EQ(0, STR.compare(config.get("STR"))); + ASSERT_EQ(0, STRS.compare(config.get("STRS"))); + + ASSERT_EQ(INT, config.getInt("INT")); + ASSERT_EQ(BOOL, config.getBool("BOOL", false)); + + vector<int64_t> ints; + config.getInts("INTS", ints); + ASSERT_EQ(2, ints.size()); + ASSERT_EQ(3, ints[0]); + ASSERT_EQ(4, ints[1]); + + float floatValue = config.getFloat("FLOAT"); + ASSERT_TRUE(absoute(floatValue - 3.5) < 0.01); + + vector<float> floats; + config.getFloats("FLOATS", floats); + ASSERT_EQ(2, floats.size()); + ASSERT_TRUE(absoute(floats[0] - 3.5) < 0.01); + ASSERT_TRUE(absoute(floats[1] - 4.6) < 0.01); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestCounter.cc ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestCounter.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestCounter.cc new file mode 100644 index 0000000..e8760ea --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestCounter.cc @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "commons.h" +#include "BufferStream.h" +#include "Buffers.h" +#include "test_commons.h" + +TEST(Counter, test) { + Counter counter1("group", "key"); + const string & group = counter1.group(); + const string & name = counter1.name(); + ASSERT_EQ(0, name.compare("key")); + ASSERT_EQ(0, group.compare("group")); + + ASSERT_EQ(0, counter1.get()); + + counter1.increase(100); + ASSERT_EQ(100, counter1.get()); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestFileSystem.cc ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestFileSystem.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestFileSystem.cc new file mode 100644 index 0000000..c29c486 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestFileSystem.cc @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "FileSystem.h" +#include "test_commons.h" + +TEST(FileSystem, RawFileSystem) { + FileSystem & fs = FileSystem::getLocal(); + fs.mkdirs("temp"); + string temppath = "temp/data"; + string content; + GenerateKVTextLength(content, 4111111, "word"); + FileOutputStream * output = (FileOutputStream*)fs.create(temppath, true); + output->write(content.data(), content.length()); + output->close(); + delete output; + FileInputStream * input = (FileInputStream*)fs.open(temppath); + char buff[1024]; + int64_t total = 0; + while (true) { + int rd = input->read(buff, 1024); + if (rd <= 0) { + break; + } + ASSERT_EQ(content.substr(total, rd), string(buff, rd)); + total += rd; + } + ASSERT_EQ(content.length(), total); + delete input; + ASSERT_EQ(fs.getLength(temppath), content.length()); + ASSERT_TRUE(fs.exists(temppath)); + fs.remove("temp"); + ASSERT_FALSE(fs.exists(temppath)); +} + +// This test needs java CLASSPATH env to run +// Enable it manually only if some changes are made to FileSystem.h/cc +//TEST(FileSystem, JavaFileSystem) { +// FileSystem & fs = FileSystem::getJava(TestConfig); +// fs.mkdirs("temp"); +// string temppath = "temp/data"; +// string content; +// GenerateKVTextLength(content, 4111111, "word"); +// FileOutputStream * output = (FileOutputStream*)fs.create(temppath, true); +// output->write(content.data(), content.length()); +// output->close(); +// delete output; +// FileInputStream * input = (FileInputStream*)fs.open(temppath); +// char buff[102400]; +// int64_t total = 0; +// while(true) { +// int rd = input->read(buff, 102400); +// if (rd<=0) { +// break; +// } +// ASSERT_EQ(content.substr(total, rd), string(buff,rd)); +// total+=rd; +// } +// ASSERT_EQ(content.length(), total); +// delete input; +// ASSERT_EQ(fs.getLength(temppath), content.length()); +// ASSERT_TRUE(fs.exists(temppath)); +// fs.remove("temp"); +// ASSERT_FALSE(fs.exists(temppath)); +//} +