gitmodimo opened a new issue, #40419:
URL: https://github.com/apache/arrow/issues/40419

   ### Describe the bug, including details regarding any error messages, 
version, and platform.
   
   Arrow version: 15.0.1
   Platform: x64-windows
   
   My Acero use case:
   
   dataset `scan` node
   \\/
   filter/projection (not relevant)
   \\/
   `consuming_sink`
   \\/
   Custom `SinkNodeConsumer` writting batches to `RecordBatchWriter`
   `RecordBatchWriter` is an `ipc::MakeStreamWriter` with `BufferOutputStream`
   
   This use case creates multiple threads writting to `BufferOutputStream` 
which causes race in `BufferOutputStream::Write `
   
   SinkNodeConsumer:
   
   ```
   struct RecordBatchWriterNodeConsumer : public ac::SinkNodeConsumer {
       
RecordBatchWriterNodeConsumer(std::shared_ptr<ar::ipc::RecordBatchWriter> 
_writer)
           : writer(_writer){}
       arrow::Status Init(const std::shared_ptr<arrow::Schema>& _schema,
           ac::BackpressureControl* backpressure_control,
           ac::ExecPlan* plan) override {
           schema = _schema;
           return arrow::Status::OK();
       }
   
       arrow::Status Consume(cp::ExecBatch batch) override {
           ARROW_ASSIGN_OR_RAISE(auto rb, batch.ToRecordBatch(schema));
           //const std::lock_guard<std::mutex> lock(consume_mutex);
           return writer->WriteRecordBatch(*rb);
       }
   
       arrow::Future<> Finish() override {
           return writer->Close();
       }
   
       std::shared_ptr<ar::ipc::RecordBatchWriter> writer;
       std::shared_ptr<arrow::Schema> schema;
       //std::mutex consume_mutex;
   };
   ```
   
   AddressSanitizer dump:
   
   ```
   =================================================================
   ==22728==ERROR: AddressSanitizer: attempting double-free on 0x038f001c4800 
in thread T48:
       #0 0x7ffc9ae00798 in _asan_wrap_RtlValidateHeap+0x288 
(\\?\C:\Users\USER\project\dist_MXP_web\libs\clang_rt.asan_dynamic-x86_64.dll+0x180040798)
       #1 0x7ffc99f31a7b in arrow::BaseMemoryPoolImpl<arrow::`anonymous 
namespace'::SystemAllocator>::Reallocate 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\memory_pool.cc:487
       #2 0x7ffc99f321e7 in arrow::PoolBuffer::Resize 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\memory_pool.cc:891
       #3 0x7ffc99ff2db2 in arrow::io::BufferOutputStream::Reserve 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\io\memory.cc:123
       #4 0x7ffc99ff3544 in arrow::io::BufferOutputStream::Write 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\io\memory.cc:105
       #5 0x7ffc99ff0770 in arrow::io::Writable::Write 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\io\interfaces.cc:203
       #6 0x7ffc9a9d7f93 in arrow::ipc::WriteIpcPayload 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\ipc\writer.cc:757
       #7 0x7ffc9a9d82ab in 
arrow::ipc::internal::PayloadStreamWriter::WritePayload 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\ipc\writer.cc:1406
       #8 0x7ffc9a9d8756 in 
arrow::ipc::internal::IpcFormatWriter::WriteRecordBatch 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\ipc\writer.cc:1195
       #9 0x7ffc9a9d8375 in 
arrow::ipc::internal::IpcFormatWriter::WriteRecordBatch 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\ipc\writer.cc:1176
       #10 0x7ffcadac9af5 in RecordBatchWriterNodeConsumer::Consume 
C:\Users\USER\project\src\mxp\electron\cmake\src\analysis\PdwProcessing.cpp:358
       #11 0x7ffcac3365e5 in arrow::acero::`anonymous 
namespace'::ConsumingSinkNode::Process 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\acero\sink_node.cc:399
       #12 0x7ffcac334d67 in arrow::acero::`anonymous 
