(495) 925-0049, ITShop интернет-магазин 229-0436, Учебный Центр 925-0049
  Главная страница Карта сайта Контакты
Поиск
Вход
Регистрация
Рассылки сайта
 
 
 
 
 

Thread-safe структуры данных .NET 4 (ч.2)

Источник: thevista

BlockingCollection<T> - эта потокобезопасная коллекция называется блокирующей, поскольку действует по следующему принципу:

  • Если коллекция пустая, и некоторый код пытается извлечь элемент, то поток, в котором он выполняется, блокируется до появления хотя бы одного элемента;
  • Если коллекция уже содержит максимальное число элементов, то поток, в котором пытаются добавить новый элемент, блокируется до тех пор, пока место не будет освобождено.

Разумеется, это далеко не полное описание её возможностей. Давайте с ними ознакомимся. Начнём с базовых вещей. Изменение содержимого коллекции производится при помощи специальных методов:

  Блокирующие методы Неблокирующие методы

Добавление
Add(T);
Add(T, CancellationToken);
TryAdd(T)
TryAdd(T, Int32)
TryAdd(T, TimeSpan)
TryAdd(T, Int32, CancellationToken)
Извлечение
Take()
Take(CancellationToken)
TryTake(out T)
TryTake(out T, Int32)
TryTake(out T, TimeSpan)
TryTake(out T, Int32, CancellationToken)

Как видно, для добавления и извлечения элементов есть в т.ч. и неблокирующие методы (те, что начинаются с префикса"Try"). Т.е. возможен неблокирующий доступ к коллекции. При этом Try* метод вернет false, если элемент невозможно добавить или извлечь, и true в противном случае. Для обоих типов методов есть перегруженные версии с маркером отмены, с помощью которого можно асинхронно прекратить ожидание.

BlockingCollection<T> имеет механизм "завершения". Коллекция считается "завершенной", когда известно, что данные в неё добавляться больше не будут. После "завершения" все попытки добавить данные приведут к генерации исключения InvalidOperationException. Если попытаться извлечь данные из пустой "завершенной" коллекции, то также будет сгенерировано исключение. Сразу же после создания коллекция является "незавершенной", а её "завершение" выполняется посредством вызова метода CompleteAdding(). Таким образом, изменение состояния коллекции выполняется вручную. Зачем был придуман такой механизм? С его помощью обеспечивается синхронизация работы производителя и потребителя, т.е. участков кода, добавляющих и извлекающих данные из коллекции. Производитель может известить о том, что новые элементы он добавлять больше не будет. При этом потребитель будет знать, что не стоит ожидать пополнения коллекции. Ниже приведен пример, демонстрирующий подобный подход.

Давайте рассмотрим немного кода, работающего с BlockingCollection<T>. Сначала создадим экземпляр коллекции, причем в конструкторе укажем её максимальную ёмкость - в данном случае это 10 элементов. Впрочем, размер можно явно не задавать, вызвав конструктор без параметров - в таком случае коллекция будет расти "неограниченно":

Код:
// создаём коллекцию
BlockingCollection<int> collection = new BlockingCollection<int>(10);

Далее в коллекцию добавим несколько элементов, и на консоль выведем свойства коллекции (о которых чуть позже):

Код:
collection.Add(200);collection.Add(300);collection.Add(400);collection.Add(500); Console.WriteLine("Count: " + collection.Count);Console.WriteLine("BoundedCapacity: " + collection.BoundedCapacity);Console.WriteLine("IsCompleted: " + collection.IsCompleted);Console.WriteLine("IsAddingCompleted: " + collection.IsAddingCompleted);Console.WriteLine();

После этого запустим таймер, в делегате callback которого происходит добавление элементов и "завершение" коллекции:

Код:
Timer timer = new Timer(delegate
{
    Console.ForegroundColor = ConsoleColor.Red;    Console.WriteLine(DateTime.Now.ToLongTimeString() + " Добавление " + " 600");    collection.Add(600);    Thread.SpinWait(300000000);     Console.ForegroundColor = ConsoleColor.Red;    Console.WriteLine(DateTime.Now.ToLongTimeString() + " Добавление " + " 700");    collection.Add(700);     collection.CompleteAdding();
},
  null, 3000, Timeout.Infinite);

Делегат, указанный при создании таймера, будет вызван в потоке, отличном от того, в котором работает метод Main() - это позволит промоделировать доступ к коллекции со стороны нескольких потоков. Дальше в цикле начинаем перебор элементов (чтобы отличить вывод от разных потоков, используется различный цвет текста):

Код:
foreach (var item in collection.GetConsumingEnumerable())
{
    Console.ForegroundColor = ConsoleColor.Gray;
    Console.WriteLine(DateTime.Now.ToLongTimeString() + " Получение " + item);
}

