arpadboda commented on a change in pull request #489: MINIFICPP-740: Add ability to run NiFi processors from Java and Python URL: https://github.com/apache/nifi-minifi-cpp/pull/489#discussion_r260265506
########## File path: extensions/jni/jvm/JniReferenceObjects.h ########## @@ -0,0 +1,374 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef EXTENSIONS_JNI_JVM_REFERNCEOBJECTS_H_ +#define EXTENSIONS_JNI_JVM_REFERNCEOBJECTS_H_ + +#include <string> +#include <vector> +#include <sstream> +#include <iterator> +#include <algorithm> +#include <jni.h> +#include "JavaServicer.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/WeakReference.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace jni { + +/** + * Represents a flow file. Exists to provide the ability to remove + * global references amongst the + */ +class JniFlowFile : public core::WeakReference { + public: + JniFlowFile(std::shared_ptr<core::FlowFile> ref, const std::shared_ptr<JavaServicer> &servicer, jobject ff) + : removed(false), + ff_object(ff), + ref_(ref), + servicer_(servicer) { + + } + + virtual ~JniFlowFile() { + + } + + virtual void remove() override; + + std::shared_ptr<core::FlowFile> get() { + return ref_; + } + + jobject getJniReference() { + return ff_object; + } + + bool operator==(const JniFlowFile &other) { + // compare the pointers + return ref_ == other.ref_; + } + + bool empty() { + return removed; + } + + protected: + + bool removed; + + jobject ff_object; + + std::mutex session_mutex_; + + std::shared_ptr<core::FlowFile> ref_; + + std::shared_ptr<JavaServicer> servicer_; + +}; + +/** + * Quick check to determine if a FF is empty. + */ +struct check_empty_ff : public std::unary_function<std::shared_ptr<JniFlowFile>, bool> { + bool operator()(std::shared_ptr<JniFlowFile> session) const { + return session->empty(); + } +}; + +class JniByteOutStream : public minifi::OutputStreamCallback { + public: + JniByteOutStream(jbyte *bytes, size_t length) + : bytes_(bytes), + length_(length) { + + } + + virtual ~JniByteOutStream() { + + } + virtual int64_t process(std::shared_ptr<minifi::io::BaseStream> stream) { + return stream->write((uint8_t*) bytes_, length_); + } + private: + jbyte *bytes_; + size_t length_; +}; + +/** + * Jni byte input stream + */ +class JniByteInputStream : public minifi::InputStreamCallback { + public: + JniByteInputStream(uint64_t size) + : read_size_(0), + stream_(nullptr) { + buffer_size_ = size; + buffer_ = new uint8_t[buffer_size_]; + } + ~JniByteInputStream() { + if (buffer_) + delete[] buffer_; + } + int64_t process(std::shared_ptr<minifi::io::BaseStream> stream) { + stream_ = stream; + return 0; + } + + int64_t read(JNIEnv *env, jbyteArray arr, int offset, int size) { + if (stream_ == nullptr) { + return -1; + } + + // seek to offset + int remaining = size; + int writtenOffset = 0; + int read = 0; + do { + + int actual = (int) stream_->read(buffer_, remaining <= buffer_size_ ? remaining : buffer_size_); + if (actual <= 0) { + if (read == 0) { + stream_ = nullptr; + return -1; + } + break; + } + + read += actual; + env->SetByteArrayRegion(arr, offset + writtenOffset, actual, (jbyte*) buffer_); + writtenOffset += (int) actual; + + remaining -= actual; + + } while (remaining > 0); + + return read; + } + + int64_t read(char &arr) { + return stream_->read(arr); + } + + std::shared_ptr<minifi::io::BaseStream> stream_; + uint8_t *buffer_; + uint64_t buffer_size_; + uint64_t read_size_; +}; + +class JniInputStream : public core::WeakReference { + public: + JniInputStream(std::unique_ptr<JniByteInputStream> jbi, jobject in_instance, const std::shared_ptr<JavaServicer> &servicer) + : removed_(false), + jbi_(std::move(jbi)), + in_instance_(in_instance), + servicer_(servicer) { + + } + + virtual void remove() override { + std::lock_guard<std::mutex> guard(mutex_); + if (!removed_) { + servicer_->attach()->DeleteGlobalRef(in_instance_); + removed_ = true; + jbi_ = nullptr; + } + + } + + int64_t read(char &arr) { + if (!removed_) { + return jbi_->read(arr); + } + return -1; + } + + int64_t read(JNIEnv *env, jbyteArray arr, int offset, int size) { + if (!removed_) { + return jbi_->read(env, arr, offset, size); + } + return -1; + } + + private: + std::mutex mutex_; + bool removed_; + jobject in_instance_; + std::unique_ptr<JniByteInputStream> jbi_; + std::shared_ptr<JavaServicer> servicer_; +}; + +class JniSession : public core::WeakReference { + public: + JniSession(const std::shared_ptr<core::ProcessSession> &session, jobject session_instance, const std::shared_ptr<JavaServicer> &servicer) + : removed_(false), + session_(session), + servicer_(servicer), + session_instance_(session_instance) { + } + + virtual void remove() override { + std::lock_guard<std::mutex> guard(session_mutex_); + if (!removed_) { + for (auto ff : global_ff_objects_) { + ff->remove(); + } + + for (auto in : input_streams_) { + in->remove(); + } + global_ff_objects_.clear(); + servicer_->attach()->DeleteGlobalRef(session_instance_); + removed_ = true; + } + + } + + std::shared_ptr<core::ProcessSession> &getSession() { + return session_; + } + + JniFlowFile *getFlowFileReference(const std::shared_ptr<core::FlowFile> &ff) { + for (auto &jni_ff : global_ff_objects_) { + if (jni_ff->get().get() == ff.get()) { + return jni_ff.get(); + } + } + return nullptr; + } + + JniFlowFile * addFlowFile(std::shared_ptr<JniFlowFile> ff) { + std::lock_guard<std::mutex> guard(session_mutex_); + global_ff_objects_.push_back(ff); + return ff.get(); + } + + void addInputStream(std::shared_ptr<JniInputStream> in) { + std::lock_guard<std::mutex> guard(session_mutex_); + input_streams_.push_back(in); + } + + std::shared_ptr<JavaServicer> getServicer() { + return servicer_; + } + + bool prune() { + global_ff_objects_.erase(std::remove_if(global_ff_objects_.begin(), global_ff_objects_.end(), check_empty_ff()), global_ff_objects_.end()); + if (global_ff_objects_.empty()) { + remove(); + } + return global_ff_objects_.empty(); + } + bool empty() { + std::lock_guard<std::mutex> guard(session_mutex_); + for (auto ff : global_ff_objects_) { + if (!ff->empty()) + return false; + } + return true; + } + + protected: + bool removed_; + std::mutex session_mutex_; Review comment: Can we make this mutable and make some member funcs const? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
