Проблема производителя и потребителя - Producer–consumer problem

В вычисление, то проблема производитель – потребитель[1][2] (также известный как проблема ограниченного буфера) является классическим примером мульти-процесс синхронизация проблема, первая версия которой была предложена Эдсгер В. Дейкстра в 1965 г. в его неопубликованной рукописи, [3] (в котором буфер был неограниченным) и впоследствии опубликован с ограниченным буфером в 1972 году.[4] В первой версии проблемы есть два циклических процесса, производитель и потребитель, которые имеют общий фиксированный размер. буфер используется как очередь. Производитель многократно генерирует данные и записывает их в буфер. Потребитель многократно считывает данные в буфере, удаляя их в процессе чтения и используя эти данные каким-либо образом. В первой версии проблемы с неограниченным буфером проблема состоит в том, как спроектировать код производителя и потребителя так, чтобы при их обмене данными данные не терялись или не дублировались, данные считывались потребителем в том порядке, в каком они написано производителем, и оба процесса достигают максимального прогресса. В более поздней формулировке проблемы Дейкстра предложил несколько производителей и потребителей, совместно использующих ограниченный набор буферов. Это добавило дополнительную проблему, заключающуюся в предотвращении попыток производителей записывать данные в буферы, когда все они были заполнены, и в попытке предотвратить чтение буфера потребителями, когда все они пусты.

Первый случай, который следует рассмотреть, - это тот, в котором есть один производитель и один потребитель, и есть буфер конечного размера. Решение для производителя - либо перейти в спящий режим, либо отбросить данные, если буфер заполнен. В следующий раз, когда потребитель удаляет элемент из буфера, он уведомляет производителя, который снова начинает заполнять буфер. Таким же образом потребитель может заснуть, если обнаружит, что буфер пуст. В следующий раз, когда производитель помещает данные в буфер, он будит спящего потребителя. Решение может быть достигнуто с помощью межпроцессного взаимодействия, обычно используя семафоры. Неадекватное решение может привести к тупик где оба процесса ждут пробуждения.

Неадекватная реализация

Чтобы решить эту проблему, возникает соблазн предложить "решение", показанное ниже. В решении используются две библиотечные подпрограммы, спать и просыпайся. Когда вызывается спящий режим, вызывающий блокируется до тех пор, пока другой процесс не разбудит его с помощью процедуры пробуждения. Глобальная переменная itemCount содержит количество элементов в буфере.

int itemCount = 0;процедура режиссер() {    пока (истинный)     {        элемент = produItem();        если (itemCount == РАЗМЕР БУФЕРА)         {            спать();        }        putItemIntoBuffer(элемент);        itemCount = itemCount + 1;        если (itemCount == 1)         {            просыпайся(потребитель);        }    }}процедура потребитель() {    пока (истинный)     {        если (itemCount == 0)         {            спать();        }        элемент = removeItemFromBuffer();        itemCount = itemCount - 1;        если (itemCount == РАЗМЕР БУФЕРА - 1)         {            просыпайся(режиссер);        }        потреблять(элемент);    }}

Проблема с этим решением в том, что оно содержит состояние гонки что может привести к тупик. Рассмотрим следующий сценарий:

  1. В потребитель только что прочитал переменную itemCount, заметил, что он равен нулю, и вот-вот переместится внутрь если блокировать.
  2. Непосредственно перед вызовом сна потребитель прерывается, а производитель возобновляет работу.
  3. Производитель создает элемент, помещает его в буфер и увеличивает itemCount.
  4. Поскольку буфер был пуст до последнего добавления, производитель пытается разбудить потребителя.
  5. К сожалению, потребитель еще не спал, и пробуждение было потеряно. Когда потребитель возобновляет работу, он засыпает и больше никогда не проснется. Это связано с тем, что производитель будит потребителя только тогда, когда itemCount равно 1.
  6. Производитель будет зацикливаться, пока буфер не заполнится, после чего он также перейдет в спящий режим.

Поскольку оба процесса будут спать вечно, мы зашли в тупик. Поэтому это решение неудовлетворительно.

Альтернативный анализ заключается в том, что если язык программирования не определяет семантику одновременного доступа к общим переменным (в данном случае itemCount) с использованием синхронизации, то решение неудовлетворительно по этой причине, без необходимости явно демонстрировать состояние гонки.

