James Porritt created ARROW-1039: ------------------------------------ Summary: Python: pyarrow.Filesystem.read_parquet causing error if nthreads>1 Key: ARROW-1039 URL: https://issues.apache.org/jira/browse/ARROW-1039 Project: Apache Arrow Issue Type: Bug Components: Python Affects Versions: 0.3.0 Reporter: James Porritt
Currently I have the code: {code} client = HdfsClient("hdfshost", 8020, "myuser", driver='libhdfs3') parquet_file = '/my/parquet/file' parquet = client.read_parquet(parquet_file, nthreads=1) df = parquet.to_pandas() {code} This works as expected. If I make nthreads=2 however I get: {noformat} 2017-05-16 02:59:36.414677, p116977, th140677336123136, ERROR InputStreamImpl: failed to read Block: [block pool ID: BP-1695827161-10.87.14.23-1472240973777 block ID 1080474497_6733673] file /my/parquet/file/part-00001-e32bec64-fdcc-4c42-a292-c837a081310b.snappy.parquet from Datanode: hdfshost1(1.2.3.4), RemoteBlockReader.cpp: 304: ChecksumException: RemoteBlockReader: checksum not match for Block: [block pool ID: BP-1695827161-10.87.14.23-1472240973777 block ID 1080474497_6733673], on Datanode: hdfshost1(1.2.3.4) @ Hdfs::Internal::RemoteBlockReader::verifyChecksum(int) @ Hdfs::Internal::RemoteBlockReader::readNextPacket() @ Hdfs::Internal::RemoteBlockReader::read(char*, int) @ Hdfs::Internal::InputStreamImpl::readOneBlock(char*, int, bool) @ Hdfs::Internal::InputStreamImpl::readInternal(char*, int) @ Hdfs::Internal::InputStreamImpl::read(char*, int) @ hdfsRead @ arrow::io::HdfsReadableFile::ReadAt(long, long, std::shared_ptr<arrow::Buffer>*) @ parquet::ArrowInputFile::ReadAt(long, long) @ parquet::InMemoryInputStream::InMemoryInputStream(parquet::RandomAccessSource*, long, long) @ parquet::SerializedRowGroup::GetColumnPageReader(int) @ parquet::RowGroupReader::Column(int) @ parquet::arrow::AllRowGroupsIterator::Next() @ parquet::arrow::ColumnReader::Impl::NextRowGroup() @ parquet::arrow::ColumnReader::Impl::Impl(arrow::MemoryPool*, std::unique_ptr<parquet::arrow::FileColumnIterator, std::default_delete<parquet::arrow::FileColumnIterator> >) @ parquet::arrow::FileReader::Impl::GetColumn(int, std::unique_ptr<parquet::arrow::ColumnReader, std::default_delete<parquet::arrow::ColumnReader> >*) @ parquet::arrow::FileReader::Impl::ReadColumn(int, std::shared_ptr<arrow::Array>*) @ parquet::arrow::FileReader::Impl::ReadTable(std::vector<int, std::allocator<int> > const&, std::shared_ptr<arrow::Table>*)::{lambda(int)#1}::operator()(int) const @ std::thread::_Impl<std::_Bind_simple<arrow::Status parquet::arrow::ParallelFor<parquet::arrow::FileReader::Impl::ReadTable(std::vector<int, std::allocator<int> > const&, std::shared_ptr<arrow::Table>*)::{lambda(int)#1}&>(int, int, parquet::arrow::FileReader::Impl::ReadTable(std::vector<int, std::allocator<int> > const&, std::shared_ptr<arrow::Table>*)::{lambda(int)#1}&)::{lambda()#1} ()> >::_M_run() @ Unknown @ start_thread @ clone , retry read again from another Datanode. 2017-05-16 02:59:36.414858, p116977, th140677336123136, INFO IntputStreamImpl: Add invalid datanode hdfshost1(1.2.3.4) to failed datanodes and try another datanode again for file /my/parquet/file/part-00001-e32bec64-fdcc-4c42-a292-c837a081310b.snappy.parquet. 2017-05-16 02:59:36.424118, p116977, th140677702768384, ERROR InputStreamImpl: failed to read Block: [block pool ID: BP-1695827161-10.87.14.23-1472240973777 block ID 1080474497_6733673] file /my/parquet/file/part-00001-e32bec64-fdcc-4c42-a292-c837a081310b.snappy.parquet from Datanode: hdfshost2(1.2.3.5), RemoteBlockReader.cpp: 205: HdfsIOException: RemoteBlockReader: failed to read block header for Block: [block pool ID: BP-1695827161-10.87.14.23-1472240973777 block ID 1080474497_6733673] from Datanode: hdfshost1(1.2.3.4) . @ Hdfs::Internal::RemoteBlockReader::readPacketHeader() @ Hdfs::Internal::RemoteBlockReader::readNextPacket() @ Hdfs::Internal::RemoteBlockReader::read(char*, int) @ Hdfs::Internal::InputStreamImpl::readOneBlock(char*, int, bool) @ Hdfs::Internal::InputStreamImpl::readInternal(char*, int) @ Hdfs::Internal::InputStreamImpl::read(char*, int) @ hdfsRead @ arrow::io::HdfsReadableFile::ReadAt(long, long, std::shared_ptr<arrow::Buffer>*) @ parquet::ArrowInputFile::ReadAt(long, long) @ parquet::InMemoryInputStream::InMemoryInputStream(parquet::RandomAccessSource*, long, long) @ parquet::SerializedRowGroup::GetColumnPageReader(int) @ parquet::RowGroupReader::Column(int) @ parquet::arrow::AllRowGroupsIterator::Next() @ parquet::arrow::ColumnReader::Impl::NextRowGroup() @ parquet::arrow::ColumnReader::Impl::Impl(arrow::MemoryPool*, std::unique_ptr<parquet::arrow::FileColumnIterator, std::default_delete<parquet::arrow::FileColumnIterator> >) @ parquet::arrow::FileReader::Impl::GetColumn(int, std::unique_ptr<parquet::arrow::ColumnReader, std::default_delete<parquet::arrow::ColumnReader> >*) @ parquet::arrow::FileReader::Impl::ReadColumn(int, std::shared_ptr<arrow::Array>*) @ parquet::arrow::FileReader::Impl::ReadTable(std::vector<int, std::allocator<int> > const&, std::shared_ptr<arrow::Table>*)::{lambda(int)#1}::operator()(int) const @ std::thread::_Impl<std::_Bind_simple<arrow::Status parquet::arrow::ParallelFor<parquet::arrow::FileReader::Impl::ReadTable(std::vector<int, std::allocator<int> > const&, std::shared_ptr<arrow::Table>*)::{lambda(int)#1}&>(int, int, parquet::arrow::FileReader::Impl::ReadTable(std::vector<int, std::allocator<int> > const&, std::shared_ptr<arrow::Table>*)::{lambda(int)#1}&)::{lambda()#1} ()> >::_M_run() @ Unknown @ start_thread @ clone Caused by PacketHeader.cpp: 109: HdfsIOException: Invalid PacketHeader, packetLen is 1228668939, protoLen is 1290, buf size is 31 @ Hdfs::Internal::PacketHeader::readFields(char const*, unsigned long) @ Hdfs::Internal::RemoteBlockReader::readPacketHeader() @ Hdfs::Internal::RemoteBlockReader::readNextPacket() @ Hdfs::Internal::RemoteBlockReader::read(char*, int) @ Hdfs::Internal::InputStreamImpl::readOneBlock(char*, int, bool) @ Hdfs::Internal::InputStreamImpl::readInternal(char*, int) @ Hdfs::Internal::InputStreamImpl::read(char*, int) @ hdfsRead @ arrow::io::HdfsReadableFile::ReadAt(long, long, std::shared_ptr<arrow::Buffer>*) @ parquet::ArrowInputFile::ReadAt(long, long) @ parquet::InMemoryInputStream::InMemoryInputStream(parquet::RandomAccessSource*, long, long) @ parquet::SerializedRowGroup::GetColumnPageReader(int) @ parquet::RowGroupReader::Column(int) @ parquet::arrow::AllRowGroupsIterator::Next() @ parquet::arrow::ColumnReader::Impl::NextRowGroup() @ parquet::arrow::ColumnReader::Impl::Impl(arrow::MemoryPool*, std::unique_ptr<parquet::arrow::FileColumnIterator, std::default_delete<parquet::arrow::FileColumnIterator> >) @ parquet::arrow::FileReader::Impl::GetColumn(int, std::unique_ptr<parquet::arrow::ColumnReader, std::default_delete<parquet::arrow::ColumnReader> >*) @ parquet::arrow::FileReader::Impl::ReadColumn(int, std::shared_ptr<arrow::Array>*) @ parquet::arrow::FileReader::Impl::ReadTable(std::vector<int, std::allocator<int> > const&, std::shared_ptr<arrow::Table>*)::{lambda(int)#1}::operator()(int) const @ std::thread::_Impl<std::_Bind_simple<arrow::Status parquet::arrow::ParallelFor<parquet::arrow::FileReader::Impl::ReadTable(std::vector<int, std::allocator<int> > const&, std::shared_ptr<arrow::Table>*)::{lambda(int)#1}&>(int, int, parquet::arrow::FileReader::Impl::ReadTable(std::vector<int, std::allocator<int> > const&, std::shared_ptr<arrow::Table>*)::{lambda(int)#1}&)::{lambda()#1} ()> >::_M_run() @ Unknown @ start_thread @ clone , retry read again from another Datanode. 2017-05-16 02:59:36.424266, p116977, th140677702768384, INFO IntputStreamImpl: Add invalid datanode hdfshost2(1.2.3.5) to failed datanodes and try another datanode again for file /my/parquet/file/part-00001-e32bec64-fdcc-4c42-a292-c837a081310b.snappy.parquet. terminate called after throwing an instance of 'parquet::ParquetException' what(): Unable to read column chunk data Aborted {noformat} If nthreads>2 I get: {noformat} Segmentation fault {noformat} I'm running this in a conda environment with: {noformat} pyarrow 0.3.0.post np112py27_1 conda-forge parquet-cpp 1.1.0pre 2 conda-forge arrow-cpp 0.3.0.post np112py27_1 conda-forge libhdfs3 2.2.31 1 {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)