mathyingzhou commented on a change in pull request #8648:
URL: https://github.com/apache/arrow/pull/8648#discussion_r614510493



##########
File path: cpp/src/arrow/adapters/orc/adapter_util.cc
##########
@@ -315,13 +344,662 @@ Status AppendBatch(const liborc::Type* type, 
liborc::ColumnVectorBatch* batch,
       return Status::NotImplemented("Not implemented type kind: ", kind);
   }
 }
+}  // namespace orc
+}  // namespace adapters
+}  // namespace arrow
+
+namespace {
+
+using arrow::internal::checked_cast;
+
+arrow::Status WriteBatch(const arrow::Array& parray, int64_t orc_offset,
+                         liborc::ColumnVectorBatch* column_vector_batch,
+                         bool normalized = false);
+
+// Make sure children of StructArray have appropriate null.
+std::shared_ptr<arrow::Array> NormalizeArray(const 
std::shared_ptr<arrow::Array>& array) {
+  arrow::Type::type kind = array->type_id();
+  switch (kind) {
+    case arrow::Type::type::BOOL:
+    case arrow::Type::type::INT8:
+    case arrow::Type::type::INT16:
+    case arrow::Type::type::INT32:
+    case arrow::Type::type::INT64:
+    case arrow::Type::type::FLOAT:
+    case arrow::Type::type::DOUBLE:
+    case arrow::Type::type::STRING:
+    case arrow::Type::type::LARGE_STRING:
+    case arrow::Type::type::BINARY:
+    case arrow::Type::type::LARGE_BINARY:
+    case arrow::Type::type::FIXED_SIZE_BINARY:
+    case arrow::Type::type::DATE32:
+    case arrow::Type::type::DATE64:
+    case arrow::Type::type::TIMESTAMP:
+    case arrow::Type::type::DECIMAL128: {
+      return array;
+    }
+    case arrow::Type::type::STRUCT: {
+      if (array->null_count() == 0) {
+        return array;
+      } else {
+        auto struct_array = 
std::static_pointer_cast<arrow::StructArray>(array);
+        const std::shared_ptr<arrow::Buffer> bitmap = 
struct_array->null_bitmap();
+        std::shared_ptr<arrow::DataType> struct_type = struct_array->type();
+        std::size_t size = struct_type->fields().size();
+        std::vector<std::shared_ptr<arrow::Array>> new_children(size, nullptr);
+        for (std::size_t i = 0; i < size; i++) {
+          std::shared_ptr<arrow::Array> child = struct_array->field(i);
+          const std::shared_ptr<arrow::Buffer> child_bitmap = 
child->null_bitmap();
+          std::shared_ptr<arrow::Buffer> final_child_bitmap;
+          if (child_bitmap == nullptr) {
+            final_child_bitmap = bitmap;
+          } else {
+            final_child_bitmap = arrow::internal::BitmapAnd(
+                                     arrow::default_memory_pool(), 
bitmap->data(), 0,
+                                     child_bitmap->data(), 0, 
struct_array->length(), 0)
+                                     .ValueOrDie();
+          }
+          std::shared_ptr<arrow::ArrayData> child_array_data = child->data();
+          std::vector<std::shared_ptr<arrow::Buffer>> child_buffers =
+              child_array_data->buffers;
+          child_buffers[0] = final_child_bitmap;
+          std::shared_ptr<arrow::ArrayData> new_child_array_data = 
arrow::ArrayData::Make(
+              child->type(), child->length(), child_buffers, 
child_array_data->child_data,
+              child_array_data->dictionary);
+          new_children[i] = 
NormalizeArray(arrow::MakeArray(new_child_array_data));
+        }
+        return std::make_shared<arrow::StructArray>(struct_type, 
struct_array->length(),
+                                                    new_children, bitmap);
+      }
+    }
+    case arrow::Type::type::LIST: {
+      auto list_array = std::static_pointer_cast<arrow::ListArray>(array);
+      return std::make_shared<arrow::ListArray>(
+          list_array->type(), list_array->length(), 
list_array->value_offsets(),
+          NormalizeArray(list_array->values()), list_array->null_bitmap());
+    }
+    case arrow::Type::type::LARGE_LIST: {
+      auto list_array = std::static_pointer_cast<arrow::LargeListArray>(array);
+      return std::make_shared<arrow::LargeListArray>(
+          list_array->type(), list_array->length(), 
list_array->value_offsets(),
+          NormalizeArray(list_array->values()), list_array->null_bitmap());
+    }
+    case arrow::Type::type::FIXED_SIZE_LIST: {
+      auto list_array = 
std::static_pointer_cast<arrow::FixedSizeListArray>(array);
+      return std::make_shared<arrow::FixedSizeListArray>(
+          list_array->type(), list_array->length(), 
NormalizeArray(list_array->values()),
+          list_array->null_bitmap());
+    }
+    case arrow::Type::type::MAP: {
+      auto map_array = std::static_pointer_cast<arrow::MapArray>(array);
+      return std::make_shared<arrow::MapArray>(
+          map_array->type(), map_array->length(), map_array->value_offsets(),
+          NormalizeArray(map_array->keys()), 
NormalizeArray(map_array->items()),
+          map_array->null_bitmap());
+    }
+    default: {
+      return array;
+    }
+  }
+}
+
+template <class DataType, class BatchType, typename Enable = void>
+struct Appender {};
+
+// Types for long/double-like Appender, that is, numeric, boolean or date32
+template <typename T>
+using is_generic_type =
+    std::integral_constant<bool, arrow::is_number_type<T>::value ||
+                                     std::is_same<arrow::Date32Type, T>::value 
||
+                                     arrow::is_boolean_type<T>::value>;
+template <typename T, typename R = void>
+using enable_if_generic = arrow::enable_if_t<is_generic_type<T>::value, R>;
+
+// Number-like
+template <class DataType, class BatchType>
+struct Appender<DataType, BatchType, enable_if_generic<DataType>> {
+  using ArrayType = typename arrow::TypeTraits<DataType>::ArrayType;
+  using ValueType = typename arrow::TypeTraits<DataType>::CType;
+  arrow::Status VisitNull() {
+    batch->notNull[running_orc_offset] = false;
+    running_orc_offset++;
+    running_arrow_offset++;
+    return arrow::Status::OK();
+  }
+  arrow::Status VisitValue(ValueType v) {
+    batch->data[running_orc_offset] = array.Value(running_arrow_offset);
+    batch->notNull[running_orc_offset] = true;
+    running_orc_offset++;
+    running_arrow_offset++;
+    return arrow::Status::OK();
+  }
+  const ArrayType& array;
+  BatchType* batch;
+  int64_t running_orc_offset, running_arrow_offset;
+};
+
+// Binary
+template <class DataType>
+struct Appender<DataType, liborc::StringVectorBatch> {
+  using ArrayType = typename arrow::TypeTraits<DataType>::ArrayType;
+  using COffsetType = typename arrow::TypeTraits<DataType>::OffsetType::c_type;
+  arrow::Status VisitNull() {
+    batch->notNull[running_orc_offset] = false;
+    running_orc_offset++;
+    running_arrow_offset++;
+    return arrow::Status::OK();
+  }
+  arrow::Status VisitValue(arrow::util::string_view v) {
+    batch->notNull[running_orc_offset] = true;
+    COffsetType data_length = 0;
+    batch->data[running_orc_offset] = reinterpret_cast<char*>(
+        const_cast<uint8_t*>(array.GetValue(running_arrow_offset, 
&data_length)));
+    batch->length[running_orc_offset] = data_length;
+    running_orc_offset++;
+    running_arrow_offset++;
+    return arrow::Status::OK();
+  }
+  const ArrayType& array;
+  liborc::StringVectorBatch* batch;
+  int64_t running_orc_offset, running_arrow_offset;
+};
+
+// Decimal
+template <>
+struct Appender<arrow::Decimal128Type, liborc::Decimal64VectorBatch> {
+  arrow::Status VisitNull() {
+    batch->notNull[running_orc_offset] = false;
+    running_orc_offset++;
+    running_arrow_offset++;
+    return arrow::Status::OK();
+  }
+  arrow::Status VisitValue(arrow::util::string_view v) {
+    batch->notNull[running_orc_offset] = true;
+    uint8_t* raw_int128 = 
const_cast<uint8_t*>(array.GetValue(running_arrow_offset));
+    int64_t* lower_bits = reinterpret_cast<int64_t*>(raw_int128);
+    batch->values[running_orc_offset] = *lower_bits;
+    running_orc_offset++;
+    running_arrow_offset++;
+    return arrow::Status::OK();
+  }
+  const arrow::Decimal128Array& array;
+  liborc::Decimal64VectorBatch* batch;
+  int64_t running_orc_offset, running_arrow_offset;
+};
+
+template <>
+struct Appender<arrow::Decimal128Type, liborc::Decimal128VectorBatch> {
+  arrow::Status VisitNull() {
+    batch->notNull[running_orc_offset] = false;
+    running_orc_offset++;
+    running_arrow_offset++;
+    return arrow::Status::OK();
+  }
+  arrow::Status VisitValue(arrow::util::string_view v) {
+    batch->notNull[running_orc_offset] = true;
+    uint8_t* raw_int128 = 
const_cast<uint8_t*>(array.GetValue(running_arrow_offset));
+    uint64_t* lower_bits = reinterpret_cast<uint64_t*>(raw_int128);
+    int64_t* higher_bits = reinterpret_cast<int64_t*>(raw_int128 + 8);
+    batch->values[running_orc_offset] = liborc::Int128(*higher_bits, 
*lower_bits);
+    running_orc_offset++;
+    running_arrow_offset++;
+    return arrow::Status::OK();
+  }
+  const arrow::Decimal128Array& array;
+  liborc::Decimal128VectorBatch* batch;
+  int64_t running_orc_offset, running_arrow_offset;
+};
+
+// Date64 and Timestamp
+template <class DataType>
+struct TimestampAppender {
+  using ArrayType = typename arrow::TypeTraits<DataType>::ArrayType;
+  arrow::Status VisitNull() {
+    batch->notNull[running_orc_offset] = false;
+    running_orc_offset++;
+    running_arrow_offset++;
+    return arrow::Status::OK();
+  }
+  arrow::Status VisitValue(int64_t v) {
+    int64_t data = array.Value(running_arrow_offset);
+    batch->notNull[running_orc_offset] = true;
+    batch->data[running_orc_offset] =
+        static_cast<int64_t>(std::floor(data / conversion_factor_from_second));
+    batch->nanoseconds[running_orc_offset] =
+        (data - conversion_factor_from_second * 
batch->data[running_orc_offset]) *
+        conversion_factor_to_nano;
+    running_orc_offset++;
+    running_arrow_offset++;
+    return arrow::Status::OK();
+  }
+  const ArrayType& array;
+  liborc::TimestampVectorBatch* batch;
+  int64_t running_orc_offset, running_arrow_offset;
+  int64_t conversion_factor_from_second, conversion_factor_to_nano;
+};
+
+// FSB
+struct FixedSizeBinaryAppender {
+  arrow::Status VisitNull() {
+    batch->notNull[running_orc_offset] = false;
+    running_orc_offset++;
+    running_arrow_offset++;
+    return arrow::Status::OK();
+  }
+  arrow::Status VisitValue(arrow::util::string_view v) {
+    batch->notNull[running_orc_offset] = true;
+    batch->data[running_orc_offset] = reinterpret_cast<char*>(
+        const_cast<uint8_t*>(array.GetValue(running_arrow_offset)));
+    batch->length[running_orc_offset] = data_length;
+    running_orc_offset++;
+    running_arrow_offset++;
+    return arrow::Status::OK();
+  }
+  const arrow::FixedSizeBinaryArray& array;
+  liborc::StringVectorBatch* batch;
+  int64_t running_orc_offset, running_arrow_offset;
+  const int32_t data_length;
+};
+
+// static_cast from int64_t or double to itself shouldn't introduce overhead
+// Pleae see
+// https://stackoverflow.com/questions/19106826/
+// can-static-cast-to-same-type-introduce-runtime-overhead
+template <class DataType, class BatchType>
+arrow::Status WriteGenericBatch(const arrow::Array& array, int64_t orc_offset,
+                                liborc::ColumnVectorBatch* 
column_vector_batch) {
+  using ArrayType = typename arrow::TypeTraits<DataType>::ArrayType;
+  const ArrayType& array_(checked_cast<const ArrayType&>(array));
+  auto batch = checked_cast<BatchType*>(column_vector_batch);
+  if (array.null_count()) {
+    batch->hasNulls = true;
+  }
+  Appender<DataType, BatchType> appender{array_, batch, orc_offset, 0};
+  arrow::ArrayDataVisitor<DataType> visitor;
+  RETURN_NOT_OK(visitor.Visit(*(array_.data()), &appender));
+  return arrow::Status::OK();
+}
+
+template <class DataType>
+arrow::Status WriteTimestampBatch(const arrow::Array& array, int64_t 
orc_offset,
+                                  liborc::ColumnVectorBatch* 
column_vector_batch,
+                                  const int64_t& conversion_factor_from_second,
+                                  const int64_t& conversion_factor_to_nano) {
+  using ArrayType = typename arrow::TypeTraits<DataType>::ArrayType;
+  const ArrayType& array_(checked_cast<const ArrayType&>(array));
+  auto batch = 
checked_cast<liborc::TimestampVectorBatch*>(column_vector_batch);
+  if (array.null_count()) {
+    batch->hasNulls = true;
+  }
+  TimestampAppender<DataType> appender{array_,
+                                       batch,
+                                       orc_offset,
+                                       0,
+                                       conversion_factor_from_second,
+                                       conversion_factor_to_nano};
+  arrow::ArrayDataVisitor<DataType> visitor;
+  RETURN_NOT_OK(visitor.Visit(*(array_.data()), &appender));
+  return arrow::Status::OK();
+}
+
+arrow::Status WriteFixedSizeBinaryBatch(const arrow::Array& array, int64_t 
orc_offset,
+                                        liborc::ColumnVectorBatch* 
column_vector_batch) {
+  const arrow::FixedSizeBinaryArray& array_(
+      checked_cast<const arrow::FixedSizeBinaryArray&>(array));
+  auto batch = checked_cast<liborc::StringVectorBatch*>(column_vector_batch);
+  if (array.null_count()) {
+    batch->hasNulls = true;
+  }
+  FixedSizeBinaryAppender appender{array_, batch, orc_offset, 0, 
array_.byte_width()};
+  arrow::ArrayDataVisitor<arrow::FixedSizeBinaryType> visitor;
+  RETURN_NOT_OK(visitor.Visit(*(array_.data()), &appender));
+  return arrow::Status::OK();
+}
+
+arrow::Status WriteStructBatch(const arrow::Array& array, int64_t orc_offset,
+                               liborc::ColumnVectorBatch* column_vector_batch,
+                               bool normalized) {
+  std::shared_ptr<arrow::Array> array_ = arrow::MakeArray(array.data());
+  std::shared_ptr<arrow::StructArray> struct_array(
+      std::static_pointer_cast<arrow::StructArray>(array_));
+  auto batch = checked_cast<liborc::StructVectorBatch*>(column_vector_batch);
+  std::size_t size = array.type()->fields().size();
+  int64_t arrow_length = array.length();
+  int64_t running_arrow_offset = 0, running_orc_offset = orc_offset;
+  // First fill fields of ColumnVectorBatch
+  if (array.null_count()) {
+    batch->hasNulls = true;
+  }
+  for (; running_arrow_offset < arrow_length;
+       running_orc_offset++, running_arrow_offset++) {
+    if (array.IsNull(running_arrow_offset)) {
+      batch->notNull[running_orc_offset] = false;
+    } else {
+      batch->notNull[running_orc_offset] = true;
+    }
+  }
+  // Fill the fields
+  for (std::size_t i = 0; i < size; i++) {
+    batch->fields[i]->resize(orc_offset + arrow_length);
+    RETURN_NOT_OK(
+        WriteBatch(*(struct_array->field(i)), orc_offset, batch->fields[i], 
true));
+  }
+  return arrow::Status::OK();
+}
+
+template <class ArrayType>
+arrow::Status WriteListBatch(const arrow::Array& array, int64_t orc_offset,
+                             liborc::ColumnVectorBatch* column_vector_batch) {
+  const ArrayType& list_array(checked_cast<const ArrayType&>(array));
+  auto batch = checked_cast<liborc::ListVectorBatch*>(column_vector_batch);
+  liborc::ColumnVectorBatch* element_batch = (batch->elements).get();
+  int64_t arrow_length = array.length();
+  int64_t running_arrow_offset = 0, running_orc_offset = orc_offset;
+  if (orc_offset == 0) {
+    batch->offsets[0] = 0;
+  }
+  if (array.null_count()) {
+    batch->hasNulls = true;
+  }
+  for (; running_arrow_offset < arrow_length;
+       running_orc_offset++, running_arrow_offset++) {
+    if (array.IsNull(running_arrow_offset)) {
+      batch->notNull[running_orc_offset] = false;
+      batch->offsets[running_orc_offset + 1] = 
batch->offsets[running_orc_offset];

Review comment:
       Please correct me if I'm wrong: I know that they do and unlike Arrow ORC 
is not supposed to accommodate this behavior since we prioritize saving space 
as opposed to guaranteeing O(1) access in ORC. This is why I deliberately use 
`batch->offsets[running_orc_offset + 1] = batch->offsets[running_orc_offset];` 
to skip these elements that shouldn't make their way into the ORC elements 
array. Similarly we have to normalize struct arrays since in ORC children 
(fields) must be marked as Null if the parent (i.e. struct) is marked as Null. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to