Погружение в Futures

Валерий Миронов

Кто я такой?

  • Software Engineer в ArangoDB
  • ex Software Engineer в
    • VK Team (ex Mail.ru Group)
    • Wärtsilä (ex Transas)
  • Tech Lead в open-source проекте YACLib
  • Член open-source сообщества

Писать конкуретный код сложно?

  • Корректность
  • Производительность
  • Алгоритмы
  • Абстракции

Да.

Корректность

Проблемы

  • Data race
  • Race condition
    • Недостаточно строгие memory_order
    • Проблема ABA
    • ...
  • Priority inversion
  • Deadlock и Livelock
  • Время жизни объектов

Корректность

Решения

  • Тесты
  • Динамические проверки
    • Google Sanitizers, Valgrind
  • Статические проверки
    • clang-tidy, cppcheck
  • Fault Injection
  • Fuzzing
  • Формальная верификация

Производительность

Проблемы

  • Конкуренция за hardware ресурсы
    • NUMA nodes, CPU cores
    • Caches, Pages
    • ...
  • Конкуренция за software ресурсы
    • ​Планировщик OC, квант времени
    • Аллокатор
    • Примитивы синхронизации
    • ...

Производительность

Решения

  • Не шарить данные и ресурсы
  • Переиспользование системных ресурсов
    • Thread pool, thread local arena
  • Корутины / user-level потоки
    • Неблокирующее ожидание
  • Асинхронность
  • Кооперативность
  • Цикл: померял, оптимизировал

Алгоритмы

Проблемы

  • Работа плохо разделяется
  • Данные нужно шарить
  • Алгоритм плохо работает на железе
  • Неизвестная или изменчивая нагрузка
    • Readers vs Writers
    • Producers vs Consumers

Алгоритмы

Решения

  • Другое представление данных
  • Новый алгоритм
  • Иная точка параллелизации
  • Смириться и использовать последовательный код (можно заняться его оптимизацией)

Абстракции

Writing concurrent programs has a reputation for being exotic and difficult. I believe it is neither. You need a system that provides you with good primitives and suitable libraries, you need basic caution and carefulness, you need an armory of useful techniques, and you need to know of the common pitfalls.

Andrew Birrell

Абстракции

