Jump to content

[threading][8463] LockedQueue race conditions, useless functions, etc


Recommended Posts

Posted

What does this patch do?:

* Makes LockedQueue 100% thread-safe by killing race conditions in several functions (::size(), ::empty(), ::next(), ::front(), etc).

* Removes ::empty(), ::front() and ::size() as they make no sense in the context of a locked queue.

* Converts existing code to do: SomeObj* obj; while (queue.next(obj)) { obj->dowork(); delete obj; } - instead of using .empty() which may give wrong results.

diff --git a/src/game/World.cpp b/src/game/World.cpp
index d8a5ab6..9ee21a2 100644
--- a/src/game/World.cpp
+++ b/src/game/World.cpp
@@ -112,8 +112,9 @@ World::~World()

    m_weathers.clear();

-    while (!cliCmdQueue.empty())
-        delete cliCmdQueue.next();
+    CliCommandHolder* command;
+    while (cliCmdQueue.next(command))
+        delete command;

    VMAP::VMapFactory::clear();

@@ -1967,11 +1968,9 @@ void World::SendServerMessage(ServerMessageType type, const char *text, Player*
void World::UpdateSessions( uint32 diff )
{
    ///- Add new sessions
-    while(!addSessQueue.empty())
-    {
-        WorldSession* sess = addSessQueue.next ();
+    WorldSession* sess;
+    while(addSessQueue.next(sess))
        AddSession_ (sess);
-    }

    ///- Then send an update signal to remaining ones
    for (SessionMap::iterator itr = m_sessions.begin(), next; itr != m_sessions.end(); itr = next)
@@ -1995,25 +1994,20 @@ void World::UpdateSessions( uint32 diff )
// This handles the issued and queued CLI commands
void World::ProcessCliCommands()
{
-    if (cliCmdQueue.empty())
-        return;
+    CliCommandHolder::Print* zprint = NULL;

-    CliCommandHolder::Print* zprint;
-
-    while (!cliCmdQueue.empty())
+    CliCommandHolder* command;
+    while (cliCmdQueue.next(command))
    {
        sLog.outDebug("CLI command under processing...");
-        CliCommandHolder *command = cliCmdQueue.next();
-
        zprint = command->m_print;
-
        CliHandler(zprint).ParseCommands(command->m_command);
-
        delete command;
    }

    // print the console message here so it looks right
-    zprint("mangos>");
+    if (zprint)
+        zprint("mangos>");
}

void World::InitResultQueue()
diff --git a/src/game/WorldSession.cpp b/src/game/WorldSession.cpp
index 5c8b44b..b4b3076 100644
--- a/src/game/WorldSession.cpp
+++ b/src/game/WorldSession.cpp
@@ -69,11 +69,9 @@ WorldSession::~WorldSession()
    }

    ///- empty incoming packet queue
-    while(!_recvQueue.empty())
-    {
-        WorldPacket *packet = _recvQueue.next ();
+    WorldPacket* packet;
+    while(_recvQueue.next(packet))
        delete packet;
-    }
}

