Thursday, May 29, 2014

Testing Recovery through Snapshots

In the last post I explained how to test if the state of an akka-persistence based application is properly recovered by replaying the journals after a system restart. This post is about testing the second way of recovering application state supported by akka-persistence: by using snapshots.

Testing Recovery through Snapshots

In addition to recover application state by replaying the entire journal akka-persistence supports taking snapshots of application state and recover from them. This typically decreases recovery time significantly as the state is not reconstructed command-message by command-message but rather all at once and only those commands that arrived after the snapshot was taken need to be replayed message by message.

Enabling Snapshots

To enable the ItemActor to take snapshots we only need a few lines of code:

 1     case SaveSnapshot => saveSnapshot(ItemActorSnapshot(itemById, idCounter))
 2     case SaveSnapshotSuccess(metadata) =>
 3     case SaveSnapshotFailure(metadata, cause) =>
 4 
 5     case SnapshotOffer(_, ItemActorSnapshot(itemMap, lastId)) =>
 6       this.itemById = itemMap
 7       this.idCounter = lastId

The actor reacts on the custom message SaveSnapshot by providing the actor's current state to saveSnapshot. The entire state of the actor is simply modelled as the case class ItemActorSnapshot containing the item-map and the id-counter. saveSnapshot basically responds with SaveSnapshotSuccess or SaveSnapshotFailure depending on the success of the operation and the actor can react accordingly. We do not want to go into the details of handling these properly here, that is why they are simply ignored.

When it comes to recovery of an actor (e.g. after a restart of the application) and akka-persistence finds a snapshot for a Processor it offers that (SnapshotOffer) before replaying messages from the journal that arrived after the snapshot was taken.

Testing Recovery

Testing recovery from snapshots is in principle pretty similar to testing recovery from a journal. We have almost the same steps:

  • start the application
  • modify the application's state by sending corresponding commands
  • take a snapshot
  • stop the application
  • restart the application (and recover from the snapshot)
  • verify the application's state by queries

As these steps are almost identical we should try to reuse as much as possible from the previous test. In fact we can reuse the entire test and just have to take care that a snapshot is taken before the application is stopped. Let's one more time have a look at the central method that starts/stops an application in the tests:

 1   def withApplication[A](persistDir: File)(block: TestApplication => A) = {
 2     val tmpDirPersistenceConfig = ConfigFactory.parseMap(
 3       Map(JournalDirConfig -> new File(persistDir, "journal").getPath,
 4         NativeLevelDbConfig -> false.toString,
 5         SnapshotDirConfig -> new File(persistDir, "snapshots").getPath)
 6         .asJava)
 7     val application = newItemApplication(tmpDirPersistenceConfig)
 8     ultimately(application.shutdown())(block(application))
 9   }
10 
11   def newItemApplication(config: Config) =
12     new ItemApplication(config) with ItemApplicationTestExtensions

The application is shutdown after the test by invoking the corresponding method of the application (line 8). If we are able to inject taking a snapshot here we are basically all set as in this case akka-persistence will find the snapshot after the next restart and recover from that instead of the journal. We can actually easily do this as we already amend the application with some test-extensions and we just have to modify this a bit for taking a snapshot at shutdown.

First we overwrite the newItemApplication method in the trait WithItemApplicationWithSnapshot that extends the original one WithItemApplication to amend the application with a snapshot-specific extension:

1 trait WithItemApplicationWithSnapshot extends WithItemApplication {
2   override def newItemApplication(config: Config) =
3     new ItemApplication(config) with ItemApplicationWithSnapshot
4 }

This extension overwrites the shutdown and also amends the ItemActor by overwriting itemActorProps:

 1 trait ItemApplicationWithSnapshot extends ItemApplicationTestExtensions
 2     with TestUtil { this: ItemApplication =>
 3 
 4   abstract override def itemActorProps: Props =
 5     Props(new ItemActor with RespondToSnapshotRequest)
 6 
 7   abstract override def shutdown() = {
 8     resultOf(itemActor ? SaveSnapshot) match {
 9       case _: SaveSnapshotSuccess =>
10       case SaveSnapshotFailure(_, cause) =>
11         sys.error(s"Saving snapshot failed with: $cause")
12     }
13     super.shutdown()
14   }
15 }