namespace'::ConsumingSinkNode::InputReceived 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\acero\sink_node.cc:390
       #13 0x7ffcac32851a in arrow::acero::MapNode::InputReceived 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\acero\map_node.cc:79
       #14 0x7ffcac344069 in 
<lambda_7d84ce2741d5a383b7c7277f0b787d5c>::operator() 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\acero\source_node.cc:157
       #15 0x7ffcac349121 in 
std::_Func_impl_no_alloc<<lambda_7d84ce2741d5a383b7c7277f0b787d5c>,arrow::Status>::_Do_call
 C:\Program Files (x86)\Microsoft Visual 
Studio\2019\Community\VC\Tools\MSVC\14.29.30133\include\functional:822
       #16 0x7ffcac331b8f in 
std::_Call_binder<std::_Unforced,0,1,arrow::detail::ContinueFuture,std::tuple<arrow::Future<arrow::internal::Empty>,std::function<arrow::Status
 __cdecl(void)> >,std::tuple<> > C:\Program Files (x86)\Microsoft Visual 
Studio\2019\Community\VC\Tools\MSVC\14.29.30133\include\functional:1307
       #17 0x7ffcac33309d in arrow::internal::FnOnce<void 
__cdecl(void)>::FnImpl<std::_Binder<std::_Unforced,arrow::detail::ContinueFuture,arrow::Future<arrow::internal::Empty>
 &,std::function<arrow::Status __cdecl(void)> > >::invoke 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\util\functional.h:152
       #18 0x7ffc9a056b64 in 
std::thread::_Invoke<std::tuple<<lambda_72d791419a94ce2df79e1afeb11637d7> >,0> 
C:\Program Files (x86)\Microsoft Visual 
Studio\2019\Community\VC\Tools\MSVC\14.29.30133\include\thread:55
       #19 0x7ffd30571bb1 in configthreadlocale+0x91 
(C:\WINDOWS\System32\ucrtbase.dll+0x180021bb1)
       #20 0x7ffc9ae0ebde in _asan_default_suppressions__dll+0x122e 
(\\?\C:\Users\USER\project\dist_MXP_web\libs\clang_rt.asan_dynamic-x86_64.dll+0x18004ebde)
       #21 0x7ffd32227343 in BaseThreadInitThunk+0x13 
(C:\WINDOWS\System32\KERNEL32.DLL+0x180017343)
       #22 0x7ffd328226b0 in RtlUserThreadStart+0x20 
(C:\WINDOWS\SYSTEM32\ntdll.dll+0x1800526b0)
   
   0x038f001c4800 is located 0 bytes inside of 65536-byte region 
[0x038f001c4800,0x038f001d4800)
   freed by thread T52 here:
       #0 0x7ffc9ae00798 in _asan_wrap_RtlValidateHeap+0x288 
(\\?\C:\Users\USER\project\dist_MXP_web\libs\clang_rt.asan_dynamic-x86_64.dll+0x180040798)
       #1 0x7ffc99f31a7b in arrow::BaseMemoryPoolImpl<arrow::`anonymous 
namespace'::SystemAllocator>::Reallocate 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\memory_pool.cc:487
       #2 0x7ffc99f321e7 in arrow::PoolBuffer::Resize 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\memory_pool.cc:891
       #3 0x7ffc99ff2db2 in arrow::io::BufferOutputStream::Reserve 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\io\memory.cc:123
       #4 0x7ffc99ff3544 in arrow::io::BufferOutputStream::Write 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\io\memory.cc:105
       #5 0x7ffc99ff0770 in arrow::io::Writable::Write 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\io\interfaces.cc:203
       #6 0x7ffc9a9d7f93 in arrow::ipc::WriteIpcPayload 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\ipc\writer.cc:757
       #7 0x7ffc9a9d82ab in 
arrow::ipc::internal::PayloadStreamWriter::WritePayload 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\ipc\writer.cc:1406
       #8 0x7ffc9a9d8756 in 
arrow::ipc::internal::IpcFormatWriter::WriteRecordBatch 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\ipc\writer.cc:1195
       #9 0x7ffc9a9d8375 in 
