wgtmac commented on code in PR #37400:
URL: https://github.com/apache/arrow/pull/37400#discussion_r2593028729


##########
cpp/src/parquet/bloom_filter_writer.cc:
##########
@@ -0,0 +1,305 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/bloom_filter_writer.h"
+
+#include <map>
+#include <utility>
+
+#include "arrow/array.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/bit_run_reader.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/unreachable.h"
+
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+#include "parquet/schema.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+constexpr int64_t kHashBatchSize = 256;
+
+template <typename ParquetType>
+BloomFilterWriter<ParquetType>::BloomFilterWriter(const ColumnDescriptor* 
descr,
+                                                  BloomFilter* bloom_filter)
+    : descr_(descr), bloom_filter_(bloom_filter) {}
+
+template <typename ParquetType>
+bool BloomFilterWriter<ParquetType>::bloom_filter_enabled() const {
+  return bloom_filter_ != nullptr;
+}
+
+template <typename ParquetType>
+void BloomFilterWriter<ParquetType>::Update(const T* values, int64_t 
num_values) {
+  if (!bloom_filter_enabled()) {
+    return;
+  }
+
+  if constexpr (std::is_same_v<ParquetType, BooleanType>) {
+    throw ParquetException("Bloom filter is not supported for boolean type");
+  }
+
+  std::array<uint64_t, kHashBatchSize> hashes;
+  for (int64_t i = 0; i < num_values; i += kHashBatchSize) {
+    auto batch_size = static_cast<int>(std::min(kHashBatchSize, num_values - 
i));
+    if constexpr (std::is_same_v<ParquetType, FLBAType>) {
+      bloom_filter_->Hashes(values, descr_->type_length(), batch_size, 
hashes.data());
+    } else {
+      bloom_filter_->Hashes(values, batch_size, hashes.data());
+    }
+    bloom_filter_->InsertHashes(hashes.data(), batch_size);
+  }
+}
+
+template <>
+void BloomFilterWriter<BooleanType>::Update(const bool*, int64_t) {
+  if (!bloom_filter_enabled()) {
+    return;
+  }
+  throw ParquetException("Bloom filter is not supported for boolean type");
+}
+
+template <typename ParquetType>
+void BloomFilterWriter<ParquetType>::UpdateSpaced(const T* values, int64_t 
num_values,
+                                                  const uint8_t* valid_bits,
+                                                  int64_t valid_bits_offset) {
+  if (!bloom_filter_enabled()) {
+    return;
+  }
+
+  std::array<uint64_t, kHashBatchSize> hashes;
+  ::arrow::internal::VisitSetBitRunsVoid(
+      valid_bits, valid_bits_offset, num_values, [&](int64_t position, int64_t 
length) {
+        for (int64_t i = 0; i < length; i += kHashBatchSize) {
+          auto batch_size = static_cast<int>(std::min(kHashBatchSize, length - 
i));
+          if constexpr (std::is_same_v<ParquetType, FLBAType>) {
+            bloom_filter_->Hashes(values + i + position, descr_->type_length(),
+                                  batch_size, hashes.data());
+          } else {
+            bloom_filter_->Hashes(values + i + position, batch_size, 
hashes.data());
+          }
+          bloom_filter_->InsertHashes(hashes.data(), batch_size);
+        }
+      });
+}
+
+template <>
+void BloomFilterWriter<BooleanType>::UpdateSpaced(const bool*, int64_t, const 
uint8_t*,
+                                                  int64_t) {
+  if (!bloom_filter_enabled()) {
+    return;
+  }
+  throw ParquetException("Bloom filter is not supported for boolean type");
+}
+
+template <typename ParquetType>
+void BloomFilterWriter<ParquetType>::Update(const ::arrow::Array& values) {
+  ::arrow::Unreachable("Update for non-ByteArray type should be unreachable");
+}
+
+namespace {
+
+template <typename ArrayType>
+void UpdateBinaryBloomFilter(BloomFilter& bloom_filter, const ArrayType& 
array) {
+  // Using a small batch size because an extra `byte_arrays` is used.
+  constexpr int64_t kBinaryHashBatchSize = 64;
+  std::array<ByteArray, kBinaryHashBatchSize> byte_arrays;
+  std::array<uint64_t, kBinaryHashBatchSize> hashes;
+
+  auto batch_insert_hashes = [&](int count) {
+    if (count > 0) {
+      bloom_filter.Hashes(byte_arrays.data(), count, hashes.data());
+      bloom_filter.InsertHashes(hashes.data(), count);
+    }
+  };
+
+  int batch_idx = 0;
+  ::arrow::internal::VisitSetBitRunsVoid(
+      array.null_bitmap_data(), array.offset(), array.length(),
+      [&](int64_t position, int64_t run_length) {
+        for (int64_t i = 0; i < run_length; ++i) {
+          byte_arrays[batch_idx++] = array.GetView(position + i);
+          if (batch_idx == kBinaryHashBatchSize) {
+            batch_insert_hashes(batch_idx);
+            batch_idx = 0;
+          }
+        }
+      });
+  batch_insert_hashes(batch_idx);
+}
+
+}  // namespace
+
+template <>
+void BloomFilterWriter<ByteArrayType>::Update(const ::arrow::Array& values) {
+  if (!bloom_filter_enabled()) {
+    return;
+  }
+
+  if (::arrow::is_binary_view_like(values.type_id())) {
+    UpdateBinaryBloomFilter(
+        *bloom_filter_,
+        ::arrow::internal::checked_cast<const 
::arrow::BinaryViewArray&>(values));
+  } else if (::arrow::is_binary_like(values.type_id())) {
+    UpdateBinaryBloomFilter(
+        *bloom_filter_,
+        ::arrow::internal::checked_cast<const ::arrow::BinaryArray&>(values));
+  } else if (::arrow::is_large_binary_like(values.type_id())) {
+    UpdateBinaryBloomFilter(
+        *bloom_filter_,
+        ::arrow::internal::checked_cast<const 
::arrow::LargeBinaryArray&>(values));
+  } else {
+    ParquetException::NYI("Bloom filter is not supported for this Arrow type: 
" +
+                          values.type()->ToString());
+  }
+}
+
+template class BloomFilterWriter<BooleanType>;

