Featured image of post C++ std::async 异步编程深入剖析

C++ std::async 异步编程深入剖析

前言

在现代 C++ 编程中,异步编程是一个不可或缺的组成部分,它能够显著提升程序的性能。std::async 作为 C++ 标准库中实现异步操作的重要函数模板,为开发者提供了一种简洁而强大的方式来运行异步任务。本文将深入探讨 std::async 的功能、用法以及不同编译器实现之间的差异,更好地理解和使用这一强大的工具。

std::async 基础用法

std::async 定义在 <future> 头文件中,其基本功能是异步地运行一个函数,并返回一个 std::future 对象,该对象保存函数调用返回的结果。

std::async 的声明:

1
2
3
4
5
6
7
template <class Fn, class... ArgTypes>
future<typename result_of<Fn(ArgTypes...)>::type>
    async(Fn&& fn, ArgTypes&&... args);

template <class Fn, class... ArgTypes>
future<typename result_of<Fn(ArgTypes...)>::type>
    async(launch policy, Fn&& fn, ArgTypes&&... args);

在第二个声明中,可以指定启动策略。std::launch 是一个枚举类。

  • launch::defered:表明函数调用延迟到 wait ()get () 函数调用时才执行。
  • launch::async:表明函数在新的独立线程上执行。(这个新线程可能是从线程池中获取的,也可能是新创建的,具体取决于编译器的实现。)
  • launch::deferred | launch::asyncstd::async 的默认参数,系统会自行决定异步(创建新线程)还是同步(不创建新线程)方式运行。

基本用法如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
int foo(int a) {
  return a;
}

int main() {
  // 默认策略
  std::future<int> f = std::async(&foo, 10);

  // 新线程启动
  std::future<int> f1 = std::async(std::launch::async, []() { return 0; });

  // 延迟调用
  std::future<int> f2 = std::async(std::launch::deferred, []() { return 0; });
  
  std::println("result is: {}", f.get());
  std::println("result is: {}", f1.get());
  std::println("result is: {}", f2.get());
  return 0;
}

基本用法不做过多描述,接下来重点分析 std::async 细节部分。

std::async 策略深入分析

C++ 标准并没有明确规定 std::async 的默认策略,但大多数编译器实现(如 GCC、LLVM 和 MSVC)都选择了 std::launch::async | std::launch::deferred 作为默认策略。那么,不同平台的默认策略,最终是执行的什么策略呢?

GCC 平台

在 GCC 中,默认选项是 launch::async|launch::deferred:

1
2
3
4
5
6
7
8
9
/// async, potential overload
template<typename _Fn, typename... _Args>
  _GLIBCXX_NODISCARD inline future<__async_result_of<_Fn, _Args...>>
  async(_Fn&& __fn, _Args&&... __args)
  {
    return std::async(launch::async|launch::deferred,
    std::forward<_Fn>(__fn),
    std::forward<_Args>(__args)...);
  }

实际上,选择的策略将是 launch::async

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
 /// async
  template<typename _Fn, typename... _Args>
    _GLIBCXX_NODISCARD future<__async_result_of<_Fn, _Args...>>
    async(launch __policy, _Fn&& __fn, _Args&&... __args)
    {
      std::shared_ptr<__future_base::_State_base> __state;
      if ((__policy & launch::async) == launch::async)
 {
   __try
     {
       __state = __future_base::_S_make_async_state(
    std::thread::__make_invoker(std::forward<_Fn>(__fn),
           std::forward<_Args>(__args)...)
    );
     }
#if __cpp_exceptions
   catch(const system_error& __e)
     {
       if (__e.code() != errc::resource_unavailable_try_again
    || (__policy & launch::deferred) != launch::deferred)
  throw;
     }
#endif
 }
      if (!__state)
 {
   __state = __future_base::_S_make_deferred_state(
       std::thread::__make_invoker(std::forward<_Fn>(__fn),
       std::forward<_Args>(__args)...));
 }
      return future<__async_result_of<_Fn, _Args...>>(__state);
    }

LLVM

LLVM 对于默认选项有一个特别的启动策略 launch::any