arrow::ipc::internal::IpcFormatWriter::WriteRecordBatch 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\ipc\writer.cc:1176
       #10 0x7ffcadac9af5 in RecordBatchWriterNodeConsumer::Consume 
C:\Users\USER\project\src\mxp\electron\cmake\src\analysis\PdwProcessing.cpp:358
       #11 0x7ffcac3365e5 in arrow::acero::`anonymous 
namespace'::ConsumingSinkNode::Process 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\acero\sink_node.cc:399
       #12 0x7ffcac334d67 in arrow::acero::`anonymous 
namespace'::ConsumingSinkNode::InputReceived 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\acero\sink_node.cc:390
       #13 0x7ffcac32851a in arrow::acero::MapNode::InputReceived 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\acero\map_node.cc:79
       #14 0x7ffcac344069 in 
<lambda_7d84ce2741d5a383b7c7277f0b787d5c>::operator() 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\acero\source_node.cc:157
       #15 0x7ffcac349121 in 
std::_Func_impl_no_alloc<<lambda_7d84ce2741d5a383b7c7277f0b787d5c>,arrow::Status>::_Do_call
 C:\Program Files (x86)\Microsoft Visual 
Studio\2019\Community\VC\Tools\MSVC\14.29.30133\include\functional:822
       #16 0x7ffcac331b8f in 
std::_Call_binder<std::_Unforced,0,1,arrow::detail::ContinueFuture,std::tuple<arrow::Future<arrow::internal::Empty>,std::function<arrow::Status
 __cdecl(void)> >,std::tuple<> > C:\Program Files (x86)\Microsoft Visual 
Studio\2019\Community\VC\Tools\MSVC\14.29.30133\include\functional:1307
       #17 0x7ffcac33309d in arrow::internal::FnOnce<void 
__cdecl(void)>::FnImpl<std::_Binder<std::_Unforced,arrow::detail::ContinueFuture,arrow::Future<arrow::internal::Empty>
 &,std::function<arrow::Status __cdecl(void)> > >::invoke 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\util\functional.h:152
       #18 0x7ffc9a056b64 in 
std::thread::_Invoke<std::tuple<<lambda_72d791419a94ce2df79e1afeb11637d7> >,0> 
C:\Program Files (x86)\Microsoft Visual 
Studio\2019\Community\VC\Tools\MSVC\14.29.30133\include\thread:55
       #19 0x7ffd30571bb1 in configthreadlocale+0x91 
(C:\WINDOWS\System32\ucrtbase.dll+0x180021bb1)
       #20 0x7ffc9ae0ebde in _asan_default_suppressions__dll+0x122e 
(\\?\C:\Users\USER\project\dist_MXP_web\libs\clang_rt.asan_dynamic-x86_64.dll+0x18004ebde)
       #21 0x7ffd32227343 in BaseThreadInitThunk+0x13 
(C:\WINDOWS\System32\KERNEL32.DLL+0x180017343)
       #22 0x7ffd328226b0 in RtlUserThreadStart+0x20 
(C:\WINDOWS\SYSTEM32\ntdll.dll+0x1800526b0)
   
   previously allocated by thread T52 here:
       #0 0x7ffc9ae00a1a in _asan_wrap_RtlValidateHeap+0x50a 
(\\?\C:\Users\USER\project\dist_MXP_web\libs\clang_rt.asan_dynamic-x86_64.dll+0x180040a1a)
       #1 0x7ffc99f3197c in arrow::BaseMemoryPoolImpl<arrow::`anonymous 
namespace'::SystemAllocator>::Reallocate 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\memory_pool.cc:487
       #2 0x7ffc99f321e7 in arrow::PoolBuffer::Resize 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\memory_pool.cc:891
       #3 0x7ffc99ff2db2 in arrow::io::BufferOutputStream::Reserve 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\io\memory.cc:123
       #4 0x7ffc99ff3544 in arrow::io::BufferOutputStream::Write 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\io\memory.cc:105
       #5 0x7ffc99ff0770 in arrow::io::Writable::Write 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\io\interfaces.cc:203
       #6 0x7ffc9a9d7f93 in arrow::ipc::WriteIpcPayload 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\ipc\writer.cc:757
       #7 0x7ffc9a9d82ab in 
