Thursday, May 29, 2014

Testing Recovery through Snapshots

In the last post I explained how to test if the state of an akka-persistence based application is properly recovered by replaying the journals after a system restart. This post is about testing the second way of recovering application state supported by akka-persistence: by using snapshots.

Testing Recovery through Snapshots

In addition to recover application state by replaying the entire journal akka-persistence supports taking snapshots of application state and recover from them. This typically decreases recovery time significantly as the state is not reconstructed command-message by command-message but rather all at once and only those commands that arrived after the snapshot was taken need to be replayed message by message.

Enabling Snapshots

To enable the ItemActor to take snapshots we only need a few lines of code:

 1     case SaveSnapshot => saveSnapshot(ItemActorSnapshot(itemById, idCounter))
 2     case SaveSnapshotSuccess(metadata) =>
 3     case SaveSnapshotFailure(metadata, cause) =>
 4 
 5     case SnapshotOffer(_, ItemActorSnapshot(itemMap, lastId)) =>
 6       this.itemById = itemMap
 7       this.idCounter = lastId

The actor reacts on the custom message SaveSnapshot by providing the actor's current state to saveSnapshot. The entire state of the actor is simply modelled as the case class ItemActorSnapshot containing the item-map and the id-counter. saveSnapshot basically responds with SaveSnapshotSuccess or SaveSnapshotFailure depending on the success of the operation and the actor can react accordingly. We do not want to go into the details of handling these properly here, that is why they are simply ignored.

When it comes to recovery of an actor (e.g. after a restart of the application) and akka-persistence finds a snapshot for a Processor it offers that (SnapshotOffer) before replaying messages from the journal that arrived after the snapshot was taken.

Testing Recovery

Testing recovery from snapshots is in principle pretty similar to testing recovery from a journal. We have almost the same steps:

  • start the application
  • modify the application's state by sending corresponding commands
  • take a snapshot
  • stop the application
  • restart the application (and recover from the snapshot)
  • verify the application's state by queries

As these steps are almost identical we should try to reuse as much as possible from the previous test. In fact we can reuse the entire test and just have to take care that a snapshot is taken before the application is stopped. Let's one more time have a look at the central method that starts/stops an application in the tests:

 1   def withApplication[A](persistDir: File)(block: TestApplication => A) = {
 2     val tmpDirPersistenceConfig = ConfigFactory.parseMap(
 3       Map(JournalDirConfig -> new File(persistDir, "journal").getPath,
 4         NativeLevelDbConfig -> false.toString,
 5         SnapshotDirConfig -> new File(persistDir, "snapshots").getPath)
 6         .asJava)
 7     val application = newItemApplication(tmpDirPersistenceConfig)
 8     ultimately(application.shutdown())(block(application))
 9   }
10 
11   def newItemApplication(config: Config) =
12     new ItemApplication(config) with ItemApplicationTestExtensions

The application is shutdown after the test by invoking the corresponding method of the application (line 8). If we are able to inject taking a snapshot here we are basically all set as in this case akka-persistence will find the snapshot after the next restart and recover from that instead of the journal. We can actually easily do this as we already amend the application with some test-extensions and we just have to modify this a bit for taking a snapshot at shutdown.

First we overwrite the newItemApplication method in the trait WithItemApplicationWithSnapshot that extends the original one WithItemApplication to amend the application with a snapshot-specific extension:

1 trait WithItemApplicationWithSnapshot extends WithItemApplication {
2   override def newItemApplication(config: Config) =
3     new ItemApplication(config) with ItemApplicationWithSnapshot
4 }

This extension overwrites the shutdown and also amends the ItemActor by overwriting itemActorProps:

 1 trait ItemApplicationWithSnapshot extends ItemApplicationTestExtensions
 2     with TestUtil { this: ItemApplication =>
 3 
 4   abstract override def itemActorProps: Props =
 5     Props(new ItemActor with RespondToSnapshotRequest)
 6 
 7   abstract override def shutdown() = {
 8     resultOf(itemActor ? SaveSnapshot) match {
 9       case _: SaveSnapshotSuccess =>
10       case SaveSnapshotFailure(_, cause) =>
11         sys.error(s"Saving snapshot failed with: $cause")
12     }
13     super.shutdown()
14   }
15 }

