mu

Haskell. Erlang-style process communication.

Посмотрев на реализацию Erlang-style Distributed Haskell захотелось нарисовать что-то своё, простое, для коммуникации процессов внутри одного исходника. Получился достаточно небольшой модуль:

-- Process.hs
module Process
        ( Process (..)
        , Procedure
        , spawn
        , spawnOS
        , send
        , recv
        , recvMaybe
        , kill
        , self
        , proc
        , delay
        , evalProcedure) where

import Control.Monad.State
import Control.Concurrent

type Procedure q a = StateT (Chan q) IO a
        -- процедура, принимающая сообщения типа q
        -- с возвращаемым значением, типа a
       
data Process q = Process !ThreadId !(Chan q)
        -- процесс типа q --
        -- ссылка на запущенную параллеьно процедуру, принимающую сообщения типа q

instance Show (Process q) where
        show (Process thread _) = "Process" ++ drop 8 (show thread)

instance Eq (Process q) where
        Process thread _ == Process thread' _ = thread == thread'

-- запустить процедуру параллельно и вернуть ее процесс
spawn :: Procedure q' () -> Procedure q (Process q')
spawn procedure = do
        channel <- lift newChan
        thread <- lift (forkIO (evalStateT procedure channel))
        return (Process thread channel)

spawnOS :: Procedure q' () -> Procedure q (Process q')
spawnOS procedure = do
        channel <- lift newChan
        thread <- lift (forkOS (evalStateT procedure channel))
        return (Process thread channel)

-- отослать сообщение процессу
send :: Process q' -> q' -> Procedure q ()
send (Process _ channel) message = lift (writeChan channel message)

-- ожидает до тех пор, пока сообщение не придет
-- возвращает его
recv :: Procedure q q
recv = do
        channel <- get
        message <- lift (readChan channel)
        return message

-- ждет сообщения n микросекунд.
-- если сообщение пришло, возвращает (Just сообщение)
-- если сообщение за это время не пришо, возвращает Nothing
recvMaybe :: Int -> Procedure q (Maybe q)
recvMaybe n = do
        channel <- get
        lift (readChanDelay channel n)

-- убивает процесс
kill :: Process q' -> Procedure q ()
kill (Process thread _) = lift (killThread thread)

-- возвращает процесс текущей запущенной параллельно процедуры
self :: Procedure q (Process q)
self = do
        channel <- get
        threadId <- lift myThreadId
        return (Process threadId channel)

-- выполняет действие и возвращает его результат в процедуру
proc :: IO a -> Procedure q a
proc = lift

-- запустить процедуру как действие и получить его результат
evalProcedure :: Procedure q a -> IO a
evalProcedure procedure = do
        channel <- newChan
        evalStateT procedure channel
       
-- задерживает выполнение текущего процесса на указанное количество микросекунд
delay :: Int -> Procedure q ()
delay microseconds = lift (threadDelay microseconds)

-- не экспортируется.
-- ожидает появления в канале значения n микросекунд
-- если за это время сообщение не появилось, то Nothing
-- если значение появилось, то Just значение
readChanDelay :: Chan t -> Int -> IO (Maybe t)
readChanDelay ch n = do
        v <- newEmptyMVar
        r <- forkIO $ do
                x <- readChan ch
                putMVar v (Just x)
        forkIO (threadDelay n >> killThread r >> putMVar v Nothing)
        takeMVar vSyhi-подсветка кода


Как видно, весь механизм базируется на двух типах:
  • Procedure q a — процедура с возвращаемым значением типа a, имеющая возможность принимать сообщения типа q;
  • Process q — процесс типа q — ссылка на запущенную параллельно процедуру, принимающую сообщения типа q.
Таким образом, не удастся откомпилировать программу если вздумается отослать сообщение процессу не того типа.

За эрланговскими конструкциями receive ... end и receive ... after ... end не гнался. Однако ввел им соответственно:
  • recv :: Procedure q q — ожидает до тех пор, пока сообщение не придет, после чего возвращает его;
  • recvMaybe :: Int -> Procedure q (Maybe q) — ожидает сообщения n микросекунд. Если сообщение не пришло за это время, то возвращает Nothing, если пришло, то (Just сообщение).
Примеры с использованием запощу в ru_lambda

