## Two implementations of Seers

Last time, we implemented a bowling game scorer by using a Tardis. If you aren’t yet familiar with the Tardis’s interface, then I recommend you check out the explanation on Hackage. (tl;dr it’s a State monad with get and put, except there are two streams of state, one forwards and one backwards, so there are four operations: getPast, getFuture, sendPast, and sendFuture.)

Today, we’ll take another large step in the esoteric drection, and implement a Seer by using a Tardis. Why, you ask? My response: why not? There may be some deep motivating reasons for you to study this, but I don’t pretend to know what those might be.

> {-# LANGUAGE MultiParamTypeClasses #-}
> {-# LANGUAGE FunctionalDependencies #-}
> {-# LANGUAGE FlexibleInstances #-}
> {-# LANGUAGE FlexibleContexts #-}
> {-# LANGUAGE GeneralizedNewtypeDeriving #-}
> {-# LANGUAGE DoRec #-}
> {-# OPTIONS_GHC -Wall #-}

> import Control.Applicative (Applicative)
> import Control.Monad.Trans.Writer (WriterT, tell, runWriterT)
> import Data.Monoid


# What is a Seer?

A seer is someone that foretells the future.1 But how do seers know the future? Suppose you are writing a novel, and you want to devise a semi-believable “system” for how seers work. What would the rules be?

Well, rule number one for me is that in a legitimate system, all seers must agree about the future. If different seers predict different outcomes for the same future period, then there is reason to doubt such a system. I decided that in my seer system, all seers see “the whole universe”. All seers see the same thing, regardless of when or where in space and time they decide to “see” it.

Now, where does this information come from? Are there separate people that send information to these seers? My first idea was that the seer system could be a network of seers, and all information comes from within the network itself. All seers are therefore required to provide accurate information about their “present” in order to tap into the reservoir of mystical information about their past and future.

We therefore come to the main operation that I have devised for seers.

contact :: Monoid w => w -> Seer w

A seer provides their worldview in exchange for the grand worldview. The “whole” world should be of the form past <> present <> future, where present is whatever value is provided as the argument to contact.

Remember when I wondered about whether those that “see” the universe and those that “send” information about the universe might be different people? It turns out that we can easily write operations see and send in terms of contact. Or, alternatively, given see and send, we can easily write contact in terms of those.

> class (Monad m, Monoid w) => MonadSeer w m | m -> w where
>   see :: m w
>   send :: w -> m ()
>   contact :: w -> m w
>
>   see = contact mempty
>   send w = contact w >> return ()
>   contact w = send w >> see


I’ve created a typeclass for the Seer interface, because we are going to implement a seer in two different ways.

# Seer in terms of a Tardis

The Tardis allows us to both get and send messages to both the past and future. Given the timey-wimey nature of seers, a tardis seems like the perfect candidate for implementing them.

> newtype SeerT w m a = SeerT { unSeerT :: TardisT w w m a }


A single contact consists of a seer getting in touch with both the past and the future. It seems only fair that this seer should share with the future his newfound knowledge of the past, and with the past his knowledge of the future. The past is inquiring the present about its (the past’s) future, which includes both the present and the future, or in other words present <> future. The future is inquiring the present about its (the future’s) past, which includes both the present and the past, or in other words, past <> present. The result of the contact is the whole universe, spanning all of time, in other words, past <> present <> future. In all cases, we want to make sure to keep things in “chronological” order.

Did you follow all of that? In short, information from the past should be sent forwards to the future, and information from the future should be sent backwards to the past. We can encode this flow of information easily using the Tardis operations:

> instance (Monoid w, MonadFix m) => MonadSeer w (SeerT w m) where
>   contact present = SeerT $do > rec past <- getPast > sendPast (present <> future) > sendFuture (past <> present) > future <- getFuture > return (past <> present <> future)  Now, in order to “run” a seer operation, all we have to do is provide mempty at both ends of the time continuum, and run the tardis as usual. > runSeerT :: (MonadFix m, Monoid w) => SeerT w m a -> m a > runSeerT = flip evalTardisT (mempty, mempty) . unSeerT  Here is a dumb example demonstrating how it works. > dumbExample :: MonadSeer [Int] m => m [Int] > dumbExample = do > world1 <- see > send [world1 !! 2] > send [1] > world2 <- see > send [world2 !! 1] > world3 <- see > return world3  ghci> runSeerT dumbExample [1,1,1]  It is actually unnecessary to see more than once, since it is always the unchanging truth of past <> present <> future. The following is equivalent: dumbExample = do world <- see send [world !! 2] send [1] send [world !! 1] return world # Seer in terms of a Reader/Writer The astute observer should have noticed an odd similarity between see and ask, send and tell. They embody practically the same concept! The only nuance is that when you ask, what you will get is everything that you have tell’d, and everything you will tell. It turns out that this is quite easy to write in terms of the Reader and Writer monad transformers, which happen to be instances of MonadFix. > newtype RWSeerT w m a = RWSeerT { unRWSeerT :: ReaderT w (WriterT w m) a } > deriving (Functor, Applicative, Monad, MonadFix)  As I said before, see is simply ask, while send is simply tell. We merely lift and wrap the operations as necessary to keep the type system happy: > instance (Monoid w, Monad m) => MonadSeer w (RWSeerT w m) where > see = RWSeerT ask > send w = RWSeerT (lift (tell w))  Now, to run a Seer built on top of a Reader/Writer pair, all we have to do is feed the results of the Writer straight back into the Reader. We accomplish this via mfix. > runRWSeerT :: (Monoid w, MonadFix m) => RWSeerT w m a -> m a > runRWSeerT (RWSeerT rwma) = liftM fst$
>   mfix (\ ~(_, w) -> runWriterT (runReaderT rwma w))


Here is a dumb example demonstrating that it works

ghci> runRWSeerT dumbExample
[1,1,1]


# So why use a Tardis?

For fun, obviously!

More seriously, notice that we can “run” SeerT differently, depending on whether we implemented it with Tardis or with Reader/Writer. With Tardis, we can supply “bookends”, the further past and the further future.

> runSeerTWith :: (MonadFix m, Monoid w) => w -> w -> SeerT w m a -> m a
> runSeerTWith past future = flip evalTardisT (future, past) . unSeerT


Exercise: Predict the output of runSeerTWith [10, 11, 12] [16, 17, 18] dumbExample.

Whereas with the reader/writer pair, we can fool the seers by giving them a false reality.

> runRWSeerTWith :: (Monoid w, Monad m) => w -> RWSeerT w m a -> m a
> runRWSeerTWith falseReality (RWSeerT rwma) = liftM fst $> runWriterT (runReaderT rwma falseReality)  Exercise: Predict the output of runRWSeerTWith [10, 11, 12] dumbExample. What the ramifications of these are, I really don’t know. I just follow the types, lean on the laziness, and things just seem to work in Haskell, even mystical things like time travel and seers. Download this code and play with it! Don’t forget to cabal install tardis first. Posted in Uncategorized | 1 Comment ## Bowling on a Tardis > {-# LANGUAGE DoRec #-} > import Control.Monad.Tardis  A few months ago, I released the tardis package. I promised a few blog posts about it, but put it off until now. If you haven’t heard of my "tardis" package yet, then you should probably take a look at the hackage documentation I’ve already written up for Control.Monad.Tardis. # Bowling Let’s whip up a contrived example to which Tardis is applicable. Bowling scores is one such example, because the score you have on a given frame depends on both the past score as well as up to two future throws. Any time you need to know something from both the past and the future, Tardis might be able to help. Let’s first define a data type that captures the essence of a bowling game. A game consists of 10 "frames". Although we model a single Frame as a data type, there are special rules that apply to the final frame, so we will model it separately as LFrame. > data BowlingGame = BowlingGame > { frames :: [Frame] -- should be 9, too tedious to type restrict > , lastFrame :: LFrame } > > data Frame = Strike > | Spare { firstThrow :: Int } > | Frame { firstThrow, secondThrow :: Int } > > data LFrame = LStrike { bonus1, bonus2 :: Int } > | LSpare { throw1, bonus1 :: Int } > | LFrame { throw1, throw2 :: Int }  For details on how bowling is scored, see Wikipedia > Bowling # Scoring. # Sample data Here’s a few games’ worth of sample bowling data. > -- X 9/ X X X 81 7/ X X XXX > -- 0 20 40 70 98 117 126 146 176 206 236 > -- this guy is really good. > sampleGame = BowlingGame > { frames = > [ Strike , Spare 9 > , Strike , Strike > , Strike , Frame 8 1 > , Spare 7 , Strike > , Strike > ] > , lastFrame = LStrike 10 10 > } > > perfectGame = BowlingGame > { frames = replicate 9 Strike > , lastFrame = LStrike 10 10 > } > > worstGame = BowlingGame > { frames = replicate 9 (Frame 0 0) > , lastFrame = LFrame 0 0 > } > > main = mapM_ (print . toScores) [sampleGame, perfectGame, worstGame]  # Using a Tardis Well now we want to write the function toScores :: BowlingGame -> [Int]. We’ll do this by stepping through each Frame and creating the appropriate score. Whenever using a Tardis, I recommend you create separate newtypes for the backwards- and forwards-travelling state so you don’t get them mixed up. > newtype PreviousScores = PreviousScores [Int] > newtype NextThrows = NextThrows (Int, Int)  Here I’ve chosen the newtype PreviousScores for the forwards state, (because coming from the past to the present is moving "forwards" in time) and NextThrows as the backwards state (because coming from the future to the present is moving "backwards" in time). > toScores :: BowlingGame -> [Int] > toScores game = flip evalTardis initState$ go (frames game) where
>   go :: [Frame] -> Tardis NextThrows PreviousScores [Int]


First, we handle the case where we have another frame to process. We begin by assuming we have access to the next two throws (nextThrow1 and nextThrow2), as well as the previous score.

>   go (f : fs) = do
>     rec
>       let (score', throws') = case f of
>             Strike    -> (score + 10 + nextThrow1 + nextThrow2, (10, nextThrow1))
>             Spare n   -> (score + 10 + nextThrow1,              (n, 10 - n))
>             Frame n m -> (score + n + m,                        (n, m))


We need to determine the new state for each of the two streams of state. score' is determined by a combination of the previous score, the current frame, and future throws. This is the new score that we will send forwards in time. throws' is determined only by the current frame and future throws. This is the new "next two throws" that we will send backwards in time, which is why we put the current frame’s first throw as the earliest.

Now that we’ve got that figured out, we just use the tardis’s capabilities in order to retrieve and send information along its correct time stream. A good rule of thumb seems to be, if you want to get information from the past, then send the past some information first. Likewise, if you want info from the future, then send it some info first. However, I have no idea if this rule of thumb is necessary at all; the Tardis will sometimes Just Work even if you jumble it up a little.

>       sendPast $NextThrows throws' > PreviousScores scores@(score : _) <- getPast > sendFuture$ PreviousScores (score' : scores)
>       NextThrows ~(nextThrow1, nextThrow2) <- getFuture


Great! Finally, we move on to the rest of the frames.

>     go fs


Once we run out of frames, we need to handle the last frame. There is no future to be concerned about, and we can just set up the values to be sent to the recent past via initState, so all we have to do is look at the past score, add the final frame’s score, and we’re done.

>   go [] = do
>     PreviousScores scores@(score : _) <- getPast
>     return $(finalFrameScore + score) : scores  All that’s left is to figure out how to determine the final frame’s score, as well as the initial state. The former is easy, given the specifications of how to score a bowling game. > finalFrameScore = case lastFrame game of > LStrike b1 b2 -> 10 + b1 + b2 > LSpare t1 b1 -> 10 + b1 > LFrame t1 t2 -> t1 + t2  The "initial state" fed into a tardis is the farthest past for the forwards-travelling state, and the farthest future for the backwards-travelling state. The farthest past is a score of zero, while the farthest future is the final two throws of the game. Well, not quite. It’s the final two throws that come before the second-to-last frame. The last frame is guaranteed to consist of at least two throws. In the case of LStrike or LSpare, there are always three throws in the last frame, so the final throw is ignored. Remember, we’re sending the past its "closest" future two throws. > initState = (NextThrows$ case lastFrame game of
>     LStrike b1 b2 -> (10, b1)
>     LSpare t1 _b1 -> (t1, 10 - t1)
>     LFrame t1 t2  -> (t1, t2)
>     , PreviousScores [0])


And… that’s it! All we had to do was encode the rules of Bowling into a Tardis, and via some timey-wimey trickery, the Tardis assembles all of the information into a list of bowling scores, from the last frame to the first.

ghci> main
[236,206,176,146,126,117,98,70,40,20,0]
[300,270,240,210,180,150,120,90,60,30,0]
[0,0,0,0,0,0,0,0,0,0,0]


Exercise: download this code, and remove the tilde (~) from line 133. What happens? Why?

# Next time

Bowling was a rather simple example, to warm you up to the idea of what a Tardis is and what it can do. Next time, we’ll get even more timey-wimey by sketching out the concept of "seers" with nothing more than tardis primitives and a vague idea of some ground rules to rationally explain how you might describe a believable system of "seers" in a fictional setting.

# My experience with Typed Racket

A while ago, I began a project to re-implement Racket’s web server library in Typed Racket (TR) by providing type annotations to the existing code. I began by typing the xml collection library, but after spending a lot of time on just that, I decided to drop the project. I think TR is a very impressive feat, and I really like the direction it is going, but I feel that the current state of TR is not adequate for typing large swaths of pre-existing Racket code. In this document, I will attempt to explain the road blocks and speed bumps I encountered while working on this project.

Although I will focus on some negative aspects here, I do not wish to communicate that my overall experience with TR was bad. Quite the contrary, friends on the #racket irc channel were extremely helpful, and for every buggy type annotation that I demonstrate, there are mountains of code that are easily and effortlessly typed. Also, most of these problems could be avoided by simply starting your project with Typed Racket, rather than trying to go back to old code and type it.

## Expectations

Let me start by stating what I expect, or rather, what I wish were true of Typed Racket (TR).

First, I’d like TR to be a drop-in Racket replacement for any module. I’d like to be able to take a project, and pick any file written in regular racket, and just add type annotations and have it just automagically work with the rest of the project.

Second, I’d like to be able to take any racket code, and provide a type for it. Anything I write in regular racket should somehow be typeable in TR, without modifying the code itself. Sometimes, TR type annotations are intrusive, but as long as the original logic remains identical, I consider that to be "unmodified" code. This is essentially impossible, so let me limit that a bit: any sane code that I write in Racket should be typeable by TR. Most people write code with particular types in mind, whether or not they are explicitly using type annotations. The xml collection code I was working with provided types in the comments for every function, and also provided contracts for most functions. I think it’s reasonable to expect this kind of code to be typeable.

Now with that in mind, let’s start with one of the first bugs I ran into, and as it turns out, one of the most devastating.

## Regarding Structs

TR originally targeted R5RS, and used to be called "Typed Scheme". This perhaps explains why struct support isn’t quite there.

As a passing thought, let me just mention that there is an unfortunate disconnect between TR conventions and racket struct conventions. It is typical for TR types to be capitalized (e.g. Continuation-Mark-Set). This helps visually distinguish type names, and I think it is a good convention. However, it is typical for racket structs to be lowercased (e.g. (struct location (line char offset))). This is convenient because it makes for nice derived identifiers (e.g. location-line). However, there is a disconnect between the two conventions: TR will create a type with the same name as the struct, which means that we usually end up with a lowercase type name. This is nothing that a define-type can’t fix, but it’s annoying nonetheless.

### Extending a struct

Now on to the main event. Consider these two racket files: the first provides a struct, and the second creates another struct which extends the first.

foo.rkt

#lang racket
(define-struct foo ())
(provide (struct-out foo))

bar.rkt

#lang racket
(require "foo.rkt")
(define-struct (bar foo) ())

Let’s try to convert foo.rkt to Typed Racket. We’ll simply switch define-struct to define-struct:. If that struct had fields, we would provide type annotations for those, too.

foo.rkt

#lang typed/racket
(define-struct: foo ())
(provide (struct-out foo))

That file works just fine, but now if we try to run bar.rkt, the type checker reprimands us:

Type Checker: The type of struct:foo cannot be converted to a contract in: struct:foo5

Thus was born ticket 12503.

### Typing a stream consumer

On the side, I’ve been following recent iteratee conversations in the Haskell community, and wanted to write up "pipes" in TR.

A simplified version of a "pipe" is a stream consumer. It consumes an unknown number of inputs of the same type, and then produces some result. The consumer therefore has two states: "need another input", or "have a result". (Let’s ignore side effects for the sake of simplicity here.) This is easily written in Racket using structs to distinguish the two cases.

#lang racket
(require racket/match)

(struct fun (f))
(struct done (result))

; A way to run a consumer by giving it the same thing over and over
(define (repeatedly-apply consumer x)
(match consumer
[(fun f) (repeatedly-apply (f x) x)]
[(done result) result]))

Now we wish to type this code. TR’s union types should do the trick.

#lang typed/racket
(require racket/match)

(struct: (I R) fun ([f : (I -> (Consumer I R))]))
(struct: (R) done ([result : R]))

(define-type (Consumer I R)
(U (fun I R)
(done I R)))

; A way to run a consumer by giving it the same thing over and over
(: repeatedly-apply ((Consumer I R) I -> R))
(define (repeatedly-apply consumer x)
(match consumer
[(fun f) (repeatedly-apply (f x) x)]
[(done result) result]))

Unfortunately, in Racket 5.3, TR says that this is a type error:

Type Checker: Structure type constructor fun applied to
non-regular arguments (g5588 R) in: (fun I R)

Fortunately, merely a few days after I created ticket 12999, a fix was patched onto HEAD. Keep up the good work, guys!

## Filters

Type filters are a really cool feature of Typed Racket, and are essential to typing Racket code. Basically, whenever you use a function with a filter attached, you can refine the type information for code constructs with multiple branches, such as cond and if. See the pastebin link at the end of this section for a more detailed explanation of type filters; I’m unaware of any good official documentation on the topic.

One annoyance I ran into was being unable to control which filter a function I defined has. For example, suppose I have a function that determines whether its input is a happy char.

#lang racket
(require racket/match)

(define (happy-char? c)
(match c
[(or #\h #\a #\p #\y) #t]
[_ #f]))

(andmap happy-char? (string->list "happy")) ;; => #t
(andmap happy-char? (string->list "sad"))   ;; => #f

(Coming from Haskell, I really like racket/match.)

We’d like to define a type to represent happy chars. Happily, this can be done in Typed Racket with relative ease.

#lang typed/racket

(define-type Happy-Char (U #\h #\a #\p #\y))
(define-predicate happy-char? Happy-Char)

(andmap happy-char? (string->list "happy")) ;; => #t
(andmap happy-char? (string->list "sad"))   ;; => #f

Sadly, this requires using define-predicate. Remember that one of the things I expect from TR is that all sane Racket code that I write should be typeable. If we try to type the happy-char? that I wrote originally, we’ll run into problems:

#lang typed/racket
(require racket/match)

(define-type Happy-Char (U #\h #\a #\p #\y))

(: happy-char? (Any -> Boolean : Happy-Char))
(define (happy-char? c)
(match c
[(or #\h #\a #\p #\y) #t]
[_ #f]))

The type system says "no" to this.

Expected result with filter ((Happy-Char @ c) | (! Happy-Char @ c)),
got filter (Top | Top) in: (match c ((or #\h #\a #\p #\y) #t) (_ #f))

: Happy-Char is the filter part of the function type annotation: if this function produces #t, then the result is guaranteed to have type Happy-Char, and if this function produces #f, then the result is guaranteed to not have type Happy-Char. In error messages, TR expresses this as ((Happy-Char @ c) | (! (Happy-Char @ c))).

Aside: One annoyance is that there is no way for the programmer to annotate anything other than a filter of the form (definitely-yes | definitely-no); see feature request #12528 for details.

Back to the issue at hand. The main reason this is invalid is because TR is simply unaware of racket/match; filters don’t flow through branches of racket/match like you would expect them to. (This is on the long term list of goals for TR.)

However, there is a more fundamental problem with the current implementation of filters. I wrote up a Typed Racket file with comments that explain what filters are, and one of the latest limitations that I ran into: http://pastebin.com/JQ9txdrX.

## Macros

Optional arguments are somewhat annoying to deal with in TR. Suppose I want to type the following function:

#lang racket
(define (foo x [y 3]) (+ x y))

TR provides the case-> type, which allows a function to have multiple arities. We can use this to type foo like so:

#lang racket
(: foo (case-> (Number -> Number)
(Number Number -> Number)))
(define (foo x [y 3]) (+ x y))

Not bad, though it’s a bit annoying to have to repeat the other arguments. Well, in Racket, when you find yourself writing the same flavor of annoying code over and over, what do you do? You write a macro!

I’m not much of a macro wiz, but here’s a function describing something like the macro I’d like to write:

#lang racket

(define (case-opt s)
(match s [(,args ? ,opt-args -> ,result)
(match opt-args
[(list)
(append args (-> ,result))]
[(cons opt-first opt-rest)
(case->
,(append args (-> ,result))
,(case-opt (,(append args (list opt-first)) ? ,opt-rest -> ,result)))]
)]))

(case-opt '((Foo Bar) ? (Baz Quux) -> End)) ;; =>
;; '(case-> (Foo Bar -> End)
;;   (case-> (Foo Bar Baz -> End)
;;            (Foo Bar Baz Quux -> End)))

Seems reasonable, right? I would like a cleaner syntax for optional args, and it seems like a straightforward desugaring of my desired syntax could be accomplished through the macro system. Alas, Typed Racket hijacks macros, and happens before them.

#lang typed/racket

(define-syntax-rule (never-mind-me t) t)

(: x (never-mind-me Integer))
(define x 3)

;; Type checker: Unbound type name never-mind-me in: never-mind-me

The define-type mechanism could cover this simple example, but isn’t flexible enough to define more complex desugarings, like case-opt. There are probably some good reasons to keep it this way, but it’s disappointing to run into restrictions like this; it just feels non-Racket-y.

## Contracts

A Good Racket library will often provide contracts with the functions that it exports. Contracts are basically restrictions on input and promises about output that are checked at runtime. At this point in time, I think it’s safe to say that Racket is the one true implementation of Contracts, and other languages sometimes provide a dumbed down version of them.

There is a lot of overlap between contracts and a type system. If TR is to serve as a drop-in replacement for Racket, then it needs to be able to define and export contracts just like regular Racket. Remember, an important use case of TR is to type some code originally written in Racket, in such a way that said code behaves just like it used to, without having to type any code that depends on it.

Contracts can behave like predicates, and are therefore connected to the idea of type filters. Consider the following simplified code taken from collects/xml/private/xexpr.rkt:

#lang racket/base
(require racket/contract)

(define (correct-xexpr? true false x) ...)
(define (xexpr? x) (correct-xexpr? (λ () #t) (λ (exn) #f) x))
(define (validate-xexpr x) (correct-xexpr? (λ () #t) (λ (exn) (raise exn)) x))

(define xexpr/c
(make-flat-contract
#:name 'xexpr?
#:projection
(lambda (blame)
(lambda (val)
(with-handlers ([exn:invalid-xexpr?
... ])
(validate-xexpr val)
val)))
#:first-order xexpr?))

The contract xexpr/c is designed around the correct-xexpr? function. Rather than spitting out plain #t or #f values, correct-xexpr? can take two actions to run under the "true" or "false" circumstances, respectively. The "false" action must be a function that can take an exn:invalid-xexpr as input. This design allows correct-xexpr? to provide detailed custom error messages in its implementation, and the caller can choose whether to inspect the error message, or simply throw it away.

Like racket/match, contracts play a crucial role in large, well-designed Racket programs, but Typed Racket just isn’t powerful enough yet to grant the programmer the ability to customize contracts. Instead, you can only generate contracts mechanically for a given data type using define-predicate. define-predicate is an invasive change; it forces me to throw away the custom code that served the same purpose, and to modify all code that depended on custom behavior of the pre-existing code.

## Conclusions

I love Typed Racket. I hope this post does not discourage you from looking into TR. I especially think that TR is well-suited to new projects that can be built from the ground up with TR in mind. The TR type system is surprisingly flexible about the programs that it can type.

Unfortunately, TR doesn’t quite cover all of Racket. It can serve you very well as Typed Scheme, but it lacks full support for and cooperation with Rackety things such as structs, pattern matching, contracts, and macros. I eagerly look forward to the day when Typed Racket fully and completely meets my expectations, and I wouldn’t be too surprised if this happened over the next few years.

Posted in Uncategorized | 1 Comment

## Pipes to Conduits part 8: A comparison

In this series, we started with the simplest of Pipe implementations, and added features one by one until we reached Conduit-like functionality. Today, we’ll strip away the abort and close features not present in Conduit (the former might be considered a misfeature, though without using indexed monads it is a necessity for the latter), and compare the results. There is one major difference, which I believe illustrates a serious flaw in both implementations. I will illustrate this issue at the end of the post.

For now, walk with me through some of our old code, as we compare it side-by-side with the code from Data.Conduit.Internal (from conduit-0.5.2.2).

> {-# OPTIONS_GHC -Wall #-}
> {-# LANGUAGE NoMonomorphismRestriction #-}
>
> module PipeConduit where
>
> import Control.Monad.Trans.Free (FreeT(..), FreeF(..), liftF, wrap)
>
> import Data.Void (Void, absurd)
>
> import qualified Data.Conduit as C
> import qualified Data.Conduit.List as C
>
> import qualified Control.Frame as F
> import Control.IMonad.Restrict (foreverR, mapMR_, (!>=), (!>))


## Helpers

> pass :: Monad m => m ()
> pass = return ()
>
> unreachable :: Monad m => m ()
> unreachable = error "You've reached the unreachable finalizer"


## The Pipe type

I’ve decided for this post to revamp and re-arrange the PipeF type. Sans close and abort, and with mild re-arrangement of the order of fields for given constructors, you should be able to tell that it is identical to the PipeF type we have worked with before.

> data PipeF l i o u m next
>   = Yield next (m ()) o
>   | Await (i -> next) (u -> next)
>   | Leftover next l


The Functor instance for this type is entirely mechanical, based on its components. We could have just as easily used -XDeriveFunctor and arrived at the same instance.

> instance Functor (PipeF l i o u m) where
>   fmap h (Yield next fin o) = Yield (h next) fin o
>   fmap h (Await f g) = Await (h . f) (h . g)
>   fmap h (Leftover next l) = Leftover (h next) l

> type Pipe l i o u m r =
>   FreeT (PipeF l i o u m) m r


Now compare this with the Pipe type from Data.Conduit.Internal. I’ve rearranged the order of the lines of code, and removed comments, but otherwise the code is untouched.

data Pipe l i o u m r =
HaveOutput (Pipe l i o u m r) (m ()) o
| NeedInput (i -> Pipe l i o u m r) (u -> Pipe l i o u m r)
| Leftover (Pipe l i o u m r) l

| Done r
| PipeM (m (Pipe l i o u m r))

If you are comfortable in your grasp of the Free monad transformer, then you should be able to see that our two representations are equivalent. The Done and PipeM constructors are analogous to Return and Wrap, while HaveOutput, NeedInput, and Leftover are analogous to Yield, Await, and Leftover respectively.

I’m going to define some synonyms for FreeT and runFreeT to help illustrate the similarities in implementation.

> pipeM :: m (FreeF (PipeF l i o u m) r (Pipe l i o u m r))
>       -> Pipe l i o u m r
> pipeM m = FreeT m
>
> runPipeM :: Pipe l i o u m r
>          -> m (FreeF (PipeF l i o u m) r (Pipe l i o u m r))
> runPipeM (FreeT m) = m


For the Conduit implementation, you could imagine analogous methods that would allow us to write the Conduit code in similar fashion to what you’ve seen here before.

pipeM :: Monad m => m (Pipe l i o u m r) -> Pipe l i o u m r
pipeM m = PipeM m

runPipeM :: Monad m => Pipe l i o u m r -> m (Pipe l i o u m r)
runPipeM (PipeM m) = m >>= runPipeM
runPipeM p = return p
> type Producer   o   m r = Pipe Void () o    () m r
> type Consumer l i u m r = Pipe l    i  Void u  m r
> type Pipeline       m r = Pipe Void () Void () m r


## Working with PipeF

I’ll keep using pipeCase to maintain similarity with previous code, although without the functor composition cruft, it’s really not that bad to just use direct pattern matching.

I’ve upgraded to transformers-free-1.0 which means that Return and Wrap are now called Pure and Free respectively.

> pipeCase :: FreeF (PipeF l i o u m) r next
>  -> (r                          -> a) -- Return
>  -> (next -> l                  -> a) -- Leftover
>  -> (next -> m () -> o          -> a) -- Yield
>  -> ((i -> next) -> (u -> next) -> a) -- Await
>                                 -> a
> pipeCase (Pure r)
>   k _ _ _ = k r
> pipeCase (Free (Leftover next l))
>   _ k _ _ = k next l
> pipeCase (Free (Yield next fin o))
>   _ _ k _ = k next fin o
> pipeCase (Free (Await f g))
>   _ _ _ k = k f g


## Pipe primitives

The Free monad transformer allows us to write the primitives using the convenient liftF combinator.

> awaitE :: Monad m =>        Pipe l i o u m (Either u i)
> awaitE = liftF $Await Right Left > > yield :: Monad m => o -> Pipe l i o u m () > yield b = liftF$ Yield () pass b
>
> leftover :: Monad m => l -> Pipe l i o u m ()
> leftover l = liftF $Leftover () l  The Conduit implementation is a bit crufty in comparison, but obviously identical. awaitE :: Pipe l i o u m (Either u i) awaitE = NeedInput (Done . Right) (Done . Left) yield :: Monad m => o -> Pipe l i o u m () yield = HaveOutput (Done ()) (return ()) leftover :: l -> Pipe l i o u m () leftover = Leftover (Done ()) ## Pipe composition > (<+<) :: Monad m => Pipe Void i' o u' m r -> Pipe l i i' u m u' -> Pipe l i o u m r > p1 <+< p2 = composeWithFinalizer pass p1 p2  > (<?<) :: Monad m => Pipe Void i' o u' m r -> Pipe l i i' u m u' -> Pipe l i o u m r > p1 <?< p2 = composeWithFinalizer unreachable p1 p2  Conduit uses the same technique of defining <+< in terms of a "compose with finalizer" function. Well to be honest, I stole the technique from Conduit code, because I just couldn’t figure out how to do it on my own. However, after I got the idea from Conduit, I implemented it separately. I knew that Conduit didn’t use unreachable, but that doesn’t really change the behavior of the code. There is another important difference that I will point out. Let’s compare the code case by case. > composeWithFinalizer :: Monad m => m () > -> Pipe Void i' o u' m r -> Pipe l i i' u m u' -> Pipe l i o u m r > composeWithFinalizer finalizeUpstream p1 p2 = pipeM$ do
>   x1 <- runPipeM p1
>   let p1' = pipeM $return x1 > runPipeM$ pipeCase x1

pipe' final left right =
case right of
PipeM mp -> PipeM (liftM (pipe' final left) mp)

Note that one unimportant difference is that pipe' has the two pipe inputs in the opposite order of composeWithFinalizer. So left is p2 and right is p1. We both begin by casing on the downstream pipe.

>   {- Return -} (\r       -> lift finalizeUpstream >> return r)

    Done r2 -> PipeM (final >> return (Done r2))

If downstream returns, we both run the current finalizer and then return the same result.

>   {- L-over -} (\_next l -> absurd l)

    Leftover _ i -> absurd i

Obviously the same.

>   {- Yield  -} (\next finalizeDownstream o ->
>                        let (<*<) = composeWithFinalizer finalizeUpstream
>                        in wrap $Yield > (next <*< p2) > (finalizeUpstream >> finalizeDownstream) > o)   HaveOutput p c o -> HaveOutput (pipe' final left p) c o Notice that (next <*< p2) is identical to pipe' final left p, we both resuse the current finalizer for the next computation. And we both yield the o without modification. However, there is an important difference: in the yield construct, I have created a new finalizer by combining the "current" finalizeUpstream with the finalizer found inside the yield we are inspecting. This way, when control is transferred further downstream, both p1 and p2 will have a chance to be finalized. The conduit-0.5.2.2 implementation does not factor in the current upstream finalizer (instead, it just passes c along), and as I will later demonstrate, this causes undesirable behavior. I have to admit, when I saw this discrepancy, I was unsure whether I had missed something, or whether I was right. I put a lot of effort into part 5 explaining finalization, and it turns out that I was right, but not without a grave mistake of my own, which I shall also demonstrate. Let’s press on with our comparison. > {- Await -} (\f1 g1 -> pipeM$ do
>     x2 <- runPipeM p2
>     runPipeM $pipeCase x2   NeedInput rp rc -> upstream rp rc where upstream rp rc = case left of PipeM mp -> PipeM (liftM (\left' -> pipe' final left' right) mp) In the event of downstream await, control shifts upstream in both implementations. > {- Return -} (\u' -> g1 u' <+< return u')   Done r1 -> pipe (Done r1) (rc r1) In the absence of abort, we must return to the broken record technique: just keep giving the upstream result every time an upstream value is awaited. This is identical to Conduit behavior. > {- L-over -} (\next l -> wrap$ Leftover (p1' <?< next) l)

        Leftover left' i -> Leftover (pipe' final left' right) i

Here the only difference is that I use unreachable while Conduit just passes the current finalizer. Since it will never be reached, the behavior is the same.

>     {- Yield  -} (\next newFinalizer o ->
>                       let (<*<) = composeWithFinalizer newFinalizer
>                       in f1 o <*< next)

        HaveOutput left' final' o -> pipe' final' left' (rp o)

When upstream yields to downstream, the choice is obvoius. A new upstream finalizer is provided, so we both use that.

>     {- Await  -} (\f2 g2 -> wrap $Await > (\i -> p1' <?< f2 i) > (\u -> p1' <?< g2 u)))   NeedInput left' lc -> NeedInput (\a -> pipe' final (left' a) right) (\r0 -> pipe' final (lc r0) right) This is also the same, modulo unreachable. Notice how in our code, we had to bind p1', the pipe we got after runPipeM p1. We wouldn’t want to re-invoke those effects all over again; they should only be invoked once. The Conduit code doesn’t have to worry about that, since it partitions effects into PipeM. > (>+>) :: Monad m => Pipe l i i' u m u' -> Pipe Void i' o u' m r -> Pipe l i o u m r > (>+>) = flip (<+<) > > infixr 9 <+< > infixr 9 >+>  ## Running a pipeline It is easy to observe that runPipe is the same. > runPipe :: Monad m => Pipeline m r -> m r > runPipe p = do > e <- runPipeM p > pipeCase e > {- Return -} (\r -> return r) > {- L-over -} (\_next l -> absurd l) > {- Yield -} (\_next _fin o -> absurd o) > {- Await -} (\f _g -> runPipe$ f ())

runPipe :: Monad m => Pipe Void () Void () m r -> m r
runPipe (PipeM mp) = mp >>= runPipe
runPipe (Done r)              = return r
runPipe (Leftover _ i)        = absurd i
runPipe (HaveOutput _ _ o)    = absurd o
runPipe (NeedInput _ c)       = runPipe (c ())

## Getting rid of leftovers

The code is a little more involved here, but inspect each case and you’ll see that our implementations of injectLeftovers are also identical.

> injectLeftovers :: Monad m => Pipe i i o u m r -> Pipe l i o u m r
> injectLeftovers = go [] where
>   go ls p = pipeM $do > x <- runPipeM p > runPipeM$ pipeCase x
>     {- Return -} (\r -> return r)
>     {- L-over -} (\next l -> go (l:ls) next)
>     {- Yield  -} (\next fin o -> wrap $Yield (go ls next) fin o) > {- Await -} (\f g -> case ls of > [] -> wrap$ Await (go [] . f) (go [] . g)
>       l : ls' -> go ls' (f l))

injectLeftovers :: Monad m => Pipe i i o u m r -> Pipe l i o u m r
injectLeftovers =
go []
where
go ls (PipeM mp) = PipeM (liftM (go ls) mp)
go _ (Done r) = Done r
go ls (Leftover p l) = go (l:ls) p
go ls (HaveOutput p c o) = HaveOutput (go ls p) c o
go [] (NeedInput p c) = NeedInput (go [] . p) (go [] . c)
go (l:ls) (NeedInput p _) = go ls $p l ## Adding finalizers to a pipe cleanupP and addCleanup differ only in a matter of style: Conduit’s addCleanup finalizer takes a Bool input to determine whether termination is "normal" or "abnormal", while cleanupP takes two separate finalizers to cover the two cases. The third abort case is obviously removed with the removal of abort. > cleanupP :: Monad m => m () -> m () -> Pipe l i o u m r > -> Pipe l i o u m r > cleanupP discardedFinalize returnFinalize = go where > go p = pipeM$ do
>     x <- runPipeM p
>     runPipeM $pipeCase x  addCleanup :: Monad m => (Bool -> m ()) -> Pipe l i o u m r -> Pipe l i o u m r addCleanup cleanup (PipeM msrc) = PipeM (liftM (addCleanup cleanup) msrc) Identical modulo pipeM/runPipeM. > {- Return -} (\r -> lift returnFinalize >> return r)  addCleanup cleanup (Done r) = PipeM (cleanup True >> return (Done r)) Here we see both invoke the "normal termination" finalizer. > {- L-over -} (\next l -> wrap$ Leftover (go next) l)

addCleanup cleanup (Leftover p i) = Leftover (addCleanup cleanup p) i

Identical.

>     {- Yield  -} (\next finalizeRest o -> wrap $> Yield (go next) (finalizeRest >> discardedFinalize) o)  addCleanup cleanup (HaveOutput src close x) = HaveOutput (addCleanup cleanup src) (cleanup False >> close) x Here we see both will pass along the "abnormal termination" finalizer. However, we chose to order them differently. This may be significant. > {- Await -} (\f g -> wrap$ Await (go . f) (go . g))

addCleanup cleanup (NeedInput p c) = NeedInput
(addCleanup cleanup . c)

Identical.

> finallyP :: Monad m => m () -> Pipe l i o u m r -> Pipe l i o u m r
> finallyP finalize = cleanupP finalize finalize
>
> catchP :: Monad m => m () -> Pipe l i o u m r -> Pipe l i o u m r
> catchP finalize = cleanupP finalize pass
>
> successP :: Monad m => m () -> Pipe l i o u m r -> Pipe l i o u m r
> successP finalize = cleanupP pass finalize


I didn’t see these combinators provided by Conduit, but they are nothing more than trivial wrappers around addCleanup. I patterned bracketP after the Conduit code so it should be no surprise that they are identical modulo pipeM/runPipeM. I think my code is a touch more readable, though I cannot speak for efficiency.

> bracketP :: MonadResource m => IO a -> (a -> IO ()) -> (a -> Pipe l i o u m r)
>          -> Pipe l i o u m r
> bracketP create destroy mkPipe = do
>   (key, val) <- lift $allocate create destroy > finallyP (release key) (mkPipe val)  bracketP alloc free inside = PipeM start where start = do (key, seed) <- allocate alloc free return$ addCleanup (const $release key) (inside seed) ## Finalization and associativity of composition Let’s explore the discrepancy in finalization. > finallyC :: Monad m => m () -> C.Pipe l i o u m r -> C.Pipe l i o u m r > finallyC fin = C.addCleanup (const fin) > > idC :: Monad m => C.Pipe l i i u m u > idC = C.awaitForever C.yield > > printerC :: Show i => C.Pipe l i Void u IO u > printerC = C.awaitForever$ lift . print
>
> idMsgC :: String -> C.Pipe l i i u IO u
> idMsgC msg = finallyC (putStrLn msg) idC
>
> takeC :: Monad m => Int -> C.Pipe l i i u m ()
> takeC 0 = return ()
> takeC n = C.awaitE >>= \ex -> case ex of
>   Left _u -> return ()
>   Right i -> C.yield i >> takeC (pred n)
>
> testPipeC :: Show o => C.Pipe Void Int o () IO r -> IO r
> testPipeC p = C.runPipe $printerC C.<+< p C.<+< C.sourceList [1..]  Now that we’re equipped with a few convenient ways to create pipes with finalizers, let’s see what happens when we compose three pipes together: the farthest downstream will cause termination, and the two upstream of it will both contain finalizers. ghci> testPipeC$ (takeC 2 C.<+< idMsgC "foo") C.<+< idMsgC "bar"
1
2
foo
bar

ghci> testPipeC $takeC 2 C.<+< (idMsgC "foo" C.<+< idMsgC "bar") 1 2 foo  Where did the "bar" go? It is as I suspected, conduit-0.5.2.2 drops the up-upstream finalizers. While I certainly approve of the use of ResourceT, I’m afraid that relying on it too much could be hiding these sorts of bugs in Conduit code. The deeply scary thing about this is that it illustrates that conduit composition is not associative. It’s known now that pipes with upstream results do not behave entirely like a Category, but they nevertheless should try to behave as much like a Category as possible, especially when you are constructing, composing, and running pipes using only the primitives provided. Let’s take a look at my implementation and see how it handles this situation. > fromList :: Monad m => [o] -> Producer o m () > fromList = mapM_ yield > > awaitForever :: Monad m => (i -> Pipe l i o u m r) -> Pipe l i o u m u > awaitForever f = go where > go = awaitE >>= \ex -> case ex of > Left u -> return u > Right i -> f i >> go > > pipe :: Monad m => (i -> o) -> Pipe l i o u m u > pipe f = awaitForever$ yield . f
>
> idP :: Monad m => Pipe l i i u m u
> idP = pipe id
>
> printer :: Show i => Consumer l i u IO u
> printer = awaitForever $lift . print > > idMsg :: String -> Pipe l i i u IO u > idMsg msg = finallyP (putStrLn msg) idP > > take' :: Monad m => Int -> Pipe l i i u m () > take' 0 = return () > take' n = awaitE >>= \ex -> case ex of > Left _u -> return () > Right i -> yield i >> take' (pred n) > > testPipe :: Show o => Pipe Void Int o () IO r -> IO r > testPipe p = runPipe$ printer <+< p <+< fromList [1..]

ghci> testPipe $(take' 2 <+< idMsg "foo") <+< idMsg "bar" 1 2 foo bar ghci> testPipe$ take' 2 <+< (idMsg "foo" <+< idMsg "bar")
1
2
bar
foo


Ugh! While it didn’t drop the bar finalizer (yay!), my choices for "consistency" were obviously wrong, because it still does not preserve associativity of composition.

> printerF :: Show i => F.Frame Void IO (F.M i) F.C r
> printerF = foreverR $(F.await !>= liftU . print) > > idMsgF :: String -> F.Frame i IO (F.M i) F.C r > idMsgF msg = F.finallyF (putStrLn msg) F.idF > > takeF :: Int -> F.Frame i IO (F.M i) F.C () > takeF 0 = F.close > takeF n = F.await !>= F.yield !> takeF (pred n) > > fromListF :: [o] -> F.Frame o IO (F.M i) F.C () > fromListF xs = F.close !> mapMR_ F.yield xs > > testPipeF :: Show o => F.Frame o IO (F.M Int) F.C () -> IO () > testPipeF p = F.runFrame$ printerF F.<-< p F.<-< fromListF [1..]

ghci> testPipeF $(takeF 2 F.<-< idMsgF "foo") F.<-< idMsgF "bar" 1 2 bar foo ghci> testPipeF$ takeF 2 F.<-< (idMsgF "foo" F.<-< idMsgF "bar")
1
2
bar
foo


Looks like somebody got it right. :)

## Next time

There is no next time; that’s it folks! Personally, I will be taking a closer look at the order of finalizers; hopefully we can pick an order that always preserves the associativity of composition, and patch that into the next version of conduit!

There are still a lot of interesting options to explore when it comes to implementing pipes. See also:

## Pipes to Conduits part 7: Closing the input end early

Back in part 5, we added the ability to attach arbitrary finalizers to pipes. But when those finalizers actually ran was purely mechanical: when any given pipe finished, it would run all upstream finalizers, and then its own. This behavior can sometimes delay the finalization of an upstream pipe, if the downstream pipe stops awaiting but continues running and possibly yielding.

This time, we’ll add the close primitive, which will allow the programmer to indicate that a pipe will never await again. This should possibly be named unsafeClose, because in this implementation, we will not use the type system to enforce this guarantee.

> {-# LANGUAGE TypeOperators #-}
> {-# OPTIONS_GHC -Wall #-}
>
> module PipeClose where
>
> import Control.Monad.Trans.Free (FreeT(..), FreeF(..), liftF, wrap)
> import Fun ((:&:)(..), (:|:)(..))
>
> import Data.Void (Void, absurd)


## Functors

We’ll not add a new functor this time; we’ll just reuse Then to indicate the "rest" of the computation after a pipe closes its input end.

> newtype Then next = Then next            -- Identity
> newtype Yield o next = Yield o           -- Const
> newtype Await i next = Await (i -> next) -- Fun
> data Abort next = Abort                  -- Empty
> newtype Finalize m next = Finalize (m ()) -- Const
> newtype Leftover l next = Leftover l     -- Const

> instance Functor Then where
>   fmap f (Then next) = Then (f next)
>
> instance Functor (Yield o) where
>   fmap _f (Yield o) = Yield o
>
> instance Functor (Await i) where
>   fmap f (Await g) = Await (f . g)
>
> instance Functor Abort where
>   fmap _f Abort = Abort
>
> instance Functor (Finalize m) where
>   fmap _f (Finalize m) = Finalize m
>
> instance Functor (Leftover l) where
>   fmap _f (Leftover l) = Leftover l
>
> pass :: Monad m => m ()
> pass = return ()
>
> unreachable :: Monad m => m ()
> unreachable = error "You've reached the unreachable finalizer"


## The Pipe type

> type LeftoverThen l = Leftover l :&: Then
> type YieldThen o m  = Yield o :&: Finalize m :&: Then
> type AwaitU i u     = Await i :&: Await u :&: Then
> type Close          = Then


Our PipeF type has certainly grown! Remember when it used to be just Await i :|: YieldThen o? At least we’re not adding yet another type parameter this time.

> type PipeF l i o u m =  YieldThen o m
>                     :|: AwaitU i u
>                     :|: Abort
>                     :|: LeftoverThen l
>                     :|: Close
> type Pipe l i o u m r = FreeT (PipeF l i o u m) m r
>
> type Producer   o   m r = Pipe Void () o    () m r
> type Consumer l i u m r = Pipe l    i  Void u  m r
> type Pipeline       m r = Pipe Void () Void () m r


## Working with PipeF

We update the lifts, the smart constructors, and pipeCase as usual.

> liftYield :: YieldThen o m next ->              PipeF l i o u m next
> liftYield = L . L . L . L
>
> liftAwait :: AwaitU i u next ->                 PipeF l i o u m next
> liftAwait = L . L . L . R
>
> liftAbort :: Abort next ->                      PipeF l i o u m next
> liftAbort = L . L . R
>
> liftLeftover :: LeftoverThen l next ->          PipeF l i o u m next
> liftLeftover = L . R
>
> liftClose :: Close next ->                      PipeF l i o u m next
> liftClose = R

> yieldF :: o -> m () -> next ->                  PipeF l i o u m next
> yieldF o m next = liftYield $Yield o :&: Finalize m :&: Then next > > awaitF :: (i -> next) -> (u -> next) -> next -> PipeF l i o u m next > awaitF f g next = liftAwait$ Await f :&: Await g :&: Then next
>
> abortF ::                                       PipeF l i o u m next
> abortF = liftAbort Abort
>
> leftoverF :: l -> next ->                       PipeF l i o u m next
> leftoverF l next = liftLeftover $Leftover l :&: Then next > > closeF :: next -> PipeF l i o u m next > closeF next = liftClose$ Then next

> pipeCase :: FreeF (PipeF l i o u m) r next
>  ->                                        a  -- Abort
>  -> (r                                  -> a) -- Return
>  -> (next                               -> a) -- Close
>  -> (l -> next                          -> a) -- Leftover
>  -> (o -> m () -> next                  -> a) -- Yield
>  -> ((i -> next) -> (u -> next) -> next -> a) -- Await
>                                         -> a
> pipeCase (Wrap (L (L (R Abort))))
>   k _ _ _ _ _ = k
> pipeCase (Return r)
>   _ k _ _ _ _ = k r
> pipeCase (Wrap (R (Then next)))
>   _ _ k _ _ _ = k next
> pipeCase (Wrap (L (R (Leftover l :&: Then next))))
>   _ _ _ k _ _ = k l next
> pipeCase (Wrap (L (L (L (L (Yield o :&: Finalize m :&: Then next))))))
>   _ _ _ _ k _ = k o m next
> pipeCase (Wrap (L (L (L (R (Await f :&: Await g :&: Then next))))))
>   _ _ _ _ _ k = k f g next


## Pipe primitives

We add a new primitive, as usual.

> tryAwait :: Monad m =>      Pipe l i o u m (Either (Maybe u) i)
> tryAwait = liftF $awaitF Right (Left . Just) (Left Nothing) > > yield :: Monad m => o -> Pipe l i o u m () > yield b = liftF$ yieldF b pass ()
>
> abort :: Monad m =>         Pipe l i o u m r
> abort = liftF abortF
>
> leftover :: Monad m => l -> Pipe l i o u m ()
> leftover l = liftF $leftoverF l () > > close :: Monad m => Pipe l i o u m () > close = liftF$ closeF ()


## Pipe composition

> (<+<) :: Monad m => Pipe Void i' o u' m r -> Pipe l i i' u m u' -> Pipe l i o u m r
> p1 <+< p2 = composeWithFinalizer pass p1 p2

> (<?<) :: Monad m => Pipe Void i' o u' m r -> Pipe l i i' u m u' -> Pipe l i o u m r
> p1 <?< p2 = composeWithFinalizer unreachable p1 p2


Now all uses of pipeCase must have an additional branch for Close.

> composeWithFinalizer :: Monad m => m ()
>                  -> Pipe Void i' o u' m r -> Pipe l i i' u m u' -> Pipe l i o u m r
> composeWithFinalizer finalizeUpstream p1 p2 = FreeT $do > x1 <- runFreeT p1 > let p1' = FreeT$ return x1
>   runFreeT $pipeCase x1 > {- Abort -} ( lift finalizeUpstream >> abort) > {- Return -} (\r -> lift finalizeUpstream >> return r) > {- Close -} (\next -> lift finalizeUpstream >> (wrap$ closeF (next <+< abort)))


The very reason that we made the Close option was so that the upstream pipe could be finalized early. Once we do that, what do we compose with next? We could compose it with p2, but that would be very unsafe, since p2‘s finalizers have been run. Imagine if p2 were reading from a file, then we close the file, then ask p2 to keep reading! So instead, we compose with abort. Recall that earlier we asserted that right-composing a Producer with abort was the same as the identity function:

$\forall p \in Producer, p \circ abort \equiv p$

If it is indeed true that the downstream pipe will never await again, then we can rest assured that next <+< abort will behave as we desire.

>   {- L-over -} (\l _next -> absurd l)
>   {- Yield  -} (\o finalizeDownstream next ->
>                       let (<*<) = composeWithFinalizer finalizeUpstream
>                       in wrap $yieldF o > (finalizeUpstream >> finalizeDownstream) > (next <*< p2)) > {- Await -} (\f1 g1 onAbort1 -> FreeT$ do
>     x2 <- runFreeT p2
>     runFreeT $pipeCase x2 > {- Abort -} ( onAbort1 <+< abort) -- downstream recovers > {- Return -} (\u' -> g1 u' <+< abort) -- downstream recovers > {- Close -} (\next -> wrap$ closeF (p1' <?< next))


Suppose that the upstream pipe closes its input end. If we have reached this point, then the up-upstream finalizers have already been run, so we need not worry about it. Upstream still has control, so we’ll compose next with the unreachable finalizer, as we do for similar situations.

>     {- L-over -} (\l next -> wrap $leftoverF l (p1' <?< next)) > {- Yield -} (\o newFinalizer next -> > let (<*<) = composeWithFinalizer newFinalizer > in f1 o <*< next) > {- Await -} (\f2 g2 onAbort2 -> wrap$ awaitF
>                           (\i -> p1' <?< f2 i)
>                           (\u -> p1' <?< g2 u)
>                           (      p1' <?< onAbort2)))

> (>+>) :: Monad m => Pipe l i i' u m u' -> Pipe Void i' o u' m r -> Pipe l i o u m r
> (>+>) = flip (<+<)
>
> infixr 9 <+<
> infixr 9 >+>


## Running a pipeline

At the level of a pipeline, the close operation is meaningless, since it shouldn’t be awaiting anyways. When we runPipe on a Close, therefore, we will simply move on to the next computation.

> runPipe :: Monad m => Pipeline m r -> m (Maybe r)
> runPipe p = do
>   e <- runFreeT p
>   pipeCase e
>   {- Abort  -} (                  return Nothing)
>   {- Return -} (\r             -> return $Just r) > {- Close -} (\next -> runPipe next) > {- L-over -} (\l _next -> absurd l) > {- Yield -} (\o _fin _next -> absurd o) > {- Await -} (\f _g _onAbort -> runPipe$ f ())


## Getting rid of leftovers

The adjustment to injectLeftovers is interesting: once we close the input end, what should we do with leftovers? Discard them, since we promised not to look at them? Or keep them, since it doesn’t hurt the upstream pipe if we look at the stuff that we already acquired from it.

> injectLeftovers :: Monad m => Pipe i i o u m r -> Pipe l i o u m r
> injectLeftovers = go [] where
>   go ls p = FreeT $do > x <- runFreeT p > runFreeT$ pipeCase x
>     {- Abort  -} (abort)
>     {- Return -} (\r -> return r)
>     {- Close  -} (\next -> wrap $closeF (go [] next))  In the name of garbage collection, I say dump them. This is reflected by the recursive call ignoring ls and instead passing in an empty list. > {- L-over -} (\l next -> go (l:ls) next) > {- Yield -} (\o fin next -> wrap$ yieldF o fin (go ls next))
>     {- Await  -} (\f g onAbort -> case ls of
>       [] -> wrap $awaitF (go [] . f) (go [] . g) (go [] onAbort) > l : ls' -> go ls' (f l))  ## Adding finalizers to a pipe Close is always an intermediate step of a pipe (even if the next step is merely return ()), so when revisiting cleanupP, we need only make sure that the cleanup procedures are passed on to the next computation. > cleanupP :: Monad m => m () -> m () -> m () -> Pipe l i o u m r > -> Pipe l i o u m r > cleanupP abortFinalize selfAbortFinalize returnFinalize = go where > go p = FreeT$ do
>     x <- runFreeT p
>     runFreeT $pipeCase x > {- Abort -} ( lift selfAbortFinalize >> abort) > {- Return -} (\r -> lift returnFinalize >> return r) > {- Close -} (\next -> wrap$ closeF (go next))
>     {- L-over -} (\l next -> wrap $leftoverF l (go next)) > {- Yield -} (\o finalizeRest next -> wrap$
>                         yieldF o (finalizeRest >> abortFinalize) (go next))
>     {- Await  -} (\f g onAbort -> wrap $> awaitF (go . f) (go . g) (go onAbort))  ## Playing with our new primitive Here’s a simple example using (essentially) printf debugging to illustrate the code execution path. See if you can guess the output of runPipe$ exampleConsumer <+< exampleProducer before looking at it.

> exampleProducer :: Producer Int IO ()
> exampleProducer = finallyP (putStrLn "End producer") $do > lift$ putStrLn "Begin producer"
>   lift $putStrLn "Producer yielding" > yield 1 > lift$ putStrLn "Producer done yielding"
>   pass

> exampleConsumer :: Consumer Void Int () IO ()
> exampleConsumer = finallyP (putStrLn "End consumer") $do > lift$ putStrLn "Begin consumer"
>   lift $putStrLn "Consumer awaiting" > _ <- await > lift$ putStrLn "Consumer done awaiting"
>   close
>   lift $putStrLn "Consumer continues" > pass  ghci> runPipe$ exampleConsumer <+< exampleProducer
Begin consumer
Consumer awaiting
Begin producer
Producer yielding
Consumer done awaiting
End producer
Consumer continues
End consumer
Just ()


As you can see, close caused the producer’s finalizer to be run immediately. What would the output look like if the consumer didn’t close?

## Safety

Our close primitive gives programmers a convenient way to indicate that a pipe’s upstream should be finalized, but it is completely up to the programmer to make sure that close is used in a safe way, that is, that it is not followed by an await. We gave pipes the behavior of aborting in such circumstances, which is a decent choice, but can we do better?

Control.Frame from the pipes package provides a different (though similar) implementation of pipes that uses indexed monads to solve this problem. If you close a frame, then it is a type error to await afterwards. This has obvious type-safety benefits, but at the cost of using a relatively new concept that has little syntactic sugar support; see Control.Pipe.Tutorial for details on how this technique works.

## Next time

We’ve come a long ways from the simplicity of Control.Pipe. Next time, I’ll take away abort and close, and what we have left will be fairly similar to the current state of Data.Conduit. I’ll guide you through some of the conduit source code and observe which choices were made, and attempt to explain why.

## Convenience combinators

> finallyP :: Monad m => m () -> Pipe l i o u m r -> Pipe l i o u m r
> finallyP finalize = cleanupP finalize finalize finalize
>
> catchP :: Monad m => m () -> Pipe l i o u m r -> Pipe l i o u m r
> catchP finalize = cleanupP finalize finalize pass
>
> successP :: Monad m => m () -> Pipe l i o u m r -> Pipe l i o u m r
> successP finalize = cleanupP pass pass finalize

> bracketP :: MonadResource m => IO a -> (a -> IO ()) -> (a -> Pipe l i o u m r)
>          -> Pipe l i o u m r
> bracketP create destroy mkPipe = do
>   (key, val) <- lift $allocate create destroy > finallyP (release key) (mkPipe val)  ## Some basic pipes > idMsg :: String -> Pipe l i i u IO u > idMsg str = finallyP (putStrLn str) idP > > testPipeR :: Monad m => Pipe Void i o u m r -> m (Maybe r) > testPipeR p = runPipe$ (await >> abort) <+< p <+< abort
>
> testPipeL :: Monad m => Pipe Void Int o () m r -> m (Maybe r)
> testPipeL p = runPipe $(await >> await >> abort) <+< take' 1 <+< p <+< fromList [1 ..] > > testPipe :: Monad m => Pipe Void Int o () m r -> m (Maybe (r, [o])) > testPipe p = runPipe$ runP <+< p <+< fromList [1..]
>
> take' :: Monad m => Int -> Pipe l i i u m ()
> take' 0 = pass
> take' n = (await >>= yield) >> take' (pred n)

> fromList :: Monad m => [o] -> Producer o m ()
> fromList = mapM_ yield

> awaitE :: Monad m => Pipe l i o u m (Either u i)
> awaitE = tryAwait >>= \emx -> case emx of
>   Left Nothing  -> abort
>   Left (Just u) -> return $Left u > Right i -> return$ Right i
>
> awaitForever :: Monad m => (i -> Pipe l i o u m r) -> Pipe l i o u m u
> awaitForever f = go where
>   go = awaitE >>= \ex -> case ex of
>     Left u  -> return u
>     Right i -> f i >> go
>
> pipe :: Monad m => (i -> o) -> Pipe l i o u m u
> pipe f = awaitForever $yield . f > > idP :: Monad m => Pipe l i i u m u > idP = pipe id > > filterP :: Monad m => (i -> Bool) -> Pipe l i i u m u > filterP test = awaitForever$ \x -> when (test x) (yield x)
>
> printer :: Show i => Consumer l i u IO u
> printer = awaitForever $lift . print  > runP :: Monad m => Consumer l i u m (u, [i]) > runP = awaitE >>= \ex -> case ex of > Left u -> return (u, []) > Right i -> runP >>= \ ~(u, is) -> return (u, i:is) > > evalP :: Monad m => Consumer l i u m u > evalP = fst fmap runP > > execP :: Monad m => Consumer l i u m [i] > execP = snd fmap runP > > fold :: Monad m => (r -> i -> r) -> r -> Consumer l i u m r > fold f = go where > go r = awaitE >>= \ex -> case ex of > Left _u -> return r > Right i -> go$! f r i

> await :: Monad m => Pipe l i o u m i
> await = awaitE >>= \ex -> case ex of
>   Left _u -> abort
>   Right i -> return i
>
> oldPipe :: Monad m => (i -> o) -> Pipe l i o u m r
> oldPipe f = forever $await >>= yield . f > > oldIdP :: Monad m => Pipe l i i u m r > oldIdP = oldPipe id > > oldFilterP :: Monad m => (i -> Bool) -> Pipe l i i u m r > oldFilterP test = forever$ await >>= \x -> when (test x) (yield x)
>
> oldPrinter :: Show i => Consumer l i u IO r
> oldPrinter = forever $await >>= lift . print  You can play with this code for yourself by downloading PipeClose.lhs. Posted in Uncategorized | Leave a comment ## Pipes to Conduits part 6: Leftovers One important use case of the Conduit library is parsing. In order to perform useful parsing, we need to be able to occasionally consume "too much" input, and then put the "leftovers" back into the input stream, as if they had never been consumed. Today, we will extend the Pipe type yet again, creating a new primitive, leftover, comparable to that of Data.Conduit. > {-# LANGUAGE TypeOperators #-} > {-# OPTIONS_GHC -Wall #-} > > module PipeLeftover where > > import Control.Monad.Trans.Free (FreeT(..), FreeF(..), liftF, wrap) > import Fun ((:&:)(..), (:|:)(..)) > > import Data.Void (Void, absurd) > import Control.Monad (when, forever) > import Control.Monad.Trans.Class (lift) > import Control.Monad.Trans.Resource (MonadResource, allocate, release)  ## Functors We’ll create yet another synonym for Const, this time called Leftover. > newtype Then next = Then next -- Identity > newtype Yield o next = Yield o -- Const > newtype Await i next = Await (i -> next) -- Fun > data Abort next = Abort -- Empty > newtype Finalize m next = Finalize (m ()) -- Const > newtype Leftover l next = Leftover l -- Const  > instance Functor Then where > fmap f (Then next) = Then (f next) > > instance Functor (Yield o) where > fmap _f (Yield o) = Yield o > > instance Functor (Await i) where > fmap f (Await g) = Await (f . g) > > instance Functor Abort where > fmap _f Abort = Abort > > instance Functor (Finalize m) where > fmap _f (Finalize m) = Finalize m > > instance Functor (Leftover l) where > fmap _f (Leftover l) = Leftover l > > pass :: Monad m => m () > pass = return () > > unreachable :: Monad m => m () > unreachable = error "You've reached the unreachable finalizer"  ## The Pipe type The usage of leftover will be much like that of yield, we supply a value, and then carry on with our computation. We will therefore bundle Leftover with Then, as we did with YieldThen. > type LeftoverThen l = Leftover l :&: Then > type YieldThen o m = Yield o :&: Finalize m :&: Then > type AwaitU i u = Await i :&: Await u :&: Then  PipeF and Pipe will acquire a new type parameter l which indicates the type of leftovers that a given pipe will supply. > type PipeF l i o u m = YieldThen o m :|: AwaitU i u > :|: Abort :|: LeftoverThen l > type Pipe l i o u m r = FreeT (PipeF l i o u m) m r > > type Producer o m r = Pipe Void () o () m r > type Consumer l i u m r = Pipe l i Void u m r > type Pipeline m r = Pipe Void () Void () m r  ## Working with PipeF Our lifting functions will be adjusted as usual: the pre-existing ones acquire another L, while the new one gets an R. > liftYield :: YieldThen o m next -> PipeF l i o u m next > liftYield = L . L . L > > liftAwait :: AwaitU i u next -> PipeF l i o u m next > liftAwait = L . L . R > > liftAbort :: Abort next -> PipeF l i o u m next > liftAbort = L . R > > liftLeftover :: LeftoverThen l next -> PipeF l i o u m next > liftLeftover = R  We add a smart constructor leftoverF in similar fashion to the ones we have already. > yieldF :: o -> m () -> next -> PipeF l i o u m next > yieldF o m next = liftYield$ Yield o :&: Finalize m :&: Then next
>
> awaitF :: (i -> next) -> (u -> next) -> next -> PipeF l i o u m next
> awaitF f g next = liftAwait $Await f :&: Await g :&: Then next > > abortF :: PipeF l i o u m next > abortF = liftAbort Abort > > leftoverF :: l -> next -> PipeF l i o u m next > leftoverF l next = liftLeftover$ Leftover l :&: Then next


And finally we add another branch to pipeCase.

> pipeCase :: FreeF (PipeF l i o u m) r next
>  ->                                        a  -- Abort
>  -> (r                                  -> a) -- Return
>  -> (l -> next                          -> a) -- Leftover
>  -> (o -> m () -> next                  -> a) -- Yield
>  -> ((i -> next) -> (u -> next) -> next -> a) -- Await
>                                         -> a
> pipeCase (Wrap (L (R Abort)))
>   k _ _ _ _ = k
> pipeCase (Return r)
>   _ k _ _ _ = k r
> pipeCase (Wrap (R (Leftover l :&: Then next)))
>   _ _ k _ _ = k l next
> pipeCase (Wrap (L (L (L (Yield o :&: Finalize m :&: Then next)))))
>   _ _ _ k _ = k o m next
> pipeCase (Wrap (L (L (R (Await f :&: Await g :&: Then next)))))
>   _ _ _ _ k = k f g next


## Pipe primitives

Now that we’re old pros with liftF, the leftover primitive is a breeze.

> tryAwait :: Monad m =>      Pipe l i o u m (Either (Maybe u) i)
> tryAwait = liftF $awaitF Right (Left . Just) (Left Nothing) > > yield :: Monad m => o -> Pipe l i o u m () > yield b = liftF$ yieldF b pass ()
>
> abort :: Monad m =>         Pipe l i o u m r
> abort = liftF abortF
>
> leftover :: Monad m => l -> Pipe l i o u m ()
> leftover l = liftF $leftoverF l ()  ## Getting rid of leftovers Being able to specify leftovers is one thing, but how do we interpret that? What does it mean when a pipe supplies leftovers? The "obvious" meaning is that the rest of the pipe computation should have that leftover value available to it the next time it awaits. Let’s write an interpreter that will "inject" leftovers into a pipe, making them available to the pipe’s own awaits. The given pipe must therefore bear the restriction that the leftover type is the same as the input type. The resultant pipe will contain no leftover constructs, and so it can therefore be polymorphic in that type parameter. The situation might arise where two leftovers are supplied in a row. What should we do then? Discard the old and keep the new? If we keep both, then which order should they be supplied back to the subsequent awaits? Recall that Pipes are a form of stream processing. Suppose we represent the stream as a queue. await and yield are like the operations dequeue (taking from the front of a queue) and enqueue (adding to the back of a queue) respectively. The idea of "leftovers" is that we accidentally took "too much", and we want to reverse our actions. The logical conclusion, therefore, is that the leftover operation should "push" a value back onto the front of the queue. > injectLeftovers :: Monad m => Pipe i i o u m r -> Pipe l i o u m r > injectLeftovers = go [] where  Our "queue" is going to be represented by a list. An empty list means "please refer to the actual stream". A nonempty list means "I have these values that I took from the stream; please pretend like they’re still there." > go ls p = FreeT$ do
>     x <- runFreeT p
>     runFreeT $pipeCase x > {- Abort -} (abort) > {- Return -} (\r -> return r) > {- L-over -} (\l next -> go (l:ls) next)  When we encounter a leftover statement, we have yet another value we took from the stream, and we’d like to "put it back". We therefore cons it onto the front. > {- Yield -} (\o fin next -> wrap$ yieldF o fin (go ls next))
>     {- Await  -} (\f g onAbort -> case ls of
>       [] -> wrap $awaitF (go [] . f) (go [] . g) (go [] onAbort) > l : ls' -> go ls' (f l))  When we encounter an await, there are two possibilities: either we have an empty list, and we need to refer to the actual stream, or we have a nonempty list, and we can just take the top value. "Referring to the actual stream" translates to creating another await construct, while "just taking the top value" translates to invoking the f callback with the l value. ## Pipe composition The question arises: how are we supposed to compose two pipes that both might supply leftovers? There are a few possibilities. If we allow them both to supply leftovers, then should we discard the leftovers from one pipe or the other? Perhaps the resultant pipe could simply have an Either union of the two types of leftovers. The other option is to disallow leftovers from one or both pipes upon composing them. If we disallow leftovers from one pipe, then the resultant pipe will have the leftover type of the other one. If we disallow leftovers from both pipes, then there is no way for their composition to produce leftovers. Given the nature of injectLeftovers, which associates leftovers with the "input" type i, and given that the resultant input type i comes from the upstream pipe, the logical choice seems to be to allow leftovers from the upstream pipe, but not the downstream pipe. We "disallow" leftovers by specifying that the type of leftovers for the downstream pipe is Void. It is impossible to construct a value of type Void, unless it is an infinite loop or an exception. > (<+<) :: Monad m => Pipe Void i' o u' m r -> Pipe l i i' u m u' -> Pipe l i o u m r > p1 <+< p2 = composeWithFinalizer pass p1 p2  > (<?<) :: Monad m => Pipe Void i' o u' m r -> Pipe l i i' u m u' -> Pipe l i o u m r > p1 <?< p2 = composeWithFinalizer unreachable p1 p2  All we have to change in pipe composition is to add branches for leftover whenever we pipeCase. > composeWithFinalizer :: Monad m => m () > -> Pipe Void i' o u' m r -> Pipe l i i' u m u' -> Pipe l i o u m r > composeWithFinalizer finalizeUpstream p1 p2 = FreeT$ do
>   x1 <- runFreeT p1
>   let p1' = FreeT $return x1 > runFreeT$ pipeCase x1
>   {- Abort  -} (      lift finalizeUpstream >> abort)
>   {- Return -} (\r -> lift finalizeUpstream >> return r)
>   {- L-over -} (\l _next -> absurd l)


Since the downstream pipe has a leftover type of Void, we can use absurd to assert that this branch should never happen.

>   {- Yield  -} (\o finalizeDownstream next ->
>                       let (<*<) = composeWithFinalizer finalizeUpstream
>                       in wrap $yieldF o > (finalizeUpstream >> finalizeDownstream) > (next <*< p2)) > {- Await -} (\f1 g1 onAbort1 -> FreeT$ do
>     x2 <- runFreeT p2
>     runFreeT $pipeCase x2 > {- Abort -} ( onAbort1 <+< abort) -- downstream recovers > {- Return -} (\u' -> g1 u' <+< abort) -- downstream recovers > {- L-over -} (\l next -> wrap$ leftoverF l (p1' <?< next))


If the upstream pipe produced a leftover, then we’ll keep it. Since upstream still has control, there is no reason to expect that the finalizer we provide to pipe composition will be used, so we’ll use the unreachable one. Note that the types make no guarantees about unreachable, rather, it is my own assertion. I arrived at the conclusion that the provided finalizer for this location would be unreachable by reasoning about the code, but I see no convenient way to encode or enforce this it in the type system.

>     {- Yield  -} (\o newFinalizer next ->
>                       let (<*<) = composeWithFinalizer newFinalizer
>                       in f1 o <*< next)
>     {- Await  -} (\f2 g2 onAbort2 -> wrap $awaitF > (\i -> p1' <?< f2 i) > (\u -> p1' <?< g2 u) > ( p1' <?< onAbort2)))  > (>+>) :: Monad m => Pipe l i i' u m u' -> Pipe Void i' o u' m r -> Pipe l i o u m r > (>+>) = flip (<+<)  > infixr 9 <+< > infixr 9 >+>  ## Running a pipeline Given that a pipeline cannot reasonably use yield or leftover, since those types are constrained to Void, let’s again make use of absurd to discharge us of the obligation to provide code for those branches. > runPipe :: Monad m => Pipeline m r -> m (Maybe r) > runPipe p = do > e <- runFreeT p > pipeCase e > {- Abort -} ( return Nothing) > {- Return -} (\r -> return$ Just r)
>   {- L-over -} (\l _next       -> absurd l)
>   {- Yield  -} (\o _fin _next  -> absurd o)
>   {- Await  -} (\f _g _onAbort -> runPipe $f ())  ## Adding finalizers to a pipe There is little to say about the changes here. The leftover construct promises that there is is a next pipe, so we simply attach the cleanup actions to that next pipe, and that’s it. > cleanupP :: Monad m => m () -> m () -> m () -> Pipe l i o u m r > -> Pipe l i o u m r > cleanupP abortFinalize selfAbortFinalize returnFinalize = go where > go p = FreeT$ do
>     x <- runFreeT p
>     runFreeT $pipeCase x > {- Abort -} ( lift selfAbortFinalize >> abort) > {- Return -} (\r -> lift returnFinalize >> return r) > {- L-over -} (\l next -> wrap$ leftoverF l (go next))
>     {- Yield  -} (\o finalizeRest next -> wrap $> yieldF o (finalizeRest >> abortFinalize) (go next)) > {- Await -} (\f g onAbort -> wrap$
>                         awaitF (go . f) (go . g) (go onAbort))


## Play time

Let’s give leftovers a spin!

ghci> :set -XNoMonomorphismRestriction

ghci> let p = leftover "hello" >> leftover "world" >> idP

ghci> runPipe $execP <+< injectLeftovers p <+< fromList ["the", "end"] Just ["world","hello","the","end"]  Note that this is a horrible abuse of leftover. The concept of leftovers is that they are made as a way for you to put back onto the stream that which you have taken off. Here’s perhaps a more sensible use of leftover: FORTH-style programming! > swap :: Monad m => Pipe i i o u m () > swap = do > i1 <- await > i2 <- await > leftover i1 > leftover i2  > dup :: Monad m => Pipe i i o u m () > dup = do > i <- await > leftover i > leftover i  ghci> :set -XNoMonomorphismRestriction ghci> let p = injectLeftovers (swap >> dup >> idP) ghci> runPipe$ execP <+< p <+< fromList [1 .. 5]
Just [2,2,1,3,4,5]


Perhaps the simplest use of leftovers is the ability to "peek" at the value coming next without consuming it.

> peekE :: Monad m => Pipe i i o u m (Either u i)
> peekE = awaitE >>= \ex -> case ex of
>   Left u  -> return (Left u)
>   Right i -> leftover i >> return (Right i)


## Next time

I initially planned for the series to end right around here, but I have decided to extend it to touch on two more topics. Next time, we will extend our Pipe type with a new primitive, close, allowing it to signal that it is finished consuming input, so that upstream finalizers can be run as soon as possible. After that, we’ll take away close and abort, and compare the result to Data.Conduit, which has neither of those two features. Whether that is a "good" or "bad" thing is up for you to decide, but I’ll try to point out a few of the trade-offs.

## Convenience combinators

> finallyP :: Monad m => m () -> Pipe l i o u m r -> Pipe l i o u m r
> finallyP finalize = cleanupP finalize finalize finalize
>
> catchP :: Monad m => m () -> Pipe l i o u m r -> Pipe l i o u m r
> catchP finalize = cleanupP finalize finalize pass
>
> successP :: Monad m => m () -> Pipe l i o u m r -> Pipe l i o u m r
> successP finalize = cleanupP pass pass finalize

> bracketP :: MonadResource m => IO a -> (a -> IO ()) -> (a -> Pipe l i o u m r)
>          -> Pipe l i o u m r
> bracketP create destroy mkPipe = do
>   (key, val) <- lift $allocate create destroy > finallyP (release key) (mkPipe val)  ## Some basic pipes > fromList :: Monad m => [o] -> Producer o m () > fromList = mapM_ yield  > awaitE :: Monad m => Pipe l i o u m (Either u i) > awaitE = tryAwait >>= \emx -> case emx of > Left Nothing -> abort > Left (Just u) -> return$ Left u
>   Right i       -> return $Right i > > awaitForever :: Monad m => (i -> Pipe l i o u m r) -> Pipe l i o u m u > awaitForever f = go where > go = awaitE >>= \ex -> case ex of > Left u -> return u > Right i -> f i >> go > > pipe :: Monad m => (i -> o) -> Pipe l i o u m u > pipe f = awaitForever$ yield . f
>
> idP :: Monad m => Pipe l i i u m u
> idP = pipe id
>
> filterP :: Monad m => (i -> Bool) -> Pipe l i i u m u
> filterP test = awaitForever $\x -> when (test x) (yield x) > > printer :: Show i => Consumer l i u IO u > printer = awaitForever$ lift . print

> runP :: Monad m => Consumer l i u m (u, [i])
> runP = awaitE >>= \ex -> case ex of
>   Left  u -> return (u, [])
>   Right i -> runP >>= \ ~(u, is) -> return (u, i:is)
>
> evalP :: Monad m => Consumer l i u m u
> evalP = fst fmap runP
>
> execP :: Monad m => Consumer l i u m [i]
> execP = snd fmap runP
>
> fold :: Monad m => (r -> i -> r) -> r -> Consumer l i u m r
> fold f = go where
>   go r = awaitE >>= \ex -> case ex of
>     Left _u -> return r
>     Right i -> go $! f r i  > await :: Monad m => Pipe l i o u m i > await = awaitE >>= \ex -> case ex of > Left _u -> abort > Right i -> return i > > oldPipe :: Monad m => (i -> o) -> Pipe l i o u m r > oldPipe f = forever$ await >>= yield . f
>
> oldIdP :: Monad m => Pipe l i i u m r
> oldIdP = oldPipe id
>
> oldFilterP :: Monad m => (i -> Bool) -> Pipe l i i u m r
> oldFilterP test = forever $await >>= \x -> when (test x) (yield x) > > oldPrinter :: Show i => Consumer l i u IO r > oldPrinter = forever$ await >>= lift . print


## Pipes to Conduits part 5: Finalizers

Last time we introduced abort recovery, allowing downstream pipes to recover from an abort. We were able to write the recover combinator, which could attach a recovery pipe to any other pipe.

Today, we’ll look at a different aspect of pipe termination: finalizers. As we have discussed before, downstream pipes may discard upstream pipes when they are done with them, whether the upstream pipe has returned a result or not. That pipe may have unfinished business: for example, open file handles or database connections that need to be closed. We’d like to be able to dictate arbitrary actions which will always be performed before a pipe is discarded.

> {-# LANGUAGE TypeOperators #-}
> {-# OPTIONS_GHC -Wall #-}
>
> module PipeFinalize where
>
> import Control.Monad.Trans.Free (FreeT(..), FreeF(..), liftF, wrap)
> import Fun ((:&:)(..), (:|:)(..))
>
> import Data.Void (Void)


## Functors

We’ll add another Const synonym, Finalize. This one is parameterized by a monad m, and contains an arbitrary action in that monad: m ().

> newtype Then next = Then next            -- Identity
> newtype Yield o next = Yield o           -- Const
> newtype Await i next = Await (i -> next) -- Fun
> data Abort next = Abort                  -- Empty
> newtype Finalize m next = Finalize (m ()) -- Const

> instance Functor Then where
>   fmap f (Then next) = Then (f next)
>
> instance Functor (Yield o) where
>   fmap _f (Yield o) = Yield o
>
> instance Functor (Await i) where
>   fmap f (Await g) = Await (f . g)
>
> instance Functor Abort where
>   fmap _f Abort = Abort
>
> instance Functor (Finalize m) where
>   fmap _f (Finalize m) = Finalize m


There will be times when a finalizer is expected, but we have none to give, and we don’t want anything to occur. We’ll just return () in those cases, so how about a nicer name for that idiom.

> pass :: Monad m => m ()
> pass = return ()


There will also come a time when we must supply a finalizer, but we never expect it to be used. We could use pass for that, too, but instead, let’s use an exploding bomb with a message attached. We’d like to be informed if the unreachable is reached.

> unreachable :: Monad m => m ()
> unreachable = error "You've reached the unreachable finalizer!"


## The Pipe type

We will attach the Finalize information to the Yield information. That way, when upstream yields to downstream, and downstream decides to discard upstream, downstream can use the latest finalizer it acquired from upstream.

That was a lot of up and down so reread that sentence a few times until it becomes clear. It sounds childish, but I find these things tend to make more sense when I wave my hand left when I read "downstream" and right when I read "upstream". It’s also more fun when you add other gestures for verbs.

> type YieldThen o m = Yield o :&: Finalize m :&: Then
> type AwaitU i u    = Await i :&: Await u :&: Then

> type PipeF i o u m = YieldThen o m :|: AwaitU i u :|: Abort
> type Pipe i o u m r = FreeT (PipeF i o u m) m r


Pay special attention to how Pipe is defined here. It makes sure that m is the same m given to both the PipeF functor and to FreeT. See if you can explain why this is significant.

> type Producer o   m r = Pipe () o    () m r
> type Consumer i u m r = Pipe i  Void u  m r
> type Pipeline     m r = Pipe () Void () m r


## Working with PipeF

The yieldF smart constructor is extended appropriately, as is pipeCase.

> liftYield :: YieldThen o m next ->              PipeF i o u m next
> liftYield = L . L
>
> liftAwait :: AwaitU i u next ->                 PipeF i o u m next
> liftAwait = L . R
>
> liftAbort :: Abort next ->                      PipeF i o u m next
> liftAbort = R
>
> yieldF :: o -> m () -> next ->                  PipeF i o u m next
> yieldF o fin next = liftYield $Yield o :&: Finalize fin :&: Then next > > awaitF :: (i -> next) -> (u -> next) -> next -> PipeF i o u m next > awaitF f g next = liftAwait$ Await f :&: Await g :&: Then next
>
> abortF :: PipeF i o u m next
> abortF = liftAbort Abort

> pipeCase :: FreeF (PipeF i o u m) r next
>  ->                                        a  -- Abort
>  -> (r                                  -> a) -- Return
>  -> (o -> m () -> next                  -> a) -- Yield
>  -> ((i -> next) -> (u -> next) -> next -> a) -- Await
>                                         -> a
> pipeCase (Wrap (R Abort))
>   k _ _ _ = k
> pipeCase (Return r)
>   _ k _ _ = k r
> pipeCase (Wrap (L (L (Yield o :&: Finalize fin :&: Then next))))
>   _ _ k _ = k o fin next
> pipeCase (Wrap (L (R (Await f :&: Await g :&: Then next))))
>   _ _ _ k = k f g next


## Pipe primitives

The yield primitive should have no finalizer attached, so we just give it pass for that slot.

> tryAwait :: Monad m => Pipe i o u m (Either (Maybe u) i)
> tryAwait = liftF $awaitF Right (Left . Just) (Left Nothing) > > yield :: Monad m => o -> Pipe i o u m () > yield b = liftF$ yieldF b pass ()
>
> abort :: Monad m => Pipe i o u m r
> abort = liftF abortF


## Pipe composition

The type of composition again remains the same, however, we now need to keep track of an additional argument: the most recent upstream finalizer. We can still keep (<+<), but this will just be a synonym for the new composition function, supplying it the empty finalizer, pass.

> (<+<) :: Monad m => Pipe i' o u' m r -> Pipe i i' u m u' -> Pipe i o u m r
> p1 <+< p2 = composeWithFinalizer pass p1 p2


It will also be convenient to define composition using the unreachable finalizer. You’ll see why momentarily.

> (<?<) :: Monad m => Pipe i' o u' m r -> Pipe i i' u m u' -> Pipe i o u m r
> p1 <?< p2 = composeWithFinalizer unreachable p1 p2

> composeWithFinalizer :: Monad m => m ()
>                  -> Pipe i' o u' m r -> Pipe i i' u m u' -> Pipe i o u m r
> composeWithFinalizer finalizeUpstream p1 p2 = FreeT $do > x1 <- runFreeT p1 > let p1' = FreeT$ return x1
>   runFreeT $pipeCase x1  And now the fun begins. Wherever we used to recursively invoke (<+<), we now need to consider: do we need to retain the current upstream finalizer? To maintain some similarity with previous code, whenever we need to invoke composeWithFinalizer recursively, we’ll let-bind a new operator (<*<), which will have some particular finalizer baked in: which one depends on each situation as we will soon see. (Recall that we also have <+< and <?< at our disposal, which have pass and unreachable finalizers baked in, respectively.) > {- Abort -} ( lift finalizeUpstream >> abort) > {- Return -} (\r -> lift finalizeUpstream >> return r)  Upon reaching a downstream abort or return, we are going to discard the upstream pipe, so we must run the finalizer. Since Pipe is an instance of MonadTrans (by virtue of being a synonym for a FreeT), we can simply lift the finalizer into a pipe, and then sequence it (>>) with the appropriate result. > {- Yield -} (\o finalizeDownstream next -> > let (<*<) = composeWithFinalizer finalizeUpstream > in wrap$ yieldF o
>                           (finalizeUpstream >> finalizeDownstream)
>                           (next <*< p2))


If the downstream pipe is yielding a result, then both the upstream and the downstream pipe are at peril of being discarded by a pipe further down the line. Fortunately, the yield construct provides an appropriate finalizer for p1, and we already have an appropriate finalizer for p2, so we’ll just bundle them together in a new yield construct. But which of the two should we run first? I chose to run the upstream finalizer first, and I’ll explain why later in this post.

In the event that control returns to our downstream pipe, we need not worry about finalizeDownstream, because p1 is once again in control. Therefore, when we compose next with p2, we only bundle in finalizeUpstream.

>   {- Await  -} (\f1 g1 onAbort1 -> FreeT $do > x2 <- runFreeT p2 > runFreeT$ pipeCase x2
>     {- Abort  -} (    onAbort1 <+< abort) -- downstream recovers
>     {- Return -} (\u' -> g1 u' <+< abort) -- downstream recovers


In the event that downstream is awaiting, control transfers upstream. If the upstream pipe is returning or aborting, then we no longer need to care about finalizing it: it has already finalized itself by this point. Therefore, we can use the regular (<+<) operator for these cases, and forget about the finalizeUpstream we used to have.

>     {- Yield  -} (\o newFinalizer next ->
>                       let (<*<) = composeWithFinalizer newFinalizer
>                       in f1 o <*< next)


If downstream is awaiting, and upstream is yielding, then that means the upstream pipe has provided a newFinalizer to use instead of the old one.

>     {- Await  -} (\f2 g2 onAbort2 -> wrap $awaitF > (\i -> p1' <?< f2 i) > (\u -> p1' <?< g2 u) > ( p1' <?< onAbort2)))  When both p1 and p2 are awaiting, well that is an interesting case. Consider: p2 is transferring control further upstream. When control comes back to p2, p1 will still be awaiting. The only way that control will transfer back down to p1 is if p2 decides to abort, return, or yield. If it aborts or returns, then it will have finalized itself. If it yields, then it will supply a brand new finalizer. So the question is, when we re-compose p1 with either f2 i, g2 u, or onAbort2, what finalizer should we use? From what I just said in the previous paragraph, it should be apparent that no matter what finalizer we provide here, it will never be used. So we’ll just hand it the exploding bomb: unreachable. > (>+>) :: Monad m => Pipe i i' u m u' -> Pipe i' o u' m r -> Pipe i o u m r > (>+>) = flip (<+<)  > infixr 9 <+< > infixr 9 >+>  Phew, we made it through again. Finalization is tricky: each case requires careful thought and analysis in order to make sure you are doing the right thing. But did we really do the right thing by using unreachable? Are you sure? Review the code, and think about it. Why did we use <+< for upstream Return and Abort cases instead of <?<? ## Running a pipeline A yielded finalizer makes no difference to runPipe. > runPipe :: Monad m => Pipeline m r -> m (Maybe r) > runPipe p = do > e <- runFreeT p > pipeCase e > {- Abort -} ( return Nothing) > {- Return -} (\r -> return$ Just r)
>   {- Yield  -} (\_o _fin next  -> runPipe next)
>   {- Await  -} (\f _g _onAbort -> runPipe $f ())  ## Adding finalizers to a pipe Well being able to compose pipes with finalizers is well and good, but how do we add finalizers to pipes in the first place? Let’s create a new pipe primitive: cleanupP. > cleanupP :: Monad m => m () -> m () -> m () -> Pipe i o u m r > -> Pipe i o u m r > cleanupP abortFinalize selfAbortFinalize returnFinalize = go where > go p = FreeT$ do
>     x <- runFreeT p
>     runFreeT $pipeCase x  By inspecting a given pipe via pipeCase, we can attach finalizers in three distinct places. > {- Abort -} ( lift selfAbortFinalize >> abort)  Any pipe can decide to abort. For example, in a previous blog post, we created the await pseudo-primitive, which voluntarily aborts if an upstream pipe aborts or returns. > {- Return -} (\r -> lift returnFinalize >> return r)  Any pipe can decide to return. This is another opportunity for finalization. > {- Yield -} (\o finalizeRest next -> wrap$
>                         yieldF o (finalizeRest >> abortFinalize) (go next))


Finally, any pipe can be discarded when it yields control to a downstream pipe. A yield construct may already have finalizers associated with it, so when we add our new one, we’ll just tack it on at the end. We could have just as easily decided to put the new finalizer first; we’ll discuss that decision momentarily.

Notice that we also recursively apply this finalizer to the next pipe after yield. That’s because if control returns to this pipe from downstream, then we still want to finalize it later.

>     {- Await  -} (\f g onAbort -> wrap $> awaitF (go . f) (go . g) (go onAbort))  We anticipate each possibility in the await case, and recursively apply the finalizer to all of them. ## More convenient finalization combinators cleanupP is too general to be useful. Let’s create some convenience combinators to handle typical needs. > finallyP :: Monad m => m () -> Pipe i o u m r -> Pipe i o u m r > finallyP finalize = cleanupP finalize finalize finalize  If we want a given finalizer run no matter what, then just use it for all 3 possibilities. > catchP :: Monad m => m () -> Pipe i o u m r -> Pipe i o u m r > catchP finalize = cleanupP finalize finalize pass  If we only want a finalizer to run if something "goes wrong", then we simply pass on the return option. > successP :: Monad m => m () -> Pipe i o u m r -> Pipe i o u m r > successP finalize = cleanupP pass pass finalize  Conversely, we may only want a finalizer to run in the absence of "problems", so we pass on both "problem" cases. > bracketP :: MonadResource m => IO a -> (a -> IO ()) -> (a -> Pipe i o u m r) > -> Pipe i o u m r > bracketP create destroy mkPipe = do > (key, val) <- lift$ allocate create destroy
>   finallyP (release key) (mkPipe val)


ResourceT provides allocate and release to help you deal with finalizers, even in the face of thrown exceptions. We can make good use of this by lifting allocate into a Pipe, and then adding the corresponding release as a finalizer!

## How do we know which finalizer comes first?

I’ve defined a few quick-n-dirty functions here to help us observe the behavior of pipe finalization.

> idMsg :: String -> Pipe i i u IO u
> idMsg str = finallyP (putStr $str ++ " ") idP > > take' :: Monad m => Int -> Pipe i i u m () > take' 0 = pass > take' n = (await >>= yield) >> take' (pred n)  testPipeR will test what happens when abort comes from upstream. > testPipeR :: Monad m => Pipe i o u m r -> m (Maybe r) > testPipeR p = runPipe$ (await >> abort) <+< p <+< abort


testPipeL will test what happens when abort comes from downstream.

> testPipeL :: Monad m => Pipe Int o () m r -> m (Maybe r)
> testPipeL p = runPipe $(await >> await >> abort) <+< take' 1 <+< p <+< fromList [1 ..]  testPipe will test what happens when abort comes from within the pipe itself. > testPipe :: Monad m => Pipe Int o () m r -> m (Maybe (r, [o])) > testPipe p = runPipe$ runP <+< p <+< fromList [1..]

> examplePipe :: Pipe Int Int u IO ()
> examplePipe = idMsg "one" <+< take' 5 <+< idMsg "two" <+< idMsg "three"


Let’s take this for a spin.

ghci> testPipeR examplePipe
three two one Nothing

ghci> testPipeL examplePipe
three two one Nothing

ghci> testPipe examplePipe
three two one Just ((),[1,2,3,4,5])


Well that’s boring. In each case the finalizers run in order from upstream to downstream: "three two one". But it’s boring on purpose: the way that I have defined for finalizers to behave is that if you are a pipe, and your finalizer is running, you can safely assume that any pipes upstream of you have already been finalized.

I encourage you to download this code , and mess with it (requires Fun.lhs as well, tested on GHC 7.4.1). What happens when you switch the order of finalizers on line 204 (pipe composition)? What happens when you switch the order of finalizers on line 317 (cleanupP)? What if you switch both? Can you think of any circumstances when you’d want a pipe’s finalizer to run before pipes upstream of it are finalized? You can use this command to run the ghci examples and see the difference between the expected output and the actual output:

$BlogLiterately -g PipeFinalize.lhs > test.html && firefox test.html ## Next time The subtleties of finalization provide us a lot to think about. There is again room for many possible implementations, but logic and seeking consistent behavior can help us narrow the possibilities, and Haskell’s type system often guides us to the "obvious" solution. Next time, we’ll tackle the "leftovers" feature, using the same style as conduit. I’ll try to point out all of the areas where different implementations are possible, because I feel that the decisions are less clear for leftovers than for previous features. ## Some basic pipes Here’s all of those pipes from previous posts. They remain unchanged: you can ignore the new finalizer capability that we added and go right along writing pipes just like you did before we added this feature. > fromList :: Monad m => [o] -> Producer o m () > fromList = mapM_ yield > > awaitE :: Monad m => Pipe i o u m (Either u i) > awaitE = tryAwait >>= \emx -> case emx of > Left Nothing -> abort > Left (Just u) -> return$ Left u
>   Right i       -> return $Right i > > awaitForever :: Monad m => (i -> Pipe i o u m r) -> Pipe i o u m u > awaitForever f = go where > go = awaitE >>= \ex -> case ex of > Left u -> return u > Right i -> f i >> go > > pipe :: Monad m => (i -> o) -> Pipe i o u m u > pipe f = awaitForever$ yield . f
>
> idP :: Monad m => Pipe i i u m u
> idP = pipe id
>
> filterP :: Monad m => (i -> Bool) -> Pipe i i u m u
> filterP test = awaitForever $\x -> when (test x) (yield x) > > printer :: Show i => Consumer i u IO u > printer = awaitForever$ lift . print
>
> runP :: Monad m => Consumer i u m (u, [i])
> runP = awaitE >>= \ex -> case ex of
>   Left  u -> return (u, [])
>   Right i -> runP >>= \ ~(u, is) -> return (u, i:is)
>
> evalP :: Monad m => Consumer i u m u
> evalP = fst fmap runP
>
> execP :: Monad m => Consumer i u m [i]
> execP = snd fmap runP
>
> fold :: Monad m => (r -> i -> r) -> r -> Consumer i u m r
> fold f = go where
>   go r = awaitE >>= \ex -> case ex of
>     Left _u -> return r
>     Right i -> go $! f r i  > await :: Monad m => Pipe i o u m i > await = awaitE >>= \ex -> case ex of > Left _u -> abort > Right i -> return i > > oldPipe :: Monad m => (i -> o) -> Pipe i o u m r > oldPipe f = forever$ await >>= yield . f
>
> oldIdP :: Monad m => Pipe i i u m r
> oldIdP = oldPipe id
>
> oldFilterP :: Monad m => (i -> Bool) -> Pipe i i u m r
> oldFilterP test = forever $await >>= \x -> when (test x) (yield x) > > oldPrinter :: Show i => Consumer i u IO r > oldPrinter = forever$ await >>= lift . print
`