arrow::ipc::internal::PayloadStreamWriter::WritePayload 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\ipc\writer.cc:1406
       #8 0x7ffc9a9d8756 in 
arrow::ipc::internal::IpcFormatWriter::WriteRecordBatch 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\ipc\writer.cc:1195
       #9 0x7ffc9a9d8375 in 
arrow::ipc::internal::IpcFormatWriter::WriteRecordBatch 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\ipc\writer.cc:1176
       #10 0x7ffcadac9af5 in RecordBatchWriterNodeConsumer::Consume 
C:\Users\USER\project\src\mxp\electron\cmake\src\analysis\PdwProcessing.cpp:358
       #11 0x7ffcac3365e5 in arrow::acero::`anonymous 
namespace'::ConsumingSinkNode::Process 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\acero\sink_node.cc:399
       #12 0x7ffcac334d67 in arrow::acero::`anonymous 
namespace'::ConsumingSinkNode::InputReceived 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\acero\sink_node.cc:390
       #13 0x7ffcac32851a in arrow::acero::MapNode::InputReceived 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\acero\map_node.cc:79
       #14 0x7ffcac344069 in 
<lambda_7d84ce2741d5a383b7c7277f0b787d5c>::operator() 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\acero\source_node.cc:157
       #15 0x7ffcac349121 in 
std::_Func_impl_no_alloc<<lambda_7d84ce2741d5a383b7c7277f0b787d5c>,arrow::Status>::_Do_call
 C:\Program Files (x86)\Microsoft Visual 
Studio\2019\Community\VC\Tools\MSVC\14.29.30133\include\functional:822
       #16 0x7ffcac331b8f in 
std::_Call_binder<std::_Unforced,0,1,arrow::detail::ContinueFuture,std::tuple<arrow::Future<arrow::internal::Empty>,std::function<arrow::Status
 __cdecl(void)> >,std::tuple<> > C:\Program Files (x86)\Microsoft Visual 
Studio\2019\Community\VC\Tools\MSVC\14.29.30133\include\functional:1307
       #17 0x7ffcac33309d in arrow::internal::FnOnce<void 
__cdecl(void)>::FnImpl<std::_Binder<std::_Unforced,arrow::detail::ContinueFuture,arrow::Future<arrow::internal::Empty>
 &,std::function<arrow::Status __cdecl(void)> > >::invoke 
C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\util\functional.h:152
       #18 0x7ffc9a056b64 in 
std::thread::_Invoke<std::tuple<<lambda_72d791419a94ce2df79e1afeb11637d7> >,0> 
C:\Program Files (x86)\Microsoft Visual 
Studio\2019\Community\VC\Tools\MSVC\14.29.30133\include\thread:55
       #19 0x7ffd30571bb1 in configthreadlocale+0x91 
(C:\WINDOWS\System32\ucrtbase.dll+0x180021bb1)
       #20 0x7ffc9ae0ebde in _asan_default_suppressions__dll+0x122e 
(\\?\C:\Users\USER\project\dist_MXP_web\libs\clang_rt.asan_dynamic-x86_64.dll+0x18004ebde)
       #21 0x7ffd32227343 in BaseThreadInitThunk+0x13 
(C:\WINDOWS\System32\KERNEL32.DLL+0x180017343)
       #22 0x7ffd328226b0 in RtlUserThreadStart+0x20 
(C:\WINDOWS\SYSTEM32\ntdll.dll+0x1800526b0)
   ```
   
   This can be fixed by adding lock in RecordBatchWriterNodeConsumer::Consume, 
however this not seem like a good solution. I think either:
   arrow::io::BufferOutputStream::Write should have write lock
   or: 
   StreamWriter should have a write lock (as it writes to stream)
   or maybe both?
   
   
   
   ### Component(s)
   
   C++


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to