Mibori Shante (mibori ) wrote,

Haskell. Erlang-style process communication.

Посмотрев на реализацию Erlang-style Distributed Haskell захотелось нарисовать что-то своё, простое, для коммуникации процессов внутри одного исходника. Получился достаточно небольшой модуль:

-- Process.hs
module Process
        ( Process (..)
        , Procedure
        , spawn
        , spawnOS
        , send
        , recv
        , recvMaybe
        , kill
        , self
        , proc
        , delay
        , evalProcedure) where

import Control.Monad.State
import Control.Concurrent

type Procedure q a = StateT (Chan q) IO a
        -- процедура, принимающая сообщения типа q
        -- с возвращаемым значением, типа a
       
data Process q = Process !ThreadId !(Chan q)
        -- процесс типа q --
        -- ссылка на запущенную параллеьно процедуру, принимающую сообщения типа q

instance Show (Process q) where
        show (Process thread _) = "Process" ++ drop 8 (show thread)

instance Eq (Process q) where
        Process thread _ == Process thread' _ = thread == thread'

-- запустить процедуру параллельно и вернуть ее процесс
spawn :: Procedure q' () -> Procedure q (Process q')
spawn procedure = do
        channel <- lift newChan
        thread <- lift (forkIO (evalStateT procedure channel))
        return (Process thread channel)

spawnOS :: Procedure q' () -> Procedure q (Process q')
spawnOS procedure = do
        channel <- lift newChan
        thread <- lift (forkOS (evalStateT procedure channel))
        return (Process thread channel)

-- отослать сообщение процессу
send :: Process q' -> q' -> Procedure q ()
send (Process _ channel) message = lift (writeChan channel message)

-- ожидает до тех пор, пока сообщение не придет
-- возвращает его
recv :: Procedure q q
recv = do
        channel <- get
        message <- lift (readChan channel)
        return message

-- ждет сообщения n микросекунд.
-- если сообщение пришло, возвращает (Just сообщение)
-- если сообщение за это время не пришо, возвращает Nothing
recvMaybe :: Int -> Procedure q (Maybe q)
recvMaybe n = do
        channel <- get
        lift (readChanDelay channel n)

-- убивает процесс
kill :: Process q' -> Procedure q ()
kill (Process thread _) = lift (killThread thread)

-- возвращает процесс текущей запущенной параллельно процедуры
self :: Procedure q (Process q)
self = do
        channel <- get
        threadId <- lift myThreadId
        return (Process threadId channel)

-- выполняет действие и возвращает его результат в процедуру
proc :: IO a -> Procedure q a
proc = lift

-- запустить процедуру как действие и получить его результат
evalProcedure :: Procedure q a -> IO a
evalProcedure procedure = do
        channel <- newChan
        evalStateT procedure channel
       
-- задерживает выполнение текущего процесса на указанное количество микросекунд
delay :: Int -> Procedure q ()
delay microseconds = lift (threadDelay microseconds)

-- не экспортируется.
-- ожидает появления в канале значения n микросекунд
-- если за это время сообщение не появилось, то Nothing
-- если значение появилось, то Just значение
readChanDelay :: Chan t -> Int -> IO (Maybe t)
readChanDelay ch n = do
        v <- newEmptyMVar
        r <- forkIO $ do
                x <- readChan ch
                putMVar v (Just x)
        forkIO (threadDelay n >> killThread r >> putMVar v Nothing)
        takeMVar vSyhi-подсветка кода


Как видно, весь механизм базируется на двух типах:
  • Procedure q a — процедура с возвращаемым значением типа a, имеющая возможность принимать сообщения типа q;
  • Process q — процесс типа q — ссылка на запущенную параллельно процедуру, принимающую сообщения типа q.
Таким образом, не удастся откомпилировать программу если вздумается отослать сообщение процессу не того типа.

За эрланговскими конструкциями receive ... end и receive ... after ... end не гнался. Однако ввел им соответственно:
  • recv :: Procedure q q — ожидает до тех пор, пока сообщение не придет, после чего возвращает его;
  • recvMaybe :: Int -> Procedure q (Maybe q) — ожидает сообщения n микросекунд. Если сообщение не пришло за это время, то возвращает Nothing, если пришло, то (Just сообщение).
Примеры с использованием запощу в ru_lambda

Upd: http://community.livejournal.com/ru_lambda/81733.html
Tags: erlang, haskell
  • Post a new comment

    Error

    default userpic

    Your reply will be screened

    Your IP address will be recorded 

  • 35 comments