Futures: универсальная абстракция

  • Многие крупные библиотеки предоставляют Futures: Folly, Boost, HPX
  • Библиотеки для RPC часто используют Futures (например, Cap'n Proto)
  • ScyllaDB и Ceph используют фреймворк Seastar, основанный на Futures
  • Многие базы данных пользуются Futures: MongoDB, ArangoDB
  • Chromium как и многие другие клиентские приложения использует аналог Future

Future

Что это такоe?

Futures представляют собой результат выполнения асинхронных операций, которые компонуются, чтобы реализовать асинхронное ожидание

Promise<int> p;
Future<int> f = p.MakeFuture();
// Thread #1:
p.Set(10);
// Thread #2:
f.Then([] (int x) { return x * 10; }).Get(); // == 100

Future

#include <future>

  • Отсутствует Then
  • Отсутствует возможность комбинации
  • Неэффективная реализация блокирующая потоки

Что же в STL?

template <typename T>
class Promise {
  void Set(T value) {
    std::lock_guard guard{shared_state->m};
    shared_state->result = value;
    shared_state->cv.notify_all();
  }

  std::shared_ptr<State<T>> shared_state;
};
template <typename T>
class Future {
  T Get() const {
    std::unique_lock lock{shared_state->m};
    shared_state->cv.wait(lock, [this] {
      return shared_state->result.has_value();
    });
    return *shared_state->result;
  }

  std::shared_ptr<State<T>> shared_state;
};
template <typename T>
struct State {
  std::mutex m;
  std::condition_variable cv;
  std::optional<T> result;
};

Future

Что же в STL?

void State<T>::SetResult(T value) {
  result = std::move(value);
  auto old_flag = flag.exchange(HasResult);
  if (old_flag == HasCallback) {
    callback(std::move(*result));
  }
}
void State<T>::SetCallback(Callback functor) {
  callback = std::move(functor);
  auto old_flag = _flag.exchange(HasCallback);
  if (old_flag == HasResult) {
    callback(std::move(*result));
  }
}
template <typename T>
struct State {
  enum Flag {
    Empty,
    HasResult,
    HasCallback,
  };

  std::atomic<Flag> flag;
  Callback callback;
  Result<T> result;
};

Future

Как сделать лучше?

mutex ⇒ wait-free

Future

Как сделать Get с помощью Then?

template <typename T>
struct Future {
  std::shared_ptr<State<T>> shared_state;

  T Get() && {
    Result<T> result;
    Event event;
    shared_state->SetCallback([&] (T value) {
      result = std::move(value);
      event.Notify();
    });
    event.Wait();
    return *result;
  }
};
struct Event {
  std::mutex m;
  std::condition_variable cv;
  bool ready{false};
};
void Event::Notify() {
  std::lock_guard guard{m};
  ready = true;
  cv.notify_all();
}
void Event::Wait() {
  std::unique_lock lock{m};
  while (!ready) { cv.wait(lock); }
}

Future

Future

Комбинаторы

WhenAll — возвращает Future, которая будет заполнена всеми результатами переданных Future, после их заполнения

Future<int> f1 = /* get one key from database */;
Future<double> f2 =  /* get other key from database */;

Future<std::tuple<int, double>> accumulate = WhenAll(f1, f2);
Future<void> result = accumulate.Then([] (int x, double y) {
  /* do something with x and y */
});

Future

Комбинаторы

Future

Комбинаторы

WhenAny — возвращает Future, которая будет заполнена результатом первой заполненной из переданных Future

Future<Response> f1 = /* request to first database shard */;
Future<Response> f2 = /* request to second database shard */;

Future<Response> first_of = WhenAny(f1, f2);
Future<void> result = first_of.Then([] (Response response) {
  return /* make response for user */
});

Future

Комбинаторы

Общий паттерн написания комбинаторов:

template <typename T>
Future<Buffer<T>> WhenSome(Container<Future<T>> futures) {
  auto combinator_ptr = /* make combinator from futures */;
  for (auto& future : futures) {
    future.Then([combintator_ptr] (T value) {
      combintator_ptr->Add(std::move(value));
    });
  }
  return combinator_ptr->promise.MakeFuture();
}
template <typename T>
struct Combinator {
  Buffer<T> buffer; 
  Promise<Buffer<T>> promise;
};
void Combinator<T>::Add(T value) {
  buffer.add(std::move(value));
  if (buffer.size() == n/2 + 1) {
    promise.Set(buffer);
  }
}

Future

Unwrapping

Иногда необходимо вернуть из одной асинхронной функции результат другой асинхронной функции

 

Мы хотим не блокировать executor во время ожидания выполнения внутренней функции

auto tp_output = yaclib::MakeThreadPool(/*threads=*/1);
auto tp_compute = yaclib::MakeThreadPool(/*threads=CPU cores*/);
auto future = yaclib::Run(tp_output, [] {
  std::cout << "Outer IO task" << std::endl;
  return yaclib::Run(tp_compute, [] {
    return 42; // Inner CPU task
  });
});

// type of `future`: Future<Future<int>> vs Future<int>?
// type of `future` is Future<int>
future.Then(/*tp_compute*/ [](int result) {
  return result * 13;
});
// type of `future` is Future<Future<int>>
future.Then(/*tp_compute*/ [](Future<int> other) {
  auto result = other.Get(); // blocking!
  return result * 13;
});

Future

Unwrapping

template <typename U>
Future<U> Future<T>::Then(Functor<Future<U>(T)> func) {
  auto [future, promise] = MakeContract<U>()
  shared_state->SetCallback([promise, func] (T val) {
    Future<U> future = func(val);
    future.Then([promise] (U result) {
      promise.Set(result);    
    });
  });
  return future;
}

Future

Unwrapping

Оптимизации Future

Некоторые относятся к Future с предубеждением, считают, что Future — не zero-cost абстракция


Посмотрим как их можно оптимизировать, чтобы они стали практически бесплатными

Оптимизации Future

Одна аллокация вместо трех

Then(executor, functor) делает до трех аллокаций:

  • Аллокация на shared state Future-Promise
  • Аллокация на type erasure для executor
  • Аллокация на очередь задач в executor

Мы хотим научить functor сетить свой результат в Promise без дополнительных аллокаций

Оптимизации Future

Strand — это...

Strand — это экзекутор, который позволяет сериализовать исполнение коллбеков поверх другого экзекутора, не блокируя его исполнение

Благодаря интрузивности задач можно написать его очень эффективную реализацию

Оптимизации Future

Strand — это...

auto tp = yaclib::MakeThreadPool(/*threads=CPU Cores*/);
auto strand = yaclib::MakeStrand(tp);
LRUCache<K, Future<V>> global_cache;

for (auto user : users) {
  auto future = yaclib::Run(tp, [&] -> K {
    return GetRequest(user);
  });
  future = future.Then(strand, [&] (K key) {
    if (global_cache.has(key)) {
      return global_cache[key];
    }
    return global_cache[key] = yaclib::Run(tp, [] (K key) {
      return ComputeValue(key);
    });
  });
  future.Then(tp, [&] (V val) { MakeResponse(user, val); });
}

Оптимизации Future

Strand — идея реализации

// MPSC lock-free queue
template <typename T>
struct StrandTaskQueue {
  struct TaskNode {
    T data;
    TaskNode* next;
  };
  std::atomic<TaskNode*> head{};
};
void StrandTaskQueue<T>::Push(TaskNode* node) {
  node->next = head.load(std::memory_order_relaxed);
  while (!head.compare_exchange_weak(
                  node->next,
                  node,
                  std::memory_order_release,
                  std::memory_order_relaxed)) {
  }
}
TaskNode* StrandTaskQueue<T>::TakeAll() {
  return Reverse(
           head.exchange(nullptr,
                         std::memory_order_acquire)
         );
}

Оптимизации Future

Не рассмотренные

Алгоритм Рассмотрено YACLib
Wait N Futures
 
N аллокаций
N ожиданий
0 аллокаций
1 ожидание
Комбинатор N Futures
2 + N аллокаций
 
2 аллокации
 
N продолжений (...Then().Then()...)
N аллокаций
N синхронизаций
0-1 аллокация
0-1 синхронизация

Зачем мы написали YACLib?

 

Написать библиотеку Future легко?

— К сожалению, нет:

  • Zero-cost abstraction
  • Easy to use
  • Easy to build
  • Good test coverage

Планы по развитию YACLib

  • Уже сейчас:
    • можно использовать в Production
    • бенчмарки из Folly работают быстрее чем в Folly
  • В марте:
    • окончательная стабилизация интерфейсов
    • deterministic testing framework с fault injections
  • В июне:
    • финальный релиз Fibers

Вопросы

 

Спасибо за внимание!

Контакты

Предупреждение!

 

Во всем докладе речь пойдет исключительно о языке C++

Несмотря на это часть утверждений может быть верна и для других языков

 

На слайдах представлен псевдокод

Реальный код можно посмотреть в библиотеке YACLib

Абстракции

Зависят от того что мы хотим писать

  • Библиотеку
  • Сервер
    • Stateless
    • Stateful
  • Базу данных
  • Клиентское приложение
    • Desktop
    • Mobile
    • Web

Абстракции

Библиотека

Типичные особенности

  • Отсутствие информации о том где и как будет использоваться
  • Минимизация числа зависимостей
  • Кроссплатформенность
  • Возможность настройки

Абстракции

Библиотека

 Сомнительные практики

  • Глобальные состояния
  • Thread Local состояния
  • Внутренняя синхронизация
  • Создание потоков, которые не видны пользователю

Абстракции

Библиотека

Хорошие практики

  • Stateless
  • Thread Safe интерфейсы
  • Внешняя синхронизация
  • Интерфейсы для ThreadPool, которые пользователь может переопределить

Абстракции

Библиотека

Итог

Переложить паралеллизацию кода на пользователя

Предложить пользователю интерфейсы и дефолтную реализацию

Скрывать от пользователя то, что библиотека порождает конкуренцию

Futures — это отличный компромисс!

Абстракции

Сервер

Типичные особенности

  • Изначально проектируем конкурентную систему
  • Большое число клиентов
  • Знаем где и как будем использоваться 
  • Если stateless, отсутствие разделяемых данных
  • Производительность ограничена IO

Абстракции

Сервер

Итог

Из-за малого количества ограничений нам подходит множество абстракций

  • Callbacks / Futures / Tasks
  • Coroutines / Fibers (User-Level Threads)
  • Actor Model

Каждая из них хороша по своему, поэтому выбор подхода зависит от проекта

Абстракции

База данных

Типичные особенности

  • Сложно параллелить
  • Как IO, так и CPU нагрузка
  • Может владеть железкой
  • Является узким местом для других систем

Абстракции

Клиентские приложения

Типичные особенности

  • Сложно параллелить + изначально однопоточны
  • Как IO, так и CPU нагрузка
  • Не владеют железкой
  • Является конечной системой

Абстракции

Базы данных & клиентские приложения

Итог

  • Даже в системе, в которой используются Fibers или Actor Model, сложно обойтись без Futures:
    • При сетевом взаимодействии часто используются RPC, и для их обработки удобнее всего использовать Futures
  • Клиентские приложения изначально однопоточны. Поэтому впоследствии использовать абстракции вроде Fibers или Actor Model — сложно

Оптимизации Future

Wait

  • Ждать не одну Future, а произвольное количество
    • Добавить счетчик к коллбеку, нотифицировать когда он станет нулем и ждать этого нуля

 

  • Положить функтор, который нотифицирует о заполненности Future на стек, и передавать ссылку на него (после нужно не забыть удалить эту ссылку из Future)

Оптимизации Future

Комбинаторы

Комбинатор может является shared функтором для всех вызовов Then, так как эти вызовы последние и могут быть выполнены сразу (dispatch), без перепланирования