最終更新日時(UTC):
が更新

履歴 編集

customization point object
<execution>

std::execution::when_all(C++26)

namespace std::execution {
  struct when_all_t { unspecified };
  inline constexpr when_all_t when_all{};
}

概要

when_allは、複数の入力Senderが全て完了するまで待機するSenderアダプタである。

when_allは全ての入力Senderが値完了シグネチャを1個だけ持つことを要求する。 値完了シグネチャが複数存在する場合はwhen_all_with_variantアルゴリズムを利用する。

  • 入力Sender全てが値完了のとき、全ての値完了結果をtupleに結合して値完了操作を行う。
  • いずれかがエラー完了のとき、同エラー値をもってエラー完了操作を行う。このとき停止要求が作成される。
  • いずれかが停止完了のとき、停止完了操作を行う。このとき停止要求が作成される。

効果

説明用のパックsndrsに対してパックSndrsdecltype((sndrs))...としたとき、型CDcommon_type_t<decltype(get-domain-early(sndrs))...>とする。

下記いずれかがtrueとなるとき、呼び出し式when_all(sndrs...)不適格となる。

  • sizeof...(sndrs) == 0、または
  • (sender<Sndrs> && ...) == false、または
  • CD不適格

そうでなければ、呼び出し式when_all(sndrs...)は下記と等価。

transform_sender(CD(), make-sender(when_all, {}, sndrs...))

Senderアルゴリズムタグ when_all

Senderアルゴリズム動作説明用のクラステンプレートimpls-forに対して、下記の特殊化が定義される。

namespace std::execution {
  template<>
  struct impls-for<when_all_t> : default-impls {
    static constexpr auto get-attrs = see below;
    static constexpr auto get-env = see below;
    static constexpr auto get-state = see below;
    static constexpr auto start = see below;
    static constexpr auto complete = see below;
  };
}

impls-for<when_all_t>::get-attrsメンバは、下記ラムダ式と等価な関数呼び出し可能なオブジェクトで初期化される。

[](auto&&, auto&&... child) noexcept {
  if constexpr (same_as<CD, default_domain>) {
    return env<>();
  } else {
    return MAKE-ENV(get_domain, CD());
  }
}

impls-for<when_all_t>::get-envメンバは、下記ラムダ式と等価な関数呼び出し可能なオブジェクトで初期化される。

[]<class State, class Rcvr>(auto&&, State& state, const Receiver& rcvr) noexcept {
  return see below;
}

ラムダ式は下記を満たすオブジェクトeを返す。

impls-for<when_all_t>::get-stateメンバは、下記ラムダ式と等価な関数呼び出し可能なオブジェクトで初期化される。

[]<class Sndr, class Rcvr>(Sndr&& sndr, Rcvr& rcvr) noexcept(e) -> decltype(e) {
  return e;
}

ラムダ式が返す式eは下記の通り。

std::forward<Sndr>(sndr).apply(make-state<Rcvr>())

impls-for<when_all_t>::startメンバは、下記ラムダ式と等価な関数呼び出し可能なオブジェクトで初期化される。

[]<class State, class Rcvr, class... Ops>(
    State& state, Rcvr& rcvr, Ops&... ops) noexcept -> void {
  state.on_stop.emplace(
    get_stop_token(get_env(rcvr)),
    on-stop-request{state.stop_src});
  if (state.stop_src.stop_requested()) {
    state.on_stop.reset();
    set_stopped(std::move(rcvr));
  } else {
    (start(ops), ...);
  }
}

impls-for<when_all_t>::completeメンバは、下記ラムダ式と等価な関数呼び出し可能なオブジェクトで初期化される。

[]<class Index, class State, class Rcvr, class Set, class... Args>(
    this auto& complete, Index, State& state, Rcvr& rcvr, Set, Args&&... args) noexcept -> void {
  if constexpr (same_as<Set, set_error_t>) {
    if (disposition::error != state.disp.exchange(disposition::error)) {
      state.stop_src.request_stop();
      TRY-EMPLACE-ERROR(state.errors, std::forward<Args>(args)...);
    }
  } else if constexpr (same_as<Set, set_stopped_t>) {
    auto expected = disposition::started;
    if (state.disp.compare_exchange_strong(expected, disposition::stopped)) {
      state.stop_src.request_stop();
    }
  } else if constexpr (!same_as<decltype(State::values), tuple<>>) {
    if (state.disp == disposition::started) {
      auto& opt = get<Index::value>(state.values);
      TRY-EMPLACE-VALUE(complete, opt, std::forward<Args>(args)...);
    }
  }
  state.arrive(rcvr);
}

説明用の式v, eに対して、式decltype(auto(e))(e)が潜在的に例外送出するならば、TRY-EMPLACE-ERROR(v, e)を下記と等価な式とする。 そうでなければ、v.template emplace<decltype(auto(e))>(e)とする。

try {
  v.template emplace<decltype(auto(e))>(e);
} catch (...) {
  v.template emplace<exception_ptr>(current_exception());
}

説明用の式c, oおよびパックasに対して、式decayed-tuple<decltype(as)...>{as...}が潜在的に例外送出するならば、TRY-EMPLACE-VALUE(c, o, as...)を下記と等価な式とする。 そうでなければ、o.emplace(as...)とする。

try {
  o.emplace(as...);
} catch (...) {
  c(Index(), state, rcvr, set_error, current_exception());
  return;
}

説明専用エンティティ

コンセプトmax-1-sender-in

template<class Sndr, class Env>
concept max-1-sender-in = sender_in<Sndr, Env> &&  // exposition only
  (tuple_size_v<value_types_of_t<Sndr, Env, tuple, tuple>> <= 1);

列挙型disposition

enum class disposition { started, error, stopped };  // exposition only