1
2
3
4
5
6
7
8
template <class _Fp, class... _Args>
_LIBCPP_NODISCARD_AFTER_CXX17 inline _LIBCPP_INLINE_VISIBILITY
future<typename __invoke_of<typename decay<_Fp>::type, typename decay<_Args>::type...>::type>
async(_Fp&& __f, _Args&&... __args)
{
    return _VSTD::async(launch::any, _VSTD::forward<_Fp>(__f),
                                    _VSTD::forward<_Args>(__args)...);
}

实际上,就是 launch::asynclaunch::deferred 的组合。

1
2
3
4
5
6
enum class launch
{
    async = 1,
    deferred = 2,
    any = async | deferred
};

而 LLVM 实际选择的策略将是 launch::async

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
template <class _Fp, class... _Args>
_LIBCPP_NODISCARD_AFTER_CXX17
future<typename __invoke_of<typename decay<_Fp>::type, typename decay<_Args>::type...>::type>
async(launch __policy, _Fp&& __f, _Args&&... __args)
{
    typedef __async_func<typename decay<_Fp>::type, typename decay<_Args>::type...> _BF;
    typedef typename _BF::_Rp _Rp;

#ifndef _LIBCPP_NO_EXCEPTIONS
    try
    {
#endif
        if (__does_policy_contain(__policy, launch::async))
        return _VSTD::__make_async_assoc_state<_Rp>(_BF(__decay_copy(_VSTD::forward<_Fp>(__f)),
                                                     __decay_copy(_VSTD::forward<_Args>(__args))...));
#ifndef _LIBCPP_NO_EXCEPTIONS
    }
    catch ( ... ) { if (__policy == launch::async) throw ; }
#endif

    if (__does_policy_contain(__policy, launch::deferred))
        return _VSTD::__make_deferred_assoc_state<_Rp>(_BF(__decay_copy(_VSTD::forward<_Fp>(__f)),
                                                        __decay_copy(_VSTD::forward<_Args>(__args))...));
    return future<_Rp>{};
}

MSVC

对于 MSVC 的默认选项,也是 launch::async | launch::deferred

1
2
3
4
5
6
_EXPORT_STD template <class _Fty, class... _ArgTypes>
_NODISCARD_ASYNC future<_Invoke_result_t<decay_t<_Fty>, decay_t<_ArgTypes>...>> async(
    _Fty&& _Fnarg, _ArgTypes&&... _Args) {
    // manages a callable object launched with default policy
    return _STD async(launch::async | launch::deferred, _STD forward<_Fty>(_Fnarg), _STD forward<_ArgTypes>(_Args)...);
}

而选择的策略,即 launch::async

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
template <class _Ret, class _Fty>
_Associated_state<typename _P_arg_type<_Ret>::type>* _Get_associated_state(launch _Psync, _Fty&& _Fnarg) {
    // construct associated asynchronous state object for the launch type
    switch (_Psync) { // select launch type
    case launch::deferred:
        return new _Deferred_async_state<_Ret>(_STD forward<_Fty>(_Fnarg));
    case launch::async: // TRANSITION, fixed in vMajorNext, should create a new thread here
    default:
        return new _Task_async_state<_Ret>(_STD forward<_Fty>(_Fnarg));
    }
}

std::launch::async 深入分析

我们知道,std::launch::async 表明函数在新的独立线程上执行。但是,C++ 标准没有指定线程是新线程还是从线程池中重用的线程。

GCC

