=46rom=2093afbeca18f8b73a1c890858a9adfb93ecba1066=20Mon=20Sep=2017=20=
00:00:00=202001=0AFrom:=20Sanjit=20Jhala=20<sjhalaz@gmail.com>=0ADate:=20=
Tue,=2014=20Jul=202009=2016:05:10=20-0700=0ASubject:=20[PATCH]=20Fixed=20=
bug=20in=20CommitLogReader=20caused=20by=20fragment=20queue=20iterator=20=
invalidation=20when=20log=20fragments=20are=20pushed=20to=20the=20back=20=
of=20the=20deque.=0A=0A---=0A=20src/cc/Hypertable/Lib/CommitLogReader.cc=20=
|=20=20=2033=20+++++++++++++++++------------=0A=20=
src/cc/Hypertable/Lib/CommitLogReader.h=20=20|=20=20=20=206=20+---=0A=20=
2=20files=20changed,=2021=20insertions(+),=2018=20deletions(-)=0A=0Adiff=20=
--git=20a/src/cc/Hypertable/Lib/CommitLogReader.cc=20=
b/src/cc/Hypertable/Lib/CommitLogReader.cc=0Aindex=2081dd0aa..194eef4=20=
100644=0A---=20a/src/cc/Hypertable/Lib/CommitLogReader.cc=0A+++=20=
b/src/cc/Hypertable/Lib/CommitLogReader.cc=0A@@=20-65,8=20+65,8=20@@=20=
namespace=20{=0A=20=0A=20=0A=20=
CommitLogReader::CommitLogReader(Filesystem=20*fs,=20const=20String=20=
&log_dir,=20bool=20mark_for_deletion)=0A-=20=20:=20=
CommitLogBase(log_dir),=20m_fs(fs),=20m_block_buffer(256),=20=
m_revision(0),=0A-=20=20=20=20m_compressor(0)=20{=0A+=20=20:=20=
CommitLogBase(log_dir),=20m_fs(fs),=20m_fragment_queue_offset(0),=0A+=20=20=
=20=20m_block_buffer(256),=20m_revision(0),=20m_compressor(0)=20{=0A=20=0A=
=20=20=20if=20(get_bool("Hypertable.CommitLog.SkipErrors"))=0A=20=20=20=20=
=20CommitLogBlockStream::ms_assert_on_error=20=3D=20false;=0A@@=20-83,20=20=
+83,23=20@@=20CommitLogReader::~CommitLogReader()=20{=0A=20bool=0A=20=
CommitLogReader::next_raw_block(CommitLogBlockInfo=20*infop,=0A=20=20=20=20=
=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=
=20=20=20=20BlockCompressionHeaderCommitLog=20*header)=20{=0A-=20=
try_again:=0A+=20=20LogFragmentQueue::iterator=20fragment_queue_iter;=0A=20=
=0A-=20=20if=20(m_iter=20=3D=3D=20m_fragment_queue.end())=0A+=20=20=
try_again:=0A+=20=20fragment_queue_iter=20=3D=20m_fragment_queue.begin()=20=
+=20m_fragment_queue_offset;=0A+=20=20if=20(fragment_queue_iter=20=3D=3D=20=
m_fragment_queue.end())=0A=20=20=20=20=20return=20false;=0A=20=0A-=20=20=
if=20((*m_iter).block_stream=20=3D=3D=200)=0A-=20=20=20=20=
(*m_iter).block_stream=20=3D=20=0A-=20=20=20=20=20=20new=20=
CommitLogBlockStream(m_fs,=20(*m_iter).log_dir,=20format("%u",=20=
(*m_iter).num));=0A+=20=20if=20((*fragment_queue_iter).block_stream=20=3D=3D=
=200)=0A+=20=20=20=20(*fragment_queue_iter).block_stream=20=3D=0A+=20=20=20=
=20=20=20new=20CommitLogBlockStream(m_fs,=20=
(*fragment_queue_iter).log_dir,=0A+=20=20=20=20=20=20=20=20=20=20=20=20=20=
=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20format("%u",=20=
(*fragment_queue_iter).num));=0A=20=0A-=20=20if=20=
(!(*m_iter).block_stream->next(infop,=20header))=20{=0A-=20=20=20=20=
delete=20(*m_iter).block_stream;=0A-=20=20=20=20(*m_iter).block_stream=20=
=3D=200;=0A-=20=20=20=20(*m_iter).revision=20=3D=20m_revision;=0A-=20=20=20=
=20m_iter++;=0A+=20=20if=20=
(!(*fragment_queue_iter).block_stream->next(infop,=20header))=20{=0A+=20=20=
=20=20delete=20(*fragment_queue_iter).block_stream;=0A+=20=20=20=20=
(*fragment_queue_iter).block_stream=20=3D=200;=0A+=20=20=20=20=
(*fragment_queue_iter).revision=20=3D=20m_revision;=0A+=20=20=20=20=
m_fragment_queue_offset++;=0A=20=20=20=20=20m_revision=20=3D=200;=0A=20=20=
=20=20=20goto=20try_again;=0A=20=20=20}=0A@@=20-131,9=20+134,10=20@@=20=
CommitLogReader::next(const=20uint8_t=20**blockp,=20size_t=20*lenp,=0A=20=
=20=20=20=20=20=20=20=20m_compressor->inflate(zblock,=20m_block_buffer,=20=
*header);=0A=20=20=20=20=20=20=20}=0A=20=20=20=20=20=20=20catch=20=
(Exception=20&e)=20{=0A+=20=20=20=20=20=20=20=20=
LogFragmentQueue::iterator=20iter=20=3D=20m_fragment_queue.begin()=20+=20=
m_fragment_queue_offset;=0A=20=20=20=20=20=20=20=20=20HT_ERRORF("Inflate=20=
error=20in=20CommitLog=20fragment=20%s=20starting=20at=20"=0A=20=20=20=20=
=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20"postion=20%lld=20(block=20=
len=20=3D=20%lld)=20-=20%s",=0A-=20=20=20=20=20=20=20=20=20=20=20=20=20=20=
=20=20=20=20(*m_iter).block_stream->get_fname().c_str(),=0A+=20=20=20=20=20=
=20=20=20=20=20=20=20=20=20=20=20=20=20=
(*iter).block_stream->get_fname().c_str(),=0A=20=20=20=20=20=20=20=20=20=20=
=20=20=20=20=20=20=20=20=20(Lld)binfo.start_offset,=20=
(Lld)(binfo.end_offset=0A=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=
=20=20=20-=20binfo.start_offset),=20Error::get_text(e.code()));=0A=20=20=20=
=20=20=20=20=20=20continue;=0A@@=20-150,9=20+154,10=20@@=20=
CommitLogReader::next(const=20uint8_t=20**blockp,=20size_t=20*lenp,=0A=20=
=20=20=20=20=20=20return=20true;=0A=20=20=20=20=20}=0A=20=0A+=20=20=20=20=
LogFragmentQueue::iterator=20iter=20=3D=20m_fragment_queue.begin()=20+=20=
m_fragment_queue_offset;=0A=20=20=20=20=20HT_WARNF("Corruption=20=
detected=20in=20CommitLog=20fragment=20%s=20starting=20at=20"=0A=20=20=20=
=20=20=20=20=20=20=20=20=20=20=20"postion=20%lld=20for=20%lld=20bytes=20=
-=20%s",=0A-=20=20=20=20=20=20=20=20=20=20=20=20=20=
(*m_iter).block_stream->get_fname().c_str(),=0A+=20=20=20=20=20=20=20=20=20=
=20=20=20=20(*iter).block_stream->get_fname().c_str(),=0A=20=20=20=20=20=20=
=20=20=20=20=20=20=20=20(Lld)binfo.start_offset,=20=
(Lld)(binfo.end_offset=0A=20=20=20=20=20=20=20=20=20=20=20=20=20=20-=20=
binfo.start_offset),=20Error::get_text(binfo.error));=0A=20=20=20}=0A=
diff=20--git=20a/src/cc/Hypertable/Lib/CommitLogReader.h=20=
b/src/cc/Hypertable/Lib/CommitLogReader.h=0Aindex=2032d1774..d8b5903=20=
100644=0A---=20a/src/cc/Hypertable/Lib/CommitLogReader.h=0A+++=20=
b/src/cc/Hypertable/Lib/CommitLogReader.h=0A@@=20-54,10=20+54,8=20@@=20=
namespace=20Hypertable=20{=0A=20=20=20=20=20bool=20next(const=20uint8_t=20=
**blockp,=20size_t=20*lenp,=0A=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=
BlockCompressionHeaderCommitLog=20*);=0A=20=0A-=20=20=20=20=
LogFragmentQueue=20&get_fragment_queue()=20{=20return=20=
m_fragment_queue;=20}=0A-=0A=20=20=20=20=20void=20reset()=20{=0A-=20=20=20=
=20=20=20m_iter=20=3D=20m_fragment_queue.begin();=0A+=20=20=20=20=20=20=
m_fragment_queue_offset=20=3D=200;=0A=20=20=20=20=20=20=20=
m_block_buffer.clear();=0A=20=20=20=20=20=20=20m_revision=20=3D=200;=0A=20=
=20=20=20=20=20=20m_latest_revision=20=3D=200;=0A@@=20-69,7=20+67,7=20@@=20=
namespace=20Hypertable=20{=0A=20=20=20=20=20void=20=
load_compressor(uint16_t=20ztype);=0A=20=0A=20=20=20=20=20Filesystem=20=20=
=20=20=20=20=20*m_fs;=0A-=20=20=20=20LogFragmentQueue::iterator=20=
m_iter;=20=20=20=20=0A+=20=20=20=20uint64_t=20=20=20=20=20=20=20=20=20=20=
m_fragment_queue_offset;=0A=20=20=20=20=20DynamicBuffer=20=20=20=20=20=
m_block_buffer;=0A=20=20=20=20=20int64_t=20=20=20=20=20=20=20=20=20=20=20=
m_revision;=0A=20=0A--=20=0A1.6.0.2=0A=0A=