Review Comment:
   This is required. Otherwise it will crash like below:
   
   ```
   (lldb) bt
   * thread #1, queue = 'com.apple.main-thread', stop reason = EXC_BAD_ACCESS 
(code=1, address=0x0)
     * frame #0: 0x0000000000000000
       frame #1: 0x0000000101f580dc 
libparquet.2300.dylib`std::__1::__unique_if<parquet::BloomFilterWriter<parquet::PhysicalType<(parquet::Type::type)0>>>::__unique_single
 
std::__1::make_unique[abi:ne190102]<parquet::BloomFilterWriter<parquet::PhysicalType<(parquet::Type::type)0>>,
 parquet::ColumnDescriptor const*, 
parquet::BloomFilter*&>(__args=0x000000016fdfd0e8, __args=0x000000016fdfd118) 
at unique_ptr.h:635:30
       frame #2: 0x0000000101f576f0 
libparquet.2300.dylib`parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>>::TypedColumnWriterImpl(this=0x00000001200086c8,
 metadata=0x000000012000b9a0, pager=nullptr, use_dictionary=false, 
encoding=PLAIN, properties=0x0000000120009b70, bloom_filter=0x0000000000000000) 
at column_writer.cc:1301:9
       frame #3: 0x0000000101f57224 
libparquet.2300.dylib`parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>>::TypedColumnWriterImpl(this=0x00000001200086c8,
 metadata=0x000000012000b9a0, pager=nullptr, use_dictionary=false, 
encoding=PLAIN, properties=0x0000000120009b70, bloom_filter=0x0000000000000000) 
at column_writer.cc:1289:38
       frame #4: 0x0000000101f57168 libparquet.2300.dylib`void 
