[MediaWiki-commits] [Gerrit] Use a Queue object to handle threadsafe access to a queue. - change (openzim)
Kelson has submitted this change and it was merged. Change subject: Use a Queue object to handle threadsafe access to a queue. .. Use a Queue object to handle threadsafe access to a queue. By using a Queue object we avoid the declaration of popFromFilenameQueue function in articlesource.cpp. Change-Id: Ic685f7e22e4ce95f6e0eb280f65809fd0dff1a6a --- M zimwriterfs/articlesource.cpp M zimwriterfs/articlesource.h A zimwriterfs/queue.h M zimwriterfs/zimwriterfs.cpp 4 files changed, 116 insertions(+), 55 deletions(-) Approvals: Kelson: Verified; Looks good to me, approved diff --git a/zimwriterfs/articlesource.cpp b/zimwriterfs/articlesource.cpp index 8b0b34c..6cf5f30 100644 --- a/zimwriterfs/articlesource.cpp +++ b/zimwriterfs/articlesource.cpp @@ -28,7 +28,6 @@ #include #include -bool popFromFilenameQueue(std::string &filename); bool isVerbose(); extern std::string welcome; @@ -44,8 +43,9 @@ unsigned int dataSize = 0; - -ArticleSource::ArticleSource() { +ArticleSource::ArticleSource(Queue& filenameQueue): +filenameQueue(filenameQueue) +{ /* Prepare metadata */ metadataQueue.push("Language"); metadataQueue.push("Publisher"); @@ -88,10 +88,10 @@ std::string line = redirectsQueue.front(); redirectsQueue.pop(); article = new RedirectArticle(line); - } else if (popFromFilenameQueue(path)) { + } else if (filenameQueue.popFromQueue(path)) { do { article = new Article(path); -} while (article && article->isInvalid() && popFromFilenameQueue(path)); +} while (article && article->isInvalid() && filenameQueue.popFromQueue(path)); } else { article = NULL; } diff --git a/zimwriterfs/articlesource.h b/zimwriterfs/articlesource.h index adbdbda..1ad6524 100644 --- a/zimwriterfs/articlesource.h +++ b/zimwriterfs/articlesource.h @@ -24,12 +24,13 @@ #include #include #include +#include "queue.h" #include class ArticleSource : public zim::writer::ArticleSource { public: -explicit ArticleSource(); +explicit ArticleSource(Queue& filenameQueue); virtual const zim::writer::Article* getNextArticle(); virtual zim::Blob getData(const std::string& aid); virtual std::string getMainPage(); @@ -39,6 +40,7 @@ private: std::queue metadataQueue; std::queue redirectsQueue; +Queue& filenameQueue; }; #endif //OPENZIM_ZIMWRITERFS_ARTICLESOURCE_H diff --git a/zimwriterfs/queue.h b/zimwriterfs/queue.h new file mode 100644 index 000..d177568 --- /dev/null +++ b/zimwriterfs/queue.h @@ -0,0 +1,88 @@ +/* + * Copyright 2016 Matthieu Gautier + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + * MA 02110-1301, USA. + */ + +#ifndef OPENZIM_ZIMWRITERFS_QUEUE_H +#define OPENZIM_ZIMWRITERFS_QUEUE_H + +#define MAX_QUEUE_SIZE 100 + +#include +#include + +template +class Queue { +public: +Queue() {pthread_mutex_init(&m_queueMutex,NULL);}; +virtual ~Queue() {pthread_mutex_destroy(&m_queueMutex);}; +virtual bool isEmpty(); +virtual void pushToQueue(const T& element); +virtual bool popFromQueue(T &filename); + +protected: +std::queue m_realQueue; +pthread_mutex_t m_queueMutex; + +private: +// Make this queue non copyable +Queue(const Queue&); +Queue& operator=(const Queue&); +}; + +template +bool Queue::isEmpty() { +pthread_mutex_lock(&m_queueMutex); +bool retVal = m_realQueue.empty(); +pthread_mutex_unlock(&m_queueMutex); +return retVal; +} + +template +void Queue::pushToQueue(const T &element) { +unsigned int wait = 0; +unsigned int queueSize = 0; + +do { +usleep(wait); +pthread_mutex_lock(&m_queueMutex); +queueSize = m_realQueue.size(); +pthread_mutex_unlock(&m_queueMutex); +wait += 10; +} while (queueSize > MAX_QUEUE_SIZE); + +pthread_mutex_lock(&m_queueMutex); +m_realQueue.push(element); +pthread_mutex_unlock(&m_queueMutex); +} + +template +bool Queue::popFromQueue(T &element) { +pthread_mutex_lock(&m_queueMutex); +if (m_realQueue.empty()) { +pthread_mutex_unlock(&m_queueMutex); +return false; +} + +element = m_realQueue.front(); +m_realQueue.pop(); +pthread_mutex_unlock(&m_queueMutex); + +
[MediaWiki-commits] [Gerrit] Use a Queue object to handle threadsafe access to a queue. - change (openzim)
Mgautierfr has uploaded a new change for review. https://gerrit.wikimedia.org/r/295518 Change subject: Use a Queue object to handle threadsafe access to a queue. .. Use a Queue object to handle threadsafe access to a queue. By using a Queue object we avoid the declaration of popFromFilenameQueue function in articlesource.cpp. Change-Id: Ic685f7e22e4ce95f6e0eb280f65809fd0dff1a6a --- M zimwriterfs/articlesource.cpp M zimwriterfs/articlesource.h A zimwriterfs/queue.h M zimwriterfs/zimwriterfs.cpp 4 files changed, 116 insertions(+), 55 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/openzim refs/changes/18/295518/1 diff --git a/zimwriterfs/articlesource.cpp b/zimwriterfs/articlesource.cpp index 8b0b34c..6cf5f30 100644 --- a/zimwriterfs/articlesource.cpp +++ b/zimwriterfs/articlesource.cpp @@ -28,7 +28,6 @@ #include #include -bool popFromFilenameQueue(std::string &filename); bool isVerbose(); extern std::string welcome; @@ -44,8 +43,9 @@ unsigned int dataSize = 0; - -ArticleSource::ArticleSource() { +ArticleSource::ArticleSource(Queue& filenameQueue): +filenameQueue(filenameQueue) +{ /* Prepare metadata */ metadataQueue.push("Language"); metadataQueue.push("Publisher"); @@ -88,10 +88,10 @@ std::string line = redirectsQueue.front(); redirectsQueue.pop(); article = new RedirectArticle(line); - } else if (popFromFilenameQueue(path)) { + } else if (filenameQueue.popFromQueue(path)) { do { article = new Article(path); -} while (article && article->isInvalid() && popFromFilenameQueue(path)); +} while (article && article->isInvalid() && filenameQueue.popFromQueue(path)); } else { article = NULL; } diff --git a/zimwriterfs/articlesource.h b/zimwriterfs/articlesource.h index adbdbda..1ad6524 100644 --- a/zimwriterfs/articlesource.h +++ b/zimwriterfs/articlesource.h @@ -24,12 +24,13 @@ #include #include #include +#include "queue.h" #include class ArticleSource : public zim::writer::ArticleSource { public: -explicit ArticleSource(); +explicit ArticleSource(Queue& filenameQueue); virtual const zim::writer::Article* getNextArticle(); virtual zim::Blob getData(const std::string& aid); virtual std::string getMainPage(); @@ -39,6 +40,7 @@ private: std::queue metadataQueue; std::queue redirectsQueue; +Queue& filenameQueue; }; #endif //OPENZIM_ZIMWRITERFS_ARTICLESOURCE_H diff --git a/zimwriterfs/queue.h b/zimwriterfs/queue.h new file mode 100644 index 000..d177568 --- /dev/null +++ b/zimwriterfs/queue.h @@ -0,0 +1,88 @@ +/* + * Copyright 2016 Matthieu Gautier + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + * MA 02110-1301, USA. + */ + +#ifndef OPENZIM_ZIMWRITERFS_QUEUE_H +#define OPENZIM_ZIMWRITERFS_QUEUE_H + +#define MAX_QUEUE_SIZE 100 + +#include +#include + +template +class Queue { +public: +Queue() {pthread_mutex_init(&m_queueMutex,NULL);}; +virtual ~Queue() {pthread_mutex_destroy(&m_queueMutex);}; +virtual bool isEmpty(); +virtual void pushToQueue(const T& element); +virtual bool popFromQueue(T &filename); + +protected: +std::queue m_realQueue; +pthread_mutex_t m_queueMutex; + +private: +// Make this queue non copyable +Queue(const Queue&); +Queue& operator=(const Queue&); +}; + +template +bool Queue::isEmpty() { +pthread_mutex_lock(&m_queueMutex); +bool retVal = m_realQueue.empty(); +pthread_mutex_unlock(&m_queueMutex); +return retVal; +} + +template +void Queue::pushToQueue(const T &element) { +unsigned int wait = 0; +unsigned int queueSize = 0; + +do { +usleep(wait); +pthread_mutex_lock(&m_queueMutex); +queueSize = m_realQueue.size(); +pthread_mutex_unlock(&m_queueMutex); +wait += 10; +} while (queueSize > MAX_QUEUE_SIZE); + +pthread_mutex_lock(&m_queueMutex); +m_realQueue.push(element); +pthread_mutex_unlock(&m_queueMutex); +} + +template +bool Queue::popFromQueue(T &element) { +pthread_mutex_lock(&m_queueMutex); +if (m_realQueue.empty()) { +pthread_mutex_unlock(&m_queueMutex); +return false; +} + +element = m_realQueue.front(); +m_realQueu