namespace std::execution {
struct when_all_t { unspecified };
inline constexpr when_all_t when_all{};
}
概要
when_all
は、複数の入力Senderが全て完了するまで待機するSenderアダプタである。
when_all
は全ての入力Senderが値完了シグネチャを1個だけ持つことを要求する。
値完了シグネチャが複数存在する場合はwhen_all_with_variant
アルゴリズムを利用する。
- 入力Sender全てが値完了のとき、全ての値完了結果を
tuple
に結合して値完了操作を行う。 - いずれかがエラー完了のとき、同エラー値をもってエラー完了操作を行う。このとき停止要求が作成される。
- いずれかが停止完了のとき、停止完了操作を行う。このとき停止要求が作成される。
効果
説明用のパックsndrs
に対してパックSndrs
をdecltype((sndrs))...
としたとき、型CD
をcommon_type_t<decltype(get-domain-early(sndrs))...>
とする。
下記いずれかがtrue
となるとき、呼び出し式when_all(sndrs...)
は不適格となる。
そうでなければ、呼び出し式when_all(sndrs...)
は下記と等価。
transform_sender(CD(), make-sender(when_all, {}, sndrs...))
Senderアルゴリズムタグ when_all
Senderアルゴリズム動作説明用のクラステンプレートimpls-for
に対して、下記の特殊化が定義される。
namespace std::execution {
template<>
struct impls-for<when_all_t> : default-impls {
static constexpr auto get-attrs = see below;
static constexpr auto get-env = see below;
static constexpr auto get-state = see below;
static constexpr auto start = see below;
static constexpr auto complete = see below;
};
}
impls-for<when_all_t>::get-attrs
メンバは、下記ラムダ式と等価な関数呼び出し可能なオブジェクトで初期化される。
[](auto&&, auto&&... child) noexcept {
if constexpr (same_as<CD, default_domain>) {
return env<>();
} else {
return MAKE-ENV(get_domain, CD());
}
}
impls-for<when_all_t>::get-env
メンバは、下記ラムダ式と等価な関数呼び出し可能なオブジェクトで初期化される。
[]<class State, class Rcvr>(auto&&, State& state, const Receiver& rcvr) noexcept {
return see below;
}
ラムダ式は下記を満たすオブジェクトe
を返す。
decltype(e)
がqueryable
のモデル、かつ- 式
e.query(get_stop_token)
がstate.stop-src.get_token()
と等価、かつ get_stop_token
以外のクエリオブジェクトq
に対して、式e.query(q)
はget_env(rcvr).query(q)
と等価。
impls-for<when_all_t>::get-state
メンバは、下記ラムダ式と等価な関数呼び出し可能なオブジェクトで初期化される。
[]<class Sndr, class Rcvr>(Sndr&& sndr, Rcvr& rcvr) noexcept(e) -> decltype(e) {
return e;
}
ラムダ式が返す式e
は下記の通り。
std::forward<Sndr>(sndr).apply(make-state<Rcvr>())
impls-for<when_all_t>::start
メンバは、下記ラムダ式と等価な関数呼び出し可能なオブジェクトで初期化される。
[]<class State, class Rcvr, class... Ops>(
State& state, Rcvr& rcvr, Ops&... ops) noexcept -> void {
state.on_stop.emplace(
get_stop_token(get_env(rcvr)),
on-stop-request{state.stop_src});
if (state.stop_src.stop_requested()) {
state.on_stop.reset();
set_stopped(std::move(rcvr));
} else {
(start(ops), ...);
}
}
impls-for<when_all_t>::complete
メンバは、下記ラムダ式と等価な関数呼び出し可能なオブジェクトで初期化される。
[]<class Index, class State, class Rcvr, class Set, class... Args>(
this auto& complete, Index, State& state, Rcvr& rcvr, Set, Args&&... args) noexcept -> void {
if constexpr (same_as<Set, set_error_t>) {
if (disposition::error != state.disp.exchange(disposition::error)) {
state.stop_src.request_stop();
TRY-EMPLACE-ERROR(state.errors, std::forward<Args>(args)...);
}
} else if constexpr (same_as<Set, set_stopped_t>) {
auto expected = disposition::started;
if (state.disp.compare_exchange_strong(expected, disposition::stopped)) {
state.stop_src.request_stop();
}
} else if constexpr (!same_as<decltype(State::values), tuple<>>) {
if (state.disp == disposition::started) {
auto& opt = get<Index::value>(state.values);
TRY-EMPLACE-VALUE(complete, opt, std::forward<Args>(args)...);
}
}
state.arrive(rcvr);
}
説明用の式v
, e
に対して、式decltype(auto(e))(e)
が潜在的に例外送出するならば、TRY-EMPLACE-ERROR(v, e)
を下記と等価な式とする。
そうでなければ、v.template emplace<decltype(auto(e))>(e)
とする。
try {
v.template emplace<decltype(auto(e))>(e);
} catch (...) {
v.template emplace<exception_ptr>(current_exception());
}
説明用の式c
, o
およびパックas
に対して、式decayed-tuple<decltype(as)...>{as...}
が潜在的に例外送出するならば、TRY-EMPLACE-VALUE(c, o, as...)
を下記と等価な式とする。
そうでなければ、o.emplace(as...)
とする。
try {
o.emplace(as...);
} catch (...) {
c(Index(), state, rcvr, set_error, current_exception());
return;
}
説明専用エンティティ
コンセプトmax-1-sender-in
template<class Sndr, class Env>
concept max-1-sender-in = sender_in<Sndr, Env> && // exposition only
(tuple_size_v<value_types_of_t<Sndr, Env, tuple, tuple>> <= 1);
列挙型disposition
enum class disposition { started, error, stopped }; // exposition only
クラステンプレートmake-state
template<class Rcvr>
struct make-state {
template<max-1-sender-in<env_of_t<Rcvr>>... Sndrs>
auto operator()(auto, auto, Sndrs&&... sndrs) const {
using values_tuple = see below;
using errors_variant = see below;
using stop_callback = stop_callback_for_t<stop_token_of_t<env_of_t<Rcvr>>, on-stop-request>;
struct state-type {
void arrive(Rcvr& rcvr) noexcept { // exposition only
if (0 == --count) {
complete(rcvr);
}
}
void complete(Rcvr& rcvr) noexcept; // exposition only
atomic<size_t> count{sizeof...(sndrs)}; // exposition only
inplace_stop_source stop_src{}; // exposition only
atomic<disposition> disp{disposition::started}; // exposition only
errors_variant errors{}; // exposition only
values_tuple values{}; // exposition only
optional<stop_callback> on_stop{nullopt}; // exposition only
};
return state-type{};
}
};
説明用の型copy-fail
を、いずれかの子Senderの値結果データのdecayコピーが潜在的に例外送出するならばexception_ptr
とする。そうでなければ、未規定の空のクラス型none-such
とする。
型values_tuple
は、適格であるならば下記の型とする。そうでなければ、tuple<>
とする。
tuple<value_types_of_t<Sndrs, env_of_t<Rcvr>, decayed-tuple, optional>...>
説明用のパックEs
を全ての子Senderのエラー結果データのdecayed型としたとき、型errors_variant
は下記定義において重複削除した型となる。
variant<none-such, copy-fail, Es...>
メンバ関数void state-type::complete(Rcvr& rcvr) noexcept
の動作は下記の通り。
-
disp == disposition::started
のとき、下記を評価する。 -
そうではなく、
disp == disposition::error
のとき、下記を評価する。 -
それ以外のとき、下記を評価する。
on_stop.reset(); set_stopped(std::move(rcvr));
カスタマイゼーションポイント
Senderアルゴリズム構築時およびReceiver接続時に、関連付けられた実行ドメインに対してexecution::transform_sender
経由でSender変換が行われる。
デフォルト実行ドメインでは無変換。
例
例1: 基本の使い方
#include <print>
#include <string>
#include <execution>
namespace ex = std::execution;
using namespace std::string_literals;
int main()
{
// string型の値を送信するSender
ex::sender auto snd1 = ex::just("C++"s);
// (int,char)型の値を送信するSender
ex::sender auto snd2 = ex::just(123, 'X');
// snd1,snd2両方の完了を待機するSender
ex::sender auto sndr = ex::when_all(snd1, snd2);
auto result = std::this_thread::sync_wait(sndr);
// result := optional<tuple<string,int,char>>型
std::println("result={}", result.value());
}
出力
result=("C++", 123, 'X')
例2: 停止要求のハンドリング
#include <print>
#include <string>
#include <execution>
namespace ex = std::execution;
// MySenderは下記いずれかの完了操作を行う
// 値完了 set_value(string)
// エラー完了 set_error(int)
// 停止完了 set_stopped()
struct MySender {
using sender_concept = ex::sender_t;
using completion_signatures = ex::completion_signatures<
ex::set_value_t(std::string),
ex::set_error_t(int),
ex::set_stopped_t()
>;
template <typename Rcvr>
struct state {
using operation_state_concept = ex::operation_state_t;
state(Rcvr rcvr, int val)
: rcvr_{std::move(rcvr)}, val_{val} {}
void start() noexcept {
auto stok = ex::get_stop_token(ex::get_env(rcvr_));
if (stok.stop_requested()) {
// 接続先Receiverにおいて停止要求が行われていれば
// 非同期操作も停止完了により早期リターンさせる
std::println("{}: set_stopped", val_);
ex::set_stopped(std::move(rcvr_));
return;
}
// MySenderの本体処理
if (0 <= val_) {
// 成功: 値完了操作
using namespace std::string_literals;
std::println("{}: set_value", val_);
ex::set_value(std::move(rcvr_), "Hello"s);
} else {
// 失敗: エラー完了操作
std::println("{}: set_error", val_);
ex::set_error(std::move(rcvr_), val_);
}
}
Rcvr rcvr_;
int val_;
};
template <typename Rcvr>
auto connect(Rcvr rcvr) noexcept {
return state{std::move(rcvr), val_};
}
int val_;
};
int main()
{
ex::sender auto snd1 = MySender{1}; // 値完了
ex::sender auto snd2 = MySender{-2}; // エラー完了 → 停止要求
ex::sender auto snd3 = MySender{3}; // 停止完了
ex::sender auto sndr = ex::when_all(snd1, snd2, snd3);
try {
auto result = std::this_thread::sync_wait(sndr);
// result := optional<tuple<string,string,string>>型
std::println("value={}", *result);
} catch (int err) {
std::println("error={}", err);
}
}
出力
1: set_value
-2: set_error
3: set_stopped
error=-2
バージョン
言語
- C++26
処理系
- Clang: ??
- GCC: ??
- ICC: ??
- Visual C++: ??