Использование семафоров

Семафоры решить проблему потерянных звонков для пробуждения. В приведенном ниже решении мы используем два семафора, fillCount и emptyCount, решить проблему. fillCount - это количество элементов, уже находящихся в буфере и доступных для чтения, а emptyCount - количество доступных пространств в буфере, куда можно записывать элементы. fillCount увеличивается и emptyCount уменьшается, когда в буфер помещается новый элемент. Если производитель пытается уменьшить emptyCount когда его значение равно нулю, производитель переводится в спящий режим. В следующий раз, когда предмет будет использован, emptyCount увеличивается, и производитель просыпается. Аналогично работает потребитель.

семафор fillCount = 0; // произведенные товарысемафор emptyCount = РАЗМЕР БУФЕРА; // оставшееся местопроцедура режиссер() {    пока (истинный)     {        элемент = produItem();        вниз(emptyCount);        putItemIntoBuffer(элемент);        вверх(fillCount);    }}процедура потребитель() {    пока (истинный)     {        вниз(fillCount);        элемент = removeItemFromBuffer();        вверх(emptyCount);        потреблять(элемент);    }}

Приведенное выше решение отлично работает, когда есть только один производитель и потребитель. Когда несколько производителей используют одно и то же пространство памяти для буфера элементов, или несколько потребителей используют одно и то же пространство памяти, это решение содержит серьезное состояние гонки, которое может привести к одновременному чтению или записи двух или более процессов в один и тот же слот. Чтобы понять, как это возможно, представьте, как проходит процедура putItemIntoBuffer () могут быть реализованы. Он может содержать два действия: одно определяет следующий доступный слот, а другое записывает в него. Если процедура может выполняться одновременно несколькими производителями, то возможен следующий сценарий:

  1. Два производителя уменьшаются emptyCount
  2. Один из производителей определяет следующий пустой слот в буфере
  3. Второй производитель определяет следующий пустой слот и получает тот же результат, что и первый производитель
  4. Оба производителя пишут в один и тот же слот

Чтобы решить эту проблему, нам нужен способ убедиться, что только один производитель выполняет putItemIntoBuffer () вовремя. Другими словами, нам нужен способ выполнить критическая секция с взаимное исключение. Ниже показано решение для нескольких производителей и потребителей.

мьютекс buffer_mutex; // похоже на "семафор buffer_mutex = 1", но отличается (см. примечания ниже)семафор fillCount = 0;семафор emptyCount = РАЗМЕР БУФЕРА;процедура режиссер() {    пока (истинный)     {        элемент = produItem();        вниз(emptyCount);        вниз(buffer_mutex);        putItemIntoBuffer(элемент);        вверх(buffer_mutex);        вверх(fillCount);    }}процедура потребитель() {    пока (истинный)     {        вниз(fillCount);        вниз(buffer_mutex);        элемент = removeItemFromBuffer();        вверх(buffer_mutex);        вверх(emptyCount);        потреблять(элемент);    }}

Обратите внимание, что порядок, в котором разные семафоры увеличиваются или уменьшаются, важен: изменение порядка может привести к тупиковой ситуации. Здесь важно отметить, что, хотя мьютекс, похоже, работает как семафор со значением 1 (двоичный семафор), но есть разница в том, что мьютекс имеет концепцию владения. Владение означает, что мьютекс может быть «увеличен» обратно (установлен в 1) только тем же процессом, который «уменьшил» его (установлен в 0), а все другие задачи ждут, пока мьютекс не станет доступен для уменьшения (фактически означает, что ресурс доступен). , что обеспечивает взаимную исключительность и позволяет избежать тупиковых ситуаций. Таким образом, неправильное использование мьютексов может остановить многие процессы, когда монопольный доступ не требуется, но мьютекс используется вместо семафора.

Использование мониторов

Следующее псевдокод показывает решение проблемы производителя и потребителя с помощью мониторы. Поскольку для мониторов взаимное исключение подразумевается, не требуется дополнительных усилий для защиты критического раздела. Другими словами, представленное ниже решение работает с любым количеством производителей и потребителей без каких-либо модификаций. Также следует отметить, что для программиста менее вероятно, что он напишет код, который страдает от состояния гонки при использовании мониторов, чем при использовании семафоров.[нужна цитата ]