Upd: http://community.livejournal.com/ru_lambda/81733.html
Метки: ,
Ня!
Это прекрасно, я считаю. Одно плохо, при общей крутости коньцепции тормознутость ghc и получаемых бинариков ещё никто не отменил.
У меня есть мутка малнькая, в процессе вынашивания. Хочется сделать параллельной одну смешную штуки (Стартап, Вебдваноль, всяхуйня). Дак вот пока я всё же в сторону эрланга смотрю.
Не, ну в целом это логично, у хаскеля тру-процессы, а не "лёгкие потоки", поэтому в общем и целом должно быть быстрее. Но эрланг напильником в этом направлении уж лет 20 шлифуют.
я могу ошибаться, но насколько я понимаю, у Хаскеля именно лёгкие потоки (если они запущены через forkIO). Если они запущены через forkOS, то это "тежелый" поток ОС. Нафига это, спрашивается? Дело в том, что если запустить в легком потоке foreign unsafe-вызов, то он заблокирует выполнение всех других легких потоков до своего завершения. Если foreign unsafe вызов запустить в ОС-потоке, то блокирования не будет. Это достаточно логично и правильно сделано.

А вот в Эрланге ситуация настораживает. У Эрланга имеются легковесные процессы. И вдруг среди них выполняеся какая-то апишка, которая выполняется, не в вирутальной машине Эрланга, а ядром ОС. Естественно неизвестно, когда этот вызов вернет результат. В этот время он блокирует все легковесные процессы виртуальной машины Эрланга. Если такое не происходит, то процессы Эрланга вовсе не легковесные (ОС процессы), либо внутри виртуальной машины Эрланга имеет место такое же разделение на легковесные/нелегковесные как и в Хаскелле (те же яйца, но вид сбоку)
у эрланга несколько диспетчеров легковесных процессов, каждый из которых сидит в своём, тяжеловесном. так что всё там ок.
понятно.
т. е. внутри все-таки тяжеловесные процессы создаются.

а почему два запущенных в разных процессах io:format(...) выводятся по отдельности?
один блокирует другого?
тяжеловесные процессы создаются статически по числу ядер в процессоре(ах).

по поводу двух процессов - io:format()' же просто msg шлёт в единственный процесс, связанный с консолью. вот и сериализуются в message queue.
единственный процесс, связанный с консолью

надо было догадаться :)
тру процессы медленнее чем лёгкие потоки, из-за IPC и анти-эффектов cache locality
Ну в первом тесте хаскелевая реализация использует MVar'ы, а эрланговскую хоть щас разноси по 15 машинам -- это надо иметь ввиду.

Я вот щас например сделал похожую вещь, но в линейку -- то бишь дикое количество потоков создается, а потом передает друг другу сообщение по цепочке (хаскельный вариант через Chan'ы). Так вот разница очень заметна -- хаскель конечно держится молодцом, минуты за 3-4 он даже через 500K потоков мессагу протащил, но эрланг-то в течение нескольких секунд ее через миллион протаскивает ;)
но эрланг-то в течение нескольких секунд ее через миллион протаскивает ;)

Можешь привести код этого на Эрланге?
http://paste.lisp.org/display/58940

Хотя вообще на таких цифрах тестировать -- очень от машины зависит. Как только количество процессов превышает определенный порог, все начинает жутко свопиться :)

Вот хаскель на всякий случай -- http://paste.lisp.org/display/58942
http://paste.lisp.org/display/58967 — переделанный с Эрланга твой вариант
GHCi, version 6.8.2: http://www.haskell.org/ghc/  :? for help
Loading package base ... linking ... done.
Prelude> :set +s
Prelude> :l A
[1 of 2] Compiling Process          ( Process.hs, interpreted )
[2 of 2] Compiling A                ( A.hs, interpreted )
Ok, modules loaded: Process, A.
(0.09 secs, 21479004 bytes)
*A> testSpawn 1000
Loading package mtl-1.1.0.0 ... linking ... done.
sending a message along
(0.02 secs, 3000252 bytes)
*A> Complete

*A> testSpawn 10000
sending a message along
(0.20 secs, 24413468 bytes)
*A> Complete

*A> testSpawn 100000
sending a message along
(10.20 secs, 246965092 bytes)
*A> Complete

