sc/source/ui/inc/datastreams.hxx | 15 ++- sc/source/ui/miscdlgs/datastreams.cxx | 129 ++++++++++++++++++++++++++----- sc/source/ui/miscdlgs/datastreamsdlg.cxx | 7 + 3 files changed, 129 insertions(+), 22 deletions(-)
New commits: commit ed89a069f462ae106802e0d1376c38723c2c12cb Author: Matúš Kukan <matus.ku...@collabora.com> Date: Wed Nov 20 17:09:41 2013 +0100 datastreams: read data in another thread Change-Id: Iedd4075eadce9ca8fc41b279ea03c2679b01ec71 diff --git a/sc/source/ui/inc/datastreams.hxx b/sc/source/ui/inc/datastreams.hxx index 80f9cd6..93d1574 100644 --- a/sc/source/ui/inc/datastreams.hxx +++ b/sc/source/ui/inc/datastreams.hxx @@ -14,23 +14,30 @@ #include <boost/noncopyable.hpp> #include <boost/scoped_ptr.hpp> +#include <vector> -namespace datastreams { class CallerThread; } +namespace datastreams { + class CallerThread; + class ReaderThread; +} class ScDocShell; class ScDocument; class ScRange; class SvStream; class Window; +typedef std::vector<OString> LinesList; + class DataStreams : boost::noncopyable { public: enum MoveEnum { NO_MOVE, RANGE_DOWN, MOVE_DOWN, MOVE_UP }; DataStreams(ScDocShell *pScDocShell); ~DataStreams(); + OString ConsumeLine(); bool ImportData(); void MoveData(); - void Set(const OUString& rUrl, bool bIsScript, bool bValuesInLine, + void Set(SvStream *pStream, bool bValuesInLine, const OUString& rRange, sal_Int32 nLimit, MoveEnum eMove); void ShowDialog(Window *pParent); void Start(); @@ -43,11 +50,13 @@ private: bool mbRunning; bool mbIsUndoEnabled; bool mbValuesInLine; + LinesList *mpLines; + size_t mnLinesCount; boost::scoped_ptr<ScRange> mpRange; boost::scoped_ptr<ScRange> mpStartRange; boost::scoped_ptr<ScRange> mpEndRange; - boost::scoped_ptr<SvStream> mpStream; rtl::Reference<datastreams::CallerThread> mxThread; + rtl::Reference<datastreams::ReaderThread> mxReaderThread; }; /* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/sc/source/ui/miscdlgs/datastreams.cxx b/sc/source/ui/miscdlgs/datastreams.cxx index b938513..9d66ee3 100644 --- a/sc/source/ui/miscdlgs/datastreams.cxx +++ b/sc/source/ui/miscdlgs/datastreams.cxx @@ -24,6 +24,8 @@ #include <tabvwsh.hxx> #include <viewdata.hxx> +#include <queue> + namespace datastreams { class CallerThread : public salhelper::Thread @@ -57,6 +59,82 @@ private: } }; +class ReaderThread : public salhelper::Thread +{ + SvStream *mpStream; +public: + bool mbTerminateReading; + osl::Condition maProduceResume; + osl::Condition maConsumeResume; + osl::Mutex maLinesProtector; + std::queue<LinesList* > maPendingLines; + std::queue<LinesList* > maUsedLines; + + ReaderThread(SvStream *pData): + Thread("ReaderThread") + ,mpStream(pData) + ,mbTerminateReading(false) + { + } + + virtual ~ReaderThread() + { + delete mpStream; + while (!maPendingLines.empty()) + { + delete maPendingLines.front(); + maPendingLines.pop(); + } + while (!maUsedLines.empty()) + { + delete maUsedLines.front(); + maUsedLines.pop(); + } + } + + void terminate() + { + mbTerminateReading = true; + maProduceResume.set(); + join(); + } + +private: + virtual void execute() SAL_OVERRIDE + { + while (!mbTerminateReading) + { + LinesList *pLines = 0; + osl::ResettableMutexGuard aGuard(maLinesProtector); + if (!maUsedLines.empty()) + { + pLines = maUsedLines.front(); + maUsedLines.pop(); + aGuard.clear(); // unlock + } + else + { + aGuard.clear(); // unlock + pLines = new LinesList(10); + } + for (size_t i = 0; i < pLines->size(); ++i) + mpStream->ReadLine( pLines->at(i) ); + aGuard.reset(); // lock + while (!mbTerminateReading && maPendingLines.size() >= 8) + { // pause reading for a bit + aGuard.clear(); // unlock + maProduceResume.wait(); + maProduceResume.reset(); + aGuard.reset(); // lock + } + maPendingLines.push(pLines); + maConsumeResume.set(); + if (!mpStream->good()) + mbTerminateReading = true; + } + } +}; + } DataStreams::DataStreams(ScDocShell *pScDocShell): @@ -64,6 +142,8 @@ DataStreams::DataStreams(ScDocShell *pScDocShell): , mpScDocument(mpScDocShell->GetDocument()) , meMove(NO_MOVE) , mbRunning(false) + , mpLines(0) + , mnLinesCount(0) { mxThread = new datastreams::CallerThread( this ); mxThread->launch(); @@ -76,6 +156,31 @@ DataStreams::~DataStreams() mxThread->mbTerminate = true; mxThread->maStart.set(); mxThread->join(); + if (mxReaderThread.is()) + mxReaderThread->terminate(); +} + +OString DataStreams::ConsumeLine() +{ + if (!mpLines || mnLinesCount >= mpLines->size()) + { + mnLinesCount = 0; + osl::ResettableMutexGuard aGuard(mxReaderThread->maLinesProtector); + if (mpLines) + mxReaderThread->maUsedLines.push(mpLines); + while (mxReaderThread->maPendingLines.empty()) + { + aGuard.clear(); // unlock + mxReaderThread->maConsumeResume.wait(); + mxReaderThread->maConsumeResume.reset(); + aGuard.reset(); // lock + } + mpLines = mxReaderThread->maPendingLines.front(); + mxReaderThread->maPendingLines.pop(); + if (mxReaderThread->maPendingLines.size() <= 4) + mxReaderThread->maProduceResume.set(); // start producer again + } + return mpLines->at(mnLinesCount++); } void DataStreams::Start() @@ -117,13 +222,13 @@ void DataStreams::Stop() mpScDocument->EnableUndo(mbIsUndoEnabled); } -void DataStreams::Set(const OUString& rUrl, bool bIsScript, bool bValuesInLine, +void DataStreams::Set(SvStream *pStream, bool bValuesInLine, const OUString& rRange, sal_Int32 nLimit, MoveEnum eMove) { - if (bIsScript) - mpStream.reset( new SvScriptStream(rUrl) ); - else - mpStream.reset( new SvFileStream(rUrl, STREAM_READ) ); + if (mxReaderThread.is()) + mxReaderThread->terminate(); + mxReaderThread = new datastreams::ReaderThread( pStream ); + mxReaderThread->launch(); mpEndRange.reset( NULL ); mpRange.reset ( new ScRange() ); @@ -170,14 +275,6 @@ void DataStreams::MoveData() bool DataStreams::ImportData() { - if (!mpStream->good()) - { - // if there is a problem with SvStream, stop running - mbRunning = false; - return mbRunning; - } - - OString sTmp; SolarMutexGuard aGuard; MoveData(); if (mbValuesInLine) @@ -186,8 +283,7 @@ bool DataStreams::ImportData() OStringBuffer aBuf; while (nHeight--) { - mpStream->ReadLine(sTmp); - aBuf.append(sTmp); + aBuf.append(ConsumeLine()); aBuf.append('\n'); } SvMemoryStream aMemoryStream((void *)aBuf.getStr(), aBuf.getLength(), STREAM_READ); @@ -202,8 +298,7 @@ bool DataStreams::ImportData() // read more lines at once but not too much for (int i = 0; i < 10; ++i) { - mpStream->ReadLine(sTmp); - OUString sLine(OStringToOUString(sTmp, RTL_TEXTENCODING_UTF8)); + OUString sLine( OStringToOUString(ConsumeLine(), RTL_TEXTENCODING_UTF8) ); if (sLine.indexOf(',') <= 0) continue; diff --git a/sc/source/ui/miscdlgs/datastreamsdlg.cxx b/sc/source/ui/miscdlgs/datastreamsdlg.cxx index cbe19a3..bacb67a 100644 --- a/sc/source/ui/miscdlgs/datastreamsdlg.cxx +++ b/sc/source/ui/miscdlgs/datastreamsdlg.cxx @@ -74,11 +74,14 @@ DataStreamsDlg::DataStreamsDlg(DataStreams *pDataStreams, Window* pParent) void DataStreamsDlg::Start() { - bool bIsScript = m_pRBScriptData->IsChecked(); sal_Int32 nLimit = 0; if (m_pRBMaxLimit->IsChecked()) nLimit = m_pEdLimit->GetText().toInt32(); - mpDataStreams->Set( m_pCbUrl->GetText(), bIsScript, m_pRBValuesInLine->IsChecked(), + mpDataStreams->Set( + (m_pRBScriptData->IsChecked() ? + dynamic_cast<SvStream*>( new SvScriptStream(m_pCbUrl->GetText()) ) : + dynamic_cast<SvStream*>( new SvFileStream(m_pCbUrl->GetText(), STREAM_READ) )), + m_pRBValuesInLine->IsChecked(), m_pEdRange->GetText(), nLimit, (m_pRBNoMove->IsChecked() ? DataStreams::NO_MOVE : m_pRBRangeDown->IsChecked() ? DataStreams::RANGE_DOWN : DataStreams::MOVE_DOWN) ); mpDataStreams->Start();
_______________________________________________ Libreoffice-commits mailing list libreoffice-comm...@lists.freedesktop.org http://lists.freedesktop.org/mailman/listinfo/libreoffice-commits