Нахрена вам разделяемая память и костыли с трубами?

Куча async. В конце всем ->join. Если надо по частям передавать данные то 
Coro::Channel к каждому



On 5 Jun 2014, at 19:47, Харпалёв Иван <[email protected]> wrote:

> Спасибо!
> На  Go выглядит заманчиво, хотя и совершенно не понятно, как происходит 
> распределение входа между воркерами.
> 
> Классный пример с ForkEngine.
> Вот только в доке IO::Pipe не описаны  функции autoflush и blocking. Зачем 
> они вызываются в ForkEngine?? и зачем делается binmode на дескриптор?
> 
> Передача через Pipe -- подходящее решение (прогон через paip почти не 
> замедляет построчное копирование из файла в файл на Perl).
> 
> А как же быть c получением из воркеров??
> как-то так?
> open my $input, "<", $in_file or die "Can not open file for read";
> open my $output, ">", $out_file or die "Can not open file for write";
> for (@workers) {
> ⇥   my $wait_for_input  = AnyEvent->io (
> ⇥   ⇥   fh => $_->{fromchild},
> ⇥   ⇥   poll => 'r',
> ⇥   ⇥   cb => sub {
> ⇥   ⇥   ⇥   say $output readline ($_->{fromchild});
> ⇥   ⇥   ⇥   $_->{c}--; #счётчик очереди в worker'е
> ⇥   ⇥   }
> ⇥   )
> }
> my $number=0;
> while (<$input>) {
> ⇥   while(1){
> ⇥   ⇥   $number = ++$number  % $#workers;
> ⇥   ⇥   my $worker = $workers[$number];
> ⇥   ⇥   if ($worker->{c} < 10) {
> ⇥   ⇥   ⇥   my $to = $worker->{tochild};
> ⇥   ⇥   ⇥   say $to $_;
> ⇥   ⇥   ⇥   $worker->{c}++;
> ⇥   ⇥   }
> ⇥   }
> }
> 
> Gearman -- штука крутая, но всё-таки хочется на Perl.
> А как решить на Coro по-прежнему непонятно(
> Вроде если есть симафоры, по они должны быть для разделяемых ресурсов и 
> следовательно должно быть возможно изменять оду и ту же переменную из разных 
> потоков...   Возможно ли вообще такое в Perl.  Хотя пайпы вроде норм решение 
> для передачи и главная проблема в неблокирующем ожидании.
> 
> Спасибо!
> 
> 
> 
> 5 июня 2014 г., 17:16 пользователь Eugene Toropov <[email protected]> 
> написал:
> Спасибо, реально разыгрался аппетит :) пошел тырить печеньки у товарищей :)
> 
> On Jun 5, 2014, at 5:14 PM, Alexander Lourier <[email protected]> wrote:
> 
>> Запилил за 10 минут. Такие задачки на Go решаются элементарно. А если 
>> учесть, что каждая горутина может выполняться на своём CPU и никаких GIL, то 
>> становится ням-ням как вкусно.
>> 
>> 
>> 5 июня 2014 г., 15:10 пользователь Eugene Toropov <[email protected]> 
>> написал:
>> Интересно, это уже готовое было или запилил за полчаса?
>> 
>> Евгений
>> 
>> On Jun 5, 2014, at 5:03 PM, Alexander Lourier <[email protected]> wrote:
>> 
>>> Минутка рекламы. Вот решение задачи на Go. Оно длинное, потому что я его 
>>> обильно снабдил комментариями. Если лишнее убрать, всё будет выглядеть 
>>> очень компактно и работать производительно.
>>> 
>>> package
>>>  main
>>> 
>>> 
>>> import
>>>  (
>>>     
>>> "fmt"
>>> 
>>>     
>>> "math/rand"
>>> 
>>>     
>>> "time"
>>> 
>>> )
>>> 
>>> 
>>> const
>>>  (
>>>     numWorkers = 
>>> 10
>>> 
>>> )
>>> 
>>> 
>>> // task - это задание для воркера.
>>> type task struct
>>>  {
>>>     value  
>>> int
>>> 
>>>     output 
>>> chan
>>>  result
>>> }
>>> 
>>> 
>>> // result - это результат обработки задания воркером.
>>> type result struct
>>>  {
>>>     value  
>>> int
>>> 
>>>     worker 
>>> int
>>> 
>>> }
>>> 
>>> 
>>> // worker берёт данные из канала input, обрабатывает их (умножает на 100) и 
>>> кладёт в канал ответа,
>>> // который прислан вместе с заданием.
>>> func worker(workerNumber int, input chan
>>>  task) {
>>>     
>>> // Пока входной канал не закроют, читаем из него задание.
>>> 
>>>     
>>> for task := range
>>>  input {
>>>             
>>> // Работаем в поте лица.
>>> 
>>>             time.Sleep(time.Duration(rand.Intn(
>>> 100
>>> )) * time.Millisecond)
>>>             task.output <- result{task.value * 
>>> 100
>>> , workerNumber}
>>>     }
>>> }
>>> 
>>> 
>>> // prepareInput готовит входные задания и кладёт их в два канала: в одну 
>>> очередь
>>> // задания для воркеров, в другую - каналы ответа.
>>> func prepareInput(input chan task, output chan chan
>>>  result) {
>>>     
>>> for i := 0; i < 100
>>> ; i++ {
>>>             
>>> // Канал ответа буферизованный, чтобы воркер не ждал, когда его ответ 
>>> считают,
>>> 
>>>             
>>> // а сразу брался за следующее задание.
>>> 
>>>             outputChan := 
>>> make(chan result, 1
>>> )
>>>             
>>> // Тот факт, что задания кладутся в input и output в одном и том же порядке,
>>> 
>>>             
>>> // гарантирует, что ответы будут упорядочены в том же порядке.
>>> 
>>>             input <- task{i, outputChan}
>>>             output <- outputChan
>>>     }
>>>     
>>> close
>>> (input)
>>>     
>>> close
>>> (output)
>>> }
>>> 
>>> 
>>> func
>>>  main() {
>>>     
>>> // Каналы обязательно буферизованные (длина буфера = числу воркеров).
>>> 
>>>     input := 
>>> make(chan
>>>  task, numWorkers)
>>>     output := 
>>> make(chan chan
>>>  result, numWorkers)
>>> 
>>>     
>>> // Запускаем готовилку входных данных.
>>> 
>>>     
>>> go
>>>  prepareInput(input, output)
>>> 
>>>     
>>> // Запускаем воркеры.
>>> 
>>>     
>>> for i := 0
>>> ; i < numWorkers; i++ {
>>>             
>>> go
>>>  worker(i, input)
>>>     }
>>> 
>>>     
>>> // Читаем ответы в порядке, в каком нам нужно.
>>> 
>>>     
>>> for res := range
>>>  output {
>>>             fmt.Printf(
>>> "%+v\n"
>>> , <-res)
>>>     }
>>> }
>>> 
>>> 
>>> 
>>> 5 июня 2014 г., 13:46 пользователь Харпалёв Иван <[email protected]> 
>>> написал:
>>> Добрый день, могучий MoscowPM
>>> 
>>> Опять про параллельную обработку.
>>> 
>>> Хочется написать вот такую схему обработки ввода: 
>>> master создаёт work'ов, 
>>> читает порции из файла, раздаёт порции worker'ам
>>> ждёт, пока worker обработает, получает ответ worker'a 
>>> пишет результат в файл.
>>> Так же мастер буфереизует вывод, чтобы выход писался в правильном порядке.
>>> 
>>> Самое туманное:
>>> Как передавать данные от мастера к worker'у и Обратно?!!!!
>>> Как ждать готовности?!!!
>>> Должна ли такая схема (работа с диском из одного места) дать ускорение по 
>>> сравнению с чтением/записью файла в каждом worker'е?
>>> 
>>> смотрел на Coro, увидел Coro::Simaphore, Coro::Signal ... но не пойму:
>>>   как сделать разделяемую память, (как быстро передавать данные между 
>>> мастером и worker'ом внутри Perl)?
>>>   как сделать неблокирующее ожидание готовности одного из worker'ов (при 
>>> котором можно заниматься вводом-выводом)?
>>> 
>>> Подскажите, на чём и как такое писать!!
>>> Спасибо!
>>> 
>>> Уважение
>>> Иван Харпалев
>>> 

-- 
Moscow.pm mailing list
[email protected] | http://moscow.pm.org

Ответить