Mibori Shante (mibori) wrote,
Mibori Shante
mibori

Haskell. Erlang-style process communication.

Посмотрев на реализацию Erlang-style Distributed Haskell захотелось нарисовать что-то своё, простое, для коммуникации процессов внутри одного исходника. Получился достаточно небольшой модуль:

-- Process.hs
module Process
        ( Process (..)
        , Procedure
        , spawn
        , spawnOS
        , send
        , recv
        , recvMaybe
        , kill
        , self
        , proc
        , delay
        , evalProcedure) where

import Control.Monad.State
import Control.Concurrent

type Procedure q a = StateT (Chan q) IO a
        -- процедура, принимающая сообщения типа q
        -- с возвращаемым значением, типа a
       
data Process q = Process !ThreadId !(Chan q)
        -- процесс типа q --
        -- ссылка на запущенную параллеьно процедуру, принимающую сообщения типа q

instance Show (Process q) where
        show (Process thread _) = "Process" ++ drop 8 (show thread)

instance Eq (Process q) where
        Process thread _ == Process thread' _ = thread == thread'

-- запустить процедуру параллельно и вернуть ее процесс
spawn :: Procedure q' () -> Procedure q (Process q')
spawn procedure = do
        channel <- lift newChan
        thread <- lift (forkIO (evalStateT procedure channel))
        return (Process thread channel)

spawnOS :: Procedure q' () -> Procedure q (Process q')
spawnOS procedure = do
        channel <- lift newChan
        thread <- lift (forkOS (evalStateT procedure channel))
        return (Process thread channel)

-- отослать сообщение процессу
send :: Process q' -> q' -> Procedure q ()
send (Process _ channel) message = lift (writeChan channel message)

-- ожидает до тех пор, пока сообщение не придет
-- возвращает его
recv :: Procedure q q
recv = do
        channel <- get
        message <- lift (readChan channel)
        return message

-- ждет сообщения n микросекунд.
-- если сообщение пришло, возвращает (Just сообщение)
-- если сообщение за это время не пришо, возвращает Nothing
recvMaybe :: Int -> Procedure q (Maybe q)
recvMaybe n = do
        channel <- get
        lift (readChanDelay channel n)

-- убивает процесс
kill :: Process q' -> Procedure q ()
kill (Process thread _) = lift (killThread thread)

-- возвращает процесс текущей запущенной параллельно процедуры
self :: Procedure q (Process q)
self = do
        channel <- get
        threadId <- lift myThreadId
        return (Process threadId channel)

-- выполняет действие и возвращает его результат в процедуру
proc :: IO a -> Procedure q a
proc = lift

-- запустить процедуру как действие и получить его результат
evalProcedure :: Procedure q a -> IO a
evalProcedure procedure = do
        channel <- newChan
        evalStateT procedure channel
       
-- задерживает выполнение текущего процесса на указанное количество микросекунд
delay :: Int -> Procedure q ()
delay microseconds = lift (threadDelay microseconds)

-- не экспортируется.
-- ожидает появления в канале значения n микросекунд
-- если за это время сообщение не появилось, то Nothing
-- если значение появилось, то Just значение
readChanDelay :: Chan t -> Int -> IO (Maybe t)
readChanDelay ch n = do
        v <- newEmptyMVar
        r <- forkIO $ do
                x <- readChan ch
                putMVar v (Just x)
        forkIO (threadDelay n >> killThread r >> putMVar v Nothing)
        takeMVar vSyhi-подсветка кода


Как видно, весь механизм базируется на двух типах:
  • Procedure q a — процедура с возвращаемым значением типа a, имеющая возможность принимать сообщения типа q;
  • Process q — процесс типа q — ссылка на запущенную параллельно процедуру, принимающую сообщения типа q.
Таким образом, не удастся откомпилировать программу если вздумается отослать сообщение процессу не того типа.

За эрланговскими конструкциями receive ... end и receive ... after ... end не гнался. Однако ввел им соответственно:
  • recv :: Procedure q q — ожидает до тех пор, пока сообщение не придет, после чего возвращает его;
  • recvMaybe :: Int -> Procedure q (Maybe q) — ожидает сообщения n микросекунд. Если сообщение не пришло за это время, то возвращает Nothing, если пришло, то (Just сообщение).
Примеры с использованием запощу в ru_lambda

Upd: http://community.livejournal.com/ru_lambda/81733.html
Tags: erlang, haskell
Subscribe
  • Post a new comment

    Error

    default userpic

    Your reply will be screened

    Your IP address will be recorded 

    When you submit the form an invisible reCAPTCHA check will be performed.
    You must follow the Privacy Policy and Google Terms of use.
  • 31 comments