クラステンプレートmake-state

template<class Rcvr>
struct make-state {
  template<max-1-sender-in<env_of_t<Rcvr>>... Sndrs>
  auto operator()(auto, auto, Sndrs&&... sndrs) const {
    using values_tuple = see below;
    using errors_variant = see below;
    using stop_callback = stop_callback_for_t<stop_token_of_t<env_of_t<Rcvr>>, on-stop-request>;

    struct state-type {
      void arrive(Rcvr& rcvr) noexcept {               // exposition only
        if (0 == --count) {
          complete(rcvr);
        }
      }

      void complete(Rcvr& rcvr) noexcept;              // exposition only

      atomic<size_t> count{sizeof...(sndrs)};          // exposition only
      inplace_stop_source stop_src{};                  // exposition only
      atomic<disposition> disp{disposition::started};  // exposition only
      errors_variant errors{};                         // exposition only
      values_tuple values{};                           // exposition only
      optional<stop_callback> on_stop{nullopt};        // exposition only
    };

    return state-type{};
  }
};

説明用の型copy-failを、いずれかの子Senderの値結果データのdecayコピーが潜在的に例外送出するならばexception_ptrとする。そうでなければ、未規定の空のクラス型none-suchとする。

values_tupleは、適格であるならば下記の型とする。そうでなければ、tuple<>とする。

説明用のパックEsを全ての子Senderのエラー結果データのdecayed型としたとき、型errors_variantは下記定義において重複削除した型となる。

variant<none-such, copy-fail, Es...>

メンバ関数void state-type::complete(Rcvr& rcvr) noexceptの動作は下記の通り。

  • disp == disposition::startedのとき、下記を評価する。

    auto tie = []<class... T>(tuple<T...>& t) noexcept { return tuple<T&...>(t); };
    auto set = [&](auto&... t) noexcept { set_value(std::move(rcvr), std::move(t)...); };
    
    on_stop.reset();
    apply(
      [&](auto&... opts) noexcept {
        apply(set, tuple_cat(tie(*opts)...));
      },
      values);
    

  • そうではなく、disp == disposition::errorのとき、下記を評価する。

    on_stop.reset();
    visit(
      [&]<class Error>(Error& error) noexcept {
        if constexpr (!same_as<Error, none-such>) {
          set_error(std::move(rcvr), std::move(error));
        }
      },
      errors);
    

  • それ以外のとき、下記を評価する。

    on_stop.reset();
    set_stopped(std::move(rcvr));
    

カスタマイゼーションポイント

Senderアルゴリズム構築時およびReceiver接続時に、関連付けられた実行ドメインに対してexecution::transform_sender経由でSender変換が行われる。 デフォルト実行ドメインでは無変換。

例1: 基本の使い方

#include <print>
#include <string>
#include <execution>
namespace ex = std::execution;
using namespace std::string_literals;

int main()
{
  // string型の値を送信するSender
  ex::sender auto snd1 = ex::just("C++"s);
  // (int,char)型の値を送信するSender
  ex::sender auto snd2 = ex::just(123, 'X');
  // snd1,snd2両方の完了を待機するSender
  ex::sender auto sndr = ex::when_all(snd1, snd2);

 auto result = std::this_thread::sync_wait(sndr);
  // result := optional<tuple<string,int,char>>型
  std::println("result={}", result.value());
}

出力

result=("C++", 123, 'X')

例2: 停止要求のハンドリング

#include <print>
#include <string>
#include <execution>
namespace ex = std::execution;


// MySenderは下記いずれかの完了操作を行う
//   値完了     set_value(string)
//   エラー完了 set_error(int)
//   停止完了   set_stopped()
struct MySender {
  using sender_concept = ex::sender_t;
  using completion_signatures = ex::completion_signatures<
    ex::set_value_t(std::string),
    ex::set_error_t(int),
    ex::set_stopped_t()
  >;

  template <typename Rcvr>
  struct state {
    using operation_state_concept = ex::operation_state_t;

    state(Rcvr rcvr, int val)
      : rcvr_{std::move(rcvr)}, val_{val} {}

    void start() noexcept {
      auto stok = ex::get_stop_token(ex::get_env(rcvr_));
      if (stok.stop_requested()) {
        // 接続先Receiverにおいて停止要求が行われていれば
        // 非同期操作も停止完了により早期リターンさせる
        std::println("{}: set_stopped", val_);
        ex::set_stopped(std::move(rcvr_));
        return;
      }
      // MySenderの本体処理
      if (0 <= val_) {
        // 成功: 値完了操作
        using namespace std::string_literals;
        std::println("{}: set_value", val_);
        ex::set_value(std::move(rcvr_), "Hello"s);
      } else {
        // 失敗: エラー完了操作
        std::println("{}: set_error", val_);
        ex::set_error(std::move(rcvr_), val_);
      }
    }

    Rcvr rcvr_;
    int val_;
  };

  template <typename Rcvr>
  auto connect(Rcvr rcvr) noexcept {
    return state{std::move(rcvr), val_};
  }

  int val_;
};

int main()
{
  ex::sender auto snd1 = MySender{1};  // 値完了
  ex::sender auto snd2 = MySender{-2}; // エラー完了 → 停止要求
  ex::sender auto snd3 = MySender{3};  // 停止完了
  ex::sender auto sndr = ex::when_all(snd1, snd2, snd3);
  try {
    auto result = std::this_thread::sync_wait(sndr);
    // result := optional<tuple<string,string,string>>型
    std::println("value={}", *result);
  } catch (int err) {
    std::println("error={}", err);
  }
}

出力

1: set_value
-2: set_error
3: set_stopped
error=-2

バージョン

言語

  • C++26

処理系

関連項目

参照