feat(mpmc_blocking_q): add blocking dequeue without timeout (#2588)
Use the new blocking dequeue to avoid unnecessarily waking up the thread pool every 10s. Fixes #2587 by replacing std::condition_variable::wait_for with std::condition_variable::wait as a workaroung for gcc 11.3 issue 101978. Co-authored-by: Alok Priyadarshi <alokp@dexterity.ai>
This commit is contained in:
		@@ -49,7 +49,7 @@ public:
 | 
				
			|||||||
        push_cv_.notify_one();
 | 
					        push_cv_.notify_one();
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    // try to dequeue item. if no item found. wait up to timeout and try again
 | 
					    // dequeue with a timeout.
 | 
				
			||||||
    // Return true, if succeeded dequeue item, false otherwise
 | 
					    // Return true, if succeeded dequeue item, false otherwise
 | 
				
			||||||
    bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration)
 | 
					    bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration)
 | 
				
			||||||
    {
 | 
					    {
 | 
				
			||||||
@@ -66,6 +66,18 @@ public:
 | 
				
			|||||||
        return true;
 | 
					        return true;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    // blocking dequeue without a timeout.
 | 
				
			||||||
 | 
					    void dequeue(T &popped_item)
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        {
 | 
				
			||||||
 | 
					            std::unique_lock<std::mutex> lock(queue_mutex_);
 | 
				
			||||||
 | 
					            push_cv_.wait(lock, [this] { return !this->q_.empty(); });
 | 
				
			||||||
 | 
					            popped_item = std::move(q_.front());
 | 
				
			||||||
 | 
					            q_.pop_front();
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        pop_cv_.notify_one();
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#else
 | 
					#else
 | 
				
			||||||
    // apparently mingw deadlocks if the mutex is released before cv.notify_one(),
 | 
					    // apparently mingw deadlocks if the mutex is released before cv.notify_one(),
 | 
				
			||||||
    // so release the mutex at the very end each function.
 | 
					    // so release the mutex at the very end each function.
 | 
				
			||||||
@@ -87,7 +99,7 @@ public:
 | 
				
			|||||||
        push_cv_.notify_one();
 | 
					        push_cv_.notify_one();
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    // try to dequeue item. if no item found. wait up to timeout and try again
 | 
					    // dequeue with a timeout.
 | 
				
			||||||
    // Return true, if succeeded dequeue item, false otherwise
 | 
					    // Return true, if succeeded dequeue item, false otherwise
 | 
				
			||||||
    bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration)
 | 
					    bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration)
 | 
				
			||||||
    {
 | 
					    {
 | 
				
			||||||
@@ -102,6 +114,16 @@ public:
 | 
				
			|||||||
        return true;
 | 
					        return true;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    // blocking dequeue without a timeout.
 | 
				
			||||||
 | 
					    void dequeue(T &popped_item)
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        std::unique_lock<std::mutex> lock(queue_mutex_);
 | 
				
			||||||
 | 
					        push_cv_.wait(lock, [this] { return !this->q_.empty(); });
 | 
				
			||||||
 | 
					        popped_item = std::move(q_.front());
 | 
				
			||||||
 | 
					        q_.pop_front();
 | 
				
			||||||
 | 
					        pop_cv_.notify_one();
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#endif
 | 
					#endif
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    size_t overrun_counter()
 | 
					    size_t overrun_counter()
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -108,11 +108,7 @@ void SPDLOG_INLINE thread_pool::worker_loop_()
 | 
				
			|||||||
bool SPDLOG_INLINE thread_pool::process_next_msg_()
 | 
					bool SPDLOG_INLINE thread_pool::process_next_msg_()
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
    async_msg incoming_async_msg;
 | 
					    async_msg incoming_async_msg;
 | 
				
			||||||
    bool dequeued = q_.dequeue_for(incoming_async_msg, std::chrono::seconds(10));
 | 
					    q_.dequeue(incoming_async_msg);
 | 
				
			||||||
    if (!dequeued)
 | 
					 | 
				
			||||||
    {
 | 
					 | 
				
			||||||
        return true;
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    switch (incoming_async_msg.msg_type)
 | 
					    switch (incoming_async_msg.msg_type)
 | 
				
			||||||
    {
 | 
					    {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -43,6 +43,26 @@ TEST_CASE("dequeue-empty-wait", "[mpmc_blocking_q]")
 | 
				
			|||||||
    REQUIRE(delta_ms <= wait_ms + tolerance_wait);
 | 
					    REQUIRE(delta_ms <= wait_ms + tolerance_wait);
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					TEST_CASE("dequeue-full-nowait", "[mpmc_blocking_q]")
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					    spdlog::details::mpmc_blocking_queue<int> q(1);
 | 
				
			||||||
 | 
					    q.enqueue(42);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    int item = 0;
 | 
				
			||||||
 | 
					    q.dequeue_for(item, milliseconds::zero());
 | 
				
			||||||
 | 
					    REQUIRE(item == 42);
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					TEST_CASE("dequeue-full-wait", "[mpmc_blocking_q]")
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					    spdlog::details::mpmc_blocking_queue<int> q(1);
 | 
				
			||||||
 | 
					    q.enqueue(42);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    int item = 0;
 | 
				
			||||||
 | 
					    q.dequeue(item);
 | 
				
			||||||
 | 
					    REQUIRE(item == 42);
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
TEST_CASE("enqueue_nowait", "[mpmc_blocking_q]")
 | 
					TEST_CASE("enqueue_nowait", "[mpmc_blocking_q]")
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -95,12 +115,12 @@ TEST_CASE("full_queue", "[mpmc_blocking_q]")
 | 
				
			|||||||
    for (int i = 1; i < static_cast<int>(q_size); i++)
 | 
					    for (int i = 1; i < static_cast<int>(q_size); i++)
 | 
				
			||||||
    {
 | 
					    {
 | 
				
			||||||
        int item = -1;
 | 
					        int item = -1;
 | 
				
			||||||
        q.dequeue_for(item, milliseconds(0));
 | 
					        q.dequeue(item);
 | 
				
			||||||
        REQUIRE(item == i);
 | 
					        REQUIRE(item == i);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    // last item pushed has overridden the oldest.
 | 
					    // last item pushed has overridden the oldest.
 | 
				
			||||||
    int item = -1;
 | 
					    int item = -1;
 | 
				
			||||||
    q.dequeue_for(item, milliseconds(0));
 | 
					    q.dequeue(item);
 | 
				
			||||||
    REQUIRE(item == 123456);
 | 
					    REQUIRE(item == 123456);
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user