westonpace commented on a change in pull request #9533:
URL: https://github.com/apache/arrow/pull/9533#discussion_r589784685



##########
File path: cpp/src/arrow/vendored/ProducerConsumerQueue.h
##########
@@ -0,0 +1,214 @@
+// Vendored from git tag v2021.02.15.00
+
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// @author Bo Hu (b...@fb.com)
+// @author Jordan DeLong (delon...@fb.com)
+
+#pragma once
+
+#include <atomic>
+#include <cassert>
+#include <cstdlib>
+#include <memory>
+#include <stdexcept>
+#include <type_traits>
+#include <utility>
+
+namespace arrow_vendored {
+namespace folly {
+
+// Vendored from folly/Portability.h
+namespace {
+#if defined(__arm__)
+#define FOLLY_ARM 1
+#else
+#define FOLLY_ARM 0
+#endif
+
+#if defined(__s390x__)
+#define FOLLY_S390X 1
+#else
+#define FOLLY_S390X 0
+#endif
+
+constexpr bool kIsArchArm = FOLLY_ARM == 1;
+constexpr bool kIsArchS390X = FOLLY_S390X == 1;
+}  // namespace
+
+// Vendored from folly/lang/Align.h
+namespace {
+
+constexpr std::size_t hardware_destructive_interference_size =
+    (kIsArchArm || kIsArchS390X) ? 64 : 128;
+
+}  // namespace
+
+/*
+ * ProducerConsumerQueue is a one producer and one consumer queue
+ * without locks.
+ */
+template <class T>
+struct ProducerConsumerQueue {
+  typedef T value_type;
+
+  ProducerConsumerQueue(const ProducerConsumerQueue&) = delete;
+  ProducerConsumerQueue& operator=(const ProducerConsumerQueue&) = delete;
+
+  // size must be >= 2.
+  //
+  // Also, note that the number of usable slots in the queue at any
+  // given time is actually (size-1), so if you start with an empty queue,
+  // isFull() will return true after size-1 insertions.
+  explicit ProducerConsumerQueue(uint32_t size)
+      : size_(size),
+        records_(static_cast<T*>(std::malloc(sizeof(T) * size))),
+        readIndex_(0),
+        writeIndex_(0) {
+    assert(size >= 2);
+    if (!records_) {
+      throw std::bad_alloc();
+    }
+  }
+
+  ~ProducerConsumerQueue() {
+    // We need to destruct anything that may still exist in our queue.
+    // (No real synchronization needed at destructor time: only one
+    // thread can be doing this.)
+    if (!std::is_trivially_destructible<T>::value) {
+      size_t readIndex = readIndex_;
+      size_t endIndex = writeIndex_;
+      while (readIndex != endIndex) {
+        records_[readIndex].~T();
+        if (++readIndex == size_) {
+          readIndex = 0;
+        }
+      }
+    }
+
+    std::free(records_);
+  }
+
+  template <class... Args>
+  bool write(Args&&... recordArgs) {
+    auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
+    auto nextRecord = currentWrite + 1;
+    if (nextRecord == size_) {
+      nextRecord = 0;
+    }
+    if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
+      new (&records_[currentWrite]) T(std::forward<Args>(recordArgs)...);
+      writeIndex_.store(nextRecord, std::memory_order_release);
+      return true;
+    }
+
+    // queue is full
+    return false;
+  }
+
+  // move (or copy) the value at the front of the queue to given variable
+  bool read(T& record) {
+    auto const currentRead = readIndex_.load(std::memory_order_relaxed);
+    if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
+      // queue is empty
+      return false;
+    }
+
+    auto nextRecord = currentRead + 1;
+    if (nextRecord == size_) {
+      nextRecord = 0;
+    }
+    record = std::move(records_[currentRead]);
+    records_[currentRead].~T();
+    readIndex_.store(nextRecord, std::memory_order_release);
+    return true;
+  }
+
+  // pointer to the value at the front of the queue (for use in-place) or
+  // nullptr if empty.
+  T* frontPtr() {

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to