Затем снова выведем свойства коллекции на консоль. Вот скриншот с результатами работы программы:

Работает этот пример следующим образом. Создаётся коллекция и в неё помещается 4 элемента. После чего запускается таймер, и начинают одновременно работают 2 потока:

  • основной, где выполняется метод Main(). Здесь элементы изымаются из коллекции в цикле foreach;
  • поток таймера, где выполняется делегат callback таймера. Здесь элементы добавляются;

Те четыре элемента, которые были добавлены в самом начале, извлекаются за время менее 1 секунды. После этого основной поток блокируется, ожидая добавления новых элементов. Таймер запускается с задержкой в 3 секунды - и это видно на скриншоте: элемент 500 был извлечен в момент 06 сек., а элемент 600 был добавлен в 09 сек. В ту же 09 секунду цикл в основном потоке разблокируется, и выбирает только что добавленный элемент 600. В это время в потоке таймера создаётся искусственная задержка за счёт вызова SpinWait(). По её истечении в коллекцию добавляется элемент 700, который тут же извлекается в цикле основного потока. В заключении в потоке таймера происходит вызов collection.CompleteAdding(), который запрещает дальнейшее добавление элементов, т.е. "завершает" коллекцию. Поскольку к тому моменту коллекция пуста, и ожидание новых элементов становится бессмысленным, в основном потоке блокировка снимается и происходит выход из цикла foreach. Если закомментировать строчку с вызовом метода CompleteAdding(), то основной поток программы блокируется навсегда, ведь он будет ожидать поступления новых элементов, а поток таймера тем временем завершит работу. Т.о. выполнение "завершения" позволяет синхронизировать два параллельно работающих потока, и предотвратить подобное развитие событий.

Что касается свойств коллекции, то здесь наиболее интересны следующие:

  • IsAddingCompleted - возвращает true, если коллекция помечена как завершенная, т.е. дальнейшее добавление элементов запрещено, false в противном случае;
  • IsCompleted - возвращает true, если коллекция завершенная и все элементы выбраны, иначе false;
  • BoundedCapacity - указанная при создании емкость. Возвращает -1, если коллекция "безразмерная";

Ещё раз взглянув на скриншот, можно заметить, что в начале, когда в коллекцию добавили несколько элементов, оба логических свойства равны false, что вполне ожидаемо, ведь коллекция не "завершена". В конце работы, напротив, оба они равны true, поскольку в потоке таймера мы объявили коллекцию "завершенной" (т.е. IsAddingCompleted = true), а в основном потоке выбрали из неё все элементы (т.е. IsCompleted = true).

Возникает законный вопрос - а что с порядком элементов? Почему элементы извлекаются по принципу FIFO - это что, принцип работы коллекции? Или можно как-то на это повлиять? Ответ на эти вопросы даст более подробное рассмотрение перегруженного конструктора коллекции. В одной из его модификаций есть возможность указать ссылку на хранилище элементов - экземпляр класса, реализующего IProducerConsumerCollection<T>. Среди имеющихся в арсенале .NET 4 beta 2 средств это следующие структуры данных:

  • ConcurrentStack<T> - потокобезопасный стек;
  • ConcurrentQueue<T> - потокобезопасная очередь;
  • ConcurrentBag<T> - потокобезопасная неупорядоченная коллекция, допускающая дубликаты;

Т.е. структура данных BlockingCollection<T> реализует не хранилище, а только способ доступа с блокировкой и "завершением". Причем по умолчанию для хранения элементов используется очередь ConcurrentQueue<T>, отсюда и соблюдение принципа FIFO при работе с элементами. В рассмотренном выше примере мы можем заменить вызов конструктора следующим:

Код:
BlockingCollection<int> collection =
                 new BlockingCollection<int>(new ConcurrentStack<int>(), 10);

т.е. использовать стек. Тогда получим такой результат:

Очевидно, что порядок извлечения элементов поменялся, и теперь он соответствует LIFO, что характерно для стека. Т.о. есть возможность реализовать хранилище данных, наиболее подходящее в данном конкретном случае, и использовать его в BlockingCollection<T>. Для этого достаточно наследовать интерфейс IProducerConsumerCollection<T>:

Код:
public interface IProducerConsumerCollection<T> : IEnumerable<T>, ICollection, IEnumerable
{
    void CopyTo(T[] array, int index);
    T[] ToArray();
    bool TryAdd(T item);
    bool TryTake(out T item);
}

