Wednesday, April 28, 2010

First Adventures in ScalaTest - Part III: Introducing Actors in the NetWorth Sample

This is the third post about my little net worth Scala sample. This third post really doesn't have a lot to do with ScalaTest, so the title is probably somewhat misleading. What this post is about is how to introduce some parallelism in the net worth calculation code by using Scala actors. The first post in the series showed some test written in ScalaTest, the second one showed the net worth sample itself, and this one starts where that post ended: The code for calculating the net worth based on an XML file of stock symbols and units is:
val stocksAndUnitsXml = scala.xml.XML.load(fileName)

 val tickersAndUnits =
    (Map[String, Int]() /: (stocksAndUnitsXml \ "symbol")) {
      (map, symbolNode) =>
        val ticker = (symbolNode \ "@ticker").toString
        val units = (symbolNode \ "units").text.toInt
        map(ticker) = units
    }

 def generateTotalNetWorthAndSimpleReport = {
    val report = new java.io.ByteArrayOutputStream
    Console.withOut(report) {

      println("Today is " + new java.util.Date)
      println("Ticket  Units  Closing Price($)  Total value($)")

      val startTime = System.nanoTime

      val netWorth = (0d /: tickersAndUnits) {
        (cumulativeWorth, symbolAndUnitsPair) =>
          val (symbol, units) = symbolAndUnitsPair
          val lastClosingPrice = getLatestClosingPriceFor(symbol)
          val value = lastClosingPrice * units

          println("%-7s  %-5d %-16f  %-16f".format(symbol, units, lastClosingPrice, value))

          cumulativeWorth + value
      }

      val endTime = System.nanoTime

      println("Total value of investments is $" + netWorth)
      println("Report tool %f seconds to generate".format( (endTime - startTime)/1000000000.0 ))

      (netWorth, report)
    }
  }
That code is explained in my last post.

Now I want to parallelize that calculation. Specifically I want to parallelize the web service calls to get the latests price for each stock symbol, so I'm going to fetch each price in a separate actor and then send the prices back to the main thread and accumulate the net worth there.

The code that gets the price for a single symbol and sends it to another actor called 'caller' is:
    caller ! (symbol, getLatestClosingPriceFor(symbol))
To do that in a separate actor for each symbol I call this method:
   private[this] def getLatestPricesForAllSymbols(caller: Actor) =
     tickersAndUnits.keys.foreach {
       symbol =>
         actor {
           caller ! (symbol, getLatestClosingPriceFor(symbol))
         }
    }
  }
the call to 'actor' starts a new actor an executes the function given to it asynchronously. The '!' method on the actor 'caller' sends the value given as an argument to the '!' method to 'caller'.

To receive a single symbol and price I do:
  receiveWithin(10000) {
    case (symbol: String, lastClosingPrice: Double) =>
      val units = tickersAndUnits(symbol)
      val value = lastClosingPrice * units
  }
'receiveWithin(10000)' blocks until a message arrives for the current actor, or times out after 10 seconds, so if this code is in the 'caller' actor it will receive one of the symbol/price pairs sent using '!' in the code above.

In order to accumulate the whole net worth I place the above in a function that also takes an 'cucumulativeWorth' and returns an updated cumulative worth:
   private[this] def receiveAndProcessOneSymbol(cumulativeWorth: Double) = {
    receiveWithin(10000) {
      case (symbol: String, lastClosingPrice: Double) =>
        val units = tickersAndUnits(symbol)
        val value = lastClosingPrice * units

        println("%-7s  %-5d %-16f  %-16f".format(symbol, units, lastClosingPrice, value))

        cumulativeWorth + value
    }
  }
and to receive all the symbol/price pairs I call that method as many times as there are symbols in the 'tickersAndUnits' map:
  private[this] def receiveAndAccumulateWorthForAllSymbol =
    (0d /: (1 to tickersAndUnits.size)) {
      (cumulativeWorth, index) =>
        receiveAndProcessOneSymbol(cumulativeWorth)
    }
