Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/158#discussion_r147740763 --- Diff: extensions/http-curl/client/HTTPCallback.h --- @@ -0,0 +1,187 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef EXTENSIONS_HTTP_CURL_CLIENT_HTTPCALLBACK_H_ +#define EXTENSIONS_HTTP_CURL_CLIENT_HTTPCALLBACK_H_ + +#include "concurrentqueue.h" +#include <thread> +#include <mutex> +#include <vector> +#include <condition_variable> + +#include "utils/ByteArrayCallback.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +/** + * will stream as items are processed. + */ +class HttpStreamingCallback : public ByteInputCallBack { + public: + HttpStreamingCallback() + : ptr(nullptr), + is_alive_(true) { + previous_pos_ = 0; + rolling_count_ = 0; + } + + virtual ~HttpStreamingCallback() { + + } + + void close() { + is_alive_ = false; + cv.notify_all(); + } + + virtual void seek(size_t pos) { + if ((pos - previous_pos_) >= current_vec_.size() || current_vec_.size() == 0) + load_buffer(); + } + + virtual int64_t process(std::shared_ptr<io::BaseStream> stream) { + + std::vector<char> vec; + + if (stream->getSize() > 0) { + vec.resize(stream->getSize()); + + stream->readData(reinterpret_cast<uint8_t*>(vec.data()), stream->getSize()); + } + + size_t added_size = vec.size(); + + byte_arrays_.enqueue(std::move(vec)); + + cv.notify_all(); + + return added_size; + + } + + virtual int64_t process(uint8_t *vector, size_t size) { + + std::vector<char> vec; + + if (size > 0) { + vec.resize(size); + + memcpy(vec.data(), vector, size); + + size_t added_size = vec.size(); + + byte_arrays_.enqueue(std::move(vec)); --- End diff -- byte_arrays_ is a lock free queue. There can be concerns with using lock free queues like moodycamel, particularly with ordering, but I've mitigated this by how I'm using it.
---