shutdown simply sends a SaveSnapshot message to ItemActor and waits for a response (line 8) before it actually shuts down the application (line 13). However to make ItemActor actually respond to this message it has to be modified slightly and that is why itemActorProps is overwritten as well (line 4) and the returned actor is extended with the trait RespondToSnapshotRequest:

 1 trait RespondToSnapshotRequest extends Actor {
 2 
 3   private var lastSnapshotSender: Option[ActorRef] = None
 4 
 5   abstract override def receive: Receive = respondToSnapshotReceive.orElse(super.receive)
 6 
 7   def respondToSnapshotReceive: Receive = {
 8     case SaveSnapshot =>
 9       lastSnapshotSender = Some(sender())
10       super.receive(SaveSnapshot)
11 
12     case message: SaveSnapshotSuccess =>
13       super.receive(message)
14       respondToSnapshotRequester(message)
15 
16     case message: SaveSnapshotFailure =>
17       super.receive(message)
18       respondToSnapshotRequester(message)
19   }
20 
21   private def respondToSnapshotRequester(response: AnyRef) = {
22     lastSnapshotSender.foreach(_ ! response)
23     lastSnapshotSender = None
24   }
25 }

The receive-method of this trait intercepts SaveSnapshot messages and keeps the sender of the message (line 9) before it continues with normal processing (line 10). The saved sender reference is used to forward the SaveSnapshotSuccess (line 12) or SaveSnapshotFailure (line 16) messages to it.

Armed with this the test for testing successful recovery from snapshots can simply extend the one for testing recovery from the journal but mix-in the WithItemApplicationWithSnapshot-trait:

1 class ItemApplicationRecoverFromSnapshotSpec extends ItemApplicationRecoverSpec
2     with WithItemApplicationWithSnapshot {

So there is no need to redundantly formulate the individual tests for recovery after create, update and delete. To verify how those tests are working we can break the implementation of taking snapshots. We could for example forget to include the id-counter in a snapshot, so stripping down the ItemActorSnapshot to case class ItemActorSnapshot(itemById: Map[ItemId, Item]). Running the tests immediately shows ...oops... that all are running fine. So the bug created by this change is not discovered by the tests. It seems we need an additional one. The difference between recovery from the journal and recovery from a snapshot is that when the state is recovered from the journal the same application logic is triggered as in case of normal operation. So if tests have proven that during normal operation the application's state is not being messed up, its hard to mess it up during recovery (as long as all command-messages really end up in the journal). However in case of snapshots this is different as the state is handled independently from the processed messages and that is why we have to add more tests. The bug we just introduced resets the id-counter to 0 after each restart. To verify that this does not occur we need to create a new item before and after the restart and check if both can be retrieved afterwards. The test looks like this:

 1         val existingItem = withApplication(persistDir) { application =>
 2           application.itemServiceTestExtension.createNewItem()
 3         }
 4         withApplication(persistDir) { application =>
 5           val service = application.itemServiceTestExtension
 6 
 7           val created = service.createNewItem()
 8 
 9           service.findItem(created.id) should be (Some(created))
10           service.findItem(existingItem.id) should be (Some(existingItem))
11         }

First an item is created before the restart (line 2) and another one after the restart (line 7). Then the test asserts that both can be retrieved (line 9-10). With the bug introduced above this test fails (in line 9) as the newly created item gets the same id as the old one and thus overwrites it. Fixing the bug makes the test run fine again.

This blog post showed how one can reuse the tests for recovery from the journal for testing recovery from snapshots. The next one shows if the same ideas can even be applied to testing successful recovery from an old journal.

Friday, May 23, 2014

Testing Recovery

In the last post I introduced the problem area of what and how to test applications that use akka-persistence for persisting their state. I demonstrated the first important test verifying that the custom serializers for the messages to be persisted are actually used. This second part is about testing the successful recovery of the application state after a restart.

Testing Recovery

To test recovery we basically have to execute the following steps:

  • start the application (with an empty journal)
  • modify the application's state by sending corresponding commands
  • stop the application
  • restart the application
  • verify the application's state by queries

While it sounds a little bit odd to start, stop and restart an application in a unit test, with the ItemApplicationFixture that we have seen in the last post it is actually not a big deal. Let's have another quick look at the central method: withApplication

 1   def withApplication[A](persistDir: File)(block: TestApplication => A) = {
 2     val tmpDirPersistenceConfig = ConfigFactory.parseMap(
 3       Map(JournalDirConfig -> new File(persistDir, "journal").getPath,
 4         NativeLevelDbConfig -> false.toString,
 5         SnapshotDirConfig -> new File(persistDir, "snapshots").getPath)
 6         .asJava)
 7     val application = newItemApplication(tmpDirPersistenceConfig)
 8     ultimately(application.shutdown())(block(application))
 9   }
10 
11   def newItemApplication(config: Config) =
12     new ItemApplication(config) with ItemApplicationTestExtensions

As you can see it takes a temporary folder (where journals and snapshots are stored) as one argument (persistDir) and test-code (block) as second argument. The application is started (line 7), the test is executed and in any case (failure or success) the application is shut down (line 8).

Armed with this we can pretty easily start, stop and restart the application in a test, like follows:

 1         val created = withApplication(persistDir) { application =>
 2           application.itemServiceTestExtension.createNewItem()
 3         }
 4         withApplication(persistDir) { application =>
 5           application.itemServiceTestExtension.findItem(created.id) should be (
 6               Some(created))
 7         }

This test starts the application (line 1), creates a new item (line 2) and shuts the application down by ending the block. Immediately after that it is restarted (line 4). As the directory for storing the journal is the same as before (persistDir) this should recover the state of the application from the previously written journal such that the following findItem successfully returns the item created before (line 5-6).

The application that is passed to the block is extended with some convenience functions (like itemServiceTestExtension) and also with a wrapper for the ItemService that eases invoking item-commands by waiting for the returned Futures. Let's have a quick look at createNewItem:

1   def createNewItem(template: ItemTemplate = newItemTemplate()): Item =
2     successOf(service.create(template))

And the implementation of successOf looks like follows:

1   def resultOf[A](future: Future[A]): A = Await.result(future, timeoutDuration)
2 
3   def successOf[A](future: Future[Try[A]]): A = successOf(resultOf(future))
4 
5   def successOf[A](result: Try[A]): A = result.get

Factoring out this kind of code required to handle the Future or Try avoids polluting the test details that do not contribute to the documentation aspect of the test, as the test does not test the creation of new items, but rather just the proper recovery.

Similar tests exist for verifying that an update is recovered successfully:

 1         val updated = withApplication(persistDir) { application =>
 2           val service = application.itemServiceTestExtension
 3 
 4           service.updateItem(service.createNewItem())
 5         }
 6         withApplication(persistDir) { application =>
 7           application.itemServiceTestExtension.findItem(updated.id) should be (
 8               Some(updated))
 9         }

or a delete:

 1         val deleted = withApplication(persistDir) { application =>
 2           val service = application.itemServiceTestExtension
 3 
 4           service.deleteItem(service.createNewItem().id)
 5         }
 6         withApplication(persistDir) { application =>
 7           application.itemServiceTestExtension.findItem(deleted.id) should be (
 8               None)
 9         }

We can easily verify that these tests are actually significant by introducing a bug in our code. For example, if we forget to wrap the UpdateItem command in a Persistent when we send it in ItemService:

1   def update(item: Item): Future[Try[Item]] =
2     (itemActor ? UpdateItem(item)).mapTo[Try[Item]]

as well as when we receive it in ItemActor:

1     case UpdateItem(item: Item) =>

the corresponding test fails with:

Some(Item(1,2 - description)) was not equal to Some(Item(1,3 - description))

This completes the tests for application-state recovery from the journal after a restart. What we still do not know is, if the application state can also be recovered from a snapshot. We will have a closer look into this in the next blog-post.