Much ado about monoids

Mark Hopkins

@antiselfdual

antiselfdual.com

Challenge

Stock exchange data

20091230,GOOG,618.07,622.73,618.01,622.73,14663
20091231,GOOG,624.86,625.4,619.98,619.98,12202
20100104,GOOG,627.46,629.51,624.24,626.75,19579
20100105,GOOG,627.84,627.84,621.54,623.99,30078
20100106,GOOG,625.08,625.86,606.36,608.26,39806
...

Columns are: date, ticker symbol, opening, highest, lowest and closing prices, and volume of shares traded

Task

Perform computations on this data

We want to be pretty flexible about the exact computation we perform

And do it in a single pass (there's a lot of data)

Monoids

Spot the differences

scala> (1 + 2) + 3 == 1 + (2 + 3)
res0: Boolean = true

scala> 9 + 0
res1: Int = 9

scala> 0 + 9
res2: Int = 9

scala> ("philip" + "k") + "dick" == "philip" + ("k" + "dick")
res1: Boolean = true

scala> "something" + ""
res2: Boolean = "something"

scala> "" + "something"
res3: Boolean = "something"

scala> BigInt(30) gcd (BigInt(42) gcd BigInt(70)) ==
(BigInt(30) gcd (BigInt(42)) gcd BigInt(70)
res0: Boolean = true

scala> BigInt(345) gcd BigInt(0)
res1: scala.math.BigInt = 345

scala> BigInt(0) gcd BigInt(345)
res2: scala.math.BigInt = 345

What's similar in each case? In each we've got

  1. a way of sticking two Ts together to get another T,
  2. an element that doesn't do anything when you stick it onto anything (from either side),

    Usually we call this the unit

  3. if we stick together a few things, it doesn't matter where we put the brackets, we still end up with the same result.

    In other words, the final result is independent of the way we "associate" the elements together, providing we keep them in the same order. This property is called associativity.

  4. Something that satisfies this definition is called a monoid

    Or a semigroup if we don't require a unit

Why "monoid"?

mono is Greek for "one"

Because there's only one operation

As for the "-oid"...

...you'll have to ask the guy who came up with the name and popularised the concept

This guy! Nicolas Bourbaki

Who never existed!

Bourbaki seminar, 1930s France

Bourbaki seminar, 1930s France

Examples!

These last two are only a semigroup — not clear what the unit should be...

If we wanted extend to a monoid, for, say, the maximum on Int, we could:

Monoids don't have to be numeric

scalaz.math.Ordering

consists of constants LT, GT, EQ

There's a monoid defined like this:

        EQ |+| x == x
        LT |+| x == LT
        GT |+| x == GT
    

This lets us to write chained comparisons with immaculate ease.

For example, if we wanted to sort Twitter users by number of followers, then by number of tweets, then by name, we could use
def compareUsers(a: User, b: User): Ordering =
        (a.followers ?|? b.followers)
    |+| (a.tweets ?|? b.tweets)
    |+| (a.name ?|? b.name)
Note: ?|? is just a little more scalaz syntactic sugar:
scala> 10 ?|? 12
res0: scalaz.Ordering = LT

Oddities

New monoids from old

Tuples of monoids create new monoids:

scala> ("hi", List(3, 4, 5), 1.2) |+| (" Mum", List(7, 8, 9), 4.6)
res0: (String, List[Int], Double) = (hi Mum,List(3, 4, 5, 7, 8, 9),5.8)

The |+| function here is just scalaz syntactic sugar for append.

New monoids from old

If M is a monoid, then functions A => M are automatically a monoid:

implicit def pointwiseMonoid[A, M](implicit m: Monoid[M]) =
    new Monoid[A => M] {
        def zero = a => m.zero

        def append(f: A => M, g: A => M): A => M =
            a => m.append(f(a), g(a))
    } 

If V is a monoid, then Map[K, V] gets a monoid structure

scala> Map('finn -> 3, 'jake -> 4, 'beemo -> 22) |+|
     | Map('finn -> 1, 'princess_bubblegum -> 8, 'beemo -> -1)
res0: scala.collection.immutable.Map[Symbol,Int] =
    Map(
        'finn -> 4,
        'princess_bubblegum -> 8,
        'beemo -> 21,
        'jake -> 4
    )
    

Pro-tip

Use "tags" (phantom types) to define a new monoid instance for a class that already has one

type Tagged[T] = {type Tag = T}
type @@[+T, Tag] = T with Tagged[Tag]
sealed trait GCD

implicit val GCDMonoid = new Monoid[BigInt @@ GCD] {
    def zero = Tag(BigInt(0))
    def append(a: BigInt @@ GCD, b: => BigInt @@ GCD) =
      Tag(a gcd b)
}
scala> GCD(16) |+| GCD(12)
res0: scalaz.@@[BigInt,GCD] = 4

Phew!

What's it all for?

What are monoids about?

They abstract the notion of accumulation

Why are we using

def foldLeft[B](z: B)(op: (B, A) => B): B

In most cases, z is the unit of a monoid, and op is just collecting things onto an accumulator

 val totalSize = collections foldMap (x => x.size) 

instead of

 val totalSize = collections foldLeft (0) ((a,x) => a + x.size) 

If we define

def foldMap[B](f: A => B)(implicit F: Monoid[B]) =
    foldLeft[B](F.zero){ (b, a) => F.append(b, f(a)) }
    

Let the monoid take care of the accumulation

separation of concerns, modularisation.

foldMap is provided in scalaz's Foldable trait

Foldable instances are provided for subclasses of Iterable (and some scalaz types)

Why "foldMap"?

Well, conceptually, foldMap(f) does a
map(f): F[A] => F[M]

followed by a

fold: F[M] => M
In the same way that flatMap(f) conceptually does map(f) followed by flatten.

Some kinds of calculations don't work as monoids. eg. averages

We can't combine two averages unless we know the size of the datasets they came from

But its the ratio of the sum and the count of elements, and they're both monoids...

Let's define a type class that allows some post-processing

    abstract class Aggregate[T:Monoid] {
        type Result
        def result(a: T): Result
    }

And a helper method:

        def aggregate(fa: F[A])(k: A => M) = fa.foldMap(k).result 

Actually, it's a little messier:

def aggregate[F[_], A, M](fa: F[A])(k: A => M)
 (implicit f: Foldable[F], a: Aggregation[M], m: Monoid[M]): a.Result =
        a.result(fa.foldMap(k)(m))

Now let's implement Mean

case class Mean[N:Fractional](sum:N,count:Int)
object Mean{
    def apply[N:Fractional](n:N): Mean[N] = Mean(n, 1)
}
implicit def MeanMonoid[N](implicit F:Fractional[N]) =
    new Monoid[Mean[N]] {
      import F._
      def zero = Mean(F.zero, 0)
      def append(a:Mean[N], b: => Mean[N]) =
        Mean(a.sum + b.sum, a.count + b.count)
    }
implicit def MeanAggregation[N](implicit F: Fractional[N]) =
    new Aggregation[Mean[N]] {
      type Result = N

      import F._

      def result(a: Mean[N]) = a.sum / fromInt(a.count)
    }

Now it works!

scala> aggregate(List[Double](1,2,3,4,5,6,7,8,9,10))(Mean(_))
res0: Double = 5.5

Let's add filtering

def filter[A, B:Monoid](p: A => Boolean)(f: A => B): A => B =
    a => if (p(a)) f(a) else implicitly[Monoid[B]].zero
    
scala> (1 to 30).toList.foldMap(filter (even) (_.toString))
res0: String = 24681012141618202224262830
    

And grouping

def groupBy[A, K, M: Monoid](createKey: A => K, monoidValuedFunction: A => M): A => Map[K, M] =
    a => Map[K, M](createKey(a) -> monoidValuedFunction(a))
    
scala> val as = "monoids are a pretty simple concept really but are pretty handy
 in practice and find a wide range of pragmatic applications in programming".spl
it("\\W").toList

scala> as.foldMap(groupBy((_:String).head, a3(count, maxLength, minLength)))
res0: Map[Char,(Int, aggregations.Aggregations.Max[Int], aggregations.Aggregations.Min[Int])] =
        Map(s -> (1,Max(Some(6)),Min(Some(6))),
        f -> (1,Max(Some(4)),Min(Some(4))),
        a -> (6,Max(Some(12)),Min(Some(1))),
        m -> (1,Max(Some(7)),Min(Some( 7))),
        i -> (2,Max(Some(2)),Min(Some(2))),
        b -> (1,Max(Some(3)),Min(Some(3))),
        p -> (5,Max(Some(11)),Min(Some(6))),
        c -> (1,Max(Some(7)),Min(Some(7))),
        h -> (1,Max(Some(5)),Min(Some(5))),
        r -> (2,Max(Some(6)),Min(Some(5))),
        w -> (1,Max(Some(4)),Min(Some(4))),
        o -> (1,Max(Some(2)),Min(Some(2))))

And helpers for doing operations in parallel

def a2[X,A,B](f: X => A,g: X => B): X => (A, B) =
  x => (f(x), g(x))

Similarly a3, a4, ...

scala> val l = List[Double](1, 2, 3, 4, 5, 6)
res0: List[Double] = List(1.0, 2.0, 3.0, 4.0, 5.0, 6.0)

scala> aggregate(l)( a2( filter((_:Double) >= 3) (sum), filter((_:Double) <= 4) (product))) )
res1: (Double, Double) = (18.0,24.0)

scala> aggregate(l)( a3(max, min, mean) )
res1: (Double, Double, Double) = (6.0,1.0,3.5)
    

Or, getting fancy...

Instead of writing a2, a3 etc by hand...

Define an for all n using some Shapeless magic.

scala> val s = sum[Double]
scala> aggregate(l)(flatten(s &&& s &&& s &&& s &&& s &&& s))
res0: (Double, Double, Double, Double, Double, Double) = (21.0,21.0,21.0,21.0,21.0,21.0)
        

&&& is an Arrow operator, it's just the same thing as our a2.

Back to problem

Read lines of file

def linesOf(f: File) = new FileLineTraversable(f).view
class FileLineTraversable(file: File) extends Traversable[String] {
  def foreach[U](f: String => U) {
    val reader = new BufferedReader(new FileReader(file))
    try {
      var line = reader.readLine()
      while (line != null) {
        f(line)
        line = reader.readLine()
      }
    } finally {
      reader.close()
    }
  }

  override def toString() = s"[lines of ${file.getAbsolutePath}]"
}

Parse each line

case class Prices(
    date:   LocalDate,
    ticker: String,
    open:   BigDecimal,
    high:   BigDecimal,
    low:    BigDecimal,
    close:  BigDecimal,
    volume: Int
)
def parsePrices(line: String): Option[Prices] = {
  val csvColumn = "([^,]+)"
  val P = (List.fill(7)(csvColumn) mkString ",").r
  def option[T](t: => T) =
      catching(classOf[IllegalArgumentException]) opt t
  for {
    P(d,t,o,h,l,c,v) <- Some(line)
    date             <- option(LocalDate.parse(d, YYYYMMdd))
    ticker           <- Some(t)
    open             <- option(BigDecimal(o))
    high             <- option(BigDecimal(h))
    low              <- option(BigDecimal(l))
    close            <- option(BigDecimal(c))
    volume           <- option(v.toInt)
  } yield Prices(date, ticker, open, high, low, close, volume)
}

Run aggregation on a file

def aggregateFile[M, P](file: File)(parse: String => Option[P])(f: P => M)
  (implicit a: Aggregation[M], m: Monoid[M]): a.Result = {
  val ps = for {
    line <- linesOf(file)
    p <- parse(line)
  } yield p
  aggregate(ps)(f)
}
def aggregatePrices[M:Monoid:Aggregation](file:File)(f: Prices => M) =
    aggregateFile(file)(parsePrices)(f)

Range of opening and closing prices

    val opening = (_:Prices).open
    val closing = (_:Prices).close
scala> aggregatePrices (f) ( (opening andThen range[BigDecimal]) &&& (closing andThen range[BigDecimal]))
res0: ((_1.Result, _2.Result), (_1.Result, _2.Result)) = ((Some(1.36),Some(627.84)),(Some(1.35),Some(626.75)))
    

Range of opening and closing prices for Google

aggregatePrices(f)(filter (google) (range(opening) &&& range(closing)))

Range of opening and closing prices for Google, grouped by month

aggregatePrices(f)(filter (google) (groupBy (startOfMonth) (range(opening) &&& range(closing)))

Bonus example

Now that we've got this framework, it's not terribly difficult to adapt to another setting

def filesUnderDirectory(dir: File): Traversable[File] = ...
val files = filesUnderDirectory(...).view
total size of all files in subtree
val size: File => Long = _.length
aggregate (files) (size)
total size of all .o files in subtree
val objFile: File => Boolean = _.getName.endsWith(".o")
aggregate (files) (filter (objFile) (size))

Get their total size, and zip them!

val (zipFile, totalSize) = aggregate(filter(objFile)(archive &&& size)

End

/

#