std::__1::allocator<parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>>>::construct[abi:ne190102]<parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>>,
 parquet::ColumnChunkMetaDataBuilder*&, 
std::__1::unique_ptr<parquet::PageWriter, 
std::__1::default_delete<parquet::PageWriter>>, bool const&, 
parquet::Encoding::type&, parquet::WriterProperties const*&, 
std::nullptr_t>(this=0x000000016fdfd2a3, __p=0x00000001200086c8, 
__args=0x000000016fdfd630, __args=nullptr, __args=0x000000016fdfd60f, 
__args=0x000000016fdfd5e8, __args=0x000000016fdfd620, 
__args=0x000000016fdfd5c0) at allocator.h:165:24
       frame #5: 0x0000000101f56f54 libparquet.2300.dylib`void 
std::__1::allocator_traits<std::__1::allocator<parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>>>>::construct[abi:ne190102]<parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>>,
 parquet::ColumnChunkMetaDataBuilder*&, 
std::__1::unique_ptr<parquet::PageWriter, 
std::__1::default_delete<parquet::PageWriter>>, bool const&, 
parquet::Encoding::type&, parquet::WriterProperties const*&, std::nullptr_t, 
0>(__a=0x000000016fdfd2a3, __p=0x00000001200086c8, __args=0x000000016fdfd630, 
__args=nullptr, __args=0x000000016fdfd60f, __args=0x000000016fdfd5e8, 
__args=0x000000016fdfd620, __args=0x000000016fdfd5c0) at 
allocator_traits.h:319:9
       frame #6: 0x0000000101f56e54 
libparquet.2300.dylib`std::__1::__shared_ptr_emplace<parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>>,
 
std::__1::allocator<parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>>>>::__shared_ptr_emplace[abi:ne190102]<parquet::ColumnChunkMetaDataBuilder*&,
 std::__1::unique_ptr<parquet::PageWriter, 
std::__1::default_delete<parquet::PageWriter>>, bool const&, 
parquet::Encoding::type&, parquet::WriterProperties const*&, std::nullptr_t, 
std::__1::allocator<parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>>>,
 0>(this=0x00000001200086b0, 
__a=allocator<parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>
 > > @ 0x000000016fdfd2ef, __args=0x000000016fdfd630, __args=nullptr, 
__args=0x000000016fdfd60f, __args=0x000000016fdfd5e8, 
__args=0x000000016fdfd620, __args=0x000000016fdfd5c0) at shared_ptr.h:266:5
       frame #7: 0x0000000101f56b08 
libparquet.2300.dylib`std::__1::__shared_ptr_emplace<parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>>,
 
std::__1::allocator<parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>>>>::__shared_ptr_emplace[abi:ne190102]<parquet::ColumnChunkMetaDataBuilder*&,
 std::__1::unique_ptr<parquet::PageWriter, 
std::__1::default_delete<parquet::PageWriter>>, bool const&, 
parquet::Encoding::type&, parquet::WriterProperties const*&, std::nullptr_t, 
std::__1::allocator<parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>>>,
 0>(this=0x00000001200086b0, 
__a=allocator<parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>
 > > @ 0x000000016fdfd34f, __args=0x000000016fdfd630, __args=nullptr, 
__args=0x000000016fdfd60f, __args=0x000000016fdfd5e8, 
__args=0x000000016fdfd620, __args=0x000000016fdfd5c0) at shared_ptr.h:263:115
       frame #8: 0x0000000101f569b4 
