{-# Language BlockArguments, ScopedTypeVariables #-}
module Hookup.Concurrent (concurrentAttempts) where
import Control.Concurrent (forkIO, throwTo)
import Control.Concurrent.Async (Async, AsyncCancelled(..), async, asyncThreadId, cancel, waitCatch, waitCatchSTM)
import Control.Concurrent.STM (STM, atomically, check, orElse, readTVar, registerDelay, retry)
import Control.Exception (SomeException, finally, mask_, onException)
import Control.Monad (join)
import Data.Foldable (for_)
concurrentAttempts ::
Int ->
(a -> IO ()) ->
[IO a] ->
IO (Either [SomeException] a)
concurrentAttempts :: forall a.
Int -> (a -> IO ()) -> [IO a] -> IO (Either [SomeException] a)
concurrentAttempts Int
delay a -> IO ()
release [IO a]
actions =
let st :: St a
st = St :: forall a.
[Async a]
-> [SomeException]
-> [IO a]
-> Int
-> (a -> IO ())
-> STM ()
-> St a
St { threads :: [Async a]
threads = [],
errors :: [SomeException]
errors = [],
work :: [IO a]
work = [IO a]
actions,
delay :: Int
delay = Int
delay,
clean :: a -> IO ()
clean = a -> IO ()
release,
readySTM :: STM ()
readySTM = STM ()
forall a. STM a
retry }
in IO (Either [SomeException] a) -> IO (Either [SomeException] a)
forall a. IO a -> IO a
mask_ (St a -> IO (Either [SomeException] a)
forall a. St a -> Answer a
loop St a
st)
data St a = St
{ forall a. St a -> [Async a]
threads :: [Async a]
, forall a. St a -> [SomeException]
errors :: [SomeException]
, forall a. St a -> [IO a]
work :: [IO a]
, forall a. St a -> Int
delay :: !Int
, forall a. St a -> a -> IO ()
clean :: a -> IO ()
, forall a. St a -> STM ()
readySTM :: STM ()
}
type Answer a = IO (Either [SomeException] a)
loop :: forall a. St a -> Answer a
loop :: forall a. St a -> Answer a
loop St a
st = if [Async a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null (St a -> [Async a]
forall a. St a -> [Async a]
threads St a
st) then St a -> Answer a
forall a. St a -> Answer a
nothingRunning St a
st else St a -> Answer a
forall a. St a -> Answer a
waitForEvent St a
st
nothingRunning :: St a -> Answer a
nothingRunning :: forall a. St a -> Answer a
nothingRunning St a
st =
case St a -> [IO a]
forall a. St a -> [IO a]
work St a
st of
[] -> Either [SomeException] a -> Answer a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([SomeException] -> Either [SomeException] a
forall a b. a -> Either a b
Left (St a -> [SomeException]
forall a. St a -> [SomeException]
errors St a
st))
IO a
x:[IO a]
xs -> IO a -> St a -> Answer a
forall a. IO a -> St a -> Answer a
start IO a
x St a
st{work :: [IO a]
work = [IO a]
xs}
start :: IO a -> St a -> Answer a
start :: forall a. IO a -> St a -> Answer a
start IO a
x St a
st =
do Async a
thread <- IO a -> IO (Async a)
forall a. IO a -> IO (Async a)
async IO a
x
STM ()
ready <- if [IO a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null (St a -> [IO a]
forall a. St a -> [IO a]
work St a
st) then STM () -> IO (STM ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure STM ()
forall a. STM a
retry else Int -> IO (STM ())
startTimer (St a -> Int
forall a. St a -> Int
delay St a
st)
St a -> Answer a
forall a. St a -> Answer a
loop St a
st { threads :: [Async a]
threads = Async a
thread Async a -> [Async a] -> [Async a]
forall a. a -> [a] -> [a]
: St a -> [Async a]
forall a. St a -> [Async a]
threads St a
st, readySTM :: STM ()
readySTM = STM ()
ready }
waitForEvent :: St a -> Answer a
waitForEvent :: forall a. St a -> Answer a
waitForEvent St a
st = IO (IO (Either [SomeException] a)) -> IO (Either [SomeException] a)
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (STM (IO (Either [SomeException] a))
-> IO (IO (Either [SomeException] a))
forall a. STM a -> IO a
atomically (St a
-> [Async a] -> [Async a] -> STM (IO (Either [SomeException] a))
forall a. St a -> [Async a] -> [Async a] -> STM (Answer a)
finish St a
st [] (St a -> [Async a]
forall a. St a -> [Async a]
threads St a
st))
IO (IO (Either [SomeException] a))
-> IO () -> IO (IO (Either [SomeException] a))
forall a b. IO a -> IO b -> IO a
`onException` (a -> IO ()) -> [Async a] -> IO ()
forall a. (a -> IO ()) -> [Async a] -> IO ()
cleanup (St a -> a -> IO ()
forall a. St a -> a -> IO ()
clean St a
st) (St a -> [Async a]
forall a. St a -> [Async a]
threads St a
st))
finish :: St a -> [Async a] -> [Async a] -> STM (Answer a)
finish :: forall a. St a -> [Async a] -> [Async a] -> STM (Answer a)
finish St a
st [Async a]
threads' [] = St a -> STM (Answer a)
forall a. St a -> STM (Answer a)
fresh St a
st
finish St a
st [Async a]
threads' (Async a
t:[Async a]
ts) = St a -> [Async a] -> Async a -> STM (Answer a)
forall a. St a -> [Async a] -> Async a -> STM (Answer a)
finish1 St a
st ([Async a]
threads' [Async a] -> [Async a] -> [Async a]
forall a. [a] -> [a] -> [a]
++ [Async a]
ts) Async a
t
STM (Answer a) -> STM (Answer a) -> STM (Answer a)
forall a. STM a -> STM a -> STM a
`orElse` St a -> [Async a] -> [Async a] -> STM (Answer a)
forall a. St a -> [Async a] -> [Async a] -> STM (Answer a)
finish St a
st (Async a
tAsync a -> [Async a] -> [Async a]
forall a. a -> [a] -> [a]
:[Async a]
threads') [Async a]
ts
finish1 :: St a -> [Async a] -> Async a -> STM (Answer a)
finish1 :: forall a. St a -> [Async a] -> Async a -> STM (Answer a)
finish1 St a
st [Async a]
threads' Async a
t =
do Either SomeException a
res <- Async a -> STM (Either SomeException a)
forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async a
t
Answer a -> STM (Answer a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure case Either SomeException a
res of
Right a
s -> a -> Either [SomeException] a
forall a b. b -> Either a b
Right a
s Either [SomeException] a -> IO () -> Answer a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ (a -> IO ()) -> [Async a] -> IO ()
forall a. (a -> IO ()) -> [Async a] -> IO ()
cleanup (St a -> a -> IO ()
forall a. St a -> a -> IO ()
clean St a
st) [Async a]
threads'
Left SomeException
e -> St a -> Answer a
forall a. St a -> Answer a
loop St a
st { errors :: [SomeException]
errors = SomeException
e SomeException -> [SomeException] -> [SomeException]
forall a. a -> [a] -> [a]
: St a -> [SomeException]
forall a. St a -> [SomeException]
errors St a
st, threads :: [Async a]
threads = [Async a]
threads'}
fresh :: St a -> STM (Answer a)
fresh :: forall a. St a -> STM (Answer a)
fresh St a
st =
case St a -> [IO a]
forall a. St a -> [IO a]
work St a
st of
[] -> STM (Answer a)
forall a. STM a
retry
IO a
x:[IO a]
xs -> IO a -> St a -> Answer a
forall a. IO a -> St a -> Answer a
start IO a
x St a
st{work :: [IO a]
work = [IO a]
xs} Answer a -> STM () -> STM (Answer a)
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ St a -> STM ()
forall a. St a -> STM ()
readySTM St a
st
startTimer :: Int -> IO (STM ())
startTimer :: Int -> IO (STM ())
startTimer Int
n =
do TVar Bool
v <- Int -> IO (TVar Bool)
registerDelay Int
n
STM () -> IO (STM ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool -> STM ()
check (Bool -> STM ()) -> STM Bool -> STM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
v)
cleanup :: (a -> IO ()) -> [Async a] -> IO ()
cleanup :: forall a. (a -> IO ()) -> [Async a] -> IO ()
cleanup a -> IO ()
release [Async a]
xs =
() () -> IO ThreadId -> IO ()
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ IO () -> IO ThreadId
forkIO do [Async a] -> (Async a -> IO ()) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [Async a]
xs \Async a
x -> ThreadId -> AsyncCancelled -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo (Async a -> ThreadId
forall a. Async a -> ThreadId
asyncThreadId Async a
x) AsyncCancelled
AsyncCancelled
[Async a] -> (Async a -> IO ()) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [Async a]
xs \Async a
x -> do Either SomeException a
res <- Async a -> IO (Either SomeException a)
forall a. Async a -> IO (Either SomeException a)
waitCatch Async a
x
Either SomeException a -> (a -> IO ()) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Either SomeException a
res a -> IO ()
release