монитор ПроизводительПотребитель {    int itemCount = 0;    условие полный;    условие пустой;    процедура Добавить(элемент)     {        если (itemCount == РАЗМЕР БУФЕРА)         {            ждать(полный);        }        putItemIntoBuffer(элемент);        itemCount = itemCount + 1;        если (itemCount == 1)        {            уведомлять(пустой);        }    }    процедура удалять()     {        если (itemCount == 0)         {            ждать(пустой);        }        элемент = removeItemFromBuffer();        itemCount = itemCount - 1;        если (itemCount == РАЗМЕР БУФЕРА - 1)        {            уведомлять(полный);        }        возвращаться элемент;    }}процедура режиссер() {    пока (истинный)     {        элемент = produItem();        ПроизводительПотребитель.Добавить(элемент);    }}процедура потребитель() {    пока (истинный)     {        элемент = ПроизводительПотребитель.удалять();        потреблять(элемент);    }}

Без семафоров и мониторов

Проблема производителя и потребителя, особенно в случае одного производителя и одного потребителя, во многом связана с реализацией ФИФО или канал. Шаблон производитель-потребитель может обеспечить высокоэффективную передачу данных, не полагаясь на семафоры, мьютексы или мониторы. для передачи данных. Использование этих примитивов может значительно увеличить производительность по сравнению с базовой атомарной операцией чтения / записи. Каналы и FIFO популярны только потому, что они избегают потребности в сквозной атомарной синхронизации. Базовый пример, закодированный на C, показан ниже. Обратите внимание, что:

  • Атомный читать-изменять-писать доступ к общим переменным избегается, поскольку каждая из двух Считать переменные обновляются только одним потоком. Кроме того, эти переменные поддерживают неограниченное количество операций приращения; отношение остается правильным, когда их значения возвращаются к целочисленному переполнению.
  • В этом примере потоки не переводятся в спящий режим, что может быть приемлемо в зависимости от системного контекста. В schedulerYield () вставлен как попытка улучшить производительность и может быть опущен. Библиотеки потоков обычно требуют семафоров или условных переменных для управления режимом сна / пробуждением потоков. В многопроцессорной среде спящий режим / пробуждение потока будет происходить гораздо реже, чем передача токенов данных, поэтому полезно избегать атомарных операций при передаче данных.
  • Этот пример не работает для нескольких производителей и / или потребителей, поскольку при проверке состояния возникает состояние гонки. Например, если в буфере хранилища находится только один токен, и два потребителя обнаруживают, что буфер непустой, тогда оба будут использовать один и тот же токен и, возможно, увеличат счетчик потребленных токенов выше счетчика для произведенных токенов.
  • Этот пример, как написано, требует, чтобы UINT_MAX + 1 без остатка делится на РАЗМЕР БУФЕРА; если он не делится без остатка, [Подсчет% BUFFER_SIZE] производит неверный индекс буфера после Считать оборачивается мимо UINT_MAX обратно к нулю. В альтернативном решении, позволяющем избежать этого ограничения, используются два дополнительных Idx переменные для отслеживания текущего индекса буфера для головы (производителя) и хвоста (потребителя). Эти Idx переменные будут использоваться вместо [Подсчет% BUFFER_SIZE], и каждый из них должен быть увеличен одновременно с соответствующим Считать переменная увеличивается следующим образом: Idx = (Idx + 1)% BUFFER_SIZE.
  • Два Считать переменные должны быть достаточно маленькими, чтобы поддерживать атомарные операции чтения и записи. В противном случае возникает состояние гонки, когда другой поток читает частично обновленное и, следовательно, неправильное значение.


летучий беззнаковый int produCount = 0, takeCount = 0;TokenType sharedBuffer[РАЗМЕР БУФЕРА];пустота режиссер(пустота) {	пока (1) {		пока (produCount - consumerCount == РАЗМЕР БУФЕРА) {			планировщик(); / * sharedBuffer заполнен * /		}		/ * Запись в sharedBuffer _ перед_ увеличением produCount * /		sharedBuffer[produCount % РАЗМЕР БУФЕРА] = produToken();		/ * Здесь требуется барьер памяти, чтобы гарантировать обновление sharedBuffer видны другим потокам до обновления produCount * /		++produCount;	}}пустота потребитель(пустота) {	пока (1) {		пока (produCount - takeCount == 0) {			планировщик(); / * sharedBuffer пуст * /		}		потреблять токен(&sharedBuffer[takeCount % РАЗМЕР БУФЕРА]);		++takeCount;	}}

