http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Combiner.cc ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Combiner.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Combiner.cc new file mode 100644 index 0000000..b426fc3 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Combiner.cc @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "Combiner.h" +#include "StringUtil.h" + +namespace NativeTask { + +NativeCombineRunner::NativeCombineRunner(Config * config, ObjectCreatorFunc combinerCreator) + : _config(config), _combinerCreator(combinerCreator), _keyGroupCount(0) { + if (NULL == _combinerCreator) { + THROW_EXCEPTION_EX(UnsupportException, "Create combiner failed"); + } +} + +KeyGroupIterator * NativeCombineRunner::createKeyGroupIterator(KVIterator * iter) { + return new KeyGroupIteratorImpl(iter); +} + +void NativeCombineRunner::combine(CombineContext context, KVIterator * iterator, + IFileWriter * writer) { + Configurable * combiner = (Configurable *)(_combinerCreator()); + if (NULL != combiner) { + combiner->configure(_config); + } + + NativeObjectType type = combiner->type(); + switch (type) { + case MapperType: { + Mapper * mapper = (Mapper*)combiner; + mapper->setCollector(writer); + + Buffer key; + Buffer value; + while (iterator->next(key, value)) { + mapper->map(key.data(), key.length(), value.data(), value.length()); + } + mapper->close(); + delete mapper; + } + break; + case ReducerType: { + Reducer * reducer = (Reducer*)combiner; + reducer->setCollector(writer); + KeyGroupIterator * kg = createKeyGroupIterator(iterator); + while (kg->nextKey()) { + _keyGroupCount++; + reducer->reduce(*kg); + } + reducer->close(); + delete reducer; + } + break; + default: + THROW_EXCEPTION(UnsupportException, "Combiner type not support"); + } +} + +} /* namespace NativeTask */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Combiner.h ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Combiner.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Combiner.h new file mode 100644 index 0000000..09f4eb0 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Combiner.h @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef COMBINER_H_ +#define COMBINER_H_ +#include "commons.h" +#include "IFile.h" + +namespace NativeTask { + +class MemoryBufferKVIterator : public KVIterator { +public: + virtual const char * getBase() = 0; + virtual std::vector<uint32_t> * getKVOffsets() = 0; +}; + +enum CombineContextType { + UNKNOWN = 0, + CONTINUOUS_MEMORY_BUFFER = 1, +}; + +class CombineContext { + +private: + CombineContextType _type; + +public: + CombineContext(CombineContextType type) + : _type(type) { + } + +public: + CombineContextType getType() { + return _type; + } +}; + +class CombineInMemory : public CombineContext { + CombineInMemory() + : CombineContext(CONTINUOUS_MEMORY_BUFFER) { + } +}; + +class ICombineRunner { +public: + ICombineRunner() { + } + + virtual void combine(CombineContext type, KVIterator * kvIterator, IFileWriter * writer) = 0; + + virtual ~ICombineRunner() { + } +}; + +class NativeCombineRunner : public ICombineRunner { +private: + Config * _config; + ObjectCreatorFunc _combinerCreator; + uint32_t _keyGroupCount; + +public: + NativeCombineRunner(Config * config, ObjectCreatorFunc objectCreator); + +public: + void combine(CombineContext type, KVIterator * kvIterator, IFileWriter * writer); + +private: + KeyGroupIterator * createKeyGroupIterator(KVIterator * iter); +}; + +} /* namespace NativeTask */ +#endif /* COMBINER_H_ */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Compressions.cc ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Compressions.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Compressions.cc new file mode 100644 index 0000000..e2ce730 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Compressions.cc @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "commons.h" +#include "SyncUtils.h" +#include "Compressions.h" +#include "codec/GzipCodec.h" +#include "codec/SnappyCodec.h" +#include "codec/Lz4Codec.h" + +namespace NativeTask { + +CompressStream::~CompressStream() { +} + +void CompressStream::writeDirect(const void * buff, uint32_t length) { + THROW_EXCEPTION(UnsupportException, "writeDirect not support"); +} + +/////////////////////////////////////////////////////////// + +DecompressStream::~DecompressStream() { +} + +int32_t DecompressStream::readDirect(void * buff, uint32_t length) { + THROW_EXCEPTION(UnsupportException, "readDirect not support"); +} + +/////////////////////////////////////////////////////////// + +const Compressions::Codec Compressions::GzipCodec = Compressions::Codec( + "org.apache.hadoop.io.compress.GzipCodec", ".gz"); +const Compressions::Codec Compressions::SnappyCodec = Compressions::Codec( + "org.apache.hadoop.io.compress.SnappyCodec", ".snappy"); +const Compressions::Codec Compressions::Lz4Codec = Compressions::Codec( + "org.apache.hadoop.io.compress.Lz4Codec", ".lz4"); + +vector<Compressions::Codec> Compressions::SupportedCodecs = vector<Compressions::Codec>(); + +void Compressions::initCodecs() { + static Lock lock; + ScopeLock<Lock> autolock(lock); + if (SupportedCodecs.size() == 0) { + SupportedCodecs.push_back(GzipCodec); + SupportedCodecs.push_back(SnappyCodec); + SupportedCodecs.push_back(Lz4Codec); + } +} + +bool Compressions::support(const string & codec) { + initCodecs(); + for (size_t i = 0; i < SupportedCodecs.size(); i++) { + if (codec == SupportedCodecs[i].name) { + return true; + } + } + return false; +} + +const string Compressions::getExtension(const string & codec) { + initCodecs(); + for (size_t i = 0; i < SupportedCodecs.size(); i++) { + if (codec == SupportedCodecs[i].name) { + return SupportedCodecs[i].extension; + } + } + return string(); +} + +const string Compressions::getCodec(const string & extension) { + initCodecs(); + for (size_t i = 0; i < SupportedCodecs.size(); i++) { + if (extension == SupportedCodecs[i].extension) { + return SupportedCodecs[i].name; + } + } + return string(); +} + +const string Compressions::getCodecByFile(const string & file) { + initCodecs(); + for (size_t i = 0; i < SupportedCodecs.size(); i++) { + const string & extension = SupportedCodecs[i].extension; + if ((file.length() > extension.length()) + && (file.substr(file.length() - extension.length()) == extension)) { + return SupportedCodecs[i].name; + } + } + return string(); +} + +CompressStream * Compressions::getCompressionStream(const string & codec, OutputStream * stream, + uint32_t bufferSizeHint) { + if (codec == GzipCodec.name) { + return new GzipCompressStream(stream, bufferSizeHint); + } + if (codec == SnappyCodec.name) { + return new SnappyCompressStream(stream, bufferSizeHint); + } + if (codec == Lz4Codec.name) { + return new Lz4CompressStream(stream, bufferSizeHint); + } + return NULL; +} + +DecompressStream * Compressions::getDecompressionStream(const string & codec, InputStream * stream, + uint32_t bufferSizeHint) { + if (codec == GzipCodec.name) { + return new GzipDecompressStream(stream, bufferSizeHint); + } + if (codec == SnappyCodec.name) { + return new SnappyDecompressStream(stream, bufferSizeHint); + } + if (codec == Lz4Codec.name) { + return new Lz4DecompressStream(stream, bufferSizeHint); + } + return NULL; +} + +} // namespace NativeTask + http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Compressions.h ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Compressions.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Compressions.h new file mode 100644 index 0000000..f70adb6 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Compressions.h @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef COMPRESSIONS_H_ +#define COMPRESSIONS_H_ + +#include <string> +#include <vector> +#include "Streams.h" + +namespace NativeTask { + +using std::vector; +using std::string; + +class CompressStream : public FilterOutputStream { +public: + CompressStream(OutputStream * stream) + : FilterOutputStream(stream) { + } + + virtual ~CompressStream(); + + virtual void writeDirect(const void * buff, uint32_t length); + + virtual void finish() { + flush(); + } + + virtual void resetState() { + + } + + virtual uint64_t compressedBytesWritten() { + return 0; + } +}; + +class DecompressStream : public FilterInputStream { +public: + DecompressStream(InputStream * stream) + : FilterInputStream(stream) { + } + + virtual ~DecompressStream(); + + virtual int32_t readDirect(void * buff, uint32_t length); + + virtual uint64_t compressedBytesRead() { + return 0; + } +}; + +class Compressions { +protected: + class Codec { + public: + string name; + string extension; + + Codec(const string & name, const string & extension) + : name(name), extension(extension) { + } + }; + + static vector<Codec> SupportedCodecs; + + static void initCodecs(); + +public: + static const Codec GzipCodec; + static const Codec SnappyCodec; + static const Codec Lz4Codec; + +public: + static bool support(const string & codec); + + static const string getExtension(const string & codec); + + static const string getCodec(const string & extension); + + static const string getCodecByFile(const string & file); + + static CompressStream * getCompressionStream(const string & codec, OutputStream * stream, + uint32_t bufferSizeHint); + + static DecompressStream * getDecompressionStream(const string & codec, InputStream * stream, + uint32_t bufferSizeHint); +}; + +} // namespace NativeTask + +#endif /* COMPRESSIONS_H_ */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Constants.h ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Constants.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Constants.h new file mode 100644 index 0000000..5a5707b --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Constants.h @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef CONSTANTS_H_ +#define CONSTANTS_H_ + +const uint32_t SIZE_OF_PARTITION_LENGTH = sizeof(uint32_t); +const uint32_t SIZE_OF_KEY_LENGTH = sizeof(uint32_t); +const uint32_t SIZE_OF_VALUE_LENGTH = sizeof(uint32_t); +const uint32_t SIZE_OF_KV_LENGTH = SIZE_OF_KEY_LENGTH + SIZE_OF_VALUE_LENGTH; + +#endif //CONSTANTS_H_ http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/FileSystem.cc ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/FileSystem.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/FileSystem.cc new file mode 100644 index 0000000..7888aa1 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/FileSystem.cc @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <errno.h> +#include <dirent.h> +#include <sys/stat.h> +#include <jni.h> +#include "commons.h" +#include "util/StringUtil.h" +#include "jniutils.h" +#include "NativeTask.h" +#include "TaskCounters.h" +#include "NativeObjectFactory.h" +#include "Path.h" +#include "FileSystem.h" + +namespace NativeTask { + +///////////////////////////////////////////////////////////// + +FileInputStream::FileInputStream(const string & path) { + _handle = fopen(path.c_str(), "rb"); + if (_handle != NULL) { + _fd = fileno(_handle); + _path = path; + } else { + _fd = -1; + THROW_EXCEPTION_EX(IOException, "Can't open raw file: [%s]", path.c_str()); + } + _bytesRead = NativeObjectFactory::GetCounter(TaskCounters::FILESYSTEM_COUNTER_GROUP, + TaskCounters::FILE_BYTES_READ); +} + +FileInputStream::~FileInputStream() { + close(); +} + +void FileInputStream::seek(uint64_t position) { + ::lseek(_fd, position, SEEK_SET); +} + +uint64_t FileInputStream::tell() { + return ::lseek(_fd, 0, SEEK_CUR); +} + +int32_t FileInputStream::read(void * buff, uint32_t length) { + int32_t ret = ::read(_fd, buff, length); + if (ret > 0) { + _bytesRead->increase(ret); + } + return ret; +} + +void FileInputStream::close() { + if (_handle != NULL) { + fclose(_handle); + _handle = NULL; + _fd = -1; + } +} + +///////////////////////////////////////////////////////////// + +FileOutputStream::FileOutputStream(const string & path, bool overwite) { + _handle = fopen(path.c_str(), "wb"); + if (_handle != NULL) { + _fd = fileno(_handle); + _path = path; + } else { + _fd = -1; + THROW_EXCEPTION_EX(IOException, "Open raw file failed: [%s]", path.c_str()); + } + _bytesWrite = NativeObjectFactory::GetCounter(TaskCounters::FILESYSTEM_COUNTER_GROUP, + TaskCounters::FILE_BYTES_WRITTEN); +} + +FileOutputStream::~FileOutputStream() { + close(); +} + +uint64_t FileOutputStream::tell() { + return ::lseek(_fd, 0, SEEK_CUR); +} + +void FileOutputStream::write(const void * buff, uint32_t length) { + if (::write(_fd, buff, length) < length) { + THROW_EXCEPTION(IOException, "::write error"); + } + _bytesWrite->increase(length); +} + +void FileOutputStream::flush() { +} + +void FileOutputStream::close() { + if (_handle != NULL) { + fclose(_handle); + _handle = NULL; + _fd = -1; + } +} + +///////////////////////////////////////////////////////////// + +class RawFileSystem : public FileSystem { +protected: + string getRealPath(const string & path) { + if (StringUtil::StartsWith(path, "file:")) { + return path.substr(5); + } + return path; + } +public: + InputStream * open(const string & path) { + return new FileInputStream(getRealPath(path)); + } + + OutputStream * create(const string & path, bool overwrite) { + string np = getRealPath(path); + string parent = Path::GetParent(np); + if (parent.length() > 0) { + if (!exists(parent)) { + mkdirs(parent); + } + } + return new FileOutputStream(np, overwrite); + } + + uint64_t getLength(const string & path) { + struct stat st; + if (::stat(getRealPath(path).c_str(), &st) != 0) { + char buff[256]; + strerror_r(errno, buff, 256); + THROW_EXCEPTION(IOException, + StringUtil::Format("stat path %s failed, %s", path.c_str(), buff)); + } + return st.st_size; + } + + bool list(const string & path, vector<FileEntry> & status) { + DIR * dp; + struct dirent * dirp; + if ((dp = opendir(path.c_str())) == NULL) { + return false; + } + + FileEntry temp; + while ((dirp = readdir(dp)) != NULL) { + temp.name = dirp->d_name; + temp.isDirectory = dirp->d_type & DT_DIR; + if (temp.name == "." || temp.name == "..") { + continue; + } + status.push_back(temp); + } + closedir(dp); + return true; + } + + void remove(const string & path) { + if (!exists(path)) { + LOG("[FileSystem] remove file %s not exists, ignore", path.c_str()); + return; + } + if (::remove(getRealPath(path).c_str()) != 0) { + int err = errno; + if (::system(StringUtil::Format("rm -rf %s", path.c_str()).c_str()) == 0) { + return; + } + char buff[256]; + strerror_r(err, buff, 256); + THROW_EXCEPTION(IOException, + StringUtil::Format("FileSystem: remove path %s failed, %s", path.c_str(), buff)); + } + } + + bool exists(const string & path) { + struct stat st; + if (::stat(getRealPath(path).c_str(), &st) != 0) { + return false; + } + return true; + } + + int mkdirs(const string & path, mode_t nmode) { + string np = getRealPath(path); + struct stat sb; + + if (stat(np.c_str(), &sb) == 0) { + if (S_ISDIR (sb.st_mode) == 0) { + return 1; + } + return 0; + } + + string npathstr = np; + char * npath = const_cast<char*>(npathstr.c_str()); + + /* Skip leading slashes. */ + char * p = npath; + while (*p == '/') + p++; + + while (NULL != (p = strchr(p, '/'))) { + *p = '\0'; + if (stat(npath, &sb) != 0) { + if (mkdir(npath, nmode)) { + return 1; + } + } else if (S_ISDIR (sb.st_mode) == 0) { + return 1; + } + *p++ = '/'; /* restore slash */ + while (*p == '/') + p++; + } + + /* Create the final directory component. */ + if (stat(npath, &sb) && mkdir(npath, nmode)) { + return 1; + } + return 0; + } + + void mkdirs(const string & path) { + int ret = mkdirs(path, 0755); + if (ret != 0) { + THROW_EXCEPTION_EX(IOException, "mkdirs [%s] failed", path.c_str()); + } + } +}; + +/////////////////////////////////////////////////////////// + +extern RawFileSystem RawFileSystemInstance; + +RawFileSystem RawFileSystemInstance = RawFileSystem(); + +string FileSystem::getDefaultFsUri(Config * config) { + const char * nm = config->get(FS_DEFAULT_NAME); + if (nm == NULL) { + nm = config->get("fs.defaultFS"); + } + if (nm == NULL) { + return string("file:///"); + } else { + return string(nm); + } +} + +FileSystem & FileSystem::getLocal() { + return RawFileSystemInstance; +} + + +FileSystem & FileSystem::get(Config * config) { + string uri = getDefaultFsUri(config); + if (uri == "file:///") { + return RawFileSystemInstance; + } +} + +} // namespace Hadoap http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/FileSystem.h ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/FileSystem.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/FileSystem.h new file mode 100644 index 0000000..4b501fa --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/FileSystem.h @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FILESYSTEM_H_ +#define FILESYSTEM_H_ + +#include <string> +#include "NativeTask.h" +#include "Streams.h" + +namespace NativeTask { + +class FileSystem; + +/** + * Local raw filesystem file input stream + * with blocking semantics + */ +class FileInputStream : public InputStream { +private: + string _path; + FILE * _handle; + int _fd; + Counter * _bytesRead; +public: + FileInputStream(const string & path); + virtual ~FileInputStream(); + + virtual void seek(uint64_t position); + + virtual uint64_t tell(); + + virtual int32_t read(void * buff, uint32_t length); + + virtual void close(); +}; + +/** + * Local raw filesystem file output stream + * with blocking semantics + */ +class FileOutputStream : public OutputStream { +private: + string _path; + FILE * _handle; + int _fd; + Counter * _bytesWrite; +public: + FileOutputStream(const string & path, bool overwite = true); + virtual ~FileOutputStream(); + + virtual uint64_t tell(); + + virtual void write(const void * buff, uint32_t length); + + virtual void flush(); + + virtual void close(); +}; + + +class FileEntry { +public: + string name; + bool isDirectory; +}; + +/** + * FileSystem interface + */ +class FileSystem { +protected: + FileSystem() { + } +public: + virtual ~FileSystem() { + } + + virtual InputStream * open(const string & path) { + return NULL; + } + + virtual OutputStream * create(const string & path, bool overwrite = true) { + return NULL; + } + + virtual uint64_t getLength(const string & path) { + return 0; + } + + virtual bool list(const string & path, vector<FileEntry> & status) { + return false; + } + + virtual void remove(const string & path) { + } + + virtual bool exists(const string & path) { + return false; + } + + virtual void mkdirs(const string & path) { + } + + static string getDefaultFsUri(Config * config); + static FileSystem & getLocal(); + static FileSystem & getJava(Config * config); + static FileSystem & get(Config * config); +}; + +} // namespace NativeTask + +#endif /* FILESYSTEM_H_ */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.cc ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.cc new file mode 100644 index 0000000..ff272ba --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.cc @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "commons.h" +#include "util/StringUtil.h" +#include "IFile.h" +#include "Compressions.h" +#include "lib/FileSystem.h" + +namespace NativeTask { + +/////////////////////////////////////////////////////////// + +IFileReader::IFileReader(InputStream * stream, SingleSpillInfo * spill, bool deleteInputStream) + : _deleteSourceStream(deleteInputStream), _stream(stream), _source(NULL), + _checksumType(spill->checkSumType), _kType(spill->keyType), _vType(spill->valueType), + _codec(spill->codec), _segmentIndex(-1), _spillInfo(spill), _valuePos(NULL), _valueLen(0) { + _source = new ChecksumInputStream(_stream, _checksumType); + _source->setLimit(0); + _reader.init(128 * 1024, _source, _codec); +} + +IFileReader::~IFileReader() { + + delete _source; + _source = NULL; + + if (_deleteSourceStream) { + delete _stream; + _stream = NULL; + } +} + +/** + * 0 if success + * 1 if end + */ +bool IFileReader::nextPartition() { + if (0 != _source->getLimit()) { + THROW_EXCEPTION(IOException, "bad ifile segment length"); + } + if (_segmentIndex >= 0) { + // verify checksum + uint32_t chsum = 0; + if (4 != _stream->readFully(&chsum, 4)) { + THROW_EXCEPTION(IOException, "read ifile checksum failed"); + } + uint32_t actual = bswap(chsum); + uint32_t expect = _source->getChecksum(); + if (actual != expect) { + THROW_EXCEPTION_EX(IOException, "read ifile checksum not match, actual %x expect %x", actual, + expect); + } + } + _segmentIndex++; + if (_segmentIndex < (int)(_spillInfo->length)) { + int64_t end_pos = (int64_t)_spillInfo->segments[_segmentIndex].realEndOffset; + if (_segmentIndex > 0) { + end_pos -= (int64_t)_spillInfo->segments[_segmentIndex - 1].realEndOffset; + } + if (end_pos < 0) { + THROW_EXCEPTION(IOException, "bad ifile format"); + } + // exclude checksum + _source->setLimit(end_pos - 4); + _source->resetChecksum(); + return true; + } else { + return false; + } +} + +/////////////////////////////////////////////////////////// + +IFileWriter * IFileWriter::create(const std::string & filepath, const MapOutputSpec & spec, + Counter * spilledRecords) { + OutputStream * fout = FileSystem::getLocal().create(filepath, true); + IFileWriter * writer = new IFileWriter(fout, spec.checksumType, spec.keyType, spec.valueType, + spec.codec, spilledRecords, true); + return writer; +} + +IFileWriter::IFileWriter(OutputStream * stream, ChecksumType checksumType, KeyValueType ktype, + KeyValueType vtype, const string & codec, Counter * counter, bool deleteTargetStream) + : _deleteTargetStream(deleteTargetStream), _stream(stream), _dest(NULL), + _checksumType(checksumType), _kType(ktype), _vType(vtype), _codec(codec), + _recordCounter(counter) { + _dest = new ChecksumOutputStream(_stream, _checksumType); + _appendBuffer.init(128 * 1024, _dest, _codec); +} + +IFileWriter::~IFileWriter() { + delete _dest; + _dest = NULL; + + if (_deleteTargetStream) { + delete _stream; + _stream = NULL; + } +} + +void IFileWriter::startPartition() { + _spillFileSegments.push_back(IFileSegment()); + _dest->resetChecksum(); +} + +void IFileWriter::endPartition() { + char EOFMarker[2] = {-1, -1}; + _appendBuffer.write(EOFMarker, 2); + _appendBuffer.flush(); + + CompressStream * compressionStream = _appendBuffer.getCompressionStream(); + if (NULL != compressionStream) { + compressionStream->finish(); + compressionStream->resetState(); + } + + uint32_t chsum = _dest->getChecksum(); + chsum = bswap(chsum); + _stream->write(&chsum, sizeof(chsum)); + _stream->flush(); + IFileSegment * info = &(_spillFileSegments[_spillFileSegments.size() - 1]); + info->uncompressedEndOffset = _appendBuffer.getCounter(); + info->realEndOffset = _stream->tell(); +} + +void IFileWriter::write(const char * key, uint32_t keyLen, const char * value, uint32_t valueLen) { + // append KeyLength ValueLength KeyBytesLength + uint32_t keyBuffLen = keyLen; + uint32_t valBuffLen = valueLen; + switch (_kType) { + case TextType: + keyBuffLen += WritableUtils::GetVLongSize(keyLen); + break; + case BytesType: + keyBuffLen += 4; + break; + default: + break; + } + + switch (_vType) { + case TextType: + valBuffLen += WritableUtils::GetVLongSize(valueLen); + break; + case BytesType: + valBuffLen += 4; + break; + default: + break; + } + + _appendBuffer.write_vuint2(keyBuffLen, valBuffLen); + + switch (_kType) { + case TextType: + _appendBuffer.write_vuint(keyLen); + break; + case BytesType: + _appendBuffer.write_uint32_be(keyLen); + break; + default: + break; + } + + if (keyLen > 0) { + _appendBuffer.write(key, keyLen); + } + + if (NULL != _recordCounter) { + _recordCounter->increase(); + } + + switch (_vType) { + case TextType: + _appendBuffer.write_vuint(valueLen); + break; + case BytesType: + _appendBuffer.write_uint32_be(valueLen); + break; + default: + break; + } + if (valueLen > 0) { + _appendBuffer.write(value, valueLen); + } +} + +IFileSegment * IFileWriter::toArray(std::vector<IFileSegment> *segments) { + IFileSegment * segs = new IFileSegment[segments->size()]; + for (size_t i = 0; i < segments->size(); i++) { + segs[i] = segments->at(i); + } + return segs; +} + +SingleSpillInfo * IFileWriter::getSpillInfo() { + const uint32_t size = _spillFileSegments.size(); + return new SingleSpillInfo(toArray(&_spillFileSegments), size, "", _checksumType, _kType, _vType, + _codec); +} + +void IFileWriter::getStatistics(uint64_t & offset, uint64_t & realOffset) { + if (_spillFileSegments.size() > 0) { + offset = _spillFileSegments[_spillFileSegments.size() - 1].uncompressedEndOffset; + realOffset = _spillFileSegments[_spillFileSegments.size() - 1].realEndOffset; + } else { + offset = 0; + realOffset = 0; + } +} + +} // namespace NativeTask + http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.h ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.h new file mode 100644 index 0000000..76d6fbc --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.h @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef IFILE_H_ +#define IFILE_H_ + +#include "Checksum.h" +#include "Buffers.h" +#include "WritableUtils.h" +#include "SpillInfo.h" +#include "MapOutputSpec.h" + +namespace NativeTask { + +/** + * IFileReader + */ +class IFileReader { +private: + InputStream * _stream; + ChecksumInputStream * _source; + ReadBuffer _reader; + ChecksumType _checksumType; + KeyValueType _kType; + KeyValueType _vType; + string _codec; + int32_t _segmentIndex; + SingleSpillInfo * _spillInfo; + const char * _valuePos; + uint32_t _valueLen; + bool _deleteSourceStream; + +public: + IFileReader(InputStream * stream, SingleSpillInfo * spill, bool deleteSourceStream = false); + + virtual ~IFileReader(); + + /** + * @return 0 if have next partition, none 0 if no more partition + */ + bool nextPartition(); + + /** + * get next key + * NULL if no more, then next_partition() need to be called + * NOTICE: before value() is called, the return pointer value is + * guaranteed to be valid + */ + const char * nextKey(uint32_t & keyLen) { + int64_t t1 = _reader.readVLong(); + int64_t t2 = _reader.readVLong(); + if (t1 == -1) { + return NULL; + } + const char * kvbuff = _reader.get((uint32_t)(t1 + t2)); + uint32_t len; + switch (_kType) { + case TextType: + keyLen = WritableUtils::ReadVInt(kvbuff, len); + break; + case BytesType: + keyLen = bswap(*(uint32_t*)kvbuff); + len = 4; + break; + default: + keyLen = t1; + len = 0; + } + const char * kbuff = kvbuff + len; + const char * vbuff = kvbuff + (uint32_t)t1; + switch (_vType) { + case TextType: + _valueLen = WritableUtils::ReadVInt(vbuff, len); + _valuePos = vbuff + len; + break; + case BytesType: + _valueLen = bswap(*(uint32_t*)vbuff); + _valuePos = vbuff + 4; + break; + default: + _valueLen = t2; + _valuePos = vbuff; + } + return kbuff; + } + + /** + * length of current value part of IFile entry + */ + uint32_t valueLen() { + return _valueLen; + } + + /** + * get current value + */ + const char * value(uint32_t & valueLen) { + valueLen = _valueLen; + return _valuePos; + } +}; + +/** + * IFile Writer + */ +class IFileWriter : public Collector { +protected: + OutputStream * _stream; + ChecksumOutputStream * _dest; + ChecksumType _checksumType; + KeyValueType _kType; + KeyValueType _vType; + string _codec; + AppendBuffer _appendBuffer; + vector<IFileSegment> _spillFileSegments; + Counter * _recordCounter; + + bool _deleteTargetStream; + +private: + IFileSegment * toArray(std::vector<IFileSegment> *segments); + +public: + static IFileWriter * create(const std::string & filepath, const MapOutputSpec & spec, + Counter * spilledRecords); + + IFileWriter(OutputStream * stream, ChecksumType checksumType, KeyValueType ktype, + KeyValueType vtype, const string & codec, Counter * recordCounter, + bool deleteTargetStream = false); + + virtual ~IFileWriter(); + + void startPartition(); + + void endPartition(); + + virtual void write(const char * key, uint32_t keyLen, const char * value, uint32_t valueLen); + + SingleSpillInfo * getSpillInfo(); + + void getStatistics(uint64_t & offset, uint64_t & realOffset); + + virtual void collect(const void * key, uint32_t keyLen, const void * value, uint32_t valueLen) { + write((const char*)key, keyLen, (const char*)value, valueLen); + } +}; + +} // namespace NativeTask + +#endif /* IFILE_H_ */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Iterator.cc ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Iterator.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Iterator.cc new file mode 100644 index 0000000..dfde456 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Iterator.cc @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "Iterator.h" +#include "commons.h" + +namespace NativeTask { + +KeyGroupIteratorImpl::KeyGroupIteratorImpl(KVIterator * iterator) + : _keyGroupIterState(NEW_KEY), _iterator(iterator), _first(true) { +} + +bool KeyGroupIteratorImpl::nextKey() { + if (_keyGroupIterState == NO_MORE) { + return false; + } + + uint32_t temp; + while (_keyGroupIterState == SAME_KEY || _keyGroupIterState == NEW_KEY_VALUE) { + nextValue(temp); + } + if (_keyGroupIterState == NEW_KEY) { + if (_first == true) { + _first = false; + if (!next()) { + _keyGroupIterState = NO_MORE; + return false; + } + } + _keyGroupIterState = NEW_KEY_VALUE; + _currentGroupKey.assign(_key.data(), _key.length()); + return true; + } + return false; +} + +const char * KeyGroupIteratorImpl::getKey(uint32_t & len) { + len = (uint32_t)_key.length(); + return _key.data(); +} + +const char * KeyGroupIteratorImpl::nextValue(uint32_t & len) { + switch (_keyGroupIterState) { + case NEW_KEY: { + return NULL; + } + case SAME_KEY: { + if (next()) { + if (_key.length() == _currentGroupKey.length()) { + if (fmemeq(_key.data(), _currentGroupKey.c_str(), _key.length())) { + len = _value.length(); + return _value.data(); + } + } + _keyGroupIterState = NEW_KEY; + return NULL; + } + _keyGroupIterState = NO_MORE; + return NULL; + } + case NEW_KEY_VALUE: { + _keyGroupIterState = SAME_KEY; + len = _value.length(); + return _value.data(); + } + case NO_MORE: + return NULL; + } + return NULL; +} + +bool KeyGroupIteratorImpl::next() { + bool result = _iterator->next(_key, _value); + return result; +} + +} //namespace NativeTask http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Iterator.h ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Iterator.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Iterator.h new file mode 100644 index 0000000..b1b5f03 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Iterator.h @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ITERATOR_H_ +#define ITERATOR_H_ + +#include "NativeTask.h" + +namespace NativeTask { + +class KeyGroupIteratorImpl : public KeyGroupIterator { +protected: + // for KeyGroupIterator + KeyGroupIterState _keyGroupIterState; + KVIterator * _iterator; + string _currentGroupKey; + Buffer _key; + Buffer _value; + bool _first; + +public: + KeyGroupIteratorImpl(KVIterator * iterator); + bool nextKey(); + const char * getKey(uint32_t & len); + const char * nextValue(uint32_t & len); + +protected: + bool next(); +}; + +} //namespace NativeTask +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Log.cc ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Log.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Log.cc new file mode 100644 index 0000000..2eaa342 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Log.cc @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "Log.h" + +namespace NativeTask { + +#ifdef PRINT_LOG + +FILE * LOG_DEVICE = stderr; + +#endif + +} //namespace NativeTask + http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Log.h ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Log.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Log.h new file mode 100644 index 0000000..a0c17f3 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Log.h @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LOG_H_ +#define LOG_H_ + +#include <stdio.h> +#include <time.h> + +namespace NativeTask { + +#define PRINT_LOG + +#ifdef PRINT_LOG + +extern FILE * LOG_DEVICE; +#define LOG(_fmt_, args...) if (LOG_DEVICE) { \ + time_t log_timer; struct tm log_tm; \ + time(&log_timer); localtime_r(&log_timer, &log_tm); \ + fprintf(LOG_DEVICE, "%02d/%02d/%02d %02d:%02d:%02d INFO "_fmt_"\n", \ + log_tm.tm_year%100, log_tm.tm_mon+1, log_tm.tm_mday, \ + log_tm.tm_hour, log_tm.tm_min, log_tm.tm_sec, \ + ##args);} + +#else + +#define LOG(_fmt_, args...) + +#endif + +} // namespace NativeTask + +#endif /* LOG_H_ */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.cc ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.cc new file mode 100644 index 0000000..34f7072 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.cc @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "commons.h" +#include "util/Timer.h" +#include "util/StringUtil.h" +#include "FileSystem.h" +#include "NativeObjectFactory.h" +#include "MapOutputCollector.h" +#include "Merge.h" +#include "NativeTask.h" +#include "WritableUtils.h" +#include "util/DualPivotQuickSort.h" +#include "Combiner.h" +#include "TaskCounters.h" +#include "MinHeap.h" + +namespace NativeTask { + +ICombineRunner * CombineRunnerWrapper::createCombiner() { + + ICombineRunner * combineRunner = NULL; + if (NULL != _config->get(NATIVE_COMBINER)) { + const char * combinerClass = _config->get(NATIVE_COMBINER); + ObjectCreatorFunc objectCreater = NativeObjectFactory::GetObjectCreator(combinerClass); + if (NULL == objectCreater) { + THROW_EXCEPTION_EX(UnsupportException, "Combiner not found: %s", combinerClass); + } else { + LOG("[MapOutputCollector::configure] native combiner is enabled: %s", combinerClass); + } + combineRunner = new NativeCombineRunner(_config, objectCreater); + } else { + CombineHandler * javaCombiner = _spillOutput->getJavaCombineHandler(); + if (NULL != javaCombiner) { + _isJavaCombiner = true; + combineRunner = (ICombineRunner *)javaCombiner; + } else { + LOG("[MapOutputCollector::getCombiner] cannot get combine handler from java"); + } + } + return combineRunner; +} + +void CombineRunnerWrapper::combine(CombineContext type, KVIterator * kvIterator, + IFileWriter * writer) { + + if (!_combinerInited) { + _combineRunner = createCombiner(); + _combinerInited = true; + } + + if (NULL != _combineRunner) { + _combineRunner->combine(type, kvIterator, writer); + } else { + LOG("[CombineRunnerWrapper::combine] no valid combiner"); + } +} + +///////////////////////////////////////////////////////////////// +// MapOutputCollector +///////////////////////////////////////////////////////////////// + +MapOutputCollector::MapOutputCollector(uint32_t numberPartitions, SpillOutputService * spillService) + : _config(NULL), _buckets(NULL), _keyComparator(NULL), _defaultBlockSize(0), + _combineRunner(NULL), _spilledRecords(NULL), _spillOutput(spillService), _pool(NULL), + _numPartitions(numberPartitions) { + _pool = new MemoryPool(); +} + +MapOutputCollector::~MapOutputCollector() { + + if (NULL != _buckets) { + for (uint32_t i = 0; i < _numPartitions; i++) { + if (NULL != _buckets[i]) { + delete _buckets[i]; + _buckets[i] = NULL; + } + } + } + + delete[] _buckets; + _buckets = NULL; + + if (NULL != _pool) { + delete _pool; + _pool = NULL; + } + + if (NULL != _combineRunner) { + delete _combineRunner; + _combineRunner = NULL; + } +} + +void MapOutputCollector::init(uint32_t defaultBlockSize, uint32_t memoryCapacity, + ComparatorPtr keyComparator, Counter * spilledRecords, ICombineRunner * combiner) { + + this->_combineRunner = combiner; + + this->_defaultBlockSize = defaultBlockSize; + + _pool->init(memoryCapacity); + + //TODO: add support for customized comparator + this->_keyComparator = keyComparator; + + _buckets = new PartitionBucket*[_numPartitions]; + + for (uint32_t partitionId = 0; partitionId < _numPartitions; partitionId++) { + PartitionBucket * pb = new PartitionBucket(_pool, partitionId, keyComparator, _combineRunner, + defaultBlockSize); + + _buckets[partitionId] = pb; + } + + _spilledRecords = spilledRecords; + + _collectTimer.reset(); +} + +void MapOutputCollector::reset() { + for (uint32_t i = 0; i < _numPartitions; i++) { + if (NULL != _buckets[i]) { + _buckets[i]->reset(); + } + } + _pool->reset(); +} + +void MapOutputCollector::configure(Config * config) { + _config = config; + MapOutputSpec::getSpecFromConfig(config, _spec); + + uint32_t maxBlockSize = config->getInt(NATIVE_SORT_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE); + uint32_t capacity = config->getInt(MAPRED_IO_SORT_MB, 300) * 1024 * 1024; + + uint32_t defaultBlockSize = getDefaultBlockSize(capacity, _numPartitions, maxBlockSize); + LOG("Native Total MemoryBlockPool: num_partitions %u, min_block_size %uK, max_block_size %uK, capacity %uM", _numPartitions, defaultBlockSize / 1024, + maxBlockSize / 1024, capacity / 1024 / 1024); + + ComparatorPtr comparator = getComparator(config, _spec); + + Counter * spilledRecord = NativeObjectFactory::GetCounter(TaskCounters::TASK_COUNTER_GROUP, + TaskCounters::SPILLED_RECORDS); + + ICombineRunner * combiner = NULL; + if (NULL != config->get(NATIVE_COMBINER) + // config name for old api and new api + || NULL != config->get(MAPRED_COMBINE_CLASS_OLD) + || NULL != config->get(MAPRED_COMBINE_CLASS_NEW)) { + combiner = new CombineRunnerWrapper(config, _spillOutput); + } + + init(defaultBlockSize, capacity, comparator, spilledRecord, combiner); +} + +KVBuffer * MapOutputCollector::allocateKVBuffer(uint32_t partitionId, uint32_t kvlength) { + PartitionBucket * partition = getPartition(partitionId); + if (NULL == partition) { + THROW_EXCEPTION_EX(IOException, "Partition is NULL, partition_id: %d, num_partitions: %d", partitionId, _numPartitions); + } + + KVBuffer * dest = partition->allocateKVBuffer(kvlength); + + if (NULL == dest) { + string * spillpath = _spillOutput->getSpillPath(); + if (NULL == spillpath || spillpath->length() == 0) { + THROW_EXCEPTION(IOException, "Illegal(empty) spill files path"); + } else { + middleSpill(*spillpath, ""); + delete spillpath; + } + + dest = dest = partition->allocateKVBuffer(kvlength); + if (NULL == dest) { + // io.sort.mb too small, cann't proceed + // should not get here, cause get_buffer_to_put can throw OOM exception + THROW_EXCEPTION(OutOfMemoryException, "key/value pair larger than io.sort.mb"); + } + } + return dest; +} + +/** + * collect one k/v pair + * @return true success; false buffer full, need spill + */ +bool MapOutputCollector::collect(const void * key, uint32_t keylen, const void * value, + uint32_t vallen, uint32_t partitionId) { + uint32_t total_length = keylen + vallen + KVBuffer::headerLength(); + KVBuffer * buff = allocateKVBuffer(partitionId, total_length); + + if (NULL == buff) { + return false; + } + buff->fill(key, keylen, value, vallen); + return true; +} + +ComparatorPtr MapOutputCollector::getComparator(Config * config, MapOutputSpec & spec) { + string nativeComparator = NATIVE_MAPOUT_KEY_COMPARATOR; + const char * key_class = config->get(MAPRED_MAPOUTPUT_KEY_CLASS); + if (NULL == key_class) { + key_class = config->get(MAPRED_OUTPUT_KEY_CLASS); + } + nativeComparator.append(".").append(key_class); + const char * comparatorName = config->get(nativeComparator); + return NativeTask::get_comparator(spec.keyType, comparatorName); +} + +PartitionBucket * MapOutputCollector::getPartition(uint32_t partition) { + if (partition >= _numPartitions) { + return NULL; + } + return _buckets[partition]; +} + +/** + * Spill buffer to file + * @return Array of spill segments information + */ +void MapOutputCollector::sortPartitions(SortOrder orderType, SortAlgorithm sortType, + IFileWriter * writer, SortMetrics & metric) { + + uint32_t start_partition = 0; + uint32_t num_partition = _numPartitions; + if (orderType == GROUPBY) { + THROW_EXCEPTION(UnsupportException, "GROUPBY not supported"); + } + + uint64_t sortingTime = 0; + Timer timer; + uint64_t recordNum = 0; + + for (uint32_t i = 0; i < num_partition; i++) { + if (NULL != writer) { + writer->startPartition(); + } + PartitionBucket * pb = _buckets[start_partition + i]; + if (pb != NULL) { + recordNum += pb->getKVCount(); + if (orderType == FULLORDER) { + timer.reset(); + pb->sort(sortType); + sortingTime += timer.now() - timer.last(); + } + if (NULL != writer) { + pb->spill(writer); + } + } + if (NULL != writer) { + writer->endPartition(); + } + } + metric.sortTime = sortingTime; + metric.recordCount = recordNum; +} + +void MapOutputCollector::middleSpill(const std::string & spillOutput, + const std::string & indexFilePath) { + + uint64_t collecttime = _collectTimer.now() - _collectTimer.last(); + const uint64_t M = 1000000; //million + + if (spillOutput.empty()) { + THROW_EXCEPTION(IOException, "MapOutputCollector: Spill file path empty"); + } else { + OutputStream * fout = FileSystem::getLocal().create(spillOutput, true); + + IFileWriter * writer = new IFileWriter(fout, _spec.checksumType, _spec.keyType, _spec.valueType, + _spec.codec, _spilledRecords); + + Timer timer; + SortMetrics metrics; + sortPartitions(_spec.sortOrder, _spec.sortAlgorithm, writer, metrics); + + SingleSpillInfo * info = writer->getSpillInfo(); + info->path = spillOutput; + uint64_t spillTime = timer.now() - timer.last() - metrics.sortTime; + + LOG( + "[MapOutputCollector::mid_spill] Sort and spill: {spilled file path: %s, id: %d, collect: %llu ms, sort: %llu ms, spill: %llu ms, records: %llu, uncompressed total bytes: %llu, compressed total bytes: %llu}", + info->path.c_str(), _spillInfos.getSpillCount(), collecttime / M, metrics.sortTime / M, spillTime / M, + metrics.recordCount, info->getEndPosition(), info->getRealEndPosition()); + + if (indexFilePath.length() > 0) { + info->writeSpillInfo(indexFilePath); + delete info; + } else { + _spillInfos.add(info); + } + + delete writer; + delete fout; + + reset(); + _collectTimer.reset(); + } +} + +/** + * final merge and/or spill, use previous spilled + * file & in-memory data + */ +void MapOutputCollector::finalSpill(const std::string & filepath, + const std::string & idx_file_path) { + + const uint64_t M = 1000000; //million + LOG("[MapOutputCollector::final_merge_and_spill] Spilling file path: %s", filepath.c_str()); + + if (_spillInfos.getSpillCount() == 0) { + middleSpill(filepath, idx_file_path); + return; + } + + IFileWriter * writer = IFileWriter::create(filepath, _spec, _spilledRecords); + Merger * merger = new Merger(writer, _config, _keyComparator, _combineRunner); + + for (size_t i = 0; i < _spillInfos.getSpillCount(); i++) { + SingleSpillInfo * spill = _spillInfos.getSingleSpillInfo(i); + MergeEntryPtr pme = IFileMergeEntry::create(spill); + merger->addMergeEntry(pme); + } + + SortMetrics metrics; + sortPartitions(_spec.sortOrder, _spec.sortAlgorithm, NULL, metrics); + LOG("[MapOutputCollector::mid_spill] Sort final in memory kvs: {sort: %llu ms, records: %llu}", + metrics.sortTime / M, metrics.recordCount); + + merger->addMergeEntry(new MemoryMergeEntry(_buckets, _numPartitions)); + + Timer timer; + merger->merge(); + LOG( + "[MapOutputCollector::final_merge_and_spill] Merge and Spill:{spilled file id: %d, merge and spill time: %llu ms}", + _spillInfos.getSpillCount(), (timer.now() - timer.last()) / M); + + delete merger; + + // write index + SingleSpillInfo * spill_range = writer->getSpillInfo(); + spill_range->writeSpillInfo(idx_file_path); + delete spill_range; + _spillInfos.deleteAllSpillFiles(); + delete writer; + reset(); +} + +void MapOutputCollector::close() { + string * outputpath = _spillOutput->getOutputPath(); + string * indexpath = _spillOutput->getOutputIndexPath(); + + if ((outputpath->length() == 0) || (indexpath->length() == 0)) { + THROW_EXCEPTION(IOException, "Illegal(empty) map output file/index path"); + } + + finalSpill(*outputpath, *indexpath); + + delete outputpath; + delete indexpath; +} +} // namespace NativeTask + http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.h ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.h new file mode 100644 index 0000000..0692a33 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.h @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MAP_OUTPUT_COLLECTOR_H_ +#define MAP_OUTPUT_COLLECTOR_H_ + +#include "NativeTask.h" +#include "MemoryPool.h" +#include "Timer.h" +#include "Buffers.h" +#include "MapOutputSpec.h" +#include "IFile.h" +#include "SpillInfo.h" +#include "Combiner.h" +#include "PartitionBucket.h" +#include "lib/SpillOutputService.h" + +namespace NativeTask { +/** + * MapOutputCollector + */ + +struct SortMetrics { + uint64_t recordCount; + uint64_t sortTime; + +public: + SortMetrics() + : recordCount(0), sortTime(0) { + } +}; + +class CombineRunnerWrapper : public ICombineRunner { +private: + Config * _config; + ICombineRunner * _combineRunner; + bool _isJavaCombiner; + bool _combinerInited; + SpillOutputService * _spillOutput; + +public: + CombineRunnerWrapper(Config * config, SpillOutputService * service) + : _spillOutput(service), _config(config), _isJavaCombiner(false), _combineRunner(NULL), + _combinerInited(false) { + } + + ~CombineRunnerWrapper() { + if (!_isJavaCombiner) { + delete _combineRunner; + } + } + + virtual void combine(CombineContext type, KVIterator * kvIterator, IFileWriter * writer); + +private: + ICombineRunner * createCombiner(); +}; + +class MapOutputCollector { + static const uint32_t DEFAULT_MIN_BLOCK_SIZE = 16 * 1024; + static const uint32_t DEFAULT_MAX_BLOCK_SIZE = 4 * 1024 * 1024; + +private: + Config * _config; + + uint32_t _numPartitions; + PartitionBucket ** _buckets; + + ComparatorPtr _keyComparator; + + ICombineRunner * _combineRunner; + + Counter * _spilledRecords; + SpillOutputService * _spillOutput; + + uint32_t _defaultBlockSize; + + SpillInfos _spillInfos; + + MapOutputSpec _spec; + + Timer _collectTimer; + + MemoryPool * _pool; + +public: + MapOutputCollector(uint32_t num_partition, SpillOutputService * spillService); + + ~MapOutputCollector(); + + void configure(Config * config); + + /** + * collect one k/v pair + * @return true success; false buffer full, need spill + */ + bool collect(const void * key, uint32_t keylen, const void * value, uint32_t vallen, + uint32_t partitionId); + + KVBuffer * allocateKVBuffer(uint32_t partitionId, uint32_t kvlength); + + void close(); + +private: + void init(uint32_t maxBlockSize, uint32_t memory_capacity, ComparatorPtr keyComparator, + Counter * spilledRecord, ICombineRunner * combiner); + + void reset(); + + /** + * spill a range of partition buckets, prepare for future + * Parallel sort & spill, TODO: parallel sort & spill + */ + void sortPartitions(SortOrder orderType, SortAlgorithm sortType, IFileWriter * writer, + SortMetrics & metrics); + + ComparatorPtr getComparator(Config * config, MapOutputSpec & spec); + + inline uint32_t GetCeil(uint32_t v, uint32_t unit) { + return ((v + unit - 1) / unit) * unit; + } + + uint32_t getDefaultBlockSize(uint32_t memoryCapacity, uint32_t partitionNum, + uint32_t maxBlockSize) { + uint32_t defaultBlockSize = memoryCapacity / _numPartitions / 4; + defaultBlockSize = GetCeil(defaultBlockSize, DEFAULT_MIN_BLOCK_SIZE); + defaultBlockSize = std::min(defaultBlockSize, maxBlockSize); + return defaultBlockSize; + } + + PartitionBucket * getPartition(uint32_t partition); + + /** + * normal spill use options in _config + * @param filepaths: spill file path + */ + void middleSpill(const std::string & spillOutput, const std::string & indexFilePath); + + /** + * final merge and/or spill use options in _config, and + * previous spilled file & in-memory data + */ + void finalSpill(const std::string & filepath, const std::string & indexpath); +}; + +} //namespace NativeTask + +#endif /* MAP_OUTPUT_COLLECTOR_H_ */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputSpec.cc ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputSpec.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputSpec.cc new file mode 100644 index 0000000..5088206 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputSpec.cc @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "commons.h" +#include "MapOutputSpec.h" +#include "NativeTask.h" + +namespace NativeTask { + +void MapOutputSpec::getSpecFromConfig(Config * config, MapOutputSpec & spec) { + if (NULL == config) { + return; + } + spec.checksumType = CHECKSUM_CRC32; + string sortType = config->get(NATIVE_SORT_TYPE, "DUALPIVOTSORT"); + if (sortType == "DUALPIVOTSORT") { + spec.sortAlgorithm = DUALPIVOTSORT; + } else { + spec.sortAlgorithm = CPPSORT; + } + if (config->get(MAPRED_COMPRESS_MAP_OUTPUT, "false") == "true") { + spec.codec = config->get(MAPRED_MAP_OUTPUT_COMPRESSION_CODEC); + } else { + spec.codec = ""; + } + if (config->getBool(MAPRED_SORT_AVOID, false)) { + spec.sortOrder = NOSORT; + } else { + spec.sortOrder = FULLORDER; + } + const char * key_class = config->get(MAPRED_MAPOUTPUT_KEY_CLASS); + if (NULL == key_class) { + key_class = config->get(MAPRED_OUTPUT_KEY_CLASS); + } + if (NULL == key_class) { + THROW_EXCEPTION(IOException, "mapred.mapoutput.key.class not set"); + } + spec.keyType = JavaClassToKeyValueType(key_class); + const char * value_class = config->get(MAPRED_MAPOUTPUT_VALUE_CLASS); + if (NULL == value_class) { + value_class = config->get(MAPRED_OUTPUT_VALUE_CLASS); + } + if (NULL == value_class) { + THROW_EXCEPTION(IOException, "mapred.mapoutput.value.class not set"); + } + spec.valueType = JavaClassToKeyValueType(value_class); +} + +} // namespace NativeTask http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputSpec.h ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputSpec.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputSpec.h new file mode 100644 index 0000000..fac96ab --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputSpec.h @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MAPOUTPUTSPEC_H_ +#define MAPOUTPUTSPEC_H_ + +#include <string> +#include "Checksum.h" +#include "WritableUtils.h" +#include "NativeTask.h" + +namespace NativeTask { + +using std::string; + +/** + * internal sort method + */ +enum SortAlgorithm { + CQSORT = 0, + CPPSORT = 1, + DUALPIVOTSORT = 2, +}; + +/** + * spill file type + * INTERMEDIATE: a simple key/value sequence file + * IFILE: classic hadoop IFile + */ +enum OutputFileType { + INTERMEDIATE = 0, + IFILE = 1, +}; + +/** + * key/value recored order requirements + * FULLSORT: hadoop standard + * GROUPBY: same key are grouped together, but not in order + * NOSORT: no order at all + */ +enum SortOrder { + FULLORDER = 0, + GROUPBY = 1, + NOSORT = 2, +}; + +enum CompressionType { + PLAIN = 0, + SNAPPY = 1, +}; + +class MapOutputSpec { +public: + KeyValueType keyType; + KeyValueType valueType; + SortOrder sortOrder; + SortAlgorithm sortAlgorithm; + string codec; + ChecksumType checksumType; + + static void getSpecFromConfig(Config * config, MapOutputSpec & spec); +}; + +} // namespace NativeTask + +#endif /* MAPOUTPUTSPEC_H_ */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryBlock.cc ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryBlock.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryBlock.cc new file mode 100644 index 0000000..29f59b9 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryBlock.cc @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "NativeTask.h" +#include "commons.h" +#include "Timer.h" +#include "Buffers.h" +#include "MapOutputSpec.h" +#include "IFile.h" +#include "SpillInfo.h" +#include "Combiner.h" +#include "MapOutputSpec.h" + +#include "MemoryBlock.h" +#include "MemoryPool.h" +#include "util/DualPivotQuickSort.h" + +namespace NativeTask { + +class MemoryPool; + +MemoryBlock::MemoryBlock(char * pos, uint32_t size) + : _base(pos), _size(size), _position(0), _sorted(false) { +} + +KVBuffer * MemoryBlock::getKVBuffer(int index) { + if (index < 0 || index >= _kvOffsets.size()) { + return NULL; + } + uint32_t offset = _kvOffsets.at(index); + KVBuffer * kvbuffer = (KVBuffer*)(_base + offset); + return kvbuffer; +} + +void MemoryBlock::sort(SortAlgorithm type, ComparatorPtr comparator) { + if ((!_sorted) && (_kvOffsets.size() > 1)) { + switch (type) { + case CPPSORT: + std::sort(_kvOffsets.begin(), _kvOffsets.end(), ComparatorForStdSort(_base, comparator)); + break; + case DUALPIVOTSORT: { + DualPivotQuicksort(_kvOffsets, ComparatorForDualPivotSort(_base, comparator)); + } + break; + default: + THROW_EXCEPTION(UnsupportException, "Sort Algorithm not support"); + } + } + _sorted = true; +} +} //namespace NativeTask http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryBlock.h ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryBlock.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryBlock.h new file mode 100644 index 0000000..e97d98a --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryBlock.h @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "commons.h" + +#ifndef MEMORYBLOCK_H_ +#define MEMORYBLOCK_H_ + +namespace NativeTask { + +class MemoryPool; + +class ComparatorForDualPivotSort { +private: + const char * _base; + ComparatorPtr _keyComparator; +public: + ComparatorForDualPivotSort(const char * base, ComparatorPtr comparator) + : _base(base), _keyComparator(comparator) { + } + + inline int operator()(uint32_t lhs, uint32_t rhs) { + KVBuffer * left = (KVBuffer *)(_base + lhs); + KVBuffer * right = (KVBuffer *)(_base + rhs); + return (*_keyComparator)(left->content, left->keyLength, right->content, right->keyLength); + } +}; + +class ComparatorForStdSort { +private: + const char * _base; + ComparatorPtr _keyComparator; +public: + ComparatorForStdSort(const char * base, ComparatorPtr comparator) + : _base(base), _keyComparator(comparator) { + } + +public: + inline bool operator()(uint32_t lhs, uint32_t rhs) { + KVBuffer * left = (KVBuffer *)(_base + lhs); + KVBuffer * right = (KVBuffer *)(_base + rhs); + int ret = (*_keyComparator)(left->getKey(), left->keyLength, right->getKey(), right->keyLength); + return ret < 0; + } +}; + +class MemoryBlock { +private: + char * _base; + uint32_t _size; + uint32_t _position; + std::vector<uint32_t> _kvOffsets; + bool _sorted; + +public: + MemoryBlock(char * pos, uint32_t size); + + bool sorted() { + return _sorted; + } + + KVBuffer * allocateKVBuffer(uint32_t length) { + if (length > remainSpace()) { + LOG("Unable to allocate kv from memory buffer, length: %d, remain: %d", length, remainSpace()); + return NULL; + } + _sorted = false; + _kvOffsets.push_back(_position); + char * space = _base + _position; + _position += length; + return (KVBuffer *)space; + } + + uint32_t remainSpace() const { + return _size - _position; + } + + uint32_t getKVCount() { + return _kvOffsets.size(); + } + + KVBuffer * getKVBuffer(int index); + + void sort(SortAlgorithm type, ComparatorPtr comparator); +}; +//class MemoryBlock + +class MemBlockIterator { +private: + MemoryBlock * _memBlock; + uint32_t _end; + uint32_t _current; + KVBuffer * _kvBuffer; + +public: + + MemBlockIterator(MemoryBlock * memBlock) + : _memBlock(memBlock), _end(0), _current(0), _kvBuffer(NULL) { + _end = memBlock->getKVCount(); + } + + KVBuffer * getKVBuffer() { + return _kvBuffer; + } + + /** + * move to next key/value + * 0 on success + * 1 on no more + */ + bool next() { + if (_current >= _end) { + return false; + } + this->_kvBuffer = _memBlock->getKVBuffer(_current); + ++_current; + return true; + } +}; +//class MemoryBlockIterator + +typedef MemBlockIterator * MemBlockIteratorPtr; + +class MemBlockComparator { +private: + ComparatorPtr _keyComparator; + +public: + MemBlockComparator(ComparatorPtr comparator) + : _keyComparator(comparator) { + } + +public: + bool operator()(const MemBlockIteratorPtr lhs, const MemBlockIteratorPtr rhs) { + + KVBuffer * left = lhs->getKVBuffer(); + KVBuffer * right = rhs->getKVBuffer(); + + //Treat NULL as infinite MAX, so that we can pop out next value + if (NULL == left) { + return false; + } + + if (NULL == right) { + return true; + } + + return (*_keyComparator)(left->content, left->keyLength, right->content, right->keyLength) < 0; + } +}; + +} //namespace NativeTask + +#endif /* MEMORYBLOCK_H_ */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryPool.h ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryPool.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryPool.h new file mode 100644 index 0000000..0303fb2 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryPool.h @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MEMORYPOOL_H_ +#define MEMORYPOOL_H_ + +#include "Buffers.h" +#include "MapOutputSpec.h" +#include "NativeTask.h" +#include "util/StringUtil.h" + +namespace NativeTask { + +/** + * Class for allocating memory buffer + */ + +class MemoryPool { +private: + char * _base; + uint32_t _capacity; + uint32_t _used; + +public: + + MemoryPool() + : _base(NULL), _capacity(0), _used(0) { + } + + ~MemoryPool() { + if (NULL != _base) { + free(_base); + _base = NULL; + } + } + + void init(uint32_t capacity) throw (OutOfMemoryException) { + if (capacity > _capacity) { + if (NULL != _base) { + free(_base); + _base = NULL; + } + _base = (char*)malloc(capacity); + if (NULL == _base) { + THROW_EXCEPTION(OutOfMemoryException, "Not enough memory to init MemoryBlockPool"); + } + _capacity = capacity; + } + reset(); + } + + void reset() { + _used = 0; + } + + char * allocate(uint32_t min, uint32_t expect, uint32_t & allocated) { + if (_used + min > _capacity) { + return NULL; + } else if (_used + expect > _capacity) { + char * buff = _base + _used; + allocated = min; + _used += min; + return buff; + } else { + char * buff = _base + _used; + allocated = expect; + _used += expect; + return buff; + } + } +}; + +} // namespace NativeTask + +#endif /* MEMORYPOOL_H_ */