*A> testSpawn 500000
sending a message along
(240.08 secs, 1233897268 bytes)
*A> Complete

*A>
Угу, примерно как ты говоришь. Надо и здесь и в Process.hs попробывать ($!), все-таки на это дело память тратится.
На счет своппинга согласен. Если бы без него, то результат с 500000 процессами был бы ориентировочно минуту.

Чуть позже на Эрланге попробую.
С Эрлангом звиздец какой-то:

Free Image Hosting at www.ImageShack.us
Это после
Erlang (BEAM) emulator version 5.6.1 [smp:2] [async-threads:0]

Eshell V5.6.1  (abort with ^G)
1> c(testproc).
{ok,testproc}
2> testproc:testSpawn(500000).
Что я неправильно готовлю?
спасибо, получилось.

не знаю как померить точно, но результат на вскидку примерно такой же как у тебя.
однако всё же думаю, что это не повод зарывать Хаскель на таких задачах.
Зарывать не надо, конечно, я просто хотел сказать что, несмотря на результаты language shootout, до полноценной erlang-style concurrency хаскель еще точить и точить :)

А вообще реально интереснее было бы yaws сравнить с happs, но тут посерьезнее тестовый стенд нужен..
давай.
убьёшь, если вдруг стану много не по теме постить.
Неделю медитировал над твоим кодом, хотел взять для одной штуковины как есть - уж очень всё красивое. Сегодня просветлился, сделаю чуть по-другому. Спасибо!
для одной штуковины
а что за штуковина, если не секрет (просто интересно ввиду малого количества показательных примеров)

сделаю чуть по-другому
wow, что я упустил? :)
Штуковина такая: отлаживаю одну хреновину, которая лазает по HTTP, сеть тормозная дико - анлим, что с него взять. Поэтому хочу повесить тред, который будет сетевые запросы гонять, и швырять в него функциями, которые в сеть лазают. А сам параллельно висеть в GHCi.
По-другому сделаю потому что мне не нужен полноценный Эрланг, стандартного Chan должно хватить для всего. Просто пару недель назад я не знал о существовании такой библиотеки. А нормально запустить параллельность ещё и сегодня не получается. :)
А нормально запустить параллельность ещё и сегодня не получается.
А у тебя те треды, которые собственно коннектятся, через forkOS пускаются? (иначе у тебя каждый foreign unsafe-вызов (считай, коннект к серваку) будет блокировать все другие треды).
нет конечно. Но connectTo :: HostName -> PortID -> IO Handle, и то, что ты потом будешь делать с Handle должны работать через forkOS, насколько я понимаю.
Но лучше всего посмотреть на исходники Network и Network.BSD, чтобы знать точно :)
Непосредственно я буду работать (уже начал) с Network.HTTP, но внутри у него, конечно, Network и Network.Socket.
> За эрланговскими конструкциями receive ... end и receive ... after ... end не гнался.

Кстати, да. У Эрланга логика приема сильно отличается от простой очереди сообщений.
Верно.
Логику, как у Эрланга можно реализовать самостоятельно:
1. Принять сообщение через recv, recvDelay, recvMaybe
2. Если сообщение не подошло, то вернуть его обратно в очередь через sendMe, sendMeBack
Если возвращать в очередь сообщение то порядок его в очереди будет другой. А в Эрланге порядок насколько я помню важен.
неа

sendMe плюхнет сообщение в очередь сверху (как обычно)
А sendMeBack -- плюхнет снизу, откуда оно было вытащено recv. sendMeBack для того и задумывалась, чтобы сохранять порядок в очереди, если нужно вытащить только определенное сообщение.
прием в Эрланге не ограничивается анализом одного элемента. Он сканирует всю очередь и вытягивает из любого места.
Ничего не мешает просканировать всю очередь.
Но да, мы упрёмся в производительность такого алгоритма.

Альтернативой этому мы можем создать несколько процессов с копиями одной очереди (через spawnDup). Сообщение посылаемое такому процессу будет появляться и в соседнем процессе тоже. При этом, удаление сообщения из своей очереди одного процесса не приведет к удалению сообщения из очереди другого. Один процесс будет обрабатывать сообщения одного вида и игнорировать сообщения другого, а второй процесс -- наоборот. При этом сообщения можно посылать любому из процессов.