http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Streams.cc ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Streams.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Streams.cc new file mode 100644 index 0000000..c2d64c9 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Streams.cc @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "commons.h" +#include "Checksum.h" +#include "Streams.h" + +namespace NativeTask { + +///////////////////////////////////////////////////////////// + +void InputStream::seek(uint64_t position) { + THROW_EXCEPTION(UnsupportException, "seek not support"); +} + +uint64_t InputStream::tell() { + THROW_EXCEPTION(UnsupportException, "tell not support"); +} + +int32_t InputStream::readFully(void * buff, uint32_t length) { + int32_t ret = 0; + while (length > 0) { + int32_t rd = read(buff, length); + if (rd <= 0) { + return ret > 0 ? ret : -1; + } + ret += rd; + buff = ((char *)buff) + rd; + length -= rd; + } + return ret; +} + +void InputStream::readAllTo(OutputStream & out, uint32_t bufferHint) { + char * buffer = new char[bufferHint]; + while (true) { + int32_t rd = read(buffer, bufferHint); + if (rd <= 0) { + break; + } + out.write(buffer, rd); + } + delete buffer; +} + +///////////////////////////////////////////////////////////// + +uint64_t OutputStream::tell() { + THROW_EXCEPTION(UnsupportException, "tell not support"); +} + +/////////////////////////////////////////////////////////// + +ChecksumInputStream::ChecksumInputStream(InputStream * stream, ChecksumType type) + : FilterInputStream(stream), _type(type), _limit(-1) { + resetChecksum(); +} + +void ChecksumInputStream::resetChecksum() { + _checksum = Checksum::init(_type); +} + +uint32_t ChecksumInputStream::getChecksum() { + return Checksum::getValue(_type, _checksum); +} + +int32_t ChecksumInputStream::read(void * buff, uint32_t length) { + if (_limit < 0) { + int32_t ret = _stream->read(buff, length); + if (ret > 0) { + Checksum::update(_type, _checksum, buff, ret); + } + return ret; + } else if (_limit == 0) { + return -1; + } else { + int64_t rd = _limit < length ? _limit : length; + int32_t ret = _stream->read(buff, rd); + if (ret > 0) { + _limit -= ret; + Checksum::update(_type, _checksum, buff, ret); + } + return ret; + } +} + +/////////////////////////////////////////////////////////// + +ChecksumOutputStream::ChecksumOutputStream(OutputStream * stream, ChecksumType type) + : FilterOutputStream(stream), _type(type) { + resetChecksum(); +} + +void ChecksumOutputStream::resetChecksum() { + _checksum = Checksum::init(_type); +} + +uint32_t ChecksumOutputStream::getChecksum() { + return Checksum::getValue(_type, _checksum); +} + +void ChecksumOutputStream::write(const void * buff, uint32_t length) { + Checksum::update(_type, _checksum, buff, length); + _stream->write(buff, length); +} + +} // namespace NativeTask
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Streams.h ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Streams.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Streams.h new file mode 100644 index 0000000..199762b --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Streams.h @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef STREAMS_H_ +#define STREAMS_H_ + +#include "util/Checksum.h" + +namespace NativeTask { + +class OutputStream; + +class InputStream { +public: + InputStream() { + } + + virtual ~InputStream() { + } + + virtual void seek(uint64_t position); + + virtual uint64_t tell(); + + virtual int32_t read(void * buff, uint32_t length) { + return -1; + } + + virtual void close() { + } + + virtual int32_t readFully(void * buff, uint32_t length); + + void readAllTo(OutputStream & out, uint32_t bufferHint = 1024 * 4); +}; + +class OutputStream { +public: + OutputStream() { + } + + virtual ~OutputStream() { + } + + virtual uint64_t tell(); + + virtual void write(const void * buff, uint32_t length) { + } + + virtual void flush() { + } + + virtual void close() { + } +}; + +class FilterInputStream : public InputStream { +protected: + InputStream * _stream; +public: + FilterInputStream(InputStream * stream) + : _stream(stream) { + } + + virtual ~FilterInputStream() { + } + + void setStream(InputStream * stream) { + _stream = stream; + } + + InputStream * getStream() { + return _stream; + } + + virtual void seek(uint64_t position) { + _stream->seek(position); + } + + virtual uint64_t tell() { + return _stream->tell(); + } + + virtual int32_t read(void * buff, uint32_t length) { + return _stream->read(buff, length); + } +}; + +class FilterOutputStream : public OutputStream { +protected: + OutputStream * _stream; +public: + FilterOutputStream(OutputStream * stream) + : _stream(stream) { + } + + virtual ~FilterOutputStream() { + } + + void setStream(OutputStream * stream) { + _stream = stream; + } + + OutputStream * getStream() { + return _stream; + } + + virtual uint64_t tell() { + return _stream->tell(); + } + + virtual void write(const void * buff, uint32_t length) { + _stream->write(buff, length); + } + + virtual void flush() { + _stream->flush(); + } + + virtual void close() { + flush(); + } +}; + +class LimitInputStream : public FilterInputStream { +protected: + int64_t _limit; +public: + LimitInputStream(InputStream * stream, int64_t limit) + : FilterInputStream(stream), _limit(limit) { + } + + virtual ~LimitInputStream() { + } + + int64_t getLimit() { + return _limit; + } + + void setLimit(int64_t limit) { + _limit = limit; + } + + virtual int32_t read(void * buff, uint32_t length) { + if (_limit < 0) { + return _stream->read(buff, length); + } else if (_limit == 0) { + return -1; + } else { + int64_t rd = _limit < length ? _limit : length; + int32_t ret = _stream->read(buff, rd); + if (ret > 0) { + _limit -= ret; + } + return ret; + } + } +}; + +class ChecksumInputStream : public FilterInputStream { +protected: + ChecksumType _type; + uint32_t _checksum; + int64_t _limit; +public: + ChecksumInputStream(InputStream * stream, ChecksumType type); + + virtual ~ChecksumInputStream() { + } + + int64_t getLimit() { + return _limit; + } + + void setLimit(int64_t limit) { + _limit = limit; + } + + void resetChecksum(); + + uint32_t getChecksum(); + + virtual int32_t read(void * buff, uint32_t length); +}; + +class ChecksumOutputStream : public FilterOutputStream { +protected: + ChecksumType _type; + uint32_t _checksum; +public: + ChecksumOutputStream(OutputStream * stream, ChecksumType type); + + virtual ~ChecksumOutputStream() { + } + + void resetChecksum(); + + uint32_t getChecksum(); + + virtual void write(const void * buff, uint32_t length); + +}; + +} // namespace NativeTask + +#endif /* STREAMS_H_ */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.cc ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.cc new file mode 100644 index 0000000..7aa7db8 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.cc @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "TaskCounters.h" + +namespace NativeTask { + +#define DEFINE_COUNTER(name) const char * TaskCounters::name = #name; + +const char * TaskCounters::TASK_COUNTER_GROUP = "org.apache.hadoop.mapred.Task$Counter"; + +DEFINE_COUNTER(MAP_INPUT_RECORDS) +DEFINE_COUNTER(MAP_OUTPUT_RECORDS) +DEFINE_COUNTER(MAP_SKIPPED_RECORDS) +DEFINE_COUNTER(MAP_INPUT_BYTES) +DEFINE_COUNTER(MAP_OUTPUT_BYTES) +DEFINE_COUNTER(MAP_OUTPUT_MATERIALIZED_BYTES) +DEFINE_COUNTER(COMBINE_INPUT_RECORDS) +DEFINE_COUNTER(COMBINE_OUTPUT_RECORDS) +DEFINE_COUNTER(REDUCE_INPUT_GROUPS) +DEFINE_COUNTER(REDUCE_SHUFFLE_BYTES) +DEFINE_COUNTER(REDUCE_INPUT_RECORDS) +DEFINE_COUNTER(REDUCE_OUTPUT_RECORDS) +DEFINE_COUNTER(REDUCE_SKIPPED_GROUPS) +DEFINE_COUNTER(REDUCE_SKIPPED_RECORDS) +DEFINE_COUNTER(SPILLED_RECORDS) + +const char * TaskCounters::FILESYSTEM_COUNTER_GROUP = "FileSystemCounters"; + +DEFINE_COUNTER(FILE_BYTES_READ) +DEFINE_COUNTER(FILE_BYTES_WRITTEN) +; + +} // namespace NativeTask http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.h ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.h new file mode 100644 index 0000000..6afc207 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.h @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef TASKCOUNTERS_H_ +#define TASKCOUNTERS_H_ + +namespace NativeTask { + +class TaskCounters { +public: + static const char * TASK_COUNTER_GROUP; + + static const char * MAP_INPUT_RECORDS; + static const char * MAP_OUTPUT_RECORDS; + static const char * MAP_SKIPPED_RECORDS; + static const char * MAP_INPUT_BYTES; + static const char * MAP_OUTPUT_BYTES; + static const char * MAP_OUTPUT_MATERIALIZED_BYTES; + static const char * COMBINE_INPUT_RECORDS; + static const char * COMBINE_OUTPUT_RECORDS; + static const char * REDUCE_INPUT_GROUPS; + static const char * REDUCE_SHUFFLE_BYTES; + static const char * REDUCE_INPUT_RECORDS; + static const char * REDUCE_OUTPUT_RECORDS; + static const char * REDUCE_SKIPPED_GROUPS; + static const char * REDUCE_SKIPPED_RECORDS; + static const char * SPILLED_RECORDS; + + static const char * FILESYSTEM_COUNTER_GROUP; + + static const char * FILE_BYTES_READ; + static const char * FILE_BYTES_WRITTEN; +}; + +} // namespace NativeTask + +#endif /* TASKCOUNTERS_H_ */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TrackingCollector.h ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TrackingCollector.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TrackingCollector.h new file mode 100644 index 0000000..3f96faf --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TrackingCollector.h @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef TRACKING_COLLECTOR_H +#define TRACKING_COLLECTOR_H + +#include <stdint.h> +#include <string> + +namespace NativeTask { + +class TrackingCollector : public Collector { +protected: + Collector * _collector; + Counter * _counter; +public: + TrackingCollector(Collector * collector, Counter * counter) + : _collector(collector), _counter(counter) { + } + + virtual void collect(const void * key, uint32_t keyLen, const void * value, uint32_t valueLen) { + _counter->increase(); + _collector->collect(key, keyLen, value, valueLen); + } + + virtual void collect(const void * key, uint32_t keyLen, const void * value, uint32_t valueLen, + int32_t partition) { + _counter->increase(); + _collector->collect(key, keyLen, value, valueLen, partition); + } +}; + +} //namespace NativeTask + +#endif //TRACKING_COLLECTOR_H http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/commons.h ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/commons.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/commons.h new file mode 100644 index 0000000..07b2cf1 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/commons.h @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef COMMONS_H_ +#define COMMONS_H_ + +#include <sys/types.h> +#include <sys/stat.h> +#include <stdint.h> +#include <stdlib.h> +#include <assert.h> +#include <string.h> +#include <stdio.h> +#include <memory.h> +#include <fcntl.h> + +#include <limits> +#include <string> +#include <vector> +#include <list> +#include <set> +#include <map> +#include <algorithm> + +#include "primitives.h" +#include "Log.h" +#include "NativeTask.h" + +#include "Constants.h" + +#include "Iterator.h" +#include "TrackingCollector.h" + +#endif /* COMMONS_H_ */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/jniutils.cc ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/jniutils.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/jniutils.cc new file mode 100644 index 0000000..01eaa57 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/jniutils.cc @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "commons.h" +#include "util/StringUtil.h" +#include "util/SyncUtils.h" +#include "jniutils.h" + +using namespace NativeTask; + +JavaVM * JNU_GetJVM(void) { + static JavaVM * gJVM = NULL; + static Lock GJVMLock; + if (gJVM != NULL) { + return gJVM; + } + { + ScopeLock<Lock> autolock(GJVMLock); + if (gJVM == NULL) { + jint rv = 0; + jint noVMs = 0; + rv = JNI_GetCreatedJavaVMs(&gJVM, 1, &noVMs); + if (rv != 0) { + THROW_EXCEPTION(NativeTask::HadoopException, "JNI_GetCreatedJavaVMs failed"); + } + if (noVMs == 0) { + char *hadoopClassPath = getenv("CLASSPATH"); + if (hadoopClassPath == NULL) { + THROW_EXCEPTION(NativeTask::HadoopException, "Environment variable CLASSPATH not set!"); + return NULL; + } + const char *hadoopClassPathVMArg = "-Djava.class.path="; + size_t optHadoopClassPathLen = strlen(hadoopClassPath) + strlen(hadoopClassPathVMArg) + 1; + char *optHadoopClassPath = (char*)malloc(sizeof(char) * optHadoopClassPathLen); + snprintf(optHadoopClassPath, optHadoopClassPathLen, "%s%s", hadoopClassPathVMArg, + hadoopClassPath); + int noArgs = 1; + JavaVMOption options[noArgs]; + options[0].optionString = optHadoopClassPath; + + //Create the VM + JavaVMInitArgs vm_args; + vm_args.version = JNI_VERSION_1_6; + vm_args.options = options; + vm_args.nOptions = noArgs; + vm_args.ignoreUnrecognized = 1; + JNIEnv * jenv; + rv = JNI_CreateJavaVM(&gJVM, (void**)&jenv, &vm_args); + if (rv != 0) { + THROW_EXCEPTION(NativeTask::HadoopException, "JNI_CreateJavaVM failed"); + return NULL; + } + free(optHadoopClassPath); + } + } + } + return gJVM; +} + +JNIEnv* JNU_GetJNIEnv(void) { + JNIEnv * env; + jint rv = JNU_GetJVM()->AttachCurrentThread((void **)&env, NULL); + if (rv != 0) { + THROW_EXCEPTION(NativeTask::HadoopException, "Call to AttachCurrentThread failed"); + } + return env; +} + +void JNU_AttachCurrentThread() { + JNU_GetJNIEnv(); +} + +void JNU_DetachCurrentThread() { + jint rv = JNU_GetJVM()->DetachCurrentThread(); + if (rv != 0) { + THROW_EXCEPTION(NativeTask::HadoopException, "Call to DetachCurrentThread failed"); + } +} + +void JNU_ThrowByName(JNIEnv *jenv, const char *name, const char *msg) { + jclass cls = jenv->FindClass(name); + if (cls != NULL) { + jenv->ThrowNew(cls, msg); + } + jenv->DeleteLocalRef(cls); +} + +std::string JNU_ByteArrayToString(JNIEnv * jenv, jbyteArray src) { + if (NULL != src) { + jsize len = jenv->GetArrayLength(src); + std::string ret(len, '\0'); + jenv->GetByteArrayRegion(src, 0, len, (jbyte*)ret.data()); + return ret; + } + return std::string(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/jniutils.h ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/jniutils.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/jniutils.h new file mode 100644 index 0000000..45c4fda --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/jniutils.h @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef JNIUTILS_H_ +#define JNIUTILS_H_ + +#include <string> +#include <jni.h> + +/** + * Get current JavaVM, if none then try to create one. + */ +JavaVM * JNU_GetJVM(void); + +/** + * Get JNIEnv for current thread. + */ +JNIEnv* JNU_GetJNIEnv(void); + +/** + * Attach currentThread, same effect as JNU_GetJNIEnv. + */ +void JNU_AttachCurrentThread(); + +/** + * Detach current thread, call it if current thread + * is created in native side and have called + * JNU_AttachCurrentThread before + */ +void JNU_DetachCurrentThread(); + +/** + * Throw a java exception. + */ +void JNU_ThrowByName(JNIEnv *jenv, const char *name, const char *msg); + +/** + * Convert a java byte array to c++ std::string + */ +std::string JNU_ByteArrayToString(JNIEnv * jenv, jbyteArray src); + +#endif /* JNIUTILS_H_ */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/org_apache_hadoop_mapred_nativetask_NativeRuntime.h ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/org_apache_hadoop_mapred_nativetask_NativeRuntime.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/org_apache_hadoop_mapred_nativetask_NativeRuntime.h new file mode 100644 index 0000000..6d780cf --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/org_apache_hadoop_mapred_nativetask_NativeRuntime.h @@ -0,0 +1,58 @@ +/* DO NOT EDIT THIS FILE - it is machine generated */ +#include <jni.h> +/* Header for class org_apache_hadoop_mapred_nativetask_NativeRuntime */ + +#ifndef _Included_org_apache_hadoop_mapred_nativetask_NativeRuntime +#define _Included_org_apache_hadoop_mapred_nativetask_NativeRuntime +#ifdef __cplusplus +extern "C" { +#endif +/* + * Class: org_apache_hadoop_mapred_nativetask_NativeRuntime + * Method: JNIRelease + * Signature: ()V + */JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIRelease(JNIEnv *, + jclass); + +/* + * Class: org_apache_hadoop_mapred_nativetask_NativeRuntime + * Method: JNIConfigure + * Signature: ([[B)V + */JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIConfigure( + JNIEnv *, jclass, jobjectArray); +/* + * Class: org_apache_hadoop_mapred_nativetask_NativeRuntime + * Method: JNICreateNativeObject + * Signature: ([B)J + */JNIEXPORT jlong JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNICreateNativeObject( + JNIEnv *, jclass, jbyteArray); +/* + * Class: org_apache_hadoop_mapred_nativetask_NativeRuntime + * Method: JNICreateDefaultNativeObject + * Signature: ([B)J + */JNIEXPORT jlong JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNICreateDefaultNativeObject( + JNIEnv *, jclass, jbyteArray); + +/* + * Class: org_apache_hadoop_mapred_nativetask_NativeRuntime + * Method: JNIReleaseNativeObject + * Signature: (J)V + */JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIReleaseNativeObject( + JNIEnv *, jclass, jlong); +/* + * Class: org_apache_hadoop_mapred_nativetask_NativeRuntime + * Method: JNIRegisterModule + * Signature: ([B[B)I + */JNIEXPORT jint JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIRegisterModule( + JNIEnv *, jclass, jbyteArray, jbyteArray); +/* + * Class: org_apache_hadoop_mapred_nativetask_NativeRuntime + * Method: JNIUpdateStatus + * Signature: ()[B + */JNIEXPORT jbyteArray JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIUpdateStatus( + JNIEnv *, jclass); + +#ifdef __cplusplus +} +#endif +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/primitives.h ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/primitives.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/primitives.h new file mode 100644 index 0000000..4c0c1a7 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/primitives.h @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * High performance primitive functions + * + **/ + +#ifndef PRIMITIVES_H_ +#define PRIMITIVES_H_ + +#include <stddef.h> +#include <stdint.h> +#include <assert.h> +#include <string> + +#ifdef __GNUC__ +#define likely(x) __builtin_expect((x),1) +#define unlikely(x) __builtin_expect((x),0) +#else +#define likely(x) (x) +#define unlikely(x) (x) +#endif + +//#define SIMPLE_MEMCPY + +#if !defined(SIMPLE_MEMCPY) +#define simple_memcpy memcpy +#define simple_memcpy2 memcpy +#else + +/** + * This memcpy assumes src & dest are not overlapped, + * and len are normally very small(<64) + * This function is primarily optimized for x86-64 processors, + * on which unaligned 64-bit loads and stores are cheap + * + * @param dest: dest buffer + * @param src: src buffer + * @param len: src buffer size, must be >0 + */ +inline void simple_memcpy(void * dest, const void * src, size_t len) { + const uint8_t * src8 = (const uint8_t*)src; + uint8_t * dest8 = (uint8_t*)dest; + switch (len) { + case 0: + return; + case 1: + dest8[0]=src8[0]; + return; + case 2: + *(uint16_t*)dest8=*(const uint16_t*)src8; + return; + case 3: + *(uint16_t*)dest8 = *(const uint16_t*)src8; + dest8[2]=src8[2]; + return; + case 4: + *(uint32_t*)dest8 = *(const uint32_t*)src8; + return; + } + if (len<8) { + *(uint32_t*)dest8 = *(const uint32_t*)src8; + *(uint32_t*)(dest8+len-4) = *(const uint32_t*)(src8+len-4); + return; + } + if (len<128) { + int64_t cur = (int64_t)len - 8; + while (cur>0) { + *(uint64_t*)(dest8+cur) = *(const uint64_t*)(src8+cur); + cur -= 8; + } + *(uint64_t*)(dest8) = *(const uint64_t*)(src8); + return; + } + ::memcpy(dest, src, len); +} + +#endif + +/** + * little-endian to big-endian or vice versa + */ +inline uint32_t bswap(uint32_t val) { + __asm__("bswap %0" : "=r" (val) : "0" (val)); + return val; +} + +inline uint64_t bswap64(uint64_t val) { +#ifdef __X64 + __asm__("bswapq %0" : "=r" (val) : "0" (val)); +#else + + uint64_t lower = val & 0xffffffffU; + uint32_t higher = (val >> 32) & 0xffffffffU; + + lower = bswap(lower); + higher = bswap(higher); + + return (lower << 32) + higher; + +#endif + return val; +} + +/** + * Fast memcmp + */ +inline int64_t fmemcmp(const char * src, const char * dest, uint32_t len) { + +#ifdef BUILDIN_MEMCMP + return memcmp(src, dest, len); +#else + + const uint8_t * src8 = (const uint8_t*)src; + const uint8_t * dest8 = (const uint8_t*)dest; + switch (len) { + case 0: + return 0; + case 1: + return (int64_t)src8[0] - (int64_t)dest8[0]; + case 2: { + int64_t ret = ((int64_t)src8[0] - (int64_t)dest8[0]); + if (ret) + return ret; + return ((int64_t)src8[1] - (int64_t)dest8[1]); + } + case 3: { + int64_t ret = ((int64_t)src8[0] - (int64_t)dest8[0]); + if (ret) + return ret; + ret = ((int64_t)src8[1] - (int64_t)dest8[1]); + if (ret) + return ret; + return ((int64_t)src8[2] - (int64_t)dest8[2]); + } + case 4: { + return (int64_t)bswap(*(uint32_t*)src) - (int64_t)bswap(*(uint32_t*)dest); + } + } + if (len < 8) { + int64_t ret = ((int64_t)bswap(*(uint32_t*)src) - (int64_t)bswap(*(uint32_t*)dest)); + if (ret) { + return ret; + } + return ((int64_t)bswap(*(uint32_t*)(src + len - 4)) + - (int64_t)bswap(*(uint32_t*)(dest + len - 4))); + } + uint32_t cur = 0; + uint32_t end = len & (0xffffffffU << 3); + while (cur < end) { + uint64_t l = *(uint64_t*)(src8 + cur); + uint64_t r = *(uint64_t*)(dest8 + cur); + if (l != r) { + l = bswap64(l); + r = bswap64(r); + return l > r ? 1 : -1; + } + cur += 8; + } + uint64_t l = *(uint64_t*)(src8 + len - 8); + uint64_t r = *(uint64_t*)(dest8 + len - 8); + if (l != r) { + l = bswap64(l); + r = bswap64(r); + return l > r ? 1 : -1; + } + return 0; +#endif +} + +inline int64_t fmemcmp(const char * src, const char * dest, uint32_t srcLen, uint32_t destLen) { + uint32_t minlen = srcLen < destLen ? srcLen : destLen; + int64_t ret = fmemcmp(src, dest, minlen); + if (ret) { + return ret; + } + return (int64_t)srcLen - (int64_t)destLen; +} + +/** + * Fast memory equal + */ +inline bool fmemeq(const char * src, const char * dest, uint32_t len) { +#ifdef BUILDIN_MEMCMP + return 0 == memcmp(src, dest, len); +#else + + const uint8_t * src8 = (const uint8_t*)src; + const uint8_t * dest8 = (const uint8_t*)dest; + switch (len) { + case 0: + return true; + case 1: + return src8[0] == dest8[0]; + case 2: + return *(uint16_t*)src8 == *(uint16_t*)dest8; + case 3: + return (*(uint16_t*)src8 == *(uint16_t*)dest8) && (src8[2] == dest8[2]); + case 4: + return *(uint32_t*)src8 == *(uint32_t*)dest8; + } + if (len < 8) { + return (*(uint32_t*)src8 == *(uint32_t*)dest8) + && (*(uint32_t*)(src8 + len - 4) == *(uint32_t*)(dest8 + len - 4)); + } + uint32_t cur = 0; + uint32_t end = len & (0xffffffff << 3); + while (cur < end) { + uint64_t l = *(uint64_t*)(src8 + cur); + uint64_t r = *(uint64_t*)(dest8 + cur); + if (l != r) { + return false; + } + cur += 8; + } + uint64_t l = *(uint64_t*)(src8 + len - 8); + uint64_t r = *(uint64_t*)(dest8 + len - 8); + if (l != r) { + return false; + } + return true; +#endif +} + +inline bool fmemeq(const char * src, uint32_t srcLen, const char * dest, uint32_t destLen) { + if (srcLen != destLen) { + return false; + } + return fmemeq(src, dest, std::min(srcLen, destLen)); +} + +/** + * Fast memory equal, reverse order + */ +inline bool frmemeq(const char * src, const char * dest, uint32_t len) { + const uint8_t * src8 = (const uint8_t*)src; + const uint8_t * dest8 = (const uint8_t*)dest; + switch (len) { + case 0: + return true; + case 1: + return src8[0] == dest8[0]; + case 2: + return *(uint16_t*)src8 == *(uint16_t*)dest8; + case 3: + return (src8[2] == dest8[2]) && (*(uint16_t*)src8 == *(uint16_t*)dest8); + case 4: + return *(uint32_t*)src8 == *(uint32_t*)dest8; + } + if (len < 8) { + return (*(uint32_t*)(src8 + len - 4) == *(uint32_t*)(dest8 + len - 4)) + && (*(uint32_t*)src8 == *(uint32_t*)dest8); + } + int32_t cur = (int32_t)len - 8; + while (cur > 0) { + if (*(uint64_t*)(src8 + cur) != *(uint64_t*)(dest8 + cur)) { + return false; + } + cur -= 8; + } + return *(uint64_t*)(src8) == *(uint64_t*)(dest8); +} + +inline bool frmemeq(const char * src, const char * dest, uint32_t srcLen, uint32_t destLen) { + if (srcLen != destLen) { + return false; + } + return frmemeq(src, dest, std::min(srcLen, destLen)); +} + +#endif /* PRIMITIVES_H_ */