Да.
Проблемы
Решения
Проблемы
Решения
Проблемы
Решения
Writing concurrent programs has a reputation for being exotic and difficult. I believe it is neither. You need a system that provides you with good primitives and suitable libraries, you need basic caution and carefulness, you need an armory of useful techniques, and you need to know of the common pitfalls.
Andrew Birrell
Futures: универсальная абстракция
Что это такоe?
Futures представляют собой результат выполнения асинхронных операций, которые компонуются, чтобы реализовать асинхронное ожидание
Promise<int> p;
Future<int> f = p.MakeFuture();
// Thread #1:
p.Set(10);
// Thread #2:
f.Then([] (int x) { return x * 10; }).Get(); // == 100
Что же в STL?
template <typename T>
class Promise {
void Set(T value) {
std::lock_guard guard{shared_state->m};
shared_state->result = value;
shared_state->cv.notify_all();
}
std::shared_ptr<State<T>> shared_state;
};
template <typename T>
class Future {
T Get() const {
std::unique_lock lock{shared_state->m};
shared_state->cv.wait(lock, [this] {
return shared_state->result.has_value();
});
return *shared_state->result;
}
std::shared_ptr<State<T>> shared_state;
};
template <typename T>
struct State {
std::mutex m;
std::condition_variable cv;
std::optional<T> result;
};
Что же в STL?
void State<T>::SetResult(T value) {
result = std::move(value);
auto old_flag = flag.exchange(HasResult);
if (old_flag == HasCallback) {
callback(std::move(*result));
}
}
void State<T>::SetCallback(Callback functor) {
callback = std::move(functor);
auto old_flag = _flag.exchange(HasCallback);
if (old_flag == HasResult) {
callback(std::move(*result));
}
}
template <typename T>
struct State {
enum Flag {
Empty,
HasResult,
HasCallback,
};
std::atomic<Flag> flag;
Callback callback;
Result<T> result;
};
Как сделать лучше?
mutex ⇒ wait-free
Как сделать Get с помощью Then?
template <typename T>
struct Future {
std::shared_ptr<State<T>> shared_state;
T Get() && {
Result<T> result;
Event event;
shared_state->SetCallback([&] (T value) {
result = std::move(value);
event.Notify();
});
event.Wait();
return *result;
}
};
struct Event {
std::mutex m;
std::condition_variable cv;
bool ready{false};
};
void Event::Notify() {
std::lock_guard guard{m};
ready = true;
cv.notify_all();
}
void Event::Wait() {
std::unique_lock lock{m};
while (!ready) { cv.wait(lock); }
}
Комбинаторы
WhenAll — возвращает Future, которая будет заполнена всеми результатами переданных Future, после их заполнения
Future<int> f1 = /* get one key from database */;
Future<double> f2 = /* get other key from database */;
Future<std::tuple<int, double>> accumulate = WhenAll(f1, f2);
Future<void> result = accumulate.Then([] (int x, double y) {
/* do something with x and y */
});
Комбинаторы
Комбинаторы
WhenAny — возвращает Future, которая будет заполнена результатом первой заполненной из переданных Future
Future<Response> f1 = /* request to first database shard */;
Future<Response> f2 = /* request to second database shard */;
Future<Response> first_of = WhenAny(f1, f2);
Future<void> result = first_of.Then([] (Response response) {
return /* make response for user */
});
Комбинаторы
Общий паттерн написания комбинаторов:
template <typename T>
Future<Buffer<T>> WhenSome(Container<Future<T>> futures) {
auto combinator_ptr = /* make combinator from futures */;
for (auto& future : futures) {
future.Then([combintator_ptr] (T value) {
combintator_ptr->Add(std::move(value));
});
}
return combinator_ptr->promise.MakeFuture();
}
template <typename T>
struct Combinator {
Buffer<T> buffer;
Promise<Buffer<T>> promise;
};
void Combinator<T>::Add(T value) {
buffer.add(std::move(value));
if (buffer.size() == n/2 + 1) {
promise.Set(buffer);
}
}
Unwrapping
Иногда необходимо вернуть из одной асинхронной функции результат другой асинхронной функции
Мы хотим не блокировать executor во время ожидания выполнения внутренней функции
auto tp_output = yaclib::MakeThreadPool(/*threads=*/1);
auto tp_compute = yaclib::MakeThreadPool(/*threads=CPU cores*/);
auto future = yaclib::Run(tp_output, [] {
std::cout << "Outer IO task" << std::endl;
return yaclib::Run(tp_compute, [] {
return 42; // Inner CPU task
});
});
// type of `future`: Future<Future<int>> vs Future<int>?
// type of `future` is Future<int>
future.Then(/*tp_compute*/ [](int result) {
return result * 13;
});
// type of `future` is Future<Future<int>>
future.Then(/*tp_compute*/ [](Future<int> other) {
auto result = other.Get(); // blocking!
return result * 13;
});
Unwrapping
template <typename U>
Future<U> Future<T>::Then(Functor<Future<U>(T)> func) {
auto [future, promise] = MakeContract<U>()
shared_state->SetCallback([promise, func] (T val) {
Future<U> future = func(val);
future.Then([promise] (U result) {
promise.Set(result);
});
});
return future;
}
Unwrapping
Некоторые относятся к Future с предубеждением, считают, что Future — не zero-cost абстракция
Посмотрим как их можно оптимизировать, чтобы они стали практически бесплатными
Одна аллокация вместо трех
Then(executor, functor) — делает до трех аллокаций:
Мы хотим научить functor сетить свой результат в Promise без дополнительных аллокаций
Strand — это...
Strand — это экзекутор, который позволяет сериализовать исполнение коллбеков поверх другого экзекутора, не блокируя его исполнение
Благодаря интрузивности задач можно написать его очень эффективную реализацию
Strand — это...
auto tp = yaclib::MakeThreadPool(/*threads=CPU Cores*/);
auto strand = yaclib::MakeStrand(tp);
LRUCache<K, Future<V>> global_cache;
for (auto user : users) {
auto future = yaclib::Run(tp, [&] -> K {
return GetRequest(user);
});
future = future.Then(strand, [&] (K key) {
if (global_cache.has(key)) {
return global_cache[key];
}
return global_cache[key] = yaclib::Run(tp, [] (K key) {
return ComputeValue(key);
});
});
future.Then(tp, [&] (V val) { MakeResponse(user, val); });
}
Strand — идея реализации
// MPSC lock-free queue
template <typename T>
struct StrandTaskQueue {
struct TaskNode {
T data;
TaskNode* next;
};
std::atomic<TaskNode*> head{};
};
void StrandTaskQueue<T>::Push(TaskNode* node) {
node->next = head.load(std::memory_order_relaxed);
while (!head.compare_exchange_weak(
node->next,
node,
std::memory_order_release,
std::memory_order_relaxed)) {
}
}
TaskNode* StrandTaskQueue<T>::TakeAll() {
return Reverse(
head.exchange(nullptr,
std::memory_order_acquire)
);
}
Не рассмотренные
Алгоритм | Рассмотрено | YACLib |
---|
Wait N Futures |
N аллокаций N ожиданий |
0 аллокаций 1 ожидание |
Комбинатор N Futures |
2 + N аллокаций |
2 аллокации |
N продолжений (...Then().Then()...) |
N аллокаций N синхронизаций |
0-1 аллокация 0-1 синхронизация |
— Написать библиотеку Future легко?
— К сожалению, нет:
Контакты
Ссылки
Во всем докладе речь пойдет исключительно о языке C++
Несмотря на это часть утверждений может быть верна и для других языков
На слайдах представлен псевдокод
Реальный код можно посмотреть в библиотеке YACLib
Зависят от того что мы хотим писать
Библиотека
Типичные особенности
Библиотека
Сомнительные практики
Библиотека
Хорошие практики
Библиотека
Итог
Переложить паралеллизацию кода на пользователя
Предложить пользователю интерфейсы и дефолтную реализацию
Скрывать от пользователя то, что библиотека порождает конкуренцию
Futures — это отличный компромисс!
Сервер
Типичные особенности
Сервер
Итог
Из-за малого количества ограничений нам подходит множество абстракций
Каждая из них хороша по своему, поэтому выбор подхода зависит от проекта
База данных
Типичные особенности
Клиентские приложения
Типичные особенности
Базы данных & клиентские приложения
Итог
Wait
Комбинаторы
Комбинатор может является shared функтором для всех вызовов Then, так как эти вызовы последние и могут быть выполнены сразу (dispatch), без перепланирования