В приведенном выше решении используются счетчики, которые при частом использовании могут перегружаться и достигать максимального значения. UINT_MAX. Идея, изложенная в четвертом пункте, первоначально предложена Лесли Лэмпорт,[5] объясняет, как счетчики могут быть заменены счетчиками конечного диапазона. В частности, их можно заменить счетчиками конечного диапазона с максимальным значением N - емкостью буфера.

Спустя четыре десятилетия после представления проблемы производитель-потребитель Агилера, Гафни и Лампорт показали, что проблема может быть решена таким образом, чтобы процессы обращались только к счетчикам с фиксированным диапазоном (т. Е. Диапазон, который не зависит от размера буфера) при определении если буфер пуст или полон.[6] Мотивация для этой меры эффективности заключается в ускорении взаимодействия между процессором и устройствами, которые взаимодействуют через каналы FIFO. Они предложили решение, в котором счетчики максимального значения читаются, чтобы определить, безопасен ли доступ к буферу. Однако их решение по-прежнему использует неограниченные счетчики, которые бесконечно растут, только к этим счетчикам не осуществляется доступ во время описанной фазы проверки.

Позже Авраам и Амрам [7] предложил более простое решение, представленное ниже в псевдокоде, которое обладает обсуждаемым свойством фиксированного диапазона. В решении используются счетчики максимального значения N. Однако для определения того, пуст или заполнен буфер, процессы обращаются только к конечному диапазону. регистры единственного писателя. Каждому из процессов принадлежит одно записывающее устройство с 12 значениями. Процесс производителя записывает в Flag_p, а потребительский процесс записывает в Flap_c, оба являются массивами с 3 полями. Flag_p [2] и Flag_c [2] может хранить `полный’, `пустой’Или`безопасный’, Которые, соответственно, указывают, является ли буфер полным, пустым или ни полным, ни пустым.

Идея алгоритма заключается в следующем. Процессы подсчитывают количество доставленных и удаленных товаров. по модулю N + 1 через регистры CountDelivered и CountRemoved. Когда процесс доставляет или удаляет элемент, он сравнивает эти счетчики и, таким образом, успешно определяет статус буфера и сохраняет эти данные в Flag_p [2], или же Flag_c [2]. На этапе проверки выполняющийся процесс читает Flag_p и Flag_c, и пытается оценить, какое значение среди Flag_p [2] и Flag_c [2] отражает текущий статус буфера. Достичь этой цели помогают два метода синхронизации.

  1. После доставки товара производитель пишет Flag_p [0] значение, которое он считал из Flag_c [0], а после удаления товара потребитель пишет в Flag_c [1] Значение: 1-Flag_p [0]. Следовательно, условие Flag_p [0] == Flag_c [0] предполагает, что производитель недавно проверил состояние буфера, а Flag_p [0]! = Flag_c [0] предполагает обратное.
  2. Операция доставки (удаления) завершается написанием Flag_p [1](Flag_c [1]) значение, хранящееся в Flag_p [0](Flag_c [0]). Следовательно, условие Flag_p [0] == Flag_p [1] предполагает, что производитель завершил свою последнюю операцию доставки. Таким же образом Condition Flag_c [0] = Flag_c [1] предполагает, что последнее удаление потребителя уже было прекращено.

Поэтому на этапе проверки, если производитель обнаружит, что Flag_c [0]! = Flag_p [0] & Flag_c [0] == Flag_c [1], он действует согласно значению Flag_c [2], а в противном случае - в соответствии со значением, хранящимся в Flag_p [2]. Аналогично, если потребитель обнаружит, что Flag_p [0] == Flag_c [0] и Flag_p [0] == Flag_p [1], он действует согласно значению Flag_p [2], а в противном случае - в соответствии со значением, хранящимся в Flag_c [2]В приведенном ниже коде переменные с заглавной буквы обозначают общие регистры, записанные одним из процессов и прочитанные обоими процессами. Переменные без заглавной буквы - это локальные переменные, в которые процессы копируют значения, считанные из общих регистров.

countDelivered = 0; countRemoved=0;Flag_p[0] = 0; Flag_p[1] = 0; Flag_p[2] = `пустой;Flag_c[0] = 0; Flag_c[1] = 0; Flag_c[2] = `пустой; процедура режиссер() {    пока (истинный) {    элемент = produItem();        / * фаза проверки: занято ждать, пока буфер не заполнится * /       	    повторение{        flag_c = Flag_c;	если (flag_c[0] != Flag_p[0] & flag_c[0] == flag_c[1]) ответ = flag_c[2];	еще ответ = Flag_p[2];}     до того как(ответ != `полный)     / * этап доставки товара * /     putItemIntoBuffer(элемент);     CountDeliverd = countDelivered+1 % N+1;     flag_c = Flag_c;     Flag_p[0] = flag_c[0];     удаленный = CountRemoved;     если (CountDelivered  удаленный == N) { Flag_p[1] = flag_c[0]; Flag_p[2] = `полный;}     если (CountDelivered  удаленный == 0) { Flag_p[1] = flag_c[0]; Flag_p[2] = `пустой;}     если (0 < CountDelivered  удаленный < N) { Flag_p[1] = flag_c[0]; Flag_p[2] = `безопасный;}     }}процедура потребитель() {    пока (истинный) {        / * фаза проверки: занято ждать, пока буфер не станет пустым * /       	    повторение{        flag_p = Flag_p;	если (flag_p[0] == Flag_c[0] & flag_p[1] == flag_p[0]) ответ = flag_p[2]);	еще ответ = Flag_c[2];}     до того как(ответ != `пустой)     / * этап удаления предметов * /     Элемент = removeItemFromBuffer();     countRemoved = countRemoved+1 % N+1;     flag_p = Flag_p;     Flag_c[0] = 1-flag_p[0];     доставлен = CountDelivered;     если (доставлен  CountRemoved == N) { Flag_c[1] = 1-flag_p[0]; Flag_c[2] = `полный;}     если (доставлен  CountRemoved == 0) { Flag_c[1] = 1-flag_p[0]; Flag_c[2] = `пустой;}     если (0 < доставлен  CountRemoved < N) { Flag_c[1] = 1-flag_p[0]; Flag_c[2] =`безопасный;}     }}

Правильность кода зависит от предположения, что процессы могут читать весь массив или записывать в несколько полей массива за одно атомарное действие. Поскольку это предположение нереально, на практике следует заменить Flag_p и Flag_c с целыми числами (log (12) -bit), которые кодируют значения этих массивов. Flag_p и Flag_c представлены здесь в виде массивов только для удобства чтения кода.

Смотрите также

Рекомендации

  1. ^ Arpaci-Dusseau, Remzi H .; Арпачи-Дюссо, Андреа К. (2014), Операционные системы: три простых элемента [Глава: переменные состояния] (PDF), Книги Арпачи-Дюссо
  2. ^ Arpaci-Dusseau, Remzi H .; Арпачи-Дюссо, Андреа К. (2014), Операционные системы: три простых элемента [Глава: Семафоры] (PDF), Книги Арпачи-Дюссо
  3. ^ Дейкстра, Э. У. «Взаимодействующие последовательные процессы», неопубликованная рукопись EWD123 (1965), http://www.cs.utexas.edu/users/EWD/ewd01xx/EWD123.PDF.
  4. ^ Дейкстра, Э. У. "Информационные потоки, разделяющие конечный буфер". Письма об обработке информации 1.5 (1972): 179-180.
  5. ^ Лэмпорт, Лесли. «Доказательство правильности многопроцессорных программ». Транзакции IEEE по разработке программного обеспечения 2 (1977): 125-143.
  6. ^ Агилера, Маркос К., Эли Гафни и Лесли Лэмпорт. «Проблема с почтовым ящиком». Распределенные вычисления 23.2 (2010): 113-134.
  7. ^ Авраам, Ури и Гал Амрам. «Двухпроцессная синхронизация». Теоретическая информатика 688 (2017): 2-23.

дальнейшее чтение