libparquet.2300.dylib`std::__1::shared_ptr<parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>>>
 
std::__1::allocate_shared[abi:ne190102]<parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>>,
 
std::__1::allocator<parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>>>,
 parquet::ColumnChunkMetaDataBuilder*&, 
std::__1::unique_ptr<parquet::PageWriter, 
std::__1::default_delete<parquet::PageWriter>>, bool const&, 
parquet::Encoding::type&, parquet::WriterProperties const*&, std::nullptr_t, 
0>(__a=0x000000016fdfd427, __args=0x000000016fdfd630, __args=nullptr, 
__args=0x000000016fdfd60f, __args=0x000000016fdfd5e8, 
__args=0x000000016fdfd620, __args=0x000000016fdfd5c0) at shared_ptr.h:845:51
       frame #9: 0x0000000101f1ebe8 
libparquet.2300.dylib`std::__1::shared_ptr<parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>>>
 
std::__1::make_shared[abi:ne190102]<parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>>,
 parquet::ColumnChunkMetaDataBuilder*&, 
std::__1::unique_ptr<parquet::PageWriter, 
std::__1::default_delete<parquet::PageWriter>>, bool const&, 
parquet::Encoding::type&, parquet::WriterProperties const*&, std::nullptr_t, 
0>(__args=0x000000016fdfd630, __args=nullptr, __args=0x000000016fdfd60f, 
__args=0x000000016fdfd5e8, __args=0x000000016fdfd620, 
__args=0x000000016fdfd5c0) at shared_ptr.h:853:10
       frame #10: 0x0000000101f1e8ac 
libparquet.2300.dylib`parquet::ColumnWriter::Make(metadata=0x000000012000b9a0, 
pager=nullptr, properties=0x0000000120009b70, bloom_filter=0x0000000000000000) 
at column_writer.cc:2686:14
       frame #11: 0x0000000102089860 
libparquet.2300.dylib`parquet::RowGroupSerializer::CreateColumnWriterForColumn(this=0x000000012000b900,
 col_meta=0x000000012000b9a0, column_ordinal=0) const at file_writer.cc:304:12
       frame #12: 0x0000000102088c44 
libparquet.2300.dylib`parquet::RowGroupSerializer::NextColumn(this=0x000000012000b900)
 at file_writer.cc:146:26
       frame #13: 0x000000010207fd14 
libparquet.2300.dylib`parquet::RowGroupWriter::NextColumn(this=0x000000012000b990)
 at file_writer.cc:56:64
       frame #14: 0x00000001001d6e90 
parquet-internals-test`parquet::test::TestStatistics<parquet::PhysicalType<(parquet::Type::type)0>>::TestFullRoundtrip(this=0x00000001200091c0,
 num_values=100, null_count=31) at statistics_test.cc:436:69
       frame #15: 0x00000001001d6948 
parquet-internals-test`parquet::test::TestStatistics_FullRoundtrip_Test<parquet::PhysicalType<(parquet::Type::type)0>>::TestBody(this=0x00000001200091c0)
 at statistics_test.cc:607:3
       frame #16: 0x0000000101071f50 libarrow_testing.2300.dylib`void 
testing::internal::HandleExceptionsInMethodIfSupported<testing::Test, 
void>(testing::Test*, void (testing::Test::*)(), char const*) + 104
       frame #17: 0x0000000101071e98 
libarrow_testing.2300.dylib`testing::Test::Run() + 208
       frame #18: 0x0000000101072a78 
libarrow_testing.2300.dylib`testing::TestInfo::Run() + 212
       frame #19: 0x00000001010734fc 
libarrow_testing.2300.dylib`testing::TestSuite::Run() + 456
       frame #20: 0x000000010107e924 
libarrow_testing.2300.dylib`testing::internal::UnitTestImpl::RunAllTests() + 
1328
       frame #21: 0x000000010107e2d0 libarrow_testing.2300.dylib`bool 
testing::internal::HandleExceptionsInMethodIfSupported<testing::internal::UnitTestImpl,
 bool>(testing::internal::UnitTestImpl*, bool 
(testing::internal::UnitTestImpl::*)(), char const*) + 104
       frame #22: 0x000000010107e234 
libarrow_testing.2300.dylib`testing::UnitTest::Run() + 124
       frame #23: 0x0000000100262f30 parquet-internals-test`main + 68
       frame #24: 0x000000019594ab98 dyld`start + 6076
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to