shutdown simply sends a SaveSnapshot message to ItemActor and waits for a response (line 8) before it actually shuts down the application (line 13). However to make ItemActor actually respond to this message it has to be modified slightly and that is why itemActorProps is overwritten as well (line 4) and the returned actor is extended with the trait RespondToSnapshotRequest:

 1 trait RespondToSnapshotRequest extends Actor {
 2 
 3   private var lastSnapshotSender: Option[ActorRef] = None
 4 
 5   abstract override def receive: Receive = respondToSnapshotReceive.orElse(super.receive)
 6 
 7   def respondToSnapshotReceive: Receive = {
 8     case SaveSnapshot =>
 9       lastSnapshotSender = Some(sender())
10       super.receive(SaveSnapshot)
11 
12     case message: SaveSnapshotSuccess =>
13       super.receive(message)
14       respondToSnapshotRequester(message)
15 
16     case message: SaveSnapshotFailure =>
17       super.receive(message)
18       respondToSnapshotRequester(message)
19   }
20 
21   private def respondToSnapshotRequester(response: AnyRef) = {
22     lastSnapshotSender.foreach(_ ! response)
23     lastSnapshotSender = None
24   }
25 }

The receive-method of this trait intercepts SaveSnapshot messages and keeps the sender of the message (line 9) before it continues with normal processing (line 10). The saved sender reference is used to forward the SaveSnapshotSuccess (line 12) or SaveSnapshotFailure (line 16) messages to it.

Armed with this the test for testing successful recovery from snapshots can simply extend the one for testing recovery from the journal but mix-in the WithItemApplicationWithSnapshot-trait:

1 class ItemApplicationRecoverFromSnapshotSpec extends ItemApplicationRecoverSpec
2     with WithItemApplicationWithSnapshot {

So there is no need to redundantly formulate the individual tests for recovery after create, update and delete. To verify how those tests are working we can break the implementation of taking snapshots. We could for example forget to include the id-counter in a snapshot, so stripping down the ItemActorSnapshot to case class ItemActorSnapshot(itemById: Map[ItemId, Item]). Running the tests immediately shows ...oops... that all are running fine. So the bug created by this change is not discovered by the tests. It seems we need an additional one. The difference between recovery from the journal and recovery from a snapshot is that when the state is recovered from the journal the same application logic is triggered as in case of normal operation. So if tests have proven that during normal operation the application's state is not being messed up, its hard to mess it up during recovery (as long as all command-messages really end up in the journal). However in case of snapshots this is different as the state is handled independently from the processed messages and that is why we have to add more tests. The bug we just introduced resets the id-counter to 0 after each restart. To verify that this does not occur we need to create a new item before and after the restart and check if both can be retrieved afterwards. The test looks like this:

 1         val existingItem = withApplication(persistDir) { application =>
 2           application.itemServiceTestExtension.createNewItem()
 3         }
 4         withApplication(persistDir) { application =>
 5           val service = application.itemServiceTestExtension
 6 
 7           val created = service.createNewItem()
 8 
 9           service.findItem(created.id) should be (Some(created))
10           service.findItem(existingItem.id) should be (Some(existingItem))
11         }

First an item is created before the restart (line 2) and another one after the restart (line 7). Then the test asserts that both can be retrieved (line 9-10). With the bug introduced above this test fails (in line 9) as the newly created item gets the same id as the old one and thus overwrites it. Fixing the bug makes the test run fine again.

This blog post showed how one can reuse the tests for recovery from the journal for testing recovery from snapshots. The next one shows if the same ideas can even be applied to testing successful recovery from an old journal.

No comments:

Post a Comment