В заключение хочу отметить несколько любопытных статических методов класса BlockingCollection<T>. С их помощью можно добавлять или извлекать элементы, взаимодействуя с любой из коллекций, переданных в качестве параметра. Традиционно, есть 2 версии этих методов:

  • Блокирующие:
    int BlockingCollection<T>.AddToAny(BlockingCollection<T>[], T);
    int BlockingCollection<T>.TakeFromAny(BlockingCollection<T>[], out T);
  • Неблокирующие:
    int BlockingCollection<T>.TryAddToAny(BlockingCollection<T>[], T);
    int BlockingCollection<T>.TryTakeFromAny(BlockingCollection<T>[], out T);

Как видим, они принимают массив коллекций, и параметр - добавляемый элемент или out-параметр - извлекаемый. Возвращают эти методы индекс коллекции в массиве, в которую добавили данные, или, соответственно, извлекли. При этом блокирующие методы ждут появления свободного места в коллекции или нового элемента, блокируя при этом поток, в котором выполняются. Неблокирующие, в зависимости от вызванной перегруженной версии, сразу возвращают управление, либо "пытают счастья" указанный таймаут. Разумеется, в списке выше приведены не все сигнатуры методов - есть, например, принимающие маркер отмены, CancellationToken, подробнее лучше посмотреть MSDN.

Комментарии к 1-й части этого обзора побудили меня внимательнее отнестись к рассмотрению возвращаемых методами значений, и не зря. Изучая статические методы, я подумал - когда, например, TakeAnyFrom() вернёт -1? И, честно говоря, так и не смог придумать сценария. Если все коллекции массива-параметра пусты - он будет вечно ждать их пополнения или "завершения". Однако, если хотя бы одну из них завершить, получим ArgumentException. Если же положить в хотя бы одну коллекцию данные, метод вернёт её индекс и в out-параметре те самые данные. Рефлектор подсказал, что на самом деле все статические методы BlockingCollection<T> вызывают внутри один и тот же метод. Но я так и не понял, как при задании бесконечного таймаута неблокирующий метод может вернуть -1, поэтому переадресовал вопрос на форум MSDN. Для тех, кто заинтересовался, топик здесь, а вот тут соответствующая тема на MS Connect. Пока сошлись на том, что это просто ошибка в документации - копипаст с описания метода TryTakeAnyFrom().

Другие примеры использования BlockingCollection<T> можно найти на сайте MSDN Code Gallery. Там имеется страничка с исходными кодами, на которых демонстрируется использование тех или иных средств распараллеливания кода, входящих в .NET 4. На момент публикации примеры есть только для .NET 4 beta 1, но скоро опубликуют комплект и для beta 2. В составе этих примеров есть, например, набор методов-расширений, позволяющий "преобразовать" данную коллекцию в объект типа IProducerConsumerCollection<T>.

Хочу обратить внимание, что наиболее актуальная реализация рассматриваемой коллекции находится в составе .NET 4 beta 2, который стал доступен на днях. Если возникнет желание опробовать её в деле, рекомендую установить Visual Studio 2010 beta 2. По сравнению с прошлогодней июньской CTP-версией, работающей под .NET 3.5, некоторые методы были переименованы. Впрочем, это касается всей библиотеки Parallel Extensions. Полное описание BlockingCollection<T> можно найти в соответствующем разделе MSDN.

Ссылки по теме


 Распечатать »
 Правила публикации »
  Написать редактору 
 Рекомендовать » Дата публикации: 12.01.2010 
 

Магазин программного обеспечения   WWW.ITSHOP.RU
Microsoft Office 365 для Дома 32-bit/x64. 5 ПК/Mac + 5 Планшетов + 5 Телефонов. Подписка на 1 год.
Microsoft Office 365 Персональный 32-bit/x64. 1 ПК/MAC + 1 Планшет + 1 Телефон. Все языки. Подписка на 1 год.
Microsoft Windows Professional 10, Электронный ключ
Microsoft 365 Business Basic (corporate)
Microsoft 365 Business Standard (corporate)
 
Другие предложения...
 
Курсы обучения   WWW.ITSHOP.RU
 
Другие предложения...
 
Магазин сертификационных экзаменов   WWW.ITSHOP.RU
 
Другие предложения...
 
3D Принтеры | 3D Печать   WWW.ITSHOP.RU
 
Другие предложения...
 
Новости по теме
 
Рассылки Subscribe.ru
Информационные технологии: CASE, RAD, ERP, OLAP
Безопасность компьютерных сетей и защита информации
Новости ITShop.ru - ПО, книги, документация, курсы обучения
Программирование на Microsoft Access
CASE-технологии
Один день системного администратора
Новые программы для Windows
 
Статьи по теме
 
Новинки каталога Download
 
Исходники
 
Документация
 
 



    
rambler's top100 Rambler's Top100