void WorldSession::SizeError(WorldPacket const& packet, uint32 size) const
@@ -154,10 +152,9 @@ bool WorldSession::Update(uint32 /*diff*/)
{
    ///- Retrieve packets from the receive queue and call the appropriate handlers
    /// not proccess packets if socket already closed
-    while (!_recvQueue.empty() && m_Socket && !m_Socket->IsClosed ())
+    WorldPacket* packet;
+    while (_recvQueue.next(packet) && m_Socket && !m_Socket->IsClosed ())
    {
-        WorldPacket *packet = _recvQueue.next();
-
        /*#if 1
        sLog.outError( "MOEP: %s (0x%.4X)",
                        LookupOpcodeName(packet->GetOpcode()),
diff --git a/src/shared/Database/SqlDelayThread.cpp b/src/shared/Database/SqlDelayThread.cpp
index 2ff8908..813e12d 100644
--- a/src/shared/Database/SqlDelayThread.cpp
+++ b/src/shared/Database/SqlDelayThread.cpp
@@ -26,7 +26,6 @@ SqlDelayThread::SqlDelayThread(Database* db) : m_dbEngine(db), m_running(true)

void SqlDelayThread::run()
{
-    SqlOperation* s;
    #ifndef DO_POSTGRESQL
    mysql_thread_init();
    #endif
@@ -36,9 +35,9 @@ void SqlDelayThread::run()
        // if the running state gets turned off while sleeping
        // empty the queue before exiting
        ACE_Based::Thread::Sleep(10);
-        while (!m_sqlQueue.empty())
+        SqlOperation* s;
+        while (m_sqlQueue.next(s))
        {
-            s = m_sqlQueue.next();
            s->Execute(m_dbEngine);
            delete s;
        }
diff --git a/src/shared/Database/SqlOperations.cpp b/src/shared/Database/SqlOperations.cpp
index a430998..d47e85d 100644
--- a/src/shared/Database/SqlOperations.cpp
+++ b/src/shared/Database/SqlOperations.cpp
@@ -71,9 +71,9 @@ void SqlQuery::Execute(Database *db)
void SqlResultQueue::Update()
{
    /// execute the callbacks waiting in the synchronization queue
-    while(!empty())
+    MaNGOS::IQueryCallback* callback;
+    while (next(callback))
    {
-        MaNGOS::IQueryCallback * callback = next();
        callback->Execute();
        delete callback;
    }
diff --git a/src/shared/LockedQueue.h b/src/shared/LockedQueue.h
index 4087ebf..72ec9e1 100644
--- a/src/shared/LockedQueue.h
+++ b/src/shared/LockedQueue.h
@@ -30,99 +30,65 @@ namespace ACE_Based
    template <class T, class LockType, typename StorageType=std::Deque<T> >
        class LockedQueue
    {
-        //! Serialize access to the Queue
+        //! Lock access to the queue.
        LockType _lock;

-        //! Storage backing the queue
+        //! Storage backing the queue.
        StorageType _queue;

-        //! Cancellation flag
-        volatile bool _canceled;
+        //! Cancellation flag.
+        /*volatile*/ bool _canceled;

        public:

-            //! Create a LockedQueue
+            //! Create a LockedQueue.
            LockedQueue() : _canceled(false) {}

-            //! Destroy a LockedQueue
+            //! Destroy a LockedQueue.
            virtual ~LockedQueue() { }

-            /**
-             * @see Queue::add(const T& item)
-             */
+            //! Adds an item to the queue.
            void add(const T& item)
            {
                ACE_Guard<LockType> g(this->_lock);

-                ASSERT(!this->_canceled);
+                //ASSERT(!this->_canceled);
                // throw Cancellation_Exception();

-                this->_queue.push_back(item);
+                _queue.push_back(item);
            }

-            /**
-             * @see Queue::next()
-             */
-            T next()
+            //! Gets the next result in the queue, if any.
+            bool next(T& result)
            {
                ACE_Guard<LockType> g(this->_lock);

-                ASSERT (!_queue.empty() || !this->_canceled);
-                // throw Cancellation_Exception();
-
-                T item = this->_queue.front();
-                this->_queue.pop_front();
-
-                return item;
-            }
+                if (_queue.empty())
+                    return false;

-            T front()
-            {
-                ACE_Guard<LockType> g(this->_lock);
+                //ASSERT (!_queue.empty() || !this->_canceled);
+                // throw Cancellation_Exception();

-                ASSERT (!this->_queue.empty());
-                // throw NoSuchElement_Exception();
+                result = _queue.front();
+                _queue.pop_front();

-                return this->_queue.front();
+                return true;
            }

-            /**
-             * @see Queue::cancel()
-             */
+            //! Cancels the queue.
            void cancel()
            {
                ACE_Guard<LockType> g(this->_lock);

-                this->_canceled = true;
-            }
-
-            /**
-             * @see Queue::isCanceled()
-             */
-            bool isCanceled()
-            {
-                // Faster check since the queue will not become un-canceled
-                if(this->_canceled)
-                    return true;
-
-                ACE_Guard<LockType> g(this->_lock);
-
-                return this->_canceled;
+                _canceled = true;
            }

-            /**
-             * @see Queue::size()
-             */
-            size_t size()
+            //! Checks if the queue is cancelled.
+            bool cancelled()
            {
                ACE_Guard<LockType> g(this->_lock);
-                return this->_queue.size();
-            }

-            bool empty()
-            {
-                ACE_Guard<LockType> g(this->_lock);
-                return this->_queue.empty();
+                return _canceled;
            }
    };
}

  • 5 weeks later...
×
×
  • Create New...

Important Information

We have placed cookies on your device to help make this website better. You can adjust your cookie settings, otherwise we'll assume you're okay to continue. Privacy Policy Terms of Use