GCC 调用 __future_base::_S_make_async_state ,这会创建一个 _Async_state_impl 的实例。它的构造函数启动一个新的 std::thread

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Shared state created by std::async().
// Starts a new thread that runs a function and makes the shared state ready.
template<typename _BoundFn, typename _Res>
  class __future_base::_Async_state_impl final
  : public __future_base::_Async_state_commonV2
  {
  public:
    explicit
    _Async_state_impl(_BoundFn&& __fn)
    : _M_result(new _Result<_Res>()), _M_fn(std::move(__fn))
    {
  _M_thread = std::thread{ [this] {
      __try
        {
   _M_set_result(_S_task_setter(_M_result, _M_fn));
        }
      __catch (const __cxxabiv1::__forced_unwind&)
        {
   // make the shared state ready on thread cancellation
   if (static_cast<bool>(_M_result))
     this->_M_break_promise(std::move(_M_result));
   __throw_exception_again;
        }
      } };
    }

LLVM

LLVM 调用 _VSTD::__make_async_assoc_state ,同样的,也是启动一个新的 std::thread

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
template <class _Rp, class _Fp>
future<_Rp>
#ifndef _LIBCPP_HAS_NO_RVALUE_REFERENCES
__make_async_assoc_state(_Fp&& __f)
#else
__make_async_assoc_state(_Fp __f)
#endif
{
    unique_ptr<__async_assoc_state<_Rp, _Fp>, __release_shared_count>
        __h(new __async_assoc_state<_Rp, _Fp>(_VSTD::forward<_Fp>(__f)));
    _VSTD::thread(&__async_assoc_state<_Rp, _Fp>::__execute, __h.get()).detach();
    return future<_Rp>(__h.get());
}

MSVC

最有趣的地方来了!MSVC 创建了一个 _Task_async_state 的实例,它创建了一个并发任务并传递了一个可调用函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// CLASS TEMPLATE _Task_async_state
template <class _Rx>
class _Task_async_state : public _Packaged_state<_Rx()> {
    // class for managing associated synchronous state for asynchronous execution from async
public:
    using _Mybase     = _Packaged_state<_Rx()>;
    using _State_type = typename _Mybase::_State_type;

    template <class _Fty2>
    _Task_async_state(_Fty2&& _Fnarg) : _Mybase(_STD forward<_Fty2>(_Fnarg)) {
        _Task = ::Concurrency::create_task([this]() { // do it now
            this->_Call_immediate();
        });

        this->_Running = true;
    }

::Concurrency::create_task 是微软并行模式库的一部分。根据 MSDN 文档, task 类从 Windows ThreadPool 获取线程,而不是创建一个新线程。

所以这里需要注意一点,基于 ThreadPool 的实现,不能保证线程完成时会销毁 thread_local 变量。因为从线程池获取的线程不会销毁。所以,你会发现,使用了std::async后,线程并未销毁释放。这里相当于是从系统线程池借入了一个线程,会计算到用户线程数量中,而这个线程并未释放,导致的现象是使用std::async越多,线程越多。

std::async 执行的并发线程数限制为 Windows 线程池默认值,即 500 个线程。

std::async 返回的 std::future 深入分析

在 cppreference 中:

如果从 std::async 获得的 std::future 没有被移动或绑定到引用,那么在完整表达式结尾, std::future 的析构函数将阻塞到异步计算完成,实质上令如下代码同步:

1
2
std::async(std::launch::async, []{ f(); }); // 临时量的析构函数等待 f()
std::async(std::launch::async, []{ g(); }); // f() 完成前不开始

注意:以调用 std::async 以外的方式获得的 std::future 的析构函数不会阻塞。

std::async 返回的 std::future 在调用其析构函数时的行为与从 std::promise 获得的 std::future 不同。当这些 std::future 销毁时,会调用 std::future 的析构函数,会执行 wait() 函数,使得创建时生成的线程汇入主线程。

这里以 MSVC 代码为例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
~_Task_async_state() noexcept override {
    _Wait();
}

void _Wait() override { // wait for completion
    _Task.wait();
}

void WaitUntilStateChangedTo(_TaskCollectionState _State)
{
    ::std::unique_lock<::std::mutex> _Lock(_M_Cs);

    while(_M_State < _State)
    {
        _M_StateChanged.wait(_Lock);
    }
}

_Task_async_state 析构时,会调用 wait(),最终堆栈到 _M_StateChanged.wait(_Lock);,这是条件变量的wait()

不同平台实现方式不同,GCC 和 LLVM 中:

1
2
3
4
5
  ~_Async_state_impl()
  {
if (_M_thread.joinable())
  _M_thread.join();
  }

析构时,在等待线程join()执行结束。

总结

std::async 是 C++ 标准库中一个线程高级抽象工具,它简化了异步操作的实现,并使代码更加简洁。然而,由于不同编译器实现之间的差异,开发者在使用时需要谨慎考虑这些因素,以避免潜在的问题。特别需要注意 thread_local 和返回的std::future

Implementations of std::async and how they might Affect Applications | Dmitry Danilov

functions | Microsoft Learn

std::async - cppreference.com

《Asynchronous Programming with C++》

Licensed under CC BY-NC-SA 4.0