Putting all that together the 'generateTotalNetWorthAndSimpleReport' becomes:
  def generateTotalNetWorthAndSimpleReport = {
    val report = new ByteArrayOutputStream
    Console.withOut(report) {
      calculateNetWorthAndWriteReportTo(report)
    }
  }

  private def calculateNetWorthAndWriteReportTo(report: ByteArrayOutputStream) = {
      writeReportHeader

      val startTime = System.nanoTime
      getLatestPricesForAllSymbols(self)
      val netWorth = receiveAndAccumulateWorthForAllSymbol
      val endTime = System.nanoTime

      writeReportFooter(netWorth, endTime, startTime)

      (netWorth, report)
    }

  private def writeReportHeader = {
      println("Today is " + new java.util.Date)
      println("Ticket  Units  Closing Price($)  Total value($)")
    }

  private def writeReportFooter(netWorth: Double, endTime: Long, startTime: Long): Unit = {
    println("Total value of investments is $" + netWorth)
    println("Report took %f seconds to generate".format((endTime - startTime) / 1000000000.0))
  }

  private[this] def getLatestPricesForAllSymbols(caller: Actor) =
    tickersAndUnits.keys.foreach {
      symbol =>
        actor {
          caller ! (symbol, getLatestClosingPriceFor(symbol))
        }
    }

  private[this] def receiveAndAccumulateWorthForAllSymbol =
    (0d /: (1 to tickersAndUnits.size)) {
      (cumulativeWorth, index) =>
        receiveAndProcessOneSymbol(cumulativeWorth)
    }

   private[this] def receiveAndProcessOneSymbol(cumulativeWorth: Double) = {
    receiveWithin(10000) {
      case (symbol: String, lastClosingPrice: Double) =>
        val units = tickersAndUnits(symbol)
        val value = lastClosingPrice * units

        println("%-7s  %-5d %-16f  %-16f".format(symbol, units, lastClosingPrice, value))

        cumulativeWorth + value
    }
Now the calculation is done by fetching prices in parallel using the lightweight Scala actors, and sending the results back as one way messages that are aggregated to the final result in the main thread. That was easy, don't you think?

Oh, and finally even in this little sample and on a small dataset this actually does speed up things. If I rerun the integration test shown in the last post:
class IntegrationSpec extends FlatSpec with ShouldMatchers {
  def withYahooStockPriceFinder(fileName: String)(testFunctionBody: (PortfolioManager) => Unit) {
    testFunctionBody(new PortfolioManager(fileName) with YahooStockPriceFinder)
  }
 
  "A PortFolioManager with the YahooStockPriceFinder" should "produce a asset report and calculate the total net worth" in {
    withYahooStockPriceFinder("src/configuration/stocks.xml") {
      pm =>
        val (totalNetWorth, report) = pm.generateTotalNetWorthAndSimpleReport
        totalNetWorth should be(85000d plusOrMinus 10000)

        print(report)
    }
  }
}
I get:
Today is Wed Apr 28 20:37:38 CEST 2010
Ticket  Units  Closing Price($)  Total value($)
MSFT     190   30,930000         5876,700000    
INTC     160   23,220000         3715,200000    
ALU      150   3,150000          472,500000     
ORCL     200   25,940000         5188,000000    
NSM      200   14,970000         2994,000000    
CSCO     250   27,170000         6792,500000    
AMD      150   9,550000          1432,500000    
IBM      215   130,060000        27962,900000   
VRSN     200   26,840000         5368,000000    
XRX      240   10,980000         2635,200000    
APPL     200   0,000000          0,000000       
HPQ      225   53,330000         11999,250000   
ADBE     125   35,420000         4427,500000    
SYMC     230   17,270000         3972,100000    
TXN      190   26,380000         5012,200000    
Total value of investments is $87848.55
Report took 0,837941 seconds to generate
which shows that the calculation took less than a second now, whereas the calculation took over 6 seconds in the sequential version.
Oh, and also notice that the symbols appear in a different order in the report than they did in the last post. The input data is exactly the same. The difference is that they are printed in the order that the main thread receives prices for them, and that ordering is not deterministic any more because it depends on how fast each web service call returns.

And that's it. I'm still having fun with learning Scala :-)

Thursday, April 15, 2010

First Adventures in ScalaTest - Part II

Following up on my last post, where I showed some test code written with ScalaTest for a simple PotfolioManager demo I'll show the PortfolioManager itself in this post. As mentioned in the last post the code is largely (almost entirely) based on the last chapter of Programming Scala by Venkat Subramaniam.

The PortFoliioManager is an abstract class with a default constructor that takes a path to an XML file and loads the contents:

abstract class PortfolioManager(fileName: String) extends StockPriceFinder {
    val stocksAndUnitsXml = scala.xml.XML.load(fileName)
  //more..
}


The data in the XML file is expected to look something like this:

<symbols>
<symbol ticker="APPL"><units>200</units></symbol>
<symbol ticker="ADBE"><units>125</units></symbol>
</symbols>







and is parsed through XPath queries applied to the XML with the '\' operator. This method on the PortfolioManager does the parsing:

def tickersAndUnits =  
  (Map[String, Int]() /: (stocksAndUnitsXml \ "symbol")) {
    (map, symbolNode) =>
      val ticker = (symbolNode \ "@ticker").toString
      val units = (symbolNode \ "units").text.toInt
      map(ticker) = units
  }

