Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/158#discussion_r147740763
  
    --- Diff: extensions/http-curl/client/HTTPCallback.h ---
    @@ -0,0 +1,187 @@
    +/**
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef EXTENSIONS_HTTP_CURL_CLIENT_HTTPCALLBACK_H_
    +#define EXTENSIONS_HTTP_CURL_CLIENT_HTTPCALLBACK_H_
    +
    +#include "concurrentqueue.h"
    +#include <thread>
    +#include <mutex>
    +#include <vector>
    +#include <condition_variable>
    +
    +#include "utils/ByteArrayCallback.h"
    +
    +namespace org {
    +namespace apache {
    +namespace nifi {
    +namespace minifi {
    +namespace utils {
    +
    +/**
    + * will stream as items are processed.
    + */
    +class HttpStreamingCallback : public ByteInputCallBack {
    + public:
    +  HttpStreamingCallback()
    +      : ptr(nullptr),
    +        is_alive_(true) {
    +    previous_pos_ = 0;
    +    rolling_count_ = 0;
    +  }
    +
    +  virtual ~HttpStreamingCallback() {
    +
    +  }
    +
    +  void close() {
    +    is_alive_ = false;
    +    cv.notify_all();
    +  }
    +
    +  virtual void seek(size_t pos) {
    +    if ((pos - previous_pos_) >= current_vec_.size() || 
current_vec_.size() == 0)
    +      load_buffer();
    +  }
    +
    +  virtual int64_t process(std::shared_ptr<io::BaseStream> stream) {
    +
    +    std::vector<char> vec;
    +
    +    if (stream->getSize() > 0) {
    +      vec.resize(stream->getSize());
    +
    +      stream->readData(reinterpret_cast<uint8_t*>(vec.data()), 
stream->getSize());
    +    }
    +
    +    size_t added_size = vec.size();
    +
    +    byte_arrays_.enqueue(std::move(vec));
    +
    +    cv.notify_all();
    +
    +    return added_size;
    +
    +  }
    +
    +  virtual int64_t process(uint8_t *vector, size_t size) {
    +
    +    std::vector<char> vec;
    +
    +    if (size > 0) {
    +      vec.resize(size);
    +
    +      memcpy(vec.data(), vector, size);
    +
    +      size_t added_size = vec.size();
    +
    +      byte_arrays_.enqueue(std::move(vec));
    --- End diff --
    
    byte_arrays_ is a lock free queue. There can be concerns with using lock 
free queues like moodycamel, particularly with ordering, but I've mitigated 
this by how I'm using it. 


---

Reply via email to