so what goes on there is that we get the list of symbol XML elements from the stocksAndUnitsXml value by use of operator '\' from the Scala standard library. We then iterate over that list with '/:' (aka foldLeft). Through the iteration we build up a map from strings to ints mapping from stock symbols to the number of units. Again the data is pulled out of the XML with XPath and '\'.

Thats all just a bit of warm up. What the PortfolioManager is supposed to do is calculate and report the net worth of the stock portfolio described in the XML. In itself that's not a big deal, but I think its fun to see how it's handled in Scala - it turns out to be sort of neat.

The net worth is calculated by fetching the latest price of each individual stock symbol multiply by the units and sum up. To get the stock prices the code must query some external source. E.g. download.finance.yahoo.com, but I don't want to do that directly because I want to be able to run my tests without depending or waiting for Yahoo. That's why the ProfolioManager is abstract: The call to get the latest price for a symbol is factored out to the StockPriceFinder traits and is abstract:

trait StockPriceFinder {
  protected def getLatestClosingPriceFor(tickerSymbol: String) : Double
}

Since Portfolio manager extends StockPriceFinder it has to be abstract. Client code has to either instantiate concrete subclasses or mix in a trait implementing the getLatestClosingPriceFor method at instantiation time. That's what the tests from the last post did. And that's what makes it easy to switch between a fake implementation for unit tests, and a real implementation for "production" code and integration test code. I'll show an integration test towards the end of the post, but for now lets return to calculating the net worth and print a simple report to an output stream:

def generateTotalNetWorthAndSimpleReport = {
    val report = new java.io.ByteArrayOutputStream
    Console.withOut(report) {

      println("Today is " + new java.util.Date)
      println("Ticket  Units  Closing Price($)  Total value($)")

      val startTime = System.nanoTime
      val netWorth = (0d /: tickersAndUnits) {
        (cumulativeWorth, symbolAndUnitsPair) =>
          val (symbol, units) = symbolAndUnitsPair
          val lastClosingPrice = getLatestClosingPriceFor(symbol)
          val value = lastClosingPrice * units
          println("%-7s  %-5d %-16f  %-16f".format(symbol, units, lastClosingPrice, value))
          cumulativeWorth + value
      }
      val endTime = System.nanoTime

      println("Total value of investments is $" + netWorth)
      println("Report took %f seconds to generate".format( (endTime - startTime)/1000000000.0 ))

      (netWorth, report)
    }
  }

Things to notice in the code above are:
  • Console.withOut(report) redirects printlns in its function parameter to the output stream 'report'
  • The method returns two values packaged up in a tuple simply by ending with the line '(networth, report)'
  • The fold left operation is used again this time to iterate over the map from symbols to units and accumulating the worth
  • The price of a symbol is found by calling the abstract getLatestClosingPriceFor

To use the code we provide an implementation for the StockPriceFinder, a path and call generateTotalNetWorthAndSimpleReport:

class IntegrationSpec extends FlatSpec with ShouldMatchers {
  def withYahooStockPriceFinder(fileName: String)(testFunctionBody: (PortfolioManager) => Unit) {
    testFunctionBody(new PortfolioManager(fileName) with YahooStockPriceFinder)
  }

  "A PortFolioManager with the YahooStockPriceFinder" should "produce a asset report and calculate the total net worth" in {
    withYahooStockPriceFinder("src/configuration/stocks.xml") {
      pm =>
        val (totalNetWorth, report) = pm.generateTotalNetWorthAndSimpleReport
        totalNetWorth should be(85000d plusOrMinus 10000)

        print(report)
    }
  }
}

which produces this output:

Today is Wed Apr 14 20:54:02 CEST 2010
Ticket  Units  Closing Price($)  Total value($)
XRX      240   10.540000         2529.600000   
NSM      200   15.650000         3130.000000   
SYMC     230   17.015000         3913.450000   
ADBE     125   34.875000         4359.375000   
VRSN     200   26.870000         5374.000000   
CSCO     250   26.870100         6717.525000   
TXN      190   26.737500         5080.125000   
ALU      150   3.400000          510.000000     
IBM      215   131.050000        28175.750000   
INTC     160   23.470000         3755.200000   
ORCL     200   26.290000         5258.000000   
APPL     200   0.000000          0.000000      
HPQ      225   54.530000         12269.250000   
AMD      150   9.940000          1491.000000   
MSFT     190   30.730000         5838.700000   
Total value of investments is $88401.975
Report took 6.151573 seconds to generate

Next post I'll introduce some actors in the code to parallelize it, and to see if that brings a speed up.

Friday, April 9, 2010

First Adventures in ScalaTest

As mentioned earlier I'm spending some time learning Scala. To that end I've read Programming Scala - which BTW is a book I'd recommend to anyone looking into Scala. Towards the end of the book there's an example of a litte application that can calculate the net worth of some stocks: Given a collection of ticker symbols and amounts it goes to Yahoo and finds the latests trading prices and adds up the net worth. Not too complicated really. I'm sort of following along with the book in my own code, but I'm also sort of straying. In later posts I'll show the actual application, but for know I just want to show some of the tests I've written along the way.


I'm using ScalaTest for testing which comes in a couple of flavors of which I've chosen the BDD styled FlatSpec style. And I'm really enjoying it. Let's look at some of the things I like about it. First, here are just two simple tests for the constructor of my code under test:


class PortfolioManagerSpec extends FlatSpec with ShouldMatchers {
  def withFakeStockPriceFinder(
        fileName: String)
       (testFunctionBody: (PortfolioManager) => Unit) {
    testFunctionBody(new PortfolioManager(fileName)
      with FakeStockPriceFinder)
  }


  "A PortfolioManager" should "be instantiable with a valid file name" in {
    withFakeStockPriceFinder("src/configuration/stocks.xml")
     {pm =>}
  }


  it should "throw an exception when given an invalid file name" in {
    evaluating {
      withFakeStockPriceFinder("fakefilename") {pm =>}
    } should produce[FileNotFoundException]
  }
  ...
}


The ScalaTest flavor is chosen simply by extending one of several traits, in this case FlatSpec. FlatSpec gives me the ability to specify test cases in " should in" fashion as seen in the first test above. To me doing that in a statically typed language is awesome, and goes to show how flexible Scala really is. The second test starts with "it", which means that it continues on the line of testing of the test above. In this case "it" lets me avoid repeating the "A PortFolioManager" part.


Even these simple tests already puts Scalas strong support for functions into play: The withFakeStockPriceFinder method at the top is a curried method that takes first a string argument, and then a function argument, testFunctionBody. The method instantiates the object under test and passes it into the testFunctionBody. Both test cases above use withFakeStockPriceFinder to contruct the object under test, and have it passed into an anonymous function where the actual test code is written. Furthermore the second test makes use of some of the things from the ShouldMatchers trait, namely the evaluating, should and produce methods. Evaluating takes a function argument and executes it, much like my own withFakeStockPriceFinder, but it handles any exceptions thrown and lets me set up expectations for exceptions by calling the should and produce methods in a fluent fashion. -Note that dots and parenthesis in method calls are optional in Scala, as long as there are no ambiguities. This is IMHO is also awesome, and again goes to show how flexible Scala is.


Before showing the rest of my PortfolioManagerSpec I want to touch on another point: TDD is - as we know - a very disciplined way of working, and can be hard to follow all the time but somehow I find that the style of testing promoted by the FlatSpec and ShouldMatchers nudges me towards shorter red-green cycles and a more stringent test-first practice, than I usually have with NUnit. I'm not sure why that is, but it has to do with the way tests are declared with strings rather that method names, I think.


Anyway here's the whole PortfolioManagerSpec:


class PortfolioManagerSpec extends FlatSpec with ShouldMatchers {
  def withFakeStockPriceFinder(
        fileName: String)
       (testFunctionBody: (PortfolioManager) => Unit) {
    testFunctionBody(new PortfolioManager(fileName)
      with FakeStockPriceFinder)
  }


  "A PortfolioManager" should "be instantiable with a valid file name" in {
    withFakeStockPriceFinder("src/configuration/stocks.xml") 
      {pm =>}
  }


  it should "throw an exception when given an invalid file name" in {
    evaluating {
      withFakeStockPriceFinder("fakefilename") {pm =>}
    } should produce[FileNotFoundException]
  }


  "A PortfolioManager for stocks.xml" should "find 15 symbols and units in the stocks.xml file" in {
    withFakeStockPriceFinder("src/configuration/stocks.xml") {
      pm =>
        val tickersAndUnits = pm.tickersAndUnits
        tickersAndUnits.size should be (15)
    }
  }


  it should "find APPL, NSM and XRX ticker symbols" in {
    withFakeStockPriceFinder("src/configuration/stocks.xml") {
      pm =>
        val tickersAndUnits = pm.tickersAndUnits
        tickersAndUnits should (contain key ("APPL") and contain key ("NSM") and contain key ("XRX"))
    }
  }


  it should "find 200 ORCL" in {
    withFakeStockPriceFinder("src/configuration/stocks.xml") {
      pm =>
        val tickersAndUnits = pm.tickersAndUnits
        tickersAndUnits("ORCL") should be (200)
    }
  }


  it should "produce a asset report and calculate the total net worth" in {
    withFakeStockPriceFinder("src/configuration/stocks.xml") {
      pm =>
        val (totalNetWorth, report) =                  
          pm.generateTotalNetWorthAndSimpleReport
        totalNetWorth should be (146250)

        print(report)
    }
  }
}