Akka Tutorials
In this section, we will show how to use Akka to help you build scalable and reactive applications.
If you are not familiar with IntelliJ and Scala, feel free to review our previous tutorials on IntelliJ and Scala. So let's get started!
Source Code:
- The source code is available on the allaboutscala GitHub repository.
Introduction:
Akka Actors:
- Actor System Introduction
- Tell Pattern
- Ask Pattern
- Ask Pattern mapTo
- Ask Pattern pipeTo
- Actor Hierarchy
- Actor Lookup
- Child actors
- Actor Lifecycle
- Actor PoisonPill
- Error Kernel Supervision
Akka Routers:
Akka Dispatchers:
- Akka Default Dispatcher
- Akka Lookup Dispatcher
- Fixed Thread Pool Dispatcher
- Resizable Thread Pool Dispatcher
- Pinned Thread Pool Dispatcher
Akka FSM:
- Actor FSM become()
- Actor FSM unbecome()
- Actor FSM protocol
- Actor LoggingFSM
- Actor LoggingFSM Part Two
- Actor LoggingFSM Part Three
- Actor LoggingFSM Part Four
- Actor LoggingFSM Part Five
- Actor LoggingFSM Part Six
- Actor FSM Scheduler
Akka TestKit:
- Testing Actor FSM
- Testing Actor
- Testing Akka HTTP POST
- Testing Query Parameter
- Testing Required Query Parameter
- Testing Optional Query Parameter
- Testing Typed Query Parameter
- Testing CSV Query Parameter
Akka HTTP:
- Akka HTTP project setup build.sbt
- Start Akka HTTP server
- HTTP GET plain text
- HTTP GET JSON response
- JSON encoding
- JSON pretty print
- HTTP POST JSON payload
- Could not find implicit value
- HTTP DELETE restriction
- Future onSuccess
- Future onComplete
- Complete with an HttpResponse
- Try failure using an HttpResponse
- Global rejection handler
- Global exception handler
- Load HTML from resources
- RESTful URLs with segment
- RESTful URLs with regex
- RESTful URLs multiple segments
- Query parameter
- Optional query parameter
- Typed query parameters
- CSV query parameter
- Query parameter to case class
- HTTP request headers
- HTTP client GET
- Unmarshal HttpResponse to case class
- HTTP client POST JSON
- Akka HTTP CRUD project
- Akka HTTP CRUD project - part 2
- Akka HTTP CRUD project - part 3
What is Akka?
Akka is a suite of modules which allows you to build distributed and reliable systems by leaning on the actor model.
The actor model puts emphasis on avoiding the use of locks in your system, in favour of parallelism and concurrency. As a result, actors are those 'things' that would 'react' to messages, and perhaps run some computation and/or respond to another actor via message passing. The actor model has been around for a while, and was certainly made popular by languages such as Erlang.
Akka brings similar features around concurrency and parallelism onto the JVM, and you can use either Java or Scala with the Akka libraries. Without any surprises, our Akka code snippets below will be making use of Scala :)
For the purpose of this Akka tutorial, we will be using the latest version which is currently 2.5.12. All the links to the official Akka documentation will also refer to version 2.5.12.
The Akka eco-system has evolved fairly rapidly over the past few years. Below is a quick overview of the various Akka modules for which we will provide code snippets:
Akka Actor:
This module introduces the Actor System, functions and utilities that Akka provides to support the actor model and message passing. For additional information, you can refer to the official Akka documentation on Actors.
Akka HTTP:
As the name implies, this module is typically best suited for middle-tier applications which require an HTTP endpoint. As an example, you could use Akka HTTP to expose a REST endpoint that interfaces with a storage layer such as a database. For additional information, you can refer to the official Akka documentation on Akka HTTP.
Akka Streams:
This module is useful when you are working on data pipelines or even stream processing. For additional information, you can refer to the official Akka documentation on Akka Streams.
Akka Networking:
This module provides the foundation for having actor systems being able to connect to each other remotely over some predefined network transport such as TCP. For additional information, you can refer to the official Akka documentation on Akka Networking.
Akka Clustering:
This module is an extension of the Akka Networking module. It is useful in scaling distributed applications by have actors form a quorum and work together by some predefined membership protocol. For additional information, you can refer to the official Akka documentation on Akka Clustering.
Project setup build.sbt
As a reminder, we will be using the latest version of Akka, which is 2.5.12, as per this writing. Below is a code snippet of the library dependencies in your build.sbt to import the Akka modules. Note that we're also making use of Scala 2.12.
scalaVersion := "2.12.4"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.5.12",
"com.typesafe.akka" %% "akka-testkit" % "2.5.12" % Test
)
Actor System Introduction
In this tutorial, you will create your first Akka ActorSystem. For the purpose of this tutorial, you can think of the ActorSystem as a 'black' box that will eventually hold your actors, and allow you to interact with them.
As a reminder, the actor model puts message passing as a first class citizen. If the actor model is still obscure at this time, that's OKAY. In the upcoming tutorials, we will demonstrate how to design protocols that our actors will understand and react to.
Defining our actor system using Akka is super easy. You simply need to create an instance of akka.actor.ActorSystem. In the code snippet below, we will give our actor system a name: DonutStoreActorSystem.
println("Step 1: create an actor system")
val system = ActorSystem("DonutStoreActorSystem")
For the purpose of this tutorial, we are not doing much with our actor system :( But, it is important to show that you can also close it by calling the terminate() method. Obviously, you would perhaps want to close your actor system when your application is shutting down, similar to how you would cleanly close any open resources such as database connections.
println("\nStep 2: close the actor system")
val isTerminated = system.terminate()
For completeness, we'll also register a Future onComplete() callback to know that our actor system was successfully shutdown. If you are not familiar with registering future callbacks, feel free to review our tutorial on Future onComplete().
println("\nStep 3: Check the status of the actor system")
isTerminated.onComplete {
case Success(result) => println("Successfully terminated actor system")
case Failure(e) => println("Failed to terminate actor system")
}
Thread.sleep(5000)
Tell Pattern
In this tutorial, we will create our first Akka Actor, and show how to design a simple message passing protocol to interact with our Actor. As a matter of fact, Akka provides various interaction patterns, and the one which we will use in this tutorial is called the Tell Pattern. This pattern is useful when you need to send a message to an actor, but do not expect to receive a response. As a result, it is also commonly referred to as "fire and forget".
As per the previous Actor System Introduction tutorial, we instantiate an ActorSystem, and give it the name DonutStoreActorSystem.
println("Step 1: Create an actor system")
val system = ActorSystem("DonutStoreActorSystem")
Next, we design a simple message passing protocol, and use a Case Class to encapsulate the message. As such, we create a case class named Info, and for now it holds a single property: name of type String.
println("\nStep 2: Define the message passing protocol for our DonutStoreActor")
object DonutStoreProtocol {
case class Info(name: String)
}
Creating an Akka Actor is really easy. All you have to do is have a class extend the Actor trait. Akka also comes built-in with a logging utility for actors, and you can access it by simply adding the ActorLogging trait.
Inside our actor, the primary method we are interested in at the moment is the receive method. The receive method is the place where you instruct your actor which messages or protocols it is designed to react to. For our DonutInfoActor below, it will react to Info messages, where the actor will simply print the name property.
println("\nStep 3: Define DonutInfoActor")
class DonutInfoActor extends Actor with ActorLogging {
import Tutorial_02_Tell_Pattern.DonutStoreProtocol._
def receive = {
case Info(name) =>
log.info(s"Found $name donut")
}
}
As of now, the DonutStoreActorSystem was empty. Let's go ahead and use the actorOf() method to create our DonutInfoActor.
println("\nStep 4: Create DonutInfoActor")
val donutInfoActor = system.actorOf(Props[DonutInfoActor], name = "DonutInfoActor")
To send a message to our DonutInfoActor using the Akka Tell Pattern, you can use the bang operator ! as shown below.
println("\nStep 5: Akka Tell Pattern")
import DonutStoreProtocol._
donutInfoActor ! Info("vanilla")
Finally, we close our actor system by calling the terminate() method.
println("\nStep 6: close the actor system")
val isTerminated = system.terminate()
You should see the following output when you run your Scala application in IntelliJ:
[INFO] [06/22/2018 15:40:33.443] [DonutStoreActorSystem-akka.actor.default-dispatcher-2] [akka://DonutStoreActorSystem/user/DonutInfoActor] Found vanilla donut
Ask Pattern
In this tutorial, we will show another interaction pattern for Akka actors, namely the Ask Pattern. This pattern allows you to send a message to an actor, and get a response back. A reminder from our previous tutorial that with the Akka Tell Pattern, you do not get back a reply from the actor.
We start by creating an ActorSystem and, similar to the previous examples, we'll name it DonutStoreActorSystem.
println("Step 1: Create an actor system")
val system = ActorSystem("DonutStoreActorSystem")
For now, we'll keep the message passing protocol unchanged from our previous examples. Our message is represented by the case class named Info, and it has a single property: name of type String.
println("\nStep 2: Define the message passing protocol for our DonutStoreActor")
object DonutStoreProtocol {
case class Info(name: String)
}
Similar to the Tell Pattern example, creating an Akka actor is as easy as extending the Actor trait, and implementing the receive method. Within the receive body, we will use the sender method to reply back to the source where the message was originated. In our simple example below, we return true, for cases when the name property of the Info message is vanilla, and we return false, for all other value of the name property.
println("\nStep 3: Create DonutInfoActor")
class DonutInfoActor extends Actor with ActorLogging {
import Tutorial_03_Ask_Pattern.DonutStoreProtocol._
def receive = {
case Info(name) if name == "vanilla" =>
log.info(s"Found valid $name donut")
sender ! true
case Info(name) =>
log.info(s"$name donut is not supported")
sender ! false
}
}
To create the DonutInfoActor within our DonutStoreActorSystem, we use the actorOf() method as shown below.
println("\nStep 4: Create DonutInfoActor")
val donutInfoActor = system.actorOf(Props[DonutInfoActor], name = "DonutInfoActor")
To use the Akka Ask Pattern, you have to make use of the ? operator. As a reminder, for the Akka Tell Pattern, we used the ! operator. The Ask Pattern will return a future, and for the purpose of this example, we will use a for comprehension to print back the response from the DonutInfoActor. If you are new to using Futures in Scala, feel free to review our Futures tutorial to learn about creating asynchronous non-blocking operations using futures.
println("\nStep 5: Akka Ask Pattern")
import DonutStoreProtocol._
import akka.pattern._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
implicit val timeout = Timeout(5 second)
val vanillaDonutFound = donutInfoActor ? Info("vanilla")
for {
found <- vanillaDonutFound
} yield (println(s"Vanilla donut found = $found"))
val glazedDonutFound = donutInfoActor ? Info("glazed")
for {
found <- glazedDonutFound
} yield (println(s"Glazed donut found = $found"))
Thread.sleep(5000)
Finally, we call the terminate() method to shutdown our Actor System.
println("\nStep 6: Close the actor system")
val isTerminated = system.terminate()
You should see the following output when you run your Scala application in IntelliJ:
[INFO] [06/22/2018 21:37:20.381] [DonutStoreActorSystem-akka.actor.default-dispatcher-4] [akka://DonutStoreActorSystem/user/DonutInfoActor] Found valid vanilla donut
Vanilla donut found = true
[INFO] [06/22/2018 21:37:20.391] [DonutStoreActorSystem-akka.actor.default-dispatcher-2] [akka://DonutStoreActorSystem/user/DonutInfoActor] glazed donut is not supported
Glazed donut found = false
Ask Pattern mapTo
This tutorial is an extension from our previous example on Akka's Ask Pattern. The actor system, protocol and actor remain identical to the code snippet we've used in the Ask Pattern tutorial. We will instead focus this tutorial on the Future.mapTo() method. It can be used to provide a type to the future returned by the Ask Pattern.
Similar to our previous examples, we create an ActorSystem named DonutStoreActorSystem which will hold our actors.
println("Step 1: Create an actor system")
val system = ActorSystem("DonutStoreActorSystem")
We keep our simple Info case class which represents the message that our actor will react to.
println("\nStep 2: Define the message passing protocol for our DonutStoreActor")
object DonutStoreProtocol {
case class Info(name: String)
}
The DonutInfoActor is unchanged from our previous Akka Ask Pattern example. The actor reacts to messages of type Info. It will send a Boolean reply value of true, when the name property is vanilla, and a value of false, for all other values of the name property.
println("\nStep 3: Create DonutInfoActor")
class DonutInfoActor extends Actor with ActorLogging {
import Tutorial_04_Ask_Pattern_MapTo.DonutStoreProtocol._
def receive = {
case Info(name) if name == "vanilla" =>
log.info(s"Found valid $name donut")
sender ! true
case Info(name) =>
log.info(s"$name donut is not supported")
sender ! false
}
}
Using the actorOf() method, we create a DonutInfoActor into our actor system.
println("\nStep 4: Create DonutInfoActor")
val donutInfoActor = system.actorOf(Props[DonutInfoActor], name = "DonutInfoActor")
Using the Future.mapTo() method, we can map the return type from the actor to a specific type. In our example, the return type is mapped to the Boolean type.
println("\nStep 5: Akka Ask Pattern and future mapTo()")
import DonutStoreProtocol._
import akka.pattern._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
implicit val timeout = Timeout(5 second)
val vanillaDonutFound: Future[Boolean] = (donutInfoActor ? Info("vanilla")).mapTo[Boolean]
for {
found <- vanillaDonutFound
} yield println(s"Vanilla donut found = $found")
Thread.sleep(5000)
Finally, we call the terminate() method to shutdown our actor system.
println("\nStep 6: close the actor system")
val isTerminated = system.terminate()
You should see the following output when you run your Scala application in IntelliJ:
[INFO] [06/22/2018 20:34:36.144] [DonutStoreActorSystem-akka.actor.default-dispatcher-4] [akka://DonutStoreActorSystem/user/DonutInfoActor] Found valid vanilla donut
Vanilla donut found = true
Ask Pattern pipeTo
We continue our discussion on the Akka Ask Pattern, and will show another handy utility named pipeTo(). It attaches to a Future operation by registering the Future andThen callback to allow you to easily send the result back to the sender. Additional details on using pipeTo() can be found in the official Akka documentation.
As per our previous example, we create an actor system named DonutStoreActorSystem which will hold our Akka actors.
println("Step 1: Create an actor system")
val system = ActorSystem("DonutStoreActorSystem")
In this step, we will augment our message passing protocol by defining another feature named CheckStock. In a real-life application, you could imagine that the CheckStock message would check the stock quantity for a given donut.
println("\nStep 2: Define the message passing protocol for our DonutStoreActor")
object DonutStoreProtocol {
case class Info(name: String)
case class CheckStock(name: String)
}
We create a DonutStockActor which will react to a CheckStock message. It will then delegate the call to a future operation named findStock(), which does the actual stock lookup. In order to send the result back to the sender, we use the handy pipeTo() method as shown below.
println("\nStep 3: Create DonutStockActor")
class DonutStockActor extends Actor with ActorLogging {
import Tutorial_05_Ask_Pattern_pipeTo.DonutStoreProtocol._
def receive = {
case CheckStock(name) =>
log.info(s"Checking stock for $name donut")
findStock(name).pipeTo(sender)
}
def findStock(name: String): Future[Int] = Future {
// assume a long running database operation to find stock for the given donut
100
}
}
We create the DonutStockActor into our DonutStoreActorSystem by calling the actorOf() method.
println("\nStep 4: Create DonutStockActor")
val donutStockActor = system.actorOf(Props[DonutStockActor], name = "DonutStockActor")
Similar to the previous mapTo() example, we use the Akka Ask Pattern to check the donut stock. Note that with the mapTo() method, we are mapping the return type from the actor to a specific type (in this case, an Int type).
println("\nStep 5: Akka Ask Pattern using mapTo() method")
import DonutStoreProtocol._
import akka.pattern._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
implicit val timeout = Timeout(5 second)
val vanillaDonutStock: Future[Int] = (donutStockActor ? CheckStock("vanilla")).mapTo[Int]
for {
found <- vanillaDonutStock
} yield println(s"Vanilla donut stock = $found")
Thread.sleep(5000)
Finally, we close our actor system by calling the terminate() method.
println("\nStep 6: Close the actor system")
val isTerminated = system.terminate()
You should see the following output when you run your Scala application in IntelliJ:
[INFO] [06/26/2018 20:47:55.499] [DonutStoreActorSystem-akka.actor.default-dispatcher-2] [akka://DonutStoreActorSystem/user/DonutStockActor] Checking stock for vanilla donut
Vanilla donut stock = 100
Actor Hierarchy
In this tutorial, we will re-use the Tell Pattern example, and discuss a bit further Akka Actor Hierarchy. As a reminder, the Actor Model favours a design principle of using actor-to-actor message passing, as we've seen in the previous code snippets. As such, it is important to note that the actors within our Actor System exist in a hierarchy.
The diagram below from the official Akka documentation provides a good visual overview of the actor hierarchy as supported by Akka.
At the top of the hierarchy is the root guardian. You can think of it as the "top-most" actor which monitors the entire actor system. Next is the system guardian, which is the "top-level" actor in charge of any system level actors. On the left hand side, you will notice the user guardian, and this is the "top-level" actor hierarchy for actors that we create in our actor system. Note also that you can nest hierarchies as shown by the someChild actor.
For now, let's focus on the user guardian. If you recall from the Tell Pattern example, the DonutInfoActor was logging the statement below when it received an Info message:
[INFO] [05/09/2018 11:36:47.095] [DonutStoreActorSystem-akka.actor.default-dispatcher-2] [akka://DonutStoreActorSystem/user/DonutInfoActor] Found vanilla donut
From the above log statement, you can see the hierarchy for DonutInfoActor: akka://DonutStoreActorSystem/user/DonutInfoActor. For lack of a better comparison, think of the path to DonutInfoActor similar to a path or location for some file in a file system. As such, DonutInfoActor is under the user guardian, and our actor system is also displayed by the name: DonutStoreActorSystem (which is the name we used when creating the actor system).
Understanding actor hierarchies is important to help us design systems which follow the Fail-Fast principle. In upcoming tutorials, we will show how to design actor systems to isolate failures, and plan ahead with recovery options. What we've learned on actor hierarchies will be the foundation to design resilient actor systems that can deal with failures.
Actor Lookup
In this tutorial, we will continue our discussion on Actor Hierarchy, and show how we can use an actor's path to find it within an actor system. For the purpose of this example, we will reuse the code snippets from our Tell Pattern tutorial. You can find additional information on Actor paths from the official Akka documentation on Actor Paths and Addressing.
By now, you should be familiar with creating an Actor System as shown below:
println("Step 1: Create an actor system")
val system = ActorSystem("DonutStoreActorSystem")
Next, we define a simple protocol for message passing with our actor.
println("\nStep 2: Define the message passing protocol for our DonutStoreActor")
object DonutStoreProtocol {
case class Info(name: String)
}
In this step, we create a DonutInfoActor by extending the akka.actor trait.
println("\nStep 3: Define DonutInfoActor")
class DonutInfoActor extends Actor with ActorLogging {
def receive = {
case Info(name) =>
log.info(s"Found $name donut")
}
}
With our DonutInfoActor defined from Step 3, we create it in our actor system by using actorOf() method.
println("\nStep 4: Create DonutInfoActor")
val donutInfoActor = system.actorOf(Props[DonutInfoActor], name = "DonutInfoActor")
In the Tell Pattern example, we used the reference to the DonutInfoActor to send an Info message to it using the bang operator !.
println("\nStep 5: Akka Tell Pattern")
import DonutStoreProtocol._
donutInfoActor ! Info("vanilla")
In the previous tutorial on Actor Hierarchy, we explained that the actors we create in our actor system reside under the /user/ hierarchy. As such, we should be able to find the DonutInfoActor by its path: /user/DonutInfoActor. Similar to an ActorRef, we can then use the bang operator ! to send a message to it using the Tell Pattern.
println("\nStep 6: Find Actor using actorSelection() method")
system.actorSelection("/user/DonutInfoActor") ! Info("chocolate")
You should see the following output when you run your Scala application in IntelliJ:
[INFO] [06/26/2018 20:09:54.763] [DonutStoreActorSystem-akka.actor.default-dispatcher-4] [akka://DonutStoreActorSystem/user/DonutInfoActor] Found chocolate donut
As another example, you can also use the actorSelection() method to send a message to all the actors in an actor system by making use of the wildcard: /user/*
system.actorSelection("/user/*") ! Info("vanilla and chocolate")
You should see the following output when you run your Scala application in IntelliJ:
[INFO] [06/26/2018 20:09:54.763] [DonutStoreActorSystem-akka.actor.default-dispatcher-4] [akka://DonutStoreActorSystem/user/DonutInfoActor] Found vanilla and chocolate donut
Perhaps, one could ask why not simply make use of the ActorRef, which we've seen so far when creating the Actor using system.actorOf() method. Another feature that Akka provides is that actor systems can be local within a JVM, or remotely hosted on a JVM. The message passing between actors is transparent to the end user, and Akka handles the underlying transport. In upcoming tutorials, we will see examples of remote actors.
Finally, we use the terminate() method to close our actor system.
println("\nStep 7: close the actor system")
val isTerminated = system.terminate()
Child Actors
In this tutorial, we will continue our journey into better understanding Actor hierarchies and paths. More precisely, we will introduce child actors, i.e. an actor which is spawn (and potentially managed and supervised) from within an actor. For now, let's not focus on the supervision, as we will discuss Akka Actor supervision strategies in upcoming tutorials.
As the usual first step, we create an actor system named DonutStoreActorSystem.
println("Step 1: Create an actor system")
val system = ActorSystem("DonutStoreActorSystem")
We then define the protocol for message passing with our actors.
println("\nStep 2: Define the message passing protocol for our DonutStoreActor")
object DonutStoreProtocol {
case class Info(name: String)
}
In this step, we will create a new actor named BakingActor by extending the usual akka.actor trait. This actor will be a child actor of DonutInfoActor below.
println("\nStep 3: Define a BakingActor and a DonutInfoActor")
class BakingActor extends Actor with ActorLogging {
def receive = {
case Info(name) =>
log.info(s"BakingActor baking $name donut")
}
}
Creating DonutInfoActor is similar to previous examples, i.e. we extend the akka.actor trait. Note however that we need to make use of the actor context value, and will use its actorOf() method to create a BakingActor. By using the actor context value, the BakingActor will be a child actor of DonutInfoActor.
From a hierarchy point of view, we can assume that DonutInfoActor will reside under the /user hierarchy: /user/DonutInfoActor. This would mean that the child BakingActor's hierarchy and path would be /user/DonutInfoActor/BakingActor. To keep this example simple, inside the receive method of the DonutInfoActor, we will simply forward the same Info message to the BakingActor.
class DonutInfoActor extends Actor with ActorLogging {
val bakingActor = context.actorOf(Props[BakingActor], name = "BakingActor")
def receive = {
case msg @ Info(name) =>
log.info(s"Found $name donut")
bakingActor forward msg
}
}
We only need to create the DonutInfoActor as it will internally be responsible for the creation of the child actor BakingActor.
println("\nStep 4: Create DonutInfoActor")
val donutInfoActor = system.actorOf(Props[DonutInfoActor], name = "DonutInfoActor")
Sending an Info message to the DonutInfoActor using the Tell Pattern (fire-and-forget message passing style), will internally forward the message to the child BakingActor.
println("\nStep 5: Akka Tell Pattern")
import DonutStoreProtocol._
donutInfoActor ! Info("vanilla")
Thread.sleep(3000)
Finally, we close the actor system by using the terminate() method.
println("\nStep 6: close the actor system")
val isTerminated = system.terminate()
You should see the following output when you run your Scala application in IntelliJ:
[INFO] [06/29/2018 20:46:10.626] [DonutStoreActorSystem-akka.actor.default-dispatcher-3] [akka://DonutStoreActorSystem/user/DonutInfoActor] Found vanilla donut
[INFO] [06/29/2018 20:46:10.627] [DonutStoreActorSystem-akka.actor.default-dispatcher-4] [akka://DonutStoreActorSystem/user/DonutInfoActor/BakingActor] BakingActor baking vanilla donut
Actor Lifecycle
So far, we've gone through various examples on creating actors, understanding actor hierarchies and paths, and creating child actors. It is perhaps a good place to introduce Actor Lifecycles. Every Akka actor within our actor system follows a lifecycle, which represents the main events of an Actor from creation to deletion. For that matter, knowing when an Actor is about to start is perhaps important in a real-life application, where you may need to open a connection to a database session. Similarly, you should close the database session if the actor is stopped or crashed.
The diagram below from the official Akka documentation provides a good visual description of Actor lifecycle.
We create a DonutStoreActorSystem similar to the previous tutorials.
println("Step 1: Create an actor system")
val system = ActorSystem("DonutStoreActorSystem")
We define the protocol for message passing with our Actors.
println("\nStep 2: Define the message passing protocol for our DonutStoreActor")
object DonutStoreProtocol {
case class Info(name: String)
}
In the BackingActor, we override the Actor lifecycle events namely preStart(), postStop(), preRestart() and postRestart() methods.
println("\nStep 3: Define a BakingActor and a DonutInfoActor")
class BakingActor extends Actor with ActorLogging {
override def preStart(): Unit = log.info("prestart")
override def postStop(): Unit = log.info("postStop")
override def preRestart(reason: Throwable, message: Option[Any]): Unit = log.info("preRestart")
override def postRestart(reason: Throwable): Unit = log.info("postRestart")
def receive = {
case Info(name) =>
log.info(s"BakingActor baking $name donut")
}
}
Similar to the BackingActor, we override the Actor lifecycle events, and will simply log a message to know which event is being triggered during the life-time of our actor.
class DonutInfoActor extends Actor with ActorLogging {
override def preStart(): Unit = log.info("prestart")
override def postStop(): Unit = log.info("postStop")
override def preRestart(reason: Throwable, message: Option[Any]): Unit = log.info("preRestart")
override def postRestart(reason: Throwable): Unit = log.info("postRestart")
val bakingActor = context.actorOf(Props[BakingActor], name = "BakingActor")
def receive = {
case msg @ Info(name) =>
log.info(s"Found $name donut")
bakingActor forward msg
}
}
We create the DonutInfoActor by using the system.actorOf() method.
println("\nStep 4: Create DonutInfoActor")
val donutInfoActor = system.actorOf(Props[DonutInfoActor], name = "DonutInfoActor")
Using the Tell Pattern, we send an Info message to the DonutInfoActor.
println("\nStep 5: Akka Tell Pattern")
import DonutStoreProtocol._
donutInfoActor ! Info("vanilla")
Thread.sleep(5000)
Finally, we close the actor system by calling the terminate() method.
println("\nStep 6: close the actor system")
val isTerminated = system.terminate()
You should see the following output when you run your Scala application in IntelliJ:
[INFO] [06/29/2018 21:26:19.880] [DonutStoreActorSystem-akka.actor.default-dispatcher-4] [akka://DonutStoreActorSystem/user/DonutInfoActor/BakingActor] prestart
[INFO] [06/29/2018 21:26:19.880] [DonutStoreActorSystem-akka.actor.default-dispatcher-2] [akka://DonutStoreActorSystem/user/DonutInfoActor] prestart
[INFO] [06/29/2018 21:26:19.882] [DonutStoreActorSystem-akka.actor.default-dispatcher-2] [akka://DonutStoreActorSystem/user/DonutInfoActor] Found vanilla donut
[INFO] [06/29/2018 21:26:19.883] [DonutStoreActorSystem-akka.actor.default-dispatcher-4] [akka://DonutStoreActorSystem/user/DonutInfoActor/BakingActor] BakingActor baking vanilla donut
[INFO] [06/29/2018 21:26:24.885] [DonutStoreActorSystem-akka.actor.default-dispatcher-2] [akka://DonutStoreActorSystem/user/DonutInfoActor/BakingActor] postStop
[INFO] [06/29/2018 21:26:24.886] [DonutStoreActorSystem-akka.actor.default-dispatcher-3] [akka://DonutStoreActorSystem/user/DonutInfoActor] postStop
Actor PoisonPill
We continue our discussion from the previous Actor Lifecycle tutorial. In the last example, we showed how you can override actor events such as the preStart() or postStop() methods. In this tutorial, we will show how you can use akka.actor.PoisonPill, which is a special message that you send to terminate or stop an actor. When the PoisonPill message has been received by the actor, you will be able to see the actor stop event being triggered from our log messages.
We create a DonutStoreActorSystem similar to previous tutorials.
println("Step 1: Create an actor system")
val system = ActorSystem("DonutStoreActorSystem")
Next, we define the protocol for message passing with our Actors.
println("\nStep 2: Define the message passing protocol for our DonutStoreActor")
object DonutStoreProtocol {
case class Info(name: String)
}
In the BackingActor, we override the Actor lifecycle events namely the preStart(), postStop(), preRestart() and postRestart() methods.
println("\nStep 3: Define a BakingActor and a DonutInfoActor")
class BakingActor extends Actor with ActorLogging {
override def preStart(): Unit = log.info("prestart")
override def postStop(): Unit = log.info("postStop")
override def preRestart(reason: Throwable, message: Option[Any]): Unit = log.info("preRestart")
override def postRestart(reason: Throwable): Unit = log.info("postRestart")
def receive = {
case Info(name) =>
log.info(s"BakingActor baking $name donut")
}
}
Similar to the BackingActor, we override the Actor lifecycle events, and will simply log a message to know which event is being triggered during the life-time of our actor.
class DonutInfoActor extends Actor with ActorLogging {
override def preStart(): Unit = log.info("prestart")
override def postStop(): Unit = log.info("postStop")
override def preRestart(reason: Throwable, message: Option[Any]): Unit = log.info("preRestart")
override def postRestart(reason: Throwable): Unit = log.info("postRestart")
val bakingActor = context.actorOf(Props[BakingActor], name = "BakingActor")
def receive = {
case msg @ Info(name) =>
log.info(s"Found $name donut")
bakingActor forward msg
}
}
We create the DonutInfoActor by using the system.actorOf() method.
println("\nStep 4: Create DonutInfoActor")
val donutInfoActor = system.actorOf(Props[DonutInfoActor], name = "DonutInfoActor")
Using the Tell Pattern, we send an Info message to the DonutInfoActor, and then we will send a PoisonPill message to the actor. Following the PosionPill message, we will try and send another Info message to the actor.
println("\nStep 5: Akka Tell Pattern")
import DonutStoreProtocol._
donutInfoActor ! Info("vanilla")
Next, we send the PoisonPill message to terminate the DonutInfoActor.
donutInfoActor ! PoisonPill
donutInfoActor ! Info("plain")
Thread.sleep(5000)
Since DonutInfoActor should no longer exist within our actor system, the Info message will not be consumed. It will instead be logged into a separate dead-letters hierarchy, which is a placeholder for any message that cannot be delivered within an actor system. You can find additional information on Akka dead-letters from the official Akka documentation.
Finally, we close the actor system by calling the terminate() method.
println("\nStep 6: close the actor system")
val isTerminated = system.terminate()
You should see the following output when you run your Scala application in IntelliJ:
[INFO] [07/03/2018 20:29:44.475] [DonutStoreActorSystem-akka.actor.default-dispatcher-4] [akka://DonutStoreActorSystem/user/DonutInfoActor/BakingActor] prestart
[INFO] [07/03/2018 20:29:44.475] [DonutStoreActorSystem-akka.actor.default-dispatcher-5] [akka://DonutStoreActorSystem/user/DonutInfoActor] prestart
[INFO] [07/03/2018 20:29:44.477] [DonutStoreActorSystem-akka.actor.default-dispatcher-5] [akka://DonutStoreActorSystem/user/DonutInfoActor] Found vanilla donut
[INFO] [07/03/2018 20:29:44.477] [DonutStoreActorSystem-akka.actor.default-dispatcher-4] [akka://DonutStoreActorSystem/user/DonutInfoActor/BakingActor] BakingActor baking vanilla donut
[INFO] [07/03/2018 20:29:44.477] [DonutStoreActorSystem-akka.actor.default-dispatcher-2] [akka://DonutStoreActorSystem/user/DonutInfoActor/BakingActor] postStop
[INFO] [07/03/2018 20:29:44.477] [DonutStoreActorSystem-akka.actor.default-dispatcher-4] [akka://DonutStoreActorSystem/user/DonutInfoActor] postStop
[INFO] [07/03/2018 20:29:44.477] [DonutStoreActorSystem-akka.actor.default-dispatcher-5] [akka://DonutStoreActorSystem/user/DonutInfoActor] Message [com.nb.actors.Tutorial_10_Kill_Actor_Using_PoisonPill$DonutStoreProtocol$Info] without sender to Actor[akka://DonutStoreActorSystem/user/DonutInfoActor#342625185] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
Error Kernel Supervision
This tutorial will be a good segment to our previous discussions on Actor Hierarchy, Actor Lifecycle, and stopping actors using Actor PoisonPill. We introduce the Error Kernel Pattern, which mandates that failures are isolated and localized, as opposed to crashing an entire system. You can find additional details on the Error Kernel Approach from the official Akka documentation.
As a reminder, Akka Actors form a hierarchy, and we will use that feature to have Actors supervise and react to failures of their child actors. We will expand our previous example of DonutStockActor and have it forward work to a child actor named DonutStockWorkerActor. In addition, Since DonutStockWorkerActor is a child actor of DonutStockActor, it will be supervised by DonutStockActor.
This is the initial step of creating an Actor System named DonutStoreActorSystem.
println("Step 1: Create an actor system")
val system = ActorSystem("DonutStoreActorSystem")
We expand our DonutStoreProtocol and, add a CheckStock() case class, in addition to the previous Info() case class, which we'll use for message passing with our Actors. Moreover, we'll create a custom exception (WorkerFailedException), that we will use to show how to isolate failures by following the Error Kernel Approach.
println("\nStep 2: Define the message passing protocol for our DonutStoreActor")
object DonutStoreProtocol {
case class Info(name: String)
case class CheckStock(name: String)
case class WorkerFailedException(error: String) extends Exception(error)
}
When our DonutStockActor receive messages of type CheckStock, it will forward the request to a child actor for the actual processing of the message. The child actor is of type DonutStockWorkerActor, which we will define in Step 4. Following on the Error Kernel Approach, DonutStockActor will supervise the child actor DonutStockWorkerActor. It does so by providing a SupervisorStrategy. In this simple example, our actor will react to exceptions of type WorkerFailedException, and attempt to restart the child actor DonutStockWorkerActor. For all other exceptions, we assume that DonutStockActor is unable to handle those and, in turn, it will escalate those exceptions up the actor hierarchy.
println("\nStep 3: Create DonutStockActor")
class DonutStockActor extends Actor with ActorLogging {
override def supervisorStrategy: SupervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 seconds) {
case _: WorkerFailedException =>
log.error("Worker failed exception, will restart.")
Restart
case _: Exception =>
log.error("Worker failed, will need to escalate up the hierarchy")
Escalate
}
val workerActor = context.actorOf(Props[DonutStockWorkerActor], name = "DonutStockWorkerActor")
def receive = {
case checkStock @ CheckStock(name) =>
log.info(s"Checking stock for $name donut")
workerActor forward checkStock
}
}
We tap into the Actor Lifecycle, and override the postRestart() method in order to know when the DonutStockWorkerActor is restarted. When the DonutStockWorkerActor receive messages of type CheckStock, it will internally delegate the work to its findStock() method. For the sake of this example, after procssing the CheckStock message, the DonutStockWorkerActor will stop processing any other messages as we're stopping it by calling context.stop(self).
Inside the findStock() method, you could assume that the operation simulates accessing an external resource, such as a database to find the stock quantity for a particular donut. In our example, we're simply returning an arbitrary Int value of 100.
println("\nStep 4: Worker Actor called DonutStockWorkerActor")
class DonutStockWorkerActor extends Actor with ActorLogging {
@throws[Exception](classOf[Exception])
override def postRestart(reason: Throwable): Unit = {
log.info(s"restarting ${self.path.name} because of $reason")
}
def receive = {
case CheckStock(name) =>
findStock(name)
context.stop(self)
}
def findStock(name: String): Int = {
log.info(s"Finding stock for donut = $name")
100
// throw new IllegalStateException("boom") // Will Escalate the exception up the hierarchy
// throw new WorkerFailedException("boom") // Will Restart DonutStockWorkerActor
}
}
Create a DonutStockActor similar to our previous tutorials.
println("\nStep 5: Define DonutStockActor")
val donutStockActor = system.actorOf(Props[DonutStockActor], name = "DonutStockActor")
We send a CheckStock message to the DonutStockActor using the Akka Ask Pattern.
println("\nStep 6: Akka Ask Pattern")
import DonutStoreProtocol._
import akka.pattern._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
implicit val timeout = Timeout(5 second)
val vanillaDonutStock: Future[Int] = (donutStockActor ? CheckStock("vanilla")).mapTo[Int]
for {
found <- vanillaDonutStock
} yield (println(s"Vanilla donut stock = $found"))
Thread.sleep(5000)
Finally, we call the terminate() method to close our Actor System.
println("\nStep 7: Close the actor system")
val isTerminated = system.terminate()
You can simulate failures by uncommenting throw new IllegalStateException("boom") or throw new WorkerFailedException("boom"). As per our SupervisorStrategy from Step 3, if DonutStockWorkerActors throws an IllegalStateException, the supervisor Actor DonutStockActor will simply escalate the exception up the Actor Hierarchy. When you run the program, you should see the following statements showing the escalation:
[INFO] [05/15/2018 20:35:47.163] [DonutStoreActorSystem-akka.actor.default-dispatcher-2] [akka://DonutStoreActorSystem/user/DonutStockActor] Checking stock for vanilla donut
[INFO] [05/15/2018 20:35:47.163] [DonutStoreActorSystem-akka.actor.default-dispatcher-3] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerActor] Finding stock for donut = vanilla
[ERROR] [05/15/2018 20:35:47.163] [DonutStoreActorSystem-akka.actor.default-dispatcher-2] [akka://DonutStoreActorSystem/user/DonutStockActor] Worker failed, will need to escalate up the hierarchy
[ERROR] [05/15/2018 20:35:47.178] [DonutStoreActorSystem-akka.actor.default-dispatcher-2] [akka://DonutStoreActorSystem/user/DonutStockActor] boom
For exceptions of type WorkerFailedException, you will notice that the SupervisorStrategy will attempt to restart the child actor DonutStockWorkerActor as shown below:
[INFO] [05/15/2018 20:37:13.503] [DonutStoreActorSystem-akka.actor.default-dispatcher-3] [akka://DonutStoreActorSystem/user/DonutStockActor] Checking stock for vanilla donut
[INFO] [05/15/2018 20:37:13.503] [DonutStoreActorSystem-akka.actor.default-dispatcher-3] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerActor] Finding stock for donut = vanilla
[ERROR] [05/15/2018 20:37:13.519] [DonutStoreActorSystem-akka.actor.default-dispatcher-4] [akka://DonutStoreActorSystem/user/DonutStockActor] Worker failed exception, will restart.
[ERROR] [05/15/2018 20:37:13.519] [DonutStoreActorSystem-akka.actor.default-dispatcher-4] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerActor] boom
[INFO] [05/15/2018 20:37:13.519] [DonutStoreActorSystem-akka.actor.default-dispatcher-3] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerActor] restarting DonutStockWorkerActor because of com.nb.actors.Tutorial_11_ErrorKernel$DonutStoreProtocol$WorkerFailedException: boom
RoundRobinPool
In this tutorial, we introduce Akka Routers. So far, we've seen how to send messages to an Akka Actor. Routing is a handy feature that Akka provides, and it allows you to send messages to a predefined set of routees. If you are not familiar with Akka routers, that's OKAY! We will provide step by step example of the various routers which Akka provides.
We expand our previous example and, instead of having DonutStockActor send messages to DonutStockWorkerActor, it will send messages to a pool of DonutStockWorkerActor. Akka also provides various routing strategies, and these determine the flow of messages from a router to its routees. In the example below, we will use the RoundRobinPool logic and, as it name implies, messages will be sent in a RoundRobin fashion among the routees.
Akka provides various other routing logic such as RandomRoutingLogic, ScatterGatherFirstCompletedRoutingLogic, TailChoppingRoutingLogic, SmallestMailboxRoutingLogic, BroadcastRoutingLogic and ConsistentHashingRoutingLogic. You can find additional details on these routing logic from the official Akka Routing Documentation.
This step should be familiar to you by now, and we create an ActorSystem named DonutStoreActorSystem.
println("Step 1: Create an actor system")
val system = ActorSystem("DonutStoreActorSystem")
We define our message passing protocols to our actors.
println("\nStep 2: Define the message passing protocol for our DonutStoreActor")
object DonutStoreProtocol {
case class Info(name: String)
case class CheckStock(name: String)
case class WorkerFailedException(error: String) extends Exception(error)
}
We create a DonutStockActor, which extends the akka.actor.Actor trait. DonutStockActor will forward messages it receives to DonutStockWorkerActor, which we'll define in Step 4. It, however, will not forward the messages directly to a single DonutStockWorkerActor as we've seen in previous examples where we've defined DonutStockWorkerActor as a child actor: val workerActor = context.actorOf(Props[DonutStockWorkerActor], name = "DonutStockWorkerActor") Instead, we define a pool of worker actors of type DonutStockWorkerActor. As a strategy for the actor pool, we're using the RoundRobinPool as shown below.
println("\nStep 3: Create DonutStockActor")
class DonutStockActor extends Actor with ActorLogging {
override def supervisorStrategy: SupervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 5 seconds) {
case _: WorkerFailedException =>
log.error("Worker failed exception, will restart.")
Restart
case _: Exception =>
log.error("Worker failed, will need to escalate up the hierarchy")
Escalate
}
// We will not create one worker actor.
// val workerActor = context.actorOf(Props[DonutStockWorkerActor], name = "DonutStockWorkerActor")
// We are using a resizable RoundRobinPool.
val resizer = DefaultResizer(lowerBound = 5, upperBound = 10)
val props = RoundRobinPool(5, Some(resizer), supervisorStrategy = supervisorStrategy)
.props(Props[DonutStockWorkerActor])
val donutStockWorkerRouterPool: ActorRef = context.actorOf(props, "DonutStockWorkerRouter")
def receive = {
case checkStock @ CheckStock(name) =>
log.info(s"Checking stock for $name donut")
donutStockWorkerRouterPool forward checkStock
}
}
The DonutStockWorkerActor internally carries out the findStock() operation when it receives a message of type CheckStock.
println("\ntep 4: Worker Actor called DonutStockWorkerActor")
class DonutStockWorkerActor extends Actor with ActorLogging {
override def postRestart(reason: Throwable): Unit = {
log.info(s"restarting ${self.path.name} because of $reason")
}
def receive = {
case CheckStock(name) =>
sender ! findStock(name)
}
def findStock(name: String): Int = {
log.info(s"Finding stock for donut = $name, thread = ${Thread.currentThread().getId}")
100
}
}
We use the system.actorOf() method to create a DonutStockActor withing our ActorSystem.
println("\nStep 5: Define DonutStockActor")
val donutStockActor = system.actorOf(Props[DonutStockActor], name = "DonutStockActor")
We fire a bunch of requests to the DonutStockActor to check the stock for vanilla donut. Once DonutStockActor receives the CheckStock message, it will forward the message to one of the DonutStockWorkerActor within the actor pool which we defined in Step 3. As a reminder, we are using an actor pool with the RoundRobinPool strategy.
println("\nStep 6: Use Akka Ask Pattern and send a bunch of requests to DonutStockActor")
import DonutStoreProtocol._
import akka.pattern._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
implicit val timeout = Timeout(5 second)
val vanillaStockRequests = (1 to 10).map(i => (donutStockActor ? CheckStock("vanilla")).mapTo[Int])
for {
results <- Future.sequence(vanillaStockRequests)
} yield println(s"vanilla stock results = $results")
Thread.sleep(5000)
Finally, we close our actor system using the usual system.terminate() method.
val isTerminated = system.terminate()
You should see the following output when you run your Scala application in IntelliJ:
[INFO] [07/06/2018 20:26:35.785] [DonutStoreActorSystem-akka.actor.default-dispatcher-2] [akka://DonutStoreActorSystem/user/DonutStockActor] Checking stock for vanilla donut
[INFO] [07/06/2018 20:26:35.787] [DonutStoreActorSystem-akka.actor.default-dispatcher-2] [akka://DonutStoreActorSystem/user/DonutStockActor] Checking stock for vanilla donut
[INFO] [07/06/2018 20:26:35.788] [DonutStoreActorSystem-akka.actor.default-dispatcher-5] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerRouter/$a] Finding stock for donut = vanilla, thread = 17
[INFO] [07/06/2018 20:26:35.788] [DonutStoreActorSystem-akka.actor.default-dispatcher-2] [akka://DonutStoreActorSystem/user/DonutStockActor] Checking stock for vanilla donut
[INFO] [07/06/2018 20:26:35.788] [DonutStoreActorSystem-akka.actor.default-dispatcher-6] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerRouter/$b] Finding stock for donut = vanilla, thread = 18
[INFO] [07/06/2018 20:26:35.788] [DonutStoreActorSystem-akka.actor.default-dispatcher-2] [akka://DonutStoreActorSystem/user/DonutStockActor] Checking stock for vanilla donut
[INFO] [07/06/2018 20:26:35.788] [DonutStoreActorSystem-akka.actor.default-dispatcher-7] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerRouter/$c] Finding stock for donut = vanilla, thread = 19
[INFO] [07/06/2018 20:26:35.788] [DonutStoreActorSystem-akka.actor.default-dispatcher-2] [akka://DonutStoreActorSystem/user/DonutStockActor] Checking stock for vanilla donut
[INFO] [07/06/2018 20:26:35.788] [DonutStoreActorSystem-akka.actor.default-dispatcher-6] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerRouter/$d] Finding stock for donut = vanilla, thread = 18
[INFO] [07/06/2018 20:26:35.788] [DonutStoreActorSystem-akka.actor.default-dispatcher-7] [akka://DonutStoreActorSystem/user/DonutStockActor] Checking stock for vanilla donut
[INFO] [07/06/2018 20:26:35.788] [DonutStoreActorSystem-akka.actor.default-dispatcher-2] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerRouter/$e] Finding stock for donut = vanilla, thread = 13
[INFO] [07/06/2018 20:26:35.788] [DonutStoreActorSystem-akka.actor.default-dispatcher-7] [akka://DonutStoreActorSystem/user/DonutStockActor] Checking stock for vanilla donut
[INFO] [07/06/2018 20:26:35.789] [DonutStoreActorSystem-akka.actor.default-dispatcher-2] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerRouter/$b] Finding stock for donut = vanilla, thread = 13
[INFO] [07/06/2018 20:26:35.789] [DonutStoreActorSystem-akka.actor.default-dispatcher-7] [akka://DonutStoreActorSystem/user/DonutStockActor] Checking stock for vanilla donut
[INFO] [07/06/2018 20:26:35.789] [DonutStoreActorSystem-akka.actor.default-dispatcher-6] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerRouter/$c] Finding stock for donut = vanilla, thread = 18
[INFO] [07/06/2018 20:26:35.789] [DonutStoreActorSystem-akka.actor.default-dispatcher-7] [akka://DonutStoreActorSystem/user/DonutStockActor] Checking stock for vanilla donut
[INFO] [07/06/2018 20:26:35.789] [DonutStoreActorSystem-akka.actor.default-dispatcher-2] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerRouter/$d] Finding stock for donut = vanilla, thread = 13
[INFO] [07/06/2018 20:26:35.789] [DonutStoreActorSystem-akka.actor.default-dispatcher-5] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerRouter/$a] Finding stock for donut = vanilla, thread = 17
[INFO] [07/06/2018 20:26:35.789] [DonutStoreActorSystem-akka.actor.default-dispatcher-2] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerRouter/$e] Finding stock for donut = vanilla, thread = 13
[INFO] [07/06/2018 20:26:35.789] [DonutStoreActorSystem-akka.actor.default-dispatcher-7] [akka://DonutStoreActorSystem/user/DonutStockActor] Checking stock for vanilla donut
vanilla stock results = Vector(100, 100, 100, 100, 100, 100, 100, 100, 100, 100)
ScatterGatherFirstCompletedPool
This tutorial is a continuation from our previous discussion on Akka Routers RoundRobinPool, and we introduce a new routing strategy which Akka provides: ScatterGatherFirstCompletedPool. As the name implies, messages are scattered to a bunch of routees, and only the first completed operation will be accepted. In a real life scenario, perhaps you may fan-out a request from an application to a farm of homogeneous services. As a reminder, you can find out more details from the Official Akka documentation.
We start by creating our ActorSystem named DonutStoreActorSystem.
println("Step 1: Create an actor system")
val system = ActorSystem("DonutStoreActorSystem")
We define the message passing protocols to our Actors by creating case classes Info and CheckStock.
println("\nStep 2: Define the message passing protocol for our DonutStoreActor")
object DonutStoreProtocol {
case class Info(name: String)
case class CheckStock(name: String)
case class WorkerFailedException(error: String) extends Exception(error)
}
Similar to our previous examples, we create a DonutStockActor which extends the akka.actor.Actor trait. Instead of DonutStockActor forwarding messages to a single DonutStockWorkActor, it will use a ScatterGatherFirstCompletedPool to fan-out the requests to a bunch of DonutStockWorkerActor. It will, however, only accept the reply from the first DonutStockWorkerActor (whichever first completes the operation).
println("\nStep 3: Create DonutStockActor")
class DonutStockActor extends Actor with ActorLogging {
override def supervisorStrategy: SupervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 5 seconds) {
case _: WorkerFailedException =>
log.error("Worker failed exception, will restart.")
Restart
case _: Exception =>
log.error("Worker failed, will need to escalate up the hierarchy")
Escalate
}
// We will not create a single worker actor.
// val workerActor = context.actorOf(Props[DonutStockWorkerActor], name = "DonutStockWorkerActor")
// We are using a resizable ScatterGatherFirstCompletedPool.
val workerName = "DonutStockWorkerActor"
val resizer = DefaultResizer(lowerBound = 5, upperBound = 10)
val props = ScatterGatherFirstCompletedPool(
nrOfInstances = 5,
resizer = Some(resizer),
supervisorStrategy = supervisorStrategy,
within = 5 seconds
).props(Props[DonutStockWorkerActor])
val donutStockWorkerRouterPool: ActorRef = context.actorOf(props, "DonutStockWorkerRouter")
def receive = {
case checkStock @ CheckStock(name) =>
log.info(s"Checking stock for $name donut")
// We forward any work to be carried by worker actors within the pool
donutStockWorkerRouterPool forward checkStock
}
}
Our DonutStockWorkerActor is unchanged from our previous examples, as it simply runs the findStock() operation for messages of type CheckStock.
println("\ntep 4: Worker Actor called DonutStockWorkerActor")
class DonutStockWorkerActor extends Actor with ActorLogging {
override def postRestart(reason: Throwable): Unit = {
log.info(s"restarting ${self.path.name} because of $reason")
}
def receive = {
case CheckStock(name) =>
sender ! findStock(name)
}
def findStock(name: String): Int = {
log.info(s"Finding stock for donut = $name, thread = ${Thread.currentThread().getId}")
100
}
}
We create a DonutStockActor within our ActorySystem by calling the system.actorOf() method.
println("\nStep 5: Define DonutStockActor")
val donutStockActor = system.actorOf(Props[DonutStockActor], name = "DonutStockActor")
We fire a bunch of requests to the DonutStockActor with messages of type CheckStock. Since DonutStockActor is using the ScatterGatherFirstCompletedPool from Step 3, it will fan-out the request to a bunch of DonutStockWorkerActor, but only accept the first completed operation.
println("\nStep 6: Use Akka Ask Pattern and send a bunch of requests to DonutStockActor")
import DonutStoreProtocol._
import akka.pattern._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
implicit val timeout = Timeout(5 second)
val vanillaStockRequests = (1 to 2).map(i => (donutStockActor ? CheckStock("vanilla")).mapTo[Int])
for {
results <- Future.sequence(vanillaStockRequests)
} yield (println(s"vanilla stock results = $results"))
Thread.sleep(5000)
Finally, we close our actor system by calling the terminate() method.
println("\nStep 7: Close the actor system")
val isTerminated = system.terminate()
You will notice that for each "Checking stock for vanilla donut" request, 5 requests (given the lowerBound = 5 for the ScatterGatherFirstCompletedPool) for "Finding stock for donut ..." are fired. Only the first result from whichever of these requests are "first completed" will be returned back.
[INFO] [07/06/2018 20:08:11.863] [DonutStoreActorSystem-akka.actor.default-dispatcher-2] [akka://DonutStoreActorSystem/user/DonutStockActor] Checking stock for vanilla donut
[INFO] [07/06/2018 20:08:11.868] [DonutStoreActorSystem-akka.actor.default-dispatcher-5] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerRouter/$a] Finding stock for donut = vanilla, thread = 17
[INFO] [07/06/2018 20:08:11.868] [DonutStoreActorSystem-akka.actor.default-dispatcher-7] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerRouter/$b] Finding stock for donut = vanilla, thread = 19
[INFO] [07/06/2018 20:08:11.868] [DonutStoreActorSystem-akka.actor.default-dispatcher-9] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerRouter/$c] Finding stock for donut = vanilla, thread = 21
[INFO] [07/06/2018 20:08:11.868] [DonutStoreActorSystem-akka.actor.default-dispatcher-6] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerRouter/$d] Finding stock for donut = vanilla, thread = 18
[INFO] [07/06/2018 20:08:11.868] [DonutStoreActorSystem-akka.actor.default-dispatcher-9] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerRouter/$e] Finding stock for donut = vanilla, thread = 21
[INFO] [07/06/2018 20:08:11.871] [DonutStoreActorSystem-akka.actor.default-dispatcher-2] [akka://DonutStoreActorSystem/user/DonutStockActor] Checking stock for vanilla donut
[INFO] [07/06/2018 20:08:11.871] [DonutStoreActorSystem-akka.actor.default-dispatcher-9] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerRouter/$a] Finding stock for donut = vanilla, thread = 21
[INFO] [07/06/2018 20:08:11.871] [DonutStoreActorSystem-akka.actor.default-dispatcher-6] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerRouter/$b] Finding stock for donut = vanilla, thread = 18
[INFO] [07/06/2018 20:08:11.872] [DonutStoreActorSystem-akka.actor.default-dispatcher-3] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerRouter/$c] Finding stock for donut = vanilla, thread = 14
[INFO] [07/06/2018 20:08:11.872] [DonutStoreActorSystem-akka.actor.default-dispatcher-5] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerRouter/$d] Finding stock for donut = vanilla, thread = 17
[INFO] [07/06/2018 20:08:11.872] [DonutStoreActorSystem-akka.actor.default-dispatcher-6] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerRouter/$e] Finding stock for donut = vanilla, thread = 18
vanilla stock results = Vector(100, 100)
TailChoppingPool
We continue with our Akka Routers tutorials, and introduce the TailChoppingPool router. This router is fairly similar to our previous ScatterGatherFirstCompletedPool router. It fans-out requests to a bunch of routees, and only accepts the first completed operation. So far, this would be a similar behaviour to the ScatterGatherFirstCompletedPool. However, the TailChoppingPool fans-out request to additional routees incrementally according to the configuration you specify.
You can review the official Akka documentation for additional details on the TailChoppingPool router.
Our first step is to create the DonutStoreActorSystem.
println("Step 1: Create an actor system")
val system = ActorSystem("DonutStoreActorSystem")
Next, we define the message passing protocols for our Actors.
println("\nStep 2: Define the message passing protocol for our DonutStoreActor")
object DonutStoreProtocol {
case class Info(name: String)
case class CheckStock(name: String)
case class WorkerFailedException(error: String) extends Exception(error)
}
We then create the DonutStockActor, which internally defines a TailChoppingPool router. On receipt of CheckStock messages, the DonutStockActor will forward the request to a TailChoppingPool of DonutStockWorkerActors. This means that DonutStockActor will incrementally fan-out CheckStock messages to DonutStockWorkerActors, as per the configuration intervals of the TailChoppingPool. As a reminder, only the first request will be accepted, and all others will be discarded.
println("\nStep 3: Create DonutStockActor")
class DonutStockActor extends Actor with ActorLogging {
override def supervisorStrategy: SupervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 5 seconds) {
case _: WorkerFailedException =>
log.error("Worker failed exception, will restart.")
Restart
case _: Exception =>
log.error("Worker failed, will need to escalate up the hierarchy")
Escalate
}
// We will not create a single worker actor.
// val workerActor = context.actorOf(Props[DonutStockWorkerActor], name = "DonutStockWorkerActor")
// We are using a resizable TailChoppingPool.
val workerName = "DonutStockWorkerActor"
val resizer = DefaultResizer(lowerBound = 5, upperBound = 10)
val props = TailChoppingPool(
nrOfInstances = 5,
resizer = Some(resizer),
within = 5 seconds,
interval = 10 millis,
supervisorStrategy = supervisorStrategy
).props(Props[DonutStockWorkerActor])
val donutStockWorkerRouterPool: ActorRef = context.actorOf(props, "DonutStockWorkerRouter")
def receive = {
case checkStock @ CheckStock(name) =>
log.info(s"Checking stock for $name donut")
donutStockWorkerRouterPool forward checkStock
}
}
Similar to our previous router examples, DonutStockWorkerActor calls the findStock() operation when it receives a CheckStock message.
println("\ntep 4: Worker Actor called DonutStockWorkerActor")
class DonutStockWorkerActor extends Actor with ActorLogging {
override def postRestart(reason: Throwable): Unit = {
log.info(s"restarting ${self.path.name} because of $reason")
}
def receive = {
case CheckStock(name) =>
sender ! findStock(name)
}
def findStock(name: String): Int = {
log.info(s"Finding stock for donut = $name, thread = ${Thread.currentThread().getId}")
100
}
}
By calling the system.actorOf() method, we create a DonutStockActor.
println("\nStep 5: Define DonutStockActor")
val donutStockActor = system.actorOf(Props[DonutStockActor], name = "DonutStockActor")
We fire a bunch of CheckStock messages to DonutStockActor. The TailChoppingPool router of DonutStockActor will incrementally fan-out additional requests to DonutStockWorkerActor as defined by the configurations in Step 3.
println("\nStep 6: Use Akka Ask Pattern and send a bunch of requests to DonutStockActor")
import DonutStoreProtocol._
import akka.pattern._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
implicit val timeout = Timeout(5 second)
val vanillaStockRequests = (1 to 2).map(i => (donutStockActor ? CheckStock("vanilla")).mapTo[Int])
for {
results <- Future.sequence(vanillaStockRequests)
} yield println(s"vanilla stock results = $results")
Thread.sleep(5000)
You should see the following output when you run your Scala application in IntelliJ:
[INFO] [07/06/2018 20:39:18.151] [DonutStoreActorSystem-akka.actor.default-dispatcher-3] [akka://DonutStoreActorSystem/user/DonutStockActor] Checking stock for vanilla donut
[INFO] [07/06/2018 20:39:18.160] [DonutStoreActorSystem-akka.actor.default-dispatcher-4] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerRouter/$e] Finding stock for donut = vanilla, thread = 15
[INFO] [07/06/2018 20:39:18.161] [DonutStoreActorSystem-akka.actor.default-dispatcher-2] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerRouter/$d] Finding stock for donut = vanilla, thread = 13
[INFO] [07/06/2018 20:39:18.162] [DonutStoreActorSystem-akka.actor.default-dispatcher-3] [akka://DonutStoreActorSystem/user/DonutStockActor] Checking stock for vanilla donut
[INFO] [07/06/2018 20:39:18.162] [DonutStoreActorSystem-akka.actor.default-dispatcher-3] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerRouter/$d] Finding stock for donut = vanilla, thread = 14
vanilla stock results = Vector(100, 100)
BroadcastPool
In our previous Akka Routers tutorials, we've shown the RoundRobinPool, the ScatterGatherFirstCompletedPool, and the TailChoppingPool. We continue our Akka Routers series, and introduce the BroadcastPool router. As its name implies, this router will broadcast or fan-out messages to all of its routees. Perhaps, in a real life scenario, the Akka BroadcastPool router can be very handy if you are implementing some Gossip Protocol, where messages are spread throughout a network. You can find additional information on the Akka BroadcastPool router from the official Akka documentation.
By now, you should be familiar with our first step where we create the ActorSystem: DonutStoreActorSystem.
println("Step 1: Create an actor system")
val system = ActorSystem("DonutStoreActorSystem")
For Step 2, we define the message passing protocols for our Actors.
println("\nStep 2: Define the message passing protocol for our DonutStoreActor")
object DonutStoreProtocol {
case class Info(name: String)
case class CheckStock(name: String)
case class WorkerFailedException(error: String) extends Exception(error)
}
We create the DonutStockActor by extending the usual akka.actor.Actor trait. When DonutStockActor receives messages of type CheckStock, it will use a BroadcastPool to fan-out the request to the underlying DonutStockWorkerActor in the pool.
println("\nStep 3: Create DonutStockActor")
class DonutStockActor extends Actor with ActorLogging {
import scala.concurrent.duration._
implicit val timeout = Timeout(5 second)
override def supervisorStrategy: SupervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 5 seconds) {
case _: WorkerFailedException =>
log.error("Worker failed exception, will restart.")
Restart
case _: Exception =>
log.error("Worker failed, will need to escalate up the hierarchy")
Escalate
}
// We will not create a single worker actor.
// val workerActor = context.actorOf(Props[DonutStockWorkerActor], name = "DonutStockWorkerActor")
// We are using a resizable BroadcastPool.
val workerName = "DonutStockWorkerActor"
val resizer = DefaultResizer(lowerBound = 5, upperBound = 10)
val props = BroadcastPool(
nrOfInstances = 5,
resizer = None,
supervisorStrategy = supervisorStrategy
).props(Props[DonutStockWorkerActor])
val donutStockWorkerRouterPool: ActorRef = context.actorOf(props, "DonutStockWorkerRouter")
def receive = {
case checkStock @ CheckStock(name) =>
log.info(s"Checking stock for $name donut")
donutStockWorkerRouterPool ! checkStock
}
}
DonutStockWorkerActor simply calls the findStock() operation when it receives a CheckStock message.
println("\ntep 4: Worker Actor called DonutStockWorkerActor")
class DonutStockWorkerActor extends Actor with ActorLogging {
override def postRestart(reason: Throwable): Unit = {
log.info(s"restarting ${self.path.name} because of $reason")
}
def receive = {
case CheckStock(name) =>
sender ! findStock(name)
}
def findStock(name: String): Int = {
log.info(s"Finding stock for donut = $name, thread = ${Thread.currentThread().getId}")
100
}
}
We create a DonutStockActor in our ActorSystem from Step 1 by calling the system.actorOf() method.
println("\nStep 5: Define DonutStockActor")
val donutStockActor = system.actorOf(Props[DonutStockActor], name = "DonutStockActor")
We use the Akka Tell Pattern, to fire a single CheckStock request to DonutStockActor. Note, however, since DonutStockActor is using a BroadcastPool actor with nrOfInstances defined in Step 3 as 5, we would expect 5 messages to be fanned-out to DonutStockWorkerActors.
println("\nStep 6: Use Akka Tell Pattern and send a single request to DonutStockActor")
import DonutStoreProtocol._
donutStockActor ! CheckStock("vanilla")
Thread.sleep(5000)
Finally, we close our ActorSystem by calling the terminate() method.
println("\nStep 7: Close the actor system")
val isTerminated = system.terminate()
You should see the following output when you run your Scala application in IntelliJ:
[INFO] [07/06/2018 20:06:42.886] [DonutStoreActorSystem-akka.actor.default-dispatcher-3] [akka://DonutStoreActorSystem/user/DonutStockActor] Checking stock for vanilla donut
[INFO] [07/06/2018 20:06:42.891] [DonutStoreActorSystem-akka.actor.default-dispatcher-3] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerRouter/$b] Finding stock for donut = vanilla, thread = 14
[INFO] [07/06/2018 20:06:42.891] [DonutStoreActorSystem-akka.actor.default-dispatcher-4] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerRouter/$c] Finding stock for donut = vanilla, thread = 15
[INFO] [07/06/2018 20:06:42.891] [DonutStoreActorSystem-akka.actor.default-dispatcher-3] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerRouter/$d] Finding stock for donut = vanilla, thread = 14
[INFO] [07/06/2018 20:06:42.891] [DonutStoreActorSystem-akka.actor.default-dispatcher-5] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerRouter/$a] Finding stock for donut = vanilla, thread = 17
[INFO] [07/06/2018 20:06:42.892] [DonutStoreActorSystem-akka.actor.default-dispatcher-4] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerRouter/$e] Finding stock for donut = vanilla, thread = 15
Akka default dispatcher
In this section, we'll go over Akka dispatchers. By now, you should be familiar that an ActorSystem is designed from the ground up to be asynchronous and non-blocking, and we've interfaced with our actor systems using the Tell Pattern or the Ask Pattern.
At the heart of the actor system is Akka's dispatcher which, to quote the official Akka documentation, is what makes Akka Actors “tick”. The dispatcher, though, requires an executor to provide it with threads necessary to run operations in a non-blocking fashion. By default, if you do not specify a particular executor, Akka will make use of the fork-join-executor, and as per the official documentation, this type of executor will provide excellent performance for most cases.
Thread tuning is, however, an important part of any actor system, and as we will see later on in this section, Akka provides you with all the required hooks to tune your dispatcher, executor and threading constructs to meet your particular use cases. For now, we'll get familiar with Akka's default dispatcher, and how you can peek into its configuration. We start off by creating our usual Actor System:
println("Step 1: Create an actor system")
val system = ActorSystem("DonutStoreActorSystem")
To peek into Akka's default dispatcher configuration, you need to access the akka.actor.default-dispatcher config.
println("\nStep 2: Akka default dispatcher config")
val defaultDispatcherConfig = system.settings.config.getConfig("akka.actor.default-dispatcher")
println(s"akka.actor.default-dispatcher = $defaultDispatcherConfig")
The default dispatcher in itself holds various parameters, and below we show a snapshot of some of the most common ones, namely type, throughput, fork-join-executor.parallelism-min and fork-join.executor.parallelism.max.
println("\nStep 3: Akka default dispatcher type")
val dispatcherType = defaultDispatcherConfig.getString("type")
println(s"$dispatcherType")
println("\nStep 4: Akka default dispatcher throughput")
val dispatcherThroughput = defaultDispatcherConfig.getString("throughput")
println(s"$dispatcherThroughput")
println("\nStep 5: Akka default dispatcher minimum parallelism")
val dispatcherParallelismMin = defaultDispatcherConfig.getInt("fork-join-executor.parallelism-min")
println(s"$dispatcherParallelismMin")
println("\nStep 6: Akka default dispatcher maximum parallelism")
val dispatcherParallelismMax = defaultDispatcherConfig.getInt("fork-join-executor.parallelism-max")
println(s"$dispatcherParallelismMax")
Finally, we close our actor system by calling the usual system.terminate().
println("\nStep 7: Close the actor system")
val isTerminated = system.terminate()
You should see the following output when you run your Scala application in IntelliJ:
Step 1: Create an actor system
Step 2: Akka default dispatcher config
akka.actor.default-dispatcher = Config(SimpleConfigObject({"affinity-pool-executor":{"fair-work-distribution":{"threshold":128},"idle-cpu-level":5,"parallelism-factor":0.8,"parallelism-max":64,"parallelism-min":4,"queue-selector":"akka.dispatch.affinity.FairDistributionHashCache","rejection-handler":"akka.dispatch.affinity.ThrowOnOverflowRejectionHandler","task-queue-size":512},"attempt-teamwork":"on","default-executor":{"fallback":"fork-join-executor"},"executor":"default-executor","fork-join-executor":{"parallelism-factor":3,"parallelism-max":64,"parallelism-min":8,"task-peeking-mode":"FIFO"},"mailbox-requirement":"","shutdown-timeout":"1s","thread-pool-executor":{"allow-core-timeout":"on","core-pool-size-factor":3,"core-pool-size-max":64,"core-pool-size-min":8,"fixed-pool-size":"off","keep-alive-time":"60s","max-pool-size-factor":3,"max-pool-size-max":64,"max-pool-size-min":8,"task-queue-size":-1,"task-queue-type":"linked"},"throughput":5,"throughput-deadline-time":"0ms","type":"Dispatcher"}))
Step 3: Akka default dispatcher type
Dispatcher
Step 4: Akka default dispatcher throughput
5
Step 5: Akka default dispatcher minimum parallelism
8
Step 6: Akka default dispatcher maximum parallelism
64
Step 7: Close the actor system
NOTE:
- Depending on your operating systems and hardware configurations, you may see different results with respect to some of the default parameters for Akka's default executor.
Akka Lookup Dispatcher
In the previous section on Akka Default Dispatcher, we had a glimpse of the default dispatcher and executor that will be provided to an Actor System. Akka, however, allows you to customise your own configurations in order to meet your system's threading and tuning requirements. These configurations are typically stored in the application.conf file under the resources folder.
For the purpose of this example, we will create a custom dispatcher and it will be named fixed-thread-pool. With regards to the threading heuristics, our dispatcher will use a fixed pool size, which we've currently set to be 5 threads only.
fixed-thread-pool {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 5
}
throughput = 2
}
To use the above custom dispatcher, we obviously start by creating our ActorSystem. We then have to use system.dispatchers.lookup() method, and pass through as parameter the name of our customised dispatcher: fixed-thread-pool. Note also that we should mark the custom dispatcher as implicit in order to lift it into the relevant context of our Actor System and code execution.
val system = ActorSystem("DonutStoreActorSystem")
implicit val executionContext = system.dispatchers.lookup("fixed-thread-pool")
system.terminate()
In the next section, we will interface with our Actor System and make use of our custom dispatcher.
Fixed Thread Pool Dispatcher
In the previous section, we showed how to create a custom thread pool dispatcher. More precisely, we created a dispatcher name fixed-thread-pool in application.conf, and showed how to wire the dispatcher with the Actor System. In this example, we will in fact use this custom fixed thread pool dispatcher, and interact with it using an Akka actor.
As usual, we start by creating an ActorSystem.
println("Step 1: Create actor system")
val system = ActorSystem("DonutStoreActorSystem")
A reminder that we created a custom dispatcher using a fixed number of threads in application.conf.
fixed-thread-pool {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 5
}
throughput = 2
}
Next, we wire our custom dispatcher using system.dispatchers.lookup(...) method.
println("Step 3: Lookup our fixed-thread-pool dispatcher from application.conf")
import DonutStoreProtocol._
import akka.pattern._
implicit val timeout = Timeout(1, TimeUnit.MINUTES)
implicit val executionContext = system.dispatchers.lookup("fixed-thread-pool")
We create a simple protocol for capturing a request to check for a donut inventory.
println("\nStep 4: Create protocol")
object DonutStoreProtocol {
case class StockRequest(name: String, clientId: Int)
trait Result
case class DonutStockRequest(quantity: Int) extends Result
case class DonutFailure(msg: String) extends Result
}
We create a new Akka Actor named DonutStockRequestactor and we are using Thread.sleep(...) to simulate looking up a donut stock quantity. It certainly goes without saying that you should not be using Thread.sleep(...) in real-world application code.
println("\nStep 5: Create DonutStockRequestActor")
class DonutStockRequestActor extends Actor with ActorLogging {
val randomStock = scala.util.Random
def receive = {
case StockRequest(name, clientId) =>
log.info(s"CHECKING: donut stock for name = $name, clientId = $clientId")
Thread.sleep(5000)
log.info(s"FINISHED: donut stock for name = $name, clientId = $clientId")
sender() ! DonutStockRequest(randomStock.nextInt(100))
}
}
We create 10 requests using Akka's Ask Pattern. Note that we are referencing our fixed-thread-pool dispatcher using the withDispatcher(...) method.
pintln("\nStep 6: Create 10 requests using pool-size = 10")
val clientRequests = (1 to 10).map(i => StockRequest(s"vanilla donut", i))
val futures = clientRequests.map{ stock =>
val actorRef = system
.actorOf(Props[DonutStockRequestActor]
.withDispatcher("fixed-thread-pool"))
(actorRef ? stock).mapTo[DonutStockRequest]
}
val results = Await.result(Future.sequence(futures), 1 minute)
results.foreach(println(_))
NOTE:
- To keep this example simple, we are blocking on all the results using Await.result(...). Do not block in real production code!
You should see the following output when you run your Scala application in IntelliJ:
[INFO] [10/19/2018 16:12:05.816] [DonutStoreActorSystem-fixed-thread-pool-8] [akka://DonutStoreActorSystem/user/$d] CHECKING: donut stock for name = vanilla donut, clientId = 4
[INFO] [10/19/2018 16:12:05.816] [DonutStoreActorSystem-fixed-thread-pool-7] [akka://DonutStoreActorSystem/user/$c] CHECKING: donut stock for name = vanilla donut, clientId = 3
[INFO] [10/19/2018 16:12:05.816] [DonutStoreActorSystem-fixed-thread-pool-6] [akka://DonutStoreActorSystem/user/$b] CHECKING: donut stock for name = vanilla donut, clientId = 2
[INFO] [10/19/2018 16:12:05.816] [DonutStoreActorSystem-fixed-thread-pool-5] [akka://DonutStoreActorSystem/user/$a] CHECKING: donut stock for name = vanilla donut, clientId = 1
[INFO] [10/19/2018 16:12:05.816] [DonutStoreActorSystem-fixed-thread-pool-9] [akka://DonutStoreActorSystem/user/$e] CHECKING: donut stock for name = vanilla donut, clientId = 5
[INFO] [10/19/2018 16:12:10.819] [DonutStoreActorSystem-fixed-thread-pool-8] [akka://DonutStoreActorSystem/user/$d] FINISHED: donut stock for name = vanilla donut, clientId = 4
[INFO] [10/19/2018 16:12:10.819] [DonutStoreActorSystem-fixed-thread-pool-6] [akka://DonutStoreActorSystem/user/$b] FINISHED: donut stock for name = vanilla donut, clientId = 2
[INFO] [10/19/2018 16:12:10.819] [DonutStoreActorSystem-fixed-thread-pool-7] [akka://DonutStoreActorSystem/user/$c] FINISHED: donut stock for name = vanilla donut, clientId = 3
[INFO] [10/19/2018 16:12:10.819] [DonutStoreActorSystem-fixed-thread-pool-5] [akka://DonutStoreActorSystem/user/$a] FINISHED: donut stock for name = vanilla donut, clientId = 1
[INFO] [10/19/2018 16:12:10.819] [DonutStoreActorSystem-fixed-thread-pool-9] [akka://DonutStoreActorSystem/user/$e] FINISHED: donut stock for name = vanilla donut, clientId = 5
NOTE:
- Since our fixed-thread-pool configuration only has 5 threads, you should see the above 5 requests start executing. When they complete, the remaining 5 requests will execute as shown below.
[INFO] [10/19/2018 16:12:10.820] [DonutStoreActorSystem-fixed-thread-pool-6] [akka://DonutStoreActorSystem/user/$h] CHECKING: donut stock for name = vanilla donut, clientId = 8
[INFO] [10/19/2018 16:12:10.820] [DonutStoreActorSystem-fixed-thread-pool-7] [akka://DonutStoreActorSystem/user/$i] CHECKING: donut stock for name = vanilla donut, clientId = 9
[INFO] [10/19/2018 16:12:10.820] [DonutStoreActorSystem-fixed-thread-pool-9] [akka://DonutStoreActorSystem/user/$f] CHECKING: donut stock for name = vanilla donut, clientId = 6
[INFO] [10/19/2018 16:12:10.820] [DonutStoreActorSystem-fixed-thread-pool-8] [akka://DonutStoreActorSystem/user/$g] CHECKING: donut stock for name = vanilla donut, clientId = 7
[INFO] [10/19/2018 16:12:10.822] [DonutStoreActorSystem-fixed-thread-pool-5] [akka://DonutStoreActorSystem/user/$j] CHECKING: donut stock for name = vanilla donut, clientId = 10
[INFO] [10/19/2018 16:12:15.825] [DonutStoreActorSystem-fixed-thread-pool-8] [akka://DonutStoreActorSystem/user/$g] FINISHED: donut stock for name = vanilla donut, clientId = 7
[INFO] [10/19/2018 16:12:15.825] [DonutStoreActorSystem-fixed-thread-pool-6] [akka://DonutStoreActorSystem/user/$h] FINISHED: donut stock for name = vanilla donut, clientId = 8
[INFO] [10/19/2018 16:12:15.825] [DonutStoreActorSystem-fixed-thread-pool-5] [akka://DonutStoreActorSystem/user/$j] FINISHED: donut stock for name = vanilla donut, clientId = 10
[INFO] [10/19/2018 16:12:15.825] [DonutStoreActorSystem-fixed-thread-pool-9] [akka://DonutStoreActorSystem/user/$f] FINISHED: donut stock for name = vanilla donut, clientId = 6
[INFO] [10/19/2018 16:12:15.825] [DonutStoreActorSystem-fixed-thread-pool-7] [akka://DonutStoreActorSystem/user/$i] FINISHED: donut stock for name = vanilla donut, clientId = 9
Let us update our custom dispatcher in application.conf to have 10 threads as opposed to 5.
fixed-thread-pool {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 10
}
throughput = 2
}
When you rerun the code, you should see all 10 requests executing simultaneously since our custom dispatcher now has enough threads to run all of them at the same time.
[INFO] [10/19/2018 16:15:57.801] [DonutStoreActorSystem-fixed-thread-pool-13] [akka://DonutStoreActorSystem/user/$i] CHECKING: donut stock for name = vanilla donut, clientId = 9
[INFO] [10/19/2018 16:15:57.801] [DonutStoreActorSystem-fixed-thread-pool-7] [akka://DonutStoreActorSystem/user/$c] CHECKING: donut stock for name = vanilla donut, clientId = 3
[INFO] [10/19/2018 16:15:57.801] [DonutStoreActorSystem-fixed-thread-pool-6] [akka://DonutStoreActorSystem/user/$b] CHECKING: donut stock for name = vanilla donut, clientId = 2
[INFO] [10/19/2018 16:15:57.801] [DonutStoreActorSystem-fixed-thread-pool-8] [akka://DonutStoreActorSystem/user/$d] CHECKING: donut stock for name = vanilla donut, clientId = 4
[INFO] [10/19/2018 16:15:57.801] [DonutStoreActorSystem-fixed-thread-pool-10] [akka://DonutStoreActorSystem/user/$f] CHECKING: donut stock for name = vanilla donut, clientId = 6
[INFO] [10/19/2018 16:15:57.801] [DonutStoreActorSystem-fixed-thread-pool-9] [akka://DonutStoreActorSystem/user/$e] CHECKING: donut stock for name = vanilla donut, clientId = 5
[INFO] [10/19/2018 16:15:57.801] [DonutStoreActorSystem-fixed-thread-pool-14] [akka://DonutStoreActorSystem/user/$j] CHECKING: donut stock for name = vanilla donut, clientId = 10
[INFO] [10/19/2018 16:15:57.801] [DonutStoreActorSystem-fixed-thread-pool-11] [akka://DonutStoreActorSystem/user/$g] CHECKING: donut stock for name = vanilla donut, clientId = 7
[INFO] [10/19/2018 16:15:57.801] [DonutStoreActorSystem-fixed-thread-pool-12] [akka://DonutStoreActorSystem/user/$h] CHECKING: donut stock for name = vanilla donut, clientId = 8
[INFO] [10/19/2018 16:15:57.801] [DonutStoreActorSystem-fixed-thread-pool-5] [akka://DonutStoreActorSystem/user/$a] CHECKING: donut stock for name = vanilla donut, clientId = 1
[INFO] [10/19/2018 16:16:02.803] [DonutStoreActorSystem-fixed-thread-pool-11] [akka://DonutStoreActorSystem/user/$g] FINISHED: donut stock for name = vanilla donut, clientId = 7
[INFO] [10/19/2018 16:16:02.803] [DonutStoreActorSystem-fixed-thread-pool-9] [akka://DonutStoreActorSystem/user/$e] FINISHED: donut stock for name = vanilla donut, clientId = 5
[INFO] [10/19/2018 16:16:02.803] [DonutStoreActorSystem-fixed-thread-pool-12] [akka://DonutStoreActorSystem/user/$h] FINISHED: donut stock for name = vanilla donut, clientId = 8
[INFO] [10/19/2018 16:16:02.803] [DonutStoreActorSystem-fixed-thread-pool-8] [akka://DonutStoreActorSystem/user/$d] FINISHED: donut stock for name = vanilla donut, clientId = 4
[INFO] [10/19/2018 16:16:02.803] [DonutStoreActorSystem-fixed-thread-pool-6] [akka://DonutStoreActorSystem/user/$b] FINISHED: donut stock for name = vanilla donut, clientId = 2
[INFO] [10/19/2018 16:16:02.803] [DonutStoreActorSystem-fixed-thread-pool-14] [akka://DonutStoreActorSystem/user/$j] FINISHED: donut stock for name = vanilla donut, clientId = 10
[INFO] [10/19/2018 16:16:02.803] [DonutStoreActorSystem-fixed-thread-pool-13] [akka://DonutStoreActorSystem/user/$i] FINISHED: donut stock for name = vanilla donut, clientId = 9
[INFO] [10/19/2018 16:16:02.803] [DonutStoreActorSystem-fixed-thread-pool-5] [akka://DonutStoreActorSystem/user/$a] FINISHED: donut stock for name = vanilla donut, clientId = 1
[INFO] [10/19/2018 16:16:02.803] [DonutStoreActorSystem-fixed-thread-pool-7] [akka://DonutStoreActorSystem/user/$c] FINISHED: donut stock for name = vanilla donut, clientId = 3
[INFO] [10/19/2018 16:16:02.803] [DonutStoreActorSystem-fixed-thread-pool-10] [akka://DonutStoreActorSystem/user/$f] FINISHED: donut stock for name = vanilla donut, clientId = 6
As usual, don't forget to close the ActorSystem using system.terminate().
println("\nStep 8: Close actor system")
system.terminate()
Resizable Thread Pool
We continue with our discussion on custom dispatcher that has tunable thread pooling capabilities, and introduce a resizable thread pool dispatcher. As its name implies, this dispatcher will dynamically increase its number of threads, as needed and as per its definition in application.conf.
In Step 1, we create our ActorSystem similar to the previous examples.
println("Step 1: Create actor system")
val system = ActorSystem("DonutStoreActorSystem")
Next, we create the configurations for the resizable thread pool in application.conf. We name the dispatcher resizable-thread-pool and, for the purpose of this tutorial, we'll have the number of threads start at 5 by setting the core-pool-size-min property. Our thread pooling capabilities will also be limited to only 20 threads as per the core-pool-size-max property.
resizable-thread-pool {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 5
core-pool-size-factor = 2.0
core-pool-size-max = 20
}
throughput = 5
}
To use the custom dispatcher, we make use of the system.dispatchers.lookup(...) method.
println("Step 3: Lookup our resizable-thread-pool dispatcher from application.conf")
import DonutStoreProtocol._
import akka.pattern._
implicit val timeout = Timeout(1, TimeUnit.MINUTES)
implicit val executionContext = system.dispatchers.lookup("resizable-thread-pool")
We reuse our protocol for simulating a request to check for donut inventory.
println("\nStep 4: Create protocol")
object DonutStoreProtocol {
case class StockRequest(name: String, clientId: Int)
trait Result
case class DonutStockRequest(quantity: Int) extends Result
case class DonutFailure(msg: String) extends Result
}
We create a DonutStockRequestActor which simply sends some random stock quantity. As a reminder, do not use a Thread.sleep(...) in your production code!
println("\nStep 5: Create DonutStockRequestActor")
class DonutStockRequestActor extends Actor with ActorLogging {
val randomStock = scala.util.Random
def receive = {
case StockRequest(name, clientId) =>
log.info(s"CHECKING: donut stock for name = $name, clientId = $clientId")
Thread.sleep(5000)
log.info(s"FINISHED: donut stock for name = $name, clientId = $clientId")
sender() ! DonutStockRequest(randomStock.nextInt(100))
}
}
We send 10 simultaneous requests to the DonutStockRequestActor using Akka's Ask Pattern. Once again, in production code, do not block using Await.result(...).
println("\nStep 6: Create 10 requests using pool-size = 10")
val clientRequests = (1 to 10).map(i => StockRequest(s"vanilla donut", i))
val futures = clientRequests.map{ stock =>
val actorRef = system
.actorOf(Props[DonutStockRequestActor]
.withDispatcher("resizable-thread-pool"))
(actorRef ? stock).mapTo[DonutStockRequest]
}
val results = Await.result(Future.sequence(futures), Duration.Inf)
results.foreach(println(_))
Finally, we close our Actor System using the familiar system.terminate().
println("\nStep 8: Close actor system")
system.terminate()
You should see the following output when you run your Scala application in IntelliJ. Note that unlike the fixed thread pool example above, all 10 requests are able to be processed in parallel - this is thanks to our resizable thread pool heuristics as defined in the application.conf.
[INFO] [10/22/2018 15:48:20.133] [DonutStoreActorSystem-resizable-thread-pool-11] [akka://DonutStoreActorSystem/user/$g] CHECKING: donut stock for name = vanilla donut, clientId = 7
[INFO] [10/22/2018 15:48:20.134] [DonutStoreActorSystem-resizable-thread-pool-8] [akka://DonutStoreActorSystem/user/$d] CHECKING: donut stock for name = vanilla donut, clientId = 4
[INFO] [10/22/2018 15:48:20.134] [DonutStoreActorSystem-resizable-thread-pool-14] [akka://DonutStoreActorSystem/user/$i] CHECKING: donut stock for name = vanilla donut, clientId = 9
[INFO] [10/22/2018 15:48:20.134] [DonutStoreActorSystem-resizable-thread-pool-5] [akka://DonutStoreActorSystem/user/$a] CHECKING: donut stock for name = vanilla donut, clientId = 1
[INFO] [10/22/2018 15:48:20.134] [DonutStoreActorSystem-resizable-thread-pool-10] [akka://DonutStoreActorSystem/user/$f] CHECKING: donut stock for name = vanilla donut, clientId = 6
[INFO] [10/22/2018 15:48:20.133] [DonutStoreActorSystem-resizable-thread-pool-9] [akka://DonutStoreActorSystem/user/$e] CHECKING: donut stock for name = vanilla donut, clientId = 5
[INFO] [10/22/2018 15:48:20.133] [DonutStoreActorSystem-resizable-thread-pool-7] [akka://DonutStoreActorSystem/user/$c] CHECKING: donut stock for name = vanilla donut, clientId = 3
[INFO] [10/22/2018 15:48:20.133] [DonutStoreActorSystem-resizable-thread-pool-6] [akka://DonutStoreActorSystem/user/$b] CHECKING: donut stock for name = vanilla donut, clientId = 2
[INFO] [10/22/2018 15:48:20.133] [DonutStoreActorSystem-resizable-thread-pool-12] [akka://DonutStoreActorSystem/user/$h] CHECKING: donut stock for name = vanilla donut, clientId = 8
[INFO] [10/22/2018 15:48:20.134] [DonutStoreActorSystem-resizable-thread-pool-15] [akka://DonutStoreActorSystem/user/$j] CHECKING: donut stock for name = vanilla donut, clientId = 10
[INFO] [10/22/2018 15:48:25.136] [DonutStoreActorSystem-resizable-thread-pool-12] [akka://DonutStoreActorSystem/user/$h] FINISHED: donut stock for name = vanilla donut, clientId = 8
[INFO] [10/22/2018 15:48:25.136] [DonutStoreActorSystem-resizable-thread-pool-15] [akka://DonutStoreActorSystem/user/$j] FINISHED: donut stock for name = vanilla donut, clientId = 10
[INFO] [10/22/2018 15:48:25.136] [DonutStoreActorSystem-resizable-thread-pool-10] [akka://DonutStoreActorSystem/user/$f] FINISHED: donut stock for name = vanilla donut, clientId = 6
[INFO] [10/22/2018 15:48:25.136] [DonutStoreActorSystem-resizable-thread-pool-11] [akka://DonutStoreActorSystem/user/$g] FINISHED: donut stock for name = vanilla donut, clientId = 7
[INFO] [10/22/2018 15:48:25.136] [DonutStoreActorSystem-resizable-thread-pool-9] [akka://DonutStoreActorSystem/user/$e] FINISHED: donut stock for name = vanilla donut, clientId = 5
[INFO] [10/22/2018 15:48:25.136] [DonutStoreActorSystem-resizable-thread-pool-5] [akka://DonutStoreActorSystem/user/$a] FINISHED: donut stock for name = vanilla donut, clientId = 1
[INFO] [10/22/2018 15:48:25.136] [DonutStoreActorSystem-resizable-thread-pool-8] [akka://DonutStoreActorSystem/user/$d] FINISHED: donut stock for name = vanilla donut, clientId = 4
[INFO] [10/22/2018 15:48:25.136] [DonutStoreActorSystem-resizable-thread-pool-7] [akka://DonutStoreActorSystem/user/$c] FINISHED: donut stock for name = vanilla donut, clientId = 3
[INFO] [10/22/2018 15:48:25.137] [DonutStoreActorSystem-resizable-thread-pool-6] [akka://DonutStoreActorSystem/user/$b] FINISHED: donut stock for name = vanilla donut, clientId = 2
[INFO] [10/22/2018 15:48:25.137] [DonutStoreActorSystem-resizable-thread-pool-14] [akka://DonutStoreActorSystem/user/$i] FINISHED: donut stock for name = vanilla donut, clientId = 9
Pinned Thread Pool Dispatcher
So far, we've seen Akka's default, fixed and resizable thread pool dispatchers. There may be times though, when you need to enforce a certain ordering of events such as processing of some streams in a First In First Out basis.
Sequencing these events into actors which have a single thread pool would be nice in such circumstances. Fortunately, Akka provides just this - a Pinned Thread Pool Dispatcher. As per the Akka documentation, there will be a unique thread for each actor using a pinned dispatcher, and each one will have its own thread pool consisting of a single thread.
Similar to the previous examples, we start by creating our ActorSystem.
println("Step 1: Create actor system")
val system = ActorSystem("DonutStoreActorSystem")
We then define a new dispatcher in application.conf which will be of type PinnedDispatcher.
pinned-thread-pool {
executor = "thread-pool-executor"
type = PinnedDispatcher
}
We reference our newly defined Pinned Dispatcher by using the familiar system.dispatchers.lookup(...) method.
println("Step 3: Lookup our pinned-thread-pool dispatcher from application.conf")
import DonutStoreProtocol._
import akka.pattern._
implicit val timeout = Timeout(1, TimeUnit.MINUTES)
implicit val executionContext = system.dispatchers.lookup("pinned-thread-pool")
We then reuse our protocol for simulating a request to check for donut inventory.
println("\nStep 4: Create protocol")
object DonutStoreProtocol {
case class StockRequest(name: String, clientId: Int)
trait Result
case class DonutStockRequest(quantity: Int) extends Result
case class DonutFailure(msg: String) extends Result
}
We also reuse our DonutStockRequestActor which simply sends some random stock quantity. As a reminder, do not use a Thread.sleep(...) in your production code!
println("\nStep 5: Create DonutStockRequestActor")
class DonutStockRequestActor extends Actor with ActorLogging {
val randomStock = scala.util.Random
def receive = {
case StockRequest(name, clientId) =>
log.info(s"CHECKING: donut stock for name = $name, clientId = $clientId")
Thread.sleep(5000)
log.info(s"FINISHED: donut stock for name = $name, clientId = $clientId")
sender() ! DonutStockRequest(randomStock.nextInt(100))
}
}
For this example, we'll send 2 simultaneous requests to the DonutStockRequestActor using Akka's Ask Pattern so that it is easier to follow the logging statements. Once again, in production code, do not block using Await.result(...).
println("\nStep 6: Create 2 requests")
val clientRequests = (1 to 2).map(i => StockRequest(s"vanilla donut", i))
val futures = clientRequests.map{ stock =>
val actorRef = system
.actorOf(Props[DonutStockRequestActor]
.withDispatcher("pinned-thread-pool"))
(actorRef ? stock).mapTo[DonutStockRequest]
// (actorRef ? stock).mapTo[DonutStockRequest]
}
val results = Await.result(Future.sequence(futures), 1 minute)
results.foreach(println(_))
As usual, we call the system.terminate() to close our ActorSystem.
println("\nStep 7: Close actor system")
system.terminate()
Notice that we've also commented out // (actorRef ? stock).mapTo[DonutStockRequest] a second request to the same actor within our map() closure. When you first run the code in IntelliJ, you should see two requests being processed simultaneously. That is because of the map() closure where we created two actors.
You should see the following output when you run your Scala application in IntelliJ:
[INFO] [10/24/2018 16:34:16.652] [DonutStoreActorSystem-pinned-thread-pool-7] [akka://DonutStoreActorSystem/user/$b] CHECKING: donut stock for name = vanilla donut, clientId = 2
[INFO] [10/24/2018 16:34:16.652] [DonutStoreActorSystem-pinned-thread-pool-6] [akka://DonutStoreActorSystem/user/$a] CHECKING: donut stock for name = vanilla donut, clientId = 1
[INFO] [10/24/2018 16:34:21.657] [DonutStoreActorSystem-pinned-thread-pool-6] [akka://DonutStoreActorSystem/user/$a] FINISHED: donut stock for name = vanilla donut, clientId = 1
[INFO] [10/24/2018 16:34:21.657] [DonutStoreActorSystem-pinned-thread-pool-7] [akka://DonutStoreActorSystem/user/$b] FINISHED: donut stock for name = vanilla donut, clientId = 2
However, if you uncomment the second request to the same actor within the map() closure ((actorRef ? stock).mapTo[DonutStockRequest]), you will notice that the second request gets queued and will also be processed by the same thread for each of the corresponding actor.
Step 6: Create 2 requests
[INFO] [10/24/2018 16:34:16.652] [DonutStoreActorSystem-pinned-thread-pool-7] [akka://DonutStoreActorSystem/user/$b] CHECKING: donut stock for name = vanilla donut, clientId = 2
[INFO] [10/24/2018 16:34:16.652] [DonutStoreActorSystem-pinned-thread-pool-6] [akka://DonutStoreActorSystem/user/$a] CHECKING: donut stock for name = vanilla donut, clientId = 1
[INFO] [10/24/2018 16:34:21.657] [DonutStoreActorSystem-pinned-thread-pool-6] [akka://DonutStoreActorSystem/user/$a] FINISHED: donut stock for name = vanilla donut, clientId = 1
[INFO] [10/24/2018 16:34:21.657] [DonutStoreActorSystem-pinned-thread-pool-7] [akka://DonutStoreActorSystem/user/$b] FINISHED: donut stock for name = vanilla donut, clientId = 2
[INFO] [10/24/2018 16:34:21.658] [DonutStoreActorSystem-pinned-thread-pool-6] [akka://DonutStoreActorSystem/user/$a] CHECKING: donut stock for name = vanilla donut, clientId = 1
[INFO] [10/24/2018 16:34:21.658] [DonutStoreActorSystem-pinned-thread-pool-7] [akka://DonutStoreActorSystem/user/$b] CHECKING: donut stock for name = vanilla donut, clientId = 2
[INFO] [10/24/2018 16:34:26.660] [DonutStoreActorSystem-pinned-thread-pool-6] [akka://DonutStoreActorSystem/user/$a] FINISHED: donut stock for name = vanilla donut, clientId = 1
[INFO] [10/24/2018 16:34:26.660] [DonutStoreActorSystem-pinned-thread-pool-7] [akka://DonutStoreActorSystem/user/$b] FINISHED: donut stock for name = vanilla donut, clientId = 2
Actor FSM become()
So far, our Akka actors have allowed us to process and Pattern Match on various events within the def receive() handler. Akka further refines the receive() method by providing Finite State Machine semantics (short for FSM).
The FSM above from Wikipedia shows two states namely opened and closed. Each state reacts to events such as open and close. These events naturally translate to transitions between the states, where each transition also triggers certain actions such as close door or open door. The Akka documentation simplifies the relationships among states, events and actions as follows:
State(S) x Event(E) -> Actions (A), State(S’)
We start off with a small Akka FSM example which makes use of the become() method. We will evolve this example in the next code snippets to showcase additional refinements in representing Finite State Machines using Akka Actors.
Without any surprises, our very first step is to create an ActorSystem.
println("Step 1: Create ActorSystem")
val system = ActorSystem("ActorStateBecome")
In Step 2, we create a DonutBakingActor by mixing in the usual Actor and ActorLogging traits. In the receive() method, we'll Pattern Match on a BakeDonut event. For now, we'll use the become() method to implicitly simulate a BakeDonut state as well. Within this BakeDonut state, the only allowed events will be BakeVanilla and BakePlain. Note that you need to use import context._ in order to bring the become() method into scope.
println("\nStep 2: Define DonutBakingActor with become() states")
class DonutBakingActor extends Actor with ActorLogging {
import context._
def receive = {
case "BakeDonut" =>
log.info("Becoming BakeDonut state")
become {
case "BakeVanilla" =>
log.info("baking vanilla")
case "BakePlain" =>
log.info("baking plain")
case event @ _ =>
log.info(s"Allowed events [BakeVanilla, BakePlain]], event = $event")
}
case event @ _ =>
log.info(s"Allowed events [BakeDonut], events = $event")
}
}
In Step 3, we get a reference to the DonutBakingActor using the familiar system.actorOf() method. We then send some events to the actor using Akka's Tell Pattern, and observe the various transitions and state changes below.
When we send an invalid event named "boom", we notice in IntelliJ's console that the event is invalid for our finite state machine.
bakingActor ! "boom" // not valid
Thread.sleep(2000)
Step 4: Send events to actor to switch states and process events
[INFO] [10/30/2018 16:09:56.528] [ActorStateBecome-akka.actor.default-dispatcher-3] [akka://ActorStateBecome/user/donut-baking-actor] Allowed events [BakeDonut], events = boom
Next, we send a "BakeDonut" event and notice that DonutBakingActor transitions to the BakeDonut state.
[INFO] [10/30/2018 16:11:36.030] [ActorStateBecome-akka.actor.default-dispatcher-3] [akka://ActorStateBecome/user/donut-baking-actor] Becoming BakeDonut state
Once in the BakeDonut state, sending "BakePlain" or "BakeVanilla" events are perfectly valid, and the DonutBakingActor reacts accordingly.
[INFO] [10/30/2018 16:12:35.097] [ActorStateBecome-akka.actor.default-dispatcher-4] [akka://ActorStateBecome/user/donut-baking-actor] baking plain
[INFO] [10/30/2018 16:13:43.367] [ActorStateBecome-akka.actor.default-dispatcher-2] [akka://ActorStateBecome/user/donut-baking-actor] baking vanilla
If, instead, we send a "BakeCholocate" event while the DonutBakingActor is in the implicit BakeDonut state, we'll see our wildcard pattern match captures the unallowed event.
[INFO] [10/30/2018 16:13:45.434] [ActorStateBecome-akka.actor.default-dispatcher-2] [akka://ActorStateBecome/user/donut-baking-actor] Allowed events [BakeVanilla, BakePlain], event = Bake Chocolate
Actor FSM unbecome()
In the previous section, we introduced Akka's Finite State Machine feature by lifting an actor's def receive() handler using the become() method. While the become() method simulates entering a certain state, the unbecome() method allows the actor to exit that state. We'll extend the previous Finite State Machine example and show how to make use of the unbecome() method.
We start by the usual creation of an ActorSystem.
println("Step 1. Create ActorSystem")
val system = ActorSystem("ActorState")
We reuse the DonutBakingActor which will switch to an implicit BakeDonut state when it receives a BakeDonut event. While in the BakeDonut state, the only allowed events will be BakeVanilla, BakePlain and StopBaking events. Note that the StopBaking event is a new event which we introduce in this example. The processing of the StopBaking event will trigger the unbecome() method to signal the actor to switch back to its previous state.
println("\nStep 2: Define DonutBakingActor with become() and unbecome() states")
class DonutBakingActor extends Actor with ActorLogging {
import context._
def receive = {
case "BakeDonut" =>
log.info("becoming bake state")
become {
case "BakeVanilla" =>
log.info("baking vanilla")
case "BakePlain" =>
log.info("baking plain")
case "StopBaking" =>
log.info("stopping to bake")
unbecome()
case event @ _ =>
log.info(s"Allowed events [BakeVanilla, BakePlain, StopBaking], event = $event")
}
case event @ _ =>
log.info(s"Allowed events [BakeDonut], event = $event")
}
}
We then create an instance of the DonutBakingActor and send events to the actor using Akka's Tell Pattern. We start off by sending an invalid event named "boom", and naturally see that the only allowed initial event is a BakeDonut event which will trigger the initial BakeDonut state.
println("\nStep 3: Create DonutBakingActor")
val bakingActor = system.actorOf(Props[DonutBakingActor], "donut-baking-actor")
bakingActor ! "boom" // not valid
Thread.sleep(2000)
Step 3: Create DonutBakingActor
[INFO] [11/02/2018 22:08:48.281] [ActorState-akka.actor.default-dispatcher-2] [akka://ActorState/user/donut-baking-actor] Allowed events [BakeDonut], event = boom
We send a "BakeDonut" event to trigger the become() method and swtich the actor into the BakeDonut state. Within this state, we send a "BakePlain", a "BakeVanilla" and a "Bake Chocolate" event. Within the BakeDonut state, the "BakePlain" and "BakeVanilla" events are valid, while the "BakeChocolate" event is not.
println("\nStep 4: Send events to actor to switch states and process events")
bakingActor ! "BakeDonut"
Thread.sleep(2000)
bakingActor ! "BakePlain"
Thread.sleep(2000)
bakingActor ! "BakeVanilla"
Thread.sleep(2000)
bakingActor ! "Bake Chocolate"
Thread.sleep(2000)
Step 4: Send events to actor to switch states and process events
[INFO] [11/02/2018 22:08:50.689] [ActorState-akka.actor.default-dispatcher-4] [akka://ActorState/user/donut-baking-actor] becoming bake state
[INFO] [11/02/2018 22:08:57.812] [ActorState-akka.actor.default-dispatcher-4] [akka://ActorState/user/donut-baking-actor] baking plain
[INFO] [11/02/2018 22:08:59.813] [ActorState-akka.actor.default-dispatcher-4] [akka://ActorState/user/donut-baking-actor] baking vanilla
[INFO] [11/02/2018 22:09:01.816] [ActorState-akka.actor.default-dispatcher-4] [akka://ActorState/user/donut-baking-actor] Allowed events [BakeVanilla, BakePlain, StopBaking], event = Bake Chocolate
Sending a "StopBaking" event triggers the unbecome() method, and switches the actor back to its initial state. At which point, sending a "BakeVanilla" event will no longer be valid as the actor is no longer in the "BakeDonut" state.
bakingActor ! "StopBaking"
Thread.sleep(2000)
bakingActor ! "BakeVanilla"
Thread.sleep(2000)
[INFO] [11/02/2018 22:09:03.818] [ActorState-akka.actor.default-dispatcher-4] [akka://ActorState/user/donut-baking-actor] stopping to bake
[INFO] [11/02/2018 22:09:48.535] [ActorState-akka.actor.default-dispatcher-4] [akka://ActorState/user/donut-baking-actor] Allowed events [BakeDonut], event = BakeVanilla
Finally, we close our ActorSystem using the familiar system.terminate() method.
println("\nStep 5: Shutdown actor system")
system.terminate()
Actor FSM Protocol
We continue to improve on the previous Akka Actor become and unbecome examples, and refine the message passing events via an explicit protocol. In addition, we further split the receive function into more meaningful functions which could otherwise end up into a spaghetti code!
As usual, we start by creating our ActorSystem.
println("Step 1. Create ActorSystem")
val system = ActorSystem("ActorState")
Next, we define an explicit message passing protocol by extending from a base sealed trait.
println("\nStep 2: Define message passing protocol using sealed trait")
sealed trait DonutProtocol
case object BakeDonut extends DonutProtocol
case object BakeVanilla extends DonutProtocol
case object BakePlain extends DonutProtocol
case object StopBaking extends DonutProtocol
We rewrite the DonutBakingActor's receive method by chaining the call to a new startBaking function. Upon receipt of a BakeDonut event, the actor will change state by making use of the become method, which in turn delegates the call to a new bake method. Then bake method listens to the BakeVanilla, BakePlain and StopBaking events. The unbecome method is triggered if a StopBaking event is received, and the actor will switch back to its initial state.
println("\nStep 3: Define DonutBakingActor with become() and unbecome() event")
class DonutBakingActor extends Actor {
import context._
def startBaking: Receive = {
case BakeDonut =>
println("becoming bake state")
become(bake)
case event @ _ =>
println(s"Allowed event [$BakeDonut], event = $event")
}
def bake: Receive = {
case BakeVanilla =>
println("baking vanilla")
case BakePlain =>
println("baking plain")
case StopBaking =>
println("stopping to bake")
unbecome()
case event @ _ =>
println(s"Allowed event [$BakeVanilla, $BakePlain, $StopBaking], event = $event")
}
def receive = startBaking
}
To see the actor's state changes and event processing, we create an instance using system.actorOf.
println("\nStep 4: Create DonutBakingActor")
val bakingActor = system.actorOf(Props[DonutBakingActor], "donut-baking-actor")
bakingActor ! "boom" // not valid
Thread.sleep(2000)
Sending a "boom" event is obviously not valid, and we see the message below:
Allowed event [BakeDonut], event = boom
Next, we send a BakeDonut event which is a subtype of our new DonutProtocol trait. This naturally triggers the bake state as captured by our new bake function closure.
println("\nStep 5: Send events to actor to switch states and process events")
bakingActor ! BakeDonut
Thread.sleep(2000)
Step 5: Send events to actor to switch states and process events
becoming bake state
Once in the bake state, the actor accepts the valid BakeDonut and BakeVanilla events. The actor also rejects a "Bake Chocolote" event.
bakingActor ! BakePlain
Thread.sleep(2000)
bakingActor ! BakeVanilla
Thread.sleep(2000)
bakingActor ! "Bake Chocolate"
Thread.sleep(2000)
baking plain
baking vanilla
Allowed event [BakeVanilla, BakePlain, StopBaking], event = Bake Chocolate
After receiving the StopBaking event, the unbecome method is triggered, and the actor switches back to its original state. As a result, it no longer accepts the BakeVanilla event.
bakingActor ! StopBaking
Thread.sleep(2000)
bakingActor ! BakeVanilla
Thread.sleep(2000)
stopping to bake
Allowed event [BakeDonut], event = BakeVanilla
Similar to our previous examples, we close the actor system using the system.terminate() method.
println("\nStep 6: Shutdown actor system")
system.terminate()
Actor FSM using LoggingFSM
Using Akka's become() and unbecome() methods, you can lift Akka actors into having Finite State Machine semantics. Yet, Akka provides further refinements around closures for states, events and internal data via the LoggingFSM trait.
The added benefits and abstractions gained when using the LoggingFSM trait (which we will cover in this tutorial and refine in upcoming ones), present a strong case for using Akka Actors in complex Finite State Machines systems. These could range from Network Gaming Systems to a cluster of actors as part of a Machine Learning pipeline, for say application of Reinforcement Learning model. If you are new to Machine Learning, that's OK as we'll cover a wide range of topics under our Data Science section. For now, you can think of Reinforcement models as emulating real world systems by encapsulating their complex semantics into a black-box (this could be an Akka ActorSystem). Interactions with the black-box happens via events, which trigger various state changes, as well as capturing mutable data - sounds very familiar to an Akka FSM system!
We start by creating our usual ActorSystem.
println("Step 1: Create ActorSystem")
val system = ActorSystem("DonutActorFSM")
We will split the protocol from the previous example into explicit events, states and mutable data.
println("\nStep 2: Defining events")
sealed trait BakingEvents
final case object BakeDonut extends BakingEvents
final case object AddTopping extends BakingEvents
println("\nStep 3: Defining states")
sealed trait BakingStates
case object Start extends BakingStates
case object Stop extends BakingStates
println("\nStep 4: Defining mutatable data")
case class BakingData(qty: Int) {
def addOneDonut = copy(qty + 1)
}
object BakingData {
def initialQuantity = BakingData(0)
}
Instead of extending the Actor trait as we've done thus far in the previous examples, we'll extend a LoggingFSM trait. This new trait expects two type parameters, namely one for states, and the other for mutable data. We then kick off the Actor's initial state and data by calling the startWith(...) method. Rather than making use of the become() and unbecome() methods, the LoggingFSM trait provides a more refined when() closure. More precisely, when(state)(partial function) is a Curried Function, where the first parameter group is for the state, and the second parameter expects a Partial Function to react to the various events for the given state. Don't forget to call the initialize() method which will setup the FSM actor into the current state and data as defined by startWith(...).
println("\nStep 5: Define DonutBakingActor using LoggingFSM trait")
class DonutBakingActor extends LoggingFSM[BakingStates, BakingData] {
startWith(Stop, BakingData.initialQuantity)
when(Stop) {
case Event(BakeDonut, _) =>
println("Current state is [Stop], switching to [Start]")
goto(Start).using(stateData.addOneDonut)
case Event(AddTopping, _) =>
println("Current state is [Stop], you first need to move to [BakeDonut]")
stay
}
when(Start) {
case _ => throw new IllegalStateException("stop")
}
initialize()
}
We create an instance of DonutBakingActor by calling the usual system.actorOf(...) method.
println("\nStep 6: Create DonutBakingActor")
val bakingActor = system.actorOf(Props[DonutBakingActor], "donut-baking-actor")
Sending an AddTopping event using Akka's Tell Pattern will display an error in Intellij's console because the current state is Stop.
println("\nStep 7: Send events to actor to switch states and process events")
bakingActor ! AddTopping
Thread.sleep(2000)
You should see the following output when you run your Scala application in IntelliJ:
[ERROR] [11/08/2018 16:34:49.111] [DonutActorFSM-akka.actor.default-dispatcher-3] [akka://DonutActorFSM/user/donut-baking-actor] Next state Stop does not exist
Current state is [Stop], you first need to move to [BakeDonut]
case Event(AddTopping, _) =>
println("Current state is [Stop], you first need to move to [BakeDonut]")
stay
Sending a BakeDonut event while the current state is Stop is valid, and will switch the actor to the Start state.
bakingActor ! BakeDonut
Thread.sleep(2000)
You should see the following output when you run your Scala application in IntelliJ:
Current state is [Stop], switching to [Start]
case Event(BakeDonut, _) =>
println("Current state is [Stop], switching to [Start]")
goto(Start).using(stateData.addOneDonut)
Finally, we close the actor system using the system.terminate() method.
println("\nStep 8: Shutdown actor system")
system.terminate()
Actor FSM LoggingFSM Part Two
In the previous example, we showed the enhanced semantics provided when using Akka's LoggingFSM for designing Finite State Machines. LoggingFSM provides much better closures around events, states, and mutable data. We extend the previous state machine, and implement the events to be handled while in the start state and, as a result, we add another event named StopBaking.
We create an ActorSystem named DonutActorFSM.
println("Step 1: Create ActorSystem")
val system = ActorSystem("DonutActorFSM")
The events, states and mutable data have clear separations as mandated by the LoggingFSM trait. We add a new StopBaking event to the generalized BakingEvents.
println("\nStep 2: Defining events")
sealed trait BakingEvents
case object BakeDonut extends BakingEvents
case object AddTopping extends BakingEvents
case object StopBaking extends BakingEvents
println("\nStep 3: Defining states")
sealed trait BakingStates
case object Start extends BakingStates
case object Stop extends BakingStates
println("\nStep 4: Defining mutatable data")
case class BakingData(qty: Int) {
def addOneDonut = copy(qty + 1)
}
object BakingData {
def initialQuantity = BakingData(0)
}
Similar to the previous code snippet, we are no longer extending the Actor trait. As an alternative, we extend the LoggingFSM trait, and pass-through its required type parameters namely BakingStates and BakingData. We implement the when(start)(partial function) Curried Function and, on receipt of a StopBaking event, we'll switch back to the Stop state using goto(Stop). Don't forget to call the initialize() method so that the initial state and mutable data are correct as defined by the startWith(...) method.
println("\nStep 5: Define DonutBakingActor using LoggingFSM trait")
class DonutBakingActor extends LoggingFSM[BakingStates, BakingData] {
startWith(Stop, BakingData.initialQuantity)
when(Stop) {
case Event(BakeDonut, _) =>
println("Current state is [Stop], switching to [Start]")
goto(Start).using(stateData.addOneDonut)
case Event(AddTopping, _) =>
println("Current state is [Stop], you first need to move to [BakeDonut]")
stay
}
when(Start) {
case Event(StopBaking, _) =>
println(s"Event StopBaking, current donut quantity = ${stateData.qty}")
goto(Stop)
}
initialize()
}
We instantiate a DonutBakingActory using system.actorOf(...) method.
println("\nStep 6: Create DonutBakingActor")
val bakingActor = system.actorOf(Props[DonutBakingActor], "donut-baking-actor")
As per the startWith(...) method, the initial state is Stop. We can test this by sending an AddTopping event which will trigger an error.
println("\nStep 7: Send events to actor to switch states and process events")
bakingActor ! AddTopping
Thread.sleep(2000)
You should see the following output when you run your Scala application in IntelliJ:
Step 7: Send events to actor to switch states and process events
[ERROR] [11/13/2018 16:17:15.825] [DonutActorFSM-akka.actor.default-dispatcher-3] [akka://DonutActorFSM/user/donut-baking-actor] Next state Stop does not exist
Current state is [Stop], you first need to move to [BakeDonut]
Sending a BakeDonut event using Akka's Tell Pattern will flip the bakingActor to its Start state.
bakingActor ! BakeDonut
Thread.sleep(2000)
You should see the following output when you run your Scala application in IntelliJ:
Current state is [Stop], switching to [Start]
Sending a StopBaking event will flip back the bakingActor to its Stop state as defined by the new Partial Function with when(Start).
bakingActor ! StopBaking
Thread.sleep(2000)
You should see the following output when you run your Scala application in IntelliJ:
Event StopBaking, current donut quantity = 1
To prove that we are in the Stop state, let's send an AddTopping event. We indeed see the warning from being in the Stop state: Current state is [Stop], you first need to move to [BakeDonut]
bakingActor ! AddTopping
Thread.sleep(2000)
As always, we call system.terminate() to close our actor system.
println("\nStep 8: Shutdown actor system")
system.terminate()
Actor LoggingFSM Part Three
When it comes to Finite State Machines, there are basically two types - deterministic versus non-deterministic. As its name imply, in a deterministic finite state machine, each event leads to a known transition and state change. Conversely, non-deterministic finite state machines may react to a particular event which triggers one or multiple state(s) and transition(s) and, as a result, lead to various arbitrary paths. Regardless of the either types of automata, you would almost certainly want to capture or log events that cannot be consumed or processed. Akka LoggingFSM provides a nice closure over unhandled event using whenUnhandled.
We create an ActorSystem named DonutActorFSM.
println("Step 1: Create ActorSystem")
val system = ActorSystem("DonutActorFSM")
We reuse the events, states and mutable data definitions from the previous example.
println("\nStep 2: Defining events")
sealed trait BakingEvents
case object BakeDonut extends BakingEvents
case object AddTopping extends BakingEvents
case object StopBaking extends BakingEvents
println("\nStep 3: Defining states")
sealed trait BakingStates
case object Start extends BakingStates
case object Stop extends BakingStates
println("\nStep 4: Defining mutatable data")
case class BakingData(qty: Int) {
def addOneDonut = copy(qty + 1)
}
object BakingData {
def initialQuantity = BakingData(0)
}
We create DonutBakingActor by extending the LoggingFSM trait, and filling in its required states and mutable data type parameters. By doing so, we inherit the whenUnhandled hook, and provide a Partial Function to log events that cannot be consumed. For the purpose of this example, we'll also flip back to the Stop state for unhandled events.
println("\nStep 5: Define DonutBakingActor using LoggingFSM trait")
class DonutBakingActor extends LoggingFSM[BakingStates, BakingData] {
startWith(Stop, BakingData.initialQuantity)
when(Stop) {
case Event(BakeDonut, _) =>
println("Current state is [Stop], switching to [Start]")
goto(Start).using(stateData.addOneDonut)
case Event(AddTopping, _) =>
println("Current state is [Stop], you first need to move to [BakeDonut]")
stay
}
when(Start) {
case Event(StopBaking, _) =>
println(s"Event StopBaking, current donut quantity = ${stateData.qty}")
goto(Stop)
}
whenUnhandled {
case Event(event, stateData) =>
println(s"We've received an unhandled event = [$event] for the state data = [$stateData]")
goto(Stop)
}
initialize()
}
We create the DonutBakingActor by calling system.actorOf() method.
println("\nStep 6: Create DonutBakingActor")
val bakingActor = system.actorOf(Props[DonutBakingActor], "donut-baking-actor")
Recall from the previous code examples that the initial state of our finite state automata is the Stop state. Sending a BakeDonut event using Akka's Tell Pattern will transition DonutBakingActor into the Start state.
bakingActor ! BakeDonut
Thread.sleep(2000)
You should see the following output when you run your Scala application in IntelliJ:
Step 7: Send events to actor to switch states and process events
Current state is [Stop], switching to [Start]
How would our actor react if we were to send some random event which is not part of its deterministic path? Since we've added the whenUnhandled hook, we should in fact expect our automata to flip back to the Stop state.
bakingActor ! "some random event"
Thread.sleep(2000)
You should see the following output when you run your Scala application in IntelliJ:
We've received an unhandled event = [some random event] for the state data = [BakingData(1)]
As always, we call system.terminate() to shutdown our actor system.
println("\nStep 8: Shutdown actor system")
system.terminate()
Actor LoggingFSM Part Four
Thus far, we've used Akka's LoggingFSM trait to provide better closures around events, states and mutable data within our Finite State Machines. Consequently, we've implemented when(Stop) and when(Start), as well as capturing events that cannot be processed through the whenUnhandled() partial function.
Let's face it, though, finite state machines can be pretty boring! But, understanding a particular transition can be equivalent to a signal or a hint as part of a Machine Learning model. These signals are actually quite critical to capture especially when you are designing a large scale model-to-model machine learning interaction. For instance, an e-commerce website would want to get insights as to why certain customers are bouncing off from their shopping cart basket? What if we could represent navigating within the e-commerce website as a Finite State Machine, and feed the transitions of user interactions to a machine learning model? Fortunately, with Akka's LoggingFSM trait, our actor also inherits an onTransition handler. To continue with our DonutBakingActor's Finite State Machine, we'll implement the onTransition closure, and log an event each time the actor switches from Stop to Start or Start to Stop states.
As always, we start off by creating an ActorSystem.
println("Step 1: Create ActorSystem")
val system = ActorSystem("DonutActorFSM")
The events, states and mutable data remain unchanged from the previous example.
println("\nStep 2: Defining events")
sealed trait BakingEvents
case object BakeDonut extends BakingEvents
case object AddTopping extends BakingEvents
case object StopBaking extends BakingEvents
println("\nStep 3: Defining states")
sealed trait BakingStates
case object Start extends BakingStates
case object Stop extends BakingStates
println("\nStep 4: Defining mutatable data")
case class BakingData(qty: Int) {
def addOneDonut = copy(qty + 1)
}
object BakingData {
def initialQuantity = BakingData(0)
}
Similar to before, we extend the LoggingFSM trait and pass-through its required states and mutable data type parameters: class DonutBakingActor extends LoggingFSM[BakingStates, BakingData]. We implement the onTransition closure, and will log an event when the actor switches from case Stop -> Start or case Start -> Stop.
println("\nStep 5: Define DonutBakingActor using LoggingFSM trait")
class DonutBakingActor extends LoggingFSM[BakingStates, BakingData] {
startWith(Stop, BakingData.initialQuantity)
when(Stop) {
case Event(BakeDonut, _) =>
println("Current state is [Stop], switching to [Start]")
goto(Start).using(stateData.addOneDonut)
case Event(AddTopping, _) =>
println("Current state is [Stop], you first need to move to [BakeDonut]")
stay
}
when(Start) {
case Event(StopBaking, _) =>
println(s"Event StopBaking, current donut quantity = ${stateData.qty}")
goto(Stop)
}
whenUnhandled {
case Event(event, stateData) =>
println(s"We've received an unhandled event = [$event] for the state data = [$stateData]")
goto(Stop)
}
onTransition {
case Stop -> Start => println("Switching state from [Stop -> Start]")
case Start -> Stop => println("Switching state from [Start -> Stop]")
}
initialize()
}
We create an instance of DonutBakingaActor by calling system.actorOf(...).
println("\nStep 6: Create DonutBakingActor")
val bakingActor = system.actorOf(Props[DonutBakingActor], "donut-baking-actor")
Sending a BakeDonut event will naturally switch our actor to the Start state. Thanks to our new onTransition closure, we'll log an event as our actor transitioned from case Stop -> Start.
println("\nStep 7: Send events to actor to switch states and process events")
bakingActor ! BakeDonut
Thread.sleep(2000)
You should see the following output when you run your Scala application in IntelliJ:
Step 7: Send events to actor to switch states and process events
Current state is [Stop], switching to [Start]
Switching state from [Stop -> Start]
Likewise, sending a StopBaking event using Akka's Tell Pattern will flip back our actor to its initial Stop state. Once again, from the onTransition closure, we'll log an event as our actor transitioned from case Start -> Stop.
bakingActor ! StopBaking
Thread.sleep(2000)
You should see the following output when you run your Scala application in IntelliJ:
Event StopBaking, current donut quantity = 1
Switching state from [Start -> Stop]
Finally, we call system.terminate() to close our ActorSystem.
println("\nStep 8: Shutdown actor system")
system.terminate()
Actor LoggingFSM Part Five
With Finite State Machines, a certain actor is expected to transition from a given state to another one. Without question, this process will repeat itself along various paths of the automata. What would happen, however, if our finite state machine is stuck or deadlocked on a particular state? Lack of logging or diagnostic tooling, such a situation would be very painful to detect let alone to resolve. Luckily, with Akka's LoggingFSM trait, you can add a specific timeout by which an actor should react to a certain event.
We instantiate an ActorSystem named DonutActorFSM.
println("Step 1: Create ActorSystem")
val system = ActorSystem("DonutActorFSM")
For the purpose of illustrating handling of timeouts, our previous event, states and mutable data remain unchanged.
println("\nStep 2: Defining events")
sealed trait BakingEvents
case object BakeDonut extends BakingEvents
case object AddTopping extends BakingEvents
case object StopBaking extends BakingEvents
println("\nStep 3: Defining states")
sealed trait BakingStates
case object Start extends BakingStates
case object Stop extends BakingStates
println("\nStep 4: Defining mutatable data")
case class BakingData(qty: Int) {
def addOneDonut = copy(qty + 1)
}
object BakingData {
def initialQuantity = BakingData(0)
}
With regards to DonutBakingActor, we start off by extending Akka's LoggingFSM trait, and pass-through the states and mutable data parameter types. We expand the previous when(Start) handler, and add an explicit timeout of 5 seconds when(Start, stateTimeout = 5 second). As a result, that would mean that event processing while the finite state machine is in the Start state, should be accomplished within 5 seconds. Lack of which, a StateTimeout event will be automatically triggered. For such a case, we are also allowed to provide a case Event(StateTimeout, _) within the when(Start, stateTimeout = 5 second) closure. Note that we also need to import scala.concurrent.duration._ in order to inherit the timeout features.
println("\nStep 5: Define DonutBakingActor using LoggingFSM trait")
class DonutBakingActor extends LoggingFSM[BakingStates, BakingData] {
startWith(Stop, BakingData.initialQuantity)
when(Stop) {
case Event(BakeDonut, _) =>
println("Current state is [Stop], switching to [Start]")
goto(Start).using(stateData.addOneDonut)
case Event(AddTopping, _) =>
println("Current state is [Stop], you first need to move to [BakeDonut]")
stay
}
import scala.concurrent.duration._
when(Start, stateTimeout = 5 second) {
case Event(StopBaking, _) =>
println(s"Event StopBaking, current donut quantity = ${stateData.qty}")
goto(Stop)
case Event(StateTimeout, _) =>
println("Timing out state = [Start], switching to state = [Stop]")
goto(Stop)
}
whenUnhandled {
case Event(event, stateData) =>
println(s"We've received an unhandled event = [$event] for the state data = [$stateData]")
goto(Stop)
}
onTransition {
case Stop -> Start => println("Switching state from [Stop -> Start]")
case Start -> Stop => println("Switching state from [Start -> Stop]")
}
initialize()
}
We make use of the system.actoOf() method to create an instance of DonutBakingActor.
println("\nStep 6: Create DonutBakingActor")
val bakingActor = system.actorOf(Props[DonutBakingActor], "donut-baking-actor")
We send a BakeDonut event to the DonutBakingActor, but follow through by increasing the Thread.sleep() to 10 seconds. Given our timeout of 5 seconds, the actor enters the case Event(StateTimeout, _) closure, and we see the output Timing out state = [Start], switching to state = [Stop]. Notice that as part of the timeout, we've also instructed the actor to switch back to the Stop state - goto(Stop).
println("\nStep 7: Send events to actor to switch states and process events")
bakingActor ! BakeDonut
Thread.sleep(10000)
You should see the following output when you run your Scala application in IntelliJ:
Step 7: Send events to actor to switch states and process events
Current state is [Stop], switching to [Start]
Switching state from [Stop -> Start]
Timing out state = [Start], switching to state = [Stop]
Switching state from [Start -> Stop]
To prove that our actor did in fact flip to its original Stop state, sending an AddTopping event should output Current state is [Stop], you first need to move to [BakeDonut]
You should see the following output when you run your Scala application in IntelliJ:
bakingActor ! AddTopping
Thread.sleep(2000)
As always, we close our actor system by calling the system.terminate() method.
println("\nStep 8: Shutdown actor system")
system.terminate()
Actor LoggingFSM Part Six
One of the main reasons behind using Actors and Actor Systems is without question the "Let it crash" philosophy of Erlang. Akka obviously follows the "Let it crash" approach, as we have seen through our previous tutorials on using Akka Actors. It would have been unfair to deviate from such a core principle with respect to Akka's Finite State Machines. Rest assured, though, that Akka's LoggingFSM trait provides a very convenient onTermination handler to allow you to understand the reason behind failures that may occur within your automata.
As per our previous tutorials, we start by creating an Akka ActorSystem named DonutActorFSM.
println("Step 1: Create ActorSystem")
val system = ActorSystem("DonutActorFSM")
Our events, states and mutable data definitions remain unchanged for this tutorial.
println("\nStep 2: Defining events")
sealed trait BakingEvents
case object BakeDonut extends BakingEvents
case object AddTopping extends BakingEvents
case object StopBaking extends BakingEvents
println("\nStep 3: Defining states")
sealed trait BakingStates
case object Start extends BakingStates
case object Stop extends BakingStates
println("\nStep 4: Defining mutatable data")
case class BakingData(qty: Int) {
def addOneDonut = copy(qty + 1)
}
object BakingData {
def initialQuantity = BakingData(0)
}
We enhance our DonutBakingActor, which still extends Akka's LoggingFSM trait, but we now also provide the necessary closure for the onTermination handler. During a crash of our automata, Akka will capture the relevant StopEvent, namely, Normal, Shutdown or Failure. In addition, the StopEvent allows us to also inspect the current state and mutable data for our automata at the time of the crash. Note that all these stop events are represented under akka.actor.FSM, which requires us to add the import statement import akka.actor.FSM.
println("\nStep 5: Define DonutBakingActor using LoggingFSM trait")
class DonutBakingActor extends LoggingFSM[BakingStates, BakingData] {
startWith(Stop, BakingData.initialQuantity)
when(Stop) {
case Event(BakeDonut, _) =>
println("Current state is [Stop], switching to [Start]")
goto(Start).using(stateData.addOneDonut)
case Event(AddTopping, _) =>
println("Current state is [Stop], you first need to move to [BakeDonut]")
stay
}
import scala.concurrent.duration._
when(Start, stateTimeout = 5 second) {
case Event(StopBaking, _) =>
println(s"Event StopBaking, current donut quantity = ${stateData.qty}")
goto(Stop)
case Event(StateTimeout, _) =>
println("Timing out state = [Start], switching to state = [Stop]")
goto(Stop)
}
whenUnhandled {
case Event(event, stateData) =>
println(s"We've received an unhandled event = [$event] for the state data = [$stateData]")
goto(Stop)
}
onTransition {
case Stop -> Start => println("Switching state from [Stop -> Start]")
case Start -> Stop => println("Switching state from [Start -> Stop]")
}
import akka.actor.FSM._
onTermination {
case StopEvent(Normal, state, data) =>
log.info(s"Actor onTermination, event = Normal, state = $state, data = $data")
case StopEvent(Shutdown, state, data) =>
log.info(s"Actor onTermination, event = Shutdown, state = $state, data = $data")
case StopEvent(Failure(cause), state, data) =>
log.error(s"Actor onTermination, event = Failure, cause = $cause, state = $state, data = $data")
}
initialize()
}
We create an instance of DonutBakingActor by calling the familiar system.actorOf() method.
println("\nStep 6: Create DonutBakingActor")
val bakingActor = system.actorOf(Props[DonutBakingActor], "donut-baking-actor")
For the purpose of illustration, we'll simulate an intentional crash by sending Akka's PoisonPill message, which will shutdown our ActorSystem. As a result of having added the onTermination handler in our actor, we are able to capture the reason for the shutdown of our Akka Finite State Machine.
println("\nStep 7: Send events to actor to switch states and process events")
bakingActor ! PoisonPill
Thread.sleep(2000)
You should see the following output when you run your Scala application in IntelliJ:
Step 7: Send events to actor to switch states and process events
[INFO] [11/27/2018 21:05:32.020] [DonutActorFSM-akka.actor.default-dispatcher-3] [akka://DonutActorFSM/user/donut-baking-actor] Actor onTermination, event = Shutdown, state = Stop, data = BakingData(0)
For completeness, we'll still call system.terminate() method to shutdown our ActorSystem.
println("\nStep 8: Shutdown actor system")
system.terminate()
Actor FSM Scheduler
I hope that the previous examples have presented a good case for using Akka when working with Finite State Machines. Whether it be deterministic or non-deterministic Finite State Machines, state machines have a very fundamental place in problem solving and, in general, within computing.
Regardless of being deterministic or not, Finite State Machines are most certainly bound to have some level of cycles, such as a Pumping Lemma automaton. Explaining a Pumping Lemma finite state machine falls outside our current discussion on Actor FSM Scheduler, but the important point relevant to our discussion is all about repeat steps within your automaton? Akka provides us with a scheduler, which you can then use to construct repeat steps for your particular automaton algorithm. It is worth noting that Akka's scheduler is not restricted to just Finite State Machines, but to underlying Akka Actors.
We start with defining the usual ActorSystem.
println("Step 1. Create ActorSystem")
val system = ActorSystem("ActorState")
We'll reuse one of the previous FSM Actor from the become() and unbecome() code snippet. Upon receipt of a "BakeDonut" event, the actor will switch to an assumed baking state, whereby it accepts "BakeVanilla", "BakePlain" or "StopBaking" events. For a "StopBaking" event, the unbecome() method will be called, to then place the actor back to its initial state.
println("\nStep 2: Define DonutBakingActor with become() and unbecome() states")
class DonutBakingActor extends Actor with ActorLogging {
import context._
def receive = {
case "BakeDonut" =>
log.info("becoming bake state")
become {
case "BakeVanilla" =>
log.info("baking vanilla")
case "BakePlain" =>
log.info("baking plain")
case "StopBaking" =>
log.info("stopping to bake")
unbecome()
case event @ _ =>
log.info(s"Allowed events [BakeVanilla, BakePlain, StopBaking], event = $event")
}
case event @ _ =>
log.info(s"Allowed events [BakeDonut], event = $event")
}
}
We go ahead and initialise an instance of DonutBakingActor using the system.actorOf() method.
println("\nStep 3: Create DonutBakingActor")
val bakingActor = system.actorOf(Props[DonutBakingActor], "donut-baking-actor")
We flip the FSM actor into its implicit baking state by sending a "BakeDonut" event using Akka's Tell Pattern.
println("\nStep 4: Send events to actor to switch states and process events")
bakingActor ! "BakeDonut"
Thread.sleep(2000)
You should see the following output when you run your Scala application in IntelliJ:
Step 4: Send events to actor to switch states and process events
[INFO] [11/29/2018 22:18:01.917] [ActorState-akka.actor.default-dispatcher-4] [akka://ActorState/user/donut-baking-actor] becoming bake state
We then make use of Akka's convenient system.scheduler.schedule() method to repeat sending "BakeVanilla" events every 2 seconds. Note that we also had to import system.dispatcher, and import scala.concurrent.duration._. For the purpose of illustration, we will block the main thread in order to see the scheduled events in IntelliJ's console. It certainly goes without saying that you should never block your main thread in a production system.
println("\nStep 5: Using system.scheduler to send periodic events to actor")
import system.dispatcher
import scala.concurrent.duration._
// initial delay of 1 seconds, and interval of 2
system.scheduler.schedule(1.seconds, 2.seconds) {
bakingActor ! "BakeVanilla"
Thread.sleep(1000)
}
StdIn.readLine()
You should see the following output when you run your Scala application in IntelliJ:
Step 5: Using system.scheduler to send periodic events to actor
[INFO] [11/29/2018 22:18:04.933] [ActorState-akka.actor.default-dispatcher-2] [akka://ActorState/user/donut-baking-actor] baking vanilla
[INFO] [11/29/2018 22:18:06.933] [ActorState-akka.actor.default-dispatcher-2] [akka://ActorState/user/donut-baking-actor] baking vanilla
[INFO] [11/29/2018 22:18:08.932] [ActorState-akka.actor.default-dispatcher-3] [akka://ActorState/user/donut-baking-actor] baking vanilla
[INFO] [11/29/2018 22:18:10.931] [ActorState-akka.actor.default-dispatcher-3] [akka://ActorState/user/donut-baking-actor] baking vanilla
[INFO] [11/29/2018 22:18:12.932] [ActorState-akka.actor.default-dispatcher-3] [akka://ActorState/user/donut-baking-actor] baking vanilla
Test Actor FSM
When writing any software, especially code that will be used in a production context, you should aim to be a responsible developer and provide a bare minimum of software testing such as Unit Tests. Most real world applications, though, will have further testing requirements from Integration Testing, Benchmark Testing to Behaviour Driven Testing (BDD). We are fortunate that the Akka team has spent a lot of effort in facilitating the tasks involved when testing Actor Systems. Thus, in this section, we will provide various examples of testing our Actor System.
To use the Akka features for testing, you will need to import Akka's akka-testkit in your build.sbt file: "com.typesafe.akka" %% "akka-testkit" % "2.5.12" % Test. For added convenience, the akka-testkit artifact interplays nicely with the popular ScalaTest library and, as as result, you will also need to import ScalaTest in your build.sbt file: "org.scalatest" %% "scalatest" % "3.0.1" % Test.
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.5.12",
"com.typesafe.akka" %% "akka-testkit" % "2.5.12" % Test,
"org.scalatest" %% "scalatest" % "3.0.1" % Test
)
We start off by showing how to test an Akka Finite State Machine (FSM) and, for the purpose of illustration, we will provide tests for the class DonutBakingActor extends LoggingFSM[BakingStates, BakingData] from the previous examples on Akka Finite State Machines.
We create a test class named DonutBakingActorFSMTests and it extends the convenient TestKit class from akka-testkit. Unsurprisingly, the TestKit class takes an ActorSystem as its constructor parameter: extends TestKit(ActorSystem("DonutActorFSM")). In addition, we mixin the ImplicitSender and DefaultTimeout traits, which are once again provided to us from the akka-testkit artifact. Since akka-testkit works well with ScalaTest, we also mixin the WordSpecLike, BeforeAndAfterAll and Matchers traits from the scalatest artifact.
class DonutBakingActorFSMTests
extends TestKit(ActorSystem("DonutActorFSM"))
with ImplicitSender
with DefaultTimeout
with WordSpecLike
with BeforeAndAfterAll
with Matchers {
Testing of an Akka Actor Finite State Machine is without question an inherently mutable process of verifying the various states, events, and mutable data of an Akka FSM Actor. For that reason, we will make use of a var donutBakingActorFSM ... , but note that we are making sure that a mutable var is obviously kept local by using the private keyword. As a reminder, we will be testing the DonutBakingActor from the previous Actor FSM tutorial, and the DonutBakingActor was also extending Akka's LoggingFSM trait.
private var donutBakingActorFSM: TestFSMRef[BakingStates, BakingData, DonutBakingActor] = _
We override the beforeAll method for our unit test, and initialize an instance of DonutBakingActor. It is good to point out that the instance of DonutBakingActor is passed-through to a top level closure via the TestFSMRef from the akka-testkit artifact.
Using this TestFSMRef type, we benefit from various features for testing Finite State Machines such as accessing the internal state for our FSM actor using the stateName method. The stateName is useful for testing the initial state for DonutBakingActor, which should equal to the Stop state.
"DonutBakingActor" should {
"have initial state of BakingStates.Stop" in {
donutBakingActorFSM.stateName shouldEqual Stop
}
}
Finally, we also override the afterAll method, during which we gracefully shutdown our actor system by calling: TestKit.shutdownActorSystem(system). For now, the DonutBakingActorFSMTests test class looks as follows, and we will provide additional examples on testing Actor FSM soon. Stay tuned!
class DonutBakingActorFSMTests
extends TestKit(ActorSystem("DonutActorFSM"))
with ImplicitSender
with DefaultTimeout
with WordSpecLike
with BeforeAndAfterAll
with Matchers {
private var donutBakingActorFSM: TestFSMRef[BakingStates, BakingData, DonutBakingActor] = _
override protected def beforeAll(): Unit = {
donutBakingActorFSM = TestFSMRef(new DonutBakingActor())
}
"DonutBakingActor" should {
"have initial state of BakingStates.Stop" in {
donutBakingActorFSM.stateName shouldEqual Stop
}
}
override protected def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
}
}
We continue with our series on using akka-testkit for testing Finite State Machines that were built using Akka's LoggingFSM trait. In this example, we'll show how you can test the internal state of your automaton by making use of the handy awaitCond() method from akka-testkit. What is more, you are not expected to modify in any way, the interaction with your state machine in order to test its internal state.
For the purpose of illustration, we'll send a BakeDonut event using Akka's Tell Pattern: donutBakingActorFSM ! BakeDonut. As a result, our DonutBakingActor should switch to the BakingStates.Start state. Thus, we can use the awaitCond() method to verify that the internal state did in fact change to the Start state: awaitCond(donutBakingActorFSM.stateName == Start, 2 second, 1 second).
As a reminder, we had initialised the DonutBakingActor in the beforeAll() handler at the beginning of our test. DonutBakingActor had, however, a top-level closure named TestFSMRef, which is provided to us by akka-testkit: donutBakingActorFSM = TestFSMRef(new DonutBakingActor()). It is through the TestFSMRef closure that we are able to access our automaton's internal state using the stateName method.
"DonutBakingActor" should {
"process BakeDonut event and switch to the BakingStates.Start state" in {
// send BakeDonut event using Tell Pattern
donutBakingActorFSM ! BakeDonut
// test the state using awaitCond
awaitCond(donutBakingActorFSM.stateName == Start, 2 second, 1 second)
}
}
We follow on from the previous example in using awaitCond() as a helper method for verifying the internal state of an automaton. When writing tests for Finite State Machines, the reification process for lineage becomes quite primordial. Lineage in state machines represents a series of steps (so to speak), during which the behaviour and output of one automaton passes through to the next automaton.
Thanks to the TestFSMRef closure from akka-testkit, each of our test cases can be stacked in such a way to mirror the particular steps within our automata. Hence, testing of lineage for Finite State Machines using akka-testkit is as simple as writing another test case!
To illustrate this idea, we create another test, which sends a StopBaking event using the familiar Akka Tell Pattern. This test relies on the previous StartBaking event to have been reified within our automata. Similar to the previous code snippet, we use the handy awaitCond(), and the stateName property from TestFSMRef, to verify that the state has subsequently been flipped to the BakingStates.Stop state.
"DonutBakingActor" should {
"process StopBaking event and switch to BakingStates.Stop state" in {
donutBakingActorFSM ! StopBaking
awaitCond(donutBakingActorFSM.stateName == Stop, 2 second, 1 second)
}
}
Now that we've covered how to test Lineage in an Akka Finite State Machine, we should perhaps show how to verify an FSM's mutable data. As a matter of fact, testing the mutable data is super easy, by virtue of the TestFSMRef closure. In addition to the stateName field, TestFSMRef exposes a stateData field. As its name implies, stateData represents the mutable data for the given FSM Actor.
With FSM Lineage still in mind, the events processed by our FSM Actor so far in the test harness should leave us with a qty of 1, as the current state of the mutable data. As a reminder, the type for the mutable data for our FSM Actor was characterized by the case class BakingData(qty: Int).
"DonutBakingActor current donut quantity" should {
"equal to 1 after the StopBaking event" in {
donutBakingActorFSM.stateData.qty shouldEqual 1
}
}
Testing Actor
Similar to testing Finite State Machine Actors using akka-testkit, we now show some examples on testing Akka Actors. In particular, we will focus on tests with regards to the common patterns and behaviours when using Actors in an Actor System. For instance, we will provide tests for Akka's Tell Pattern, Ask Pattern, and testing of a dummy let-it-crash behaviour.
Our build.sbt dependencies file for the artifacts of akka-testkit remain unchanged from the above testing of Actor FSM.
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.5.12",
"com.typesafe.akka" %% "akka-testkit" % "2.5.12" % Test,
"org.scalatest" %% "scalatest" % "3.0.1" % Test
)
For the purpose of illustration, we'll reuse the DonutInfoActor from the previous examples on Akka's Tell and Ask patterns. Moreover, we will add an IllegalStateException to showcase testing of expected exceptions in our Actor's behaviour, as a cut-down version of the let-it-crash protocol.
object DonutStoreProtocol {
case class Info(name: String)
}
class DonutInfoActor extends Actor with ActorLogging {
import DonutStoreProtocol._
def receive = {
case Info(name) if name == "vanilla" =>
log.info(s"Found valid $name donut")
sender ! true
case Info(name) =>
log.info(s"$name donut is not supported")
sender ! false
case event @ _ =>
log.info(s"Event $event is not allowed.")
throw new IllegalStateException(s"Event $event is not allowed")
}
}
The setting up of our test case is very similar to the testing of Actor FSM above. We extend the TestKit class and mixin the traits ImplicitSender and DefaultTimeout from akka-testkit. Thereafter, we mixin the traits WordSpecLike, BeforeAndAfterAll and Matchers from scalatest. To complete our general structure for testing Akka Actors, we implement the afterAll() method to gracefully close our Actor System by calling TestKit.shutdownActorSystem() method.
class DonutInfoActorTests
extends TestKit(ActorSystem("DonutActorTests"))
with ImplicitSender
with DefaultTimeout
with WordSpecLike
with BeforeAndAfterAll
with Matchers {
override protected def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
}
With the above basic structure in place, we are now ready to test some of the principal interactions with Akka Actors. For the first example, we show how you can easily test the reaction from a particular Actor following receipt of a Tell Pattern message passing protocol.
Similar to the TestFSMRef type provided by akka-testkit for interacting with an FSM Actor, akka-testkit supplies a TestActorRef as a top level closure for an Akka Actor. As a result, we create an instance of DonutInfoActor using the TestActorRef type: val testActor = TestActorRef[DonutInfoActor]. Passing through a message to this test actor using Akka's Tell Pattern is certainly no different to interfacing with any other Akka Actor. You notably have to make use of the bang operator: testActor ! Info("vanilla").
For a message of type Info("vanilla"), we know that DonutInfoActor should reply back with a Boolean message of true. How would we then test for a value of true being supplied by the test DonutInfoActor? All the heavy lifting is actually being handled for us thanks to a handy expectMsg() method from akka-testkit. Back to our example in testing Akka's Tell Pattern, we accordingly make use of the expectMsg() method as follows: expectMsg(true).
"Sending Tell Pattern Info(vanilla) message to DonutInfoActor" should {
"reply back with true" in {
val testActor = TestActorRef[DonutInfoActor]
testActor ! Info("vanilla")
expectMsg(true)
}
}
The previous code snippet clearly shows how you can easily test for responses from a given actor, where the message was sent using the Tell Pattern. If you recall earlier on with regards to Akka and the use of actors in general, actors are all about being proactive in the event of failures. For instance, a particular actor in an actor system might have a Service Level Agreement (SLA) to provide a response within a given time frame. Any response that happens after that time limit should be considered a failure. This is especially true when it comes to orchestration of actors.
Explaining synchronicity and coordination of actors will take more space that we have here, but the point relevant to our discussion is that a failure in response by an actor within a time limit should be testable! Thanks again to akka.testkit, you can wrap your test actor code inside a within block. In the example below, we are expecting the DonutInfoActor to reply back within 100 milliseconds, but have added a Thread.sleep(500). As a result, this particular test will fail.
import scala.concurrent.duration._
"DonutInfoActor" should {
"respond back within 100 millis" in {
within(100 millis) {
val testActor = TestActorRef[DonutInfoActor]
testActor ! Info("vanilla")
Thread.sleep(500)
expectMsg(true)
}
}
}
So far, we've seen how to write tests for Akka's Tell Pattern, as well as testing actor replies within a given time limit. With regards to message passing from actors, Akka provides the Ask Pattern, which should obviously be testable. Without any surprises, akka-testkit supports the testing of the Ask Pattern by making use of the TestActorRef type. The code below sends an Info("plain") message to the test actor of type TestActorRef and, using scalatest's matchers, we indeed verify that the expected response from the actor is a value of false.
"Sending Ask Pattern Info(plain) message to DonutInfoActor" should {
"reply back with false" in {
import akka.pattern._
val testActor = TestActorRef[DonutInfoActor]
val result = testActor ? Info("plain")
val Success(reply: Boolean) = result.value.get
reply shouldBe false
}
}
By now, you should be familiar with one of the core principales of actors: the let-it-crash behaviour. Generally speaking, we want to design actor systems that are responsive and, as a result, actors should never block, or become unresponsive, in case of failures. On the contrary, actors should fail-fast, and even plan ahead and react to various failure scenarios. The code below makes use of akka-testkit to intercept the expected IllegalStateException, should the DonutInfoActor receive some "Random Donut" message.
"Sending a Random Donut message to DonutInfoActor" should {
"throw IllegalStateException" in {
val testActor = TestActorRef[DonutInfoActor]
intercept[IllegalStateException] {
testActor.receive("Random Donut")
}
}
}
Following on from the previous code snippet on testing known exceptions, we also show how to conveniently mixin the features from akka-testkit and scalatest. For instance, although we previously showed how to verify that a java.lang.IllegalStateException is thrown by the DonutInfoActor upon receipt of a "Random Donut" message, what if you also had to verify the internal message of the exception?
This is easily achieved by surrounding the TestActorRef of type DonutInfoActor with the the and the thrownBy closures from scalatest. In doing so, we are able to capture a resulting IllegalStateException type and, thereafter, we verify the getMessage method by using the should be constructs from scalatest.
"The exception message when sending a Random Donut to DonutInfoActor" should {
"include the words: is not allowed" in {
val testActor = TestActorRef[DonutInfoActor]
val exception = the [IllegalStateException] thrownBy {
testActor.receive("Random Donut")
}
exception.getClass shouldEqual classOf[java.lang.IllegalStateException]
exception.getMessage should be ("Event Random Donut is not allowed")
}
}
Testing Akka HTTP POST
In the Http client POST JSON section, we showed how you can leverage Akka HTTP to send an HTTP client POST request. On the other hand, that is, on the server side, we naturally showed the coding up of the necessary Routing DSL that handles the incoming JSON payload from an HTTP POST request. In this section, we of course illustrate how to use akka-testkit and ScalaTest to write tests that validate the above-mentioned Akka HTTP functionality that is exposed via the /create-donut path.
To begin with, we create a test class named CreateDonutTest, and it extends the WordSpec class and the Matchers trait from the ScalaTest artifact. In order to benefit from the necessary built-in constructs that are designed specifically for the testing of Akka HTTP Routes, we also mixin the ScalatestRouteTest trait from the akka.http.scaladsl.testkit package.
class CreateDonutTest
extends WordSpec
with Matchers
with ScalatestRouteTest {
}
Next, we create an instance for our Akka HTTP Routes, which are encapsulated within the class DonutRoutes and exposed via its route() method. That is why you should primarily aim to enclose your relevant Akka HTTP APIs or Routes using, say, a class or a trait, such that it is easier to bring them into scope in your production code or for testing purposes.
class CreateDonutTest
extends WordSpec
with Matchers
with ScalatestRouteTest {
val donutRoutes = new DonutRoutes().route()
}
The tests that we'll provide with respect to Akka HTTP will make use of the WordSpec style. In this format, you typically group and enclose related tests with a top-level descriptor, such as the "Donut API" should { ... }, which acts as a closure fo all the nested tests related to our Donut API. Next, we define a test to validate the posting of a JSON payload to the /create-donut path.
class CreateDonutTest
extends WordSpec
with Matchers
with ScalatestRouteTest {
val donutRoutes = new DonutRoutes().route()
"Donut API" should {
"Create a valid Donut when posting JSON to /create-donut path" in {
}
}
}
Similar to the Akka HTTP client POST JSON code snippet, we define an HttpRequest in order to specify the http://localhost:8080/create-donut URI, the HTTP POST method, and the Media Type of application/json.
class CreateDonutTest
extends WordSpec
with Matchers
with ScalatestRouteTest {
val donutRoutes = new DonutRoutes().route()
"Donut API" should {
"Create a valid Donut when posting JSON to /create-donut path" in {
val jsonDonutInput = ByteString("""{"name":"plain donut", "price":1.50}""")
val httpPostCreateDonut = HttpRequest(
uri = "http://localhost:8080/create-donut",
method = HttpMethods.POST,
entity = HttpEntity(MediaTypes.`application/json`, jsonDonutInput))
}
}
}
If you recall from Akka HTTP client example, we ended up making use of a for comprehension as shown below:
val createDonutF = for {
response <- Http().singleRequest(httpPostCreateDonut)
_ = println(s"Akka HTTP request status = ${response.status}")
if response.status.isSuccess()
output <- Unmarshal(response).to[String]
} yield println(s"HTTP POST request output = $output")
Likewise, we use the the convenient Tilde Arrow or ~>, which is inherited from the ScalatestRouteTest trait. More precisely, we chain the httpPostCreateDonut request to the donutRoutes, which is then forwarded to the check method from akka.http.scaladsl.testkit. Within the check closure, we'll verify the StatusCode from the resulting HttpResponse, and this is exposed via the status method as shown below.
class CreateDonutTest
extends WordSpec
with Matchers
with ScalatestRouteTest {
val donutRoutes = new DonutRoutes().route()
"Donut API" should {
"Create a valid Donut when posting JSON to /create-donut path" in {
val jsonDonutInput = ByteString("""{"name":"plain donut", "price":1.50}""")
val httpPostCreateDonut = HttpRequest(
uri = "http://localhost:8080/create-donut",
method = HttpMethods.POST,
entity = HttpEntity(MediaTypes.`application/json`, jsonDonutInput))
httpPostCreateDonut ~> donutRoutes ~> check {
status.isSuccess() shouldEqual true
status.intValue() shouldEqual 201
status.reason shouldEqual "Created"
}
}
}
}
This test is self contained, and you do not need to fire up your Akka HTTP server. Instead, you can simply run it as any other test classes written in ScalaTest. In IntelliJ, it is as simple as Right Click withtin your test class and selecting the Run ... menu item.
Testing Query Parameter
In the Query Parameter section, we showed how to use Akka HTTP's Routing DSL to define and accept String literals that would be passed through to a donutName query parameter. Thereafter, we appended the ?donutName=some donut name notation when navigating to the http://localhost:8080/donut/prices path within, say, a web browser such as Chrome or FireFox. If all was fine at the server side for processing the donutName query parameter, we should then expect the output of: Received parameter: donutName=Plain Donut
Building on from the previous section using akka-testkit and ScalaTest, we'll demonstrate how to write a test case for the above-mentioned scenario. We, therefore, start by defining a class DonutQueryParametersTest, which extends the WordSpec class from ScalaTest, as well as mixin the Matchers trait from ScalaTest. Let's not also forget that the akka-testkit artifact also provides the ScalatestRouteTest trait that is evidently designed to facilitate the testing of the features from Akka HTTP.
class DonutQueryParametersTest
extends WordSpec
with Matchers
with ScalatestRouteTest {
}
Next, we put in scope a value for the routes that are exposed by our Akka HTTP server, and also define a top-level test definition in the style of ScalaTest's WordSpec.
class DonutQueryParametersTest
extends WordSpec
with Matchers
with ScalatestRouteTest {
val donutRoutes = new DonutRoutes().route()
"Query Parameters Tests" should {
}
}
From the above, we are now ready to create a test definition, which we define as "match the output for the URL /donut/prices?donutName" in {...}. This naturally reads in plain text that we are expecting that this test should match the output for the URL /donut/prices with the given ?donutName query parameter. We then use the Get(...) construct from the akka.http.scaladsl.client package, which is essentially a shortcut to explicitly use the RequestBuilder class as follows: val Get = new RequestBuilder(GET). Likewise to the Testing Akka HTTP POST code snippet, we chain the Get() definition to our donutRoutes using ~>, that is, the Tilde Arrow notation, and then forward this chain to akka-testkit's check method.
class DonutQueryParametersTest
extends WordSpec
with Matchers
with ScalatestRouteTest {
val donutRoutes = new DonutRoutes().route()
"Query Parameters Tests" should {
"match the output for the URL /donut/prices?donutName" in {
Get("/donut/prices?donutName=plain%20donut") ~> donutRoutes ~> check {
}
}
}
}
Finally, within the check closure, we match the resulting output String, besides validating the StatusCodes.OK.
class DonutQueryParametersTest
extends WordSpec
with Matchers
with ScalatestRouteTest {
val donutRoutes = new DonutRoutes().route()
"Query Parameters Tests" should {
"match the output for the URL /donut/prices?donutName" in {
Get("/donut/prices?donutName=plain%20donut") ~> donutRoutes ~> check {
responseAs[String] shouldEqual "Received parameter: donutName=plain donut"
status shouldEqual StatusCodes.OK
}
}
}
}
To run this self contained test, simply run it as any other test classes written with ScalaTest. In IntelliJ, it is just a Right Click within your test class, and then selecting the Run ... menu item.
Testing Required Query Parameter
In the previous section, we wrote a Unit Test to verify the output of the URL - http://localhost:8080/donut/prices?donutName=some donut name. While the test is certainly OK, we should perhaps remind ourselves that the particular donutName query parameter is in fact a required one. This therefore presents further opportunity to refine the testing behaviour by making sure that somewhere within the Akka HTTP Routing DSL, or path definition, the donutName query parameter is not changed down the line to an optional query parameter. If that were to happen, you would of course modify your test, as the latter would certainly fail, accordingly.
To this end, we modify our class DonutQueryParametersTest and add another test definition: "Check for required donutName query parameter at /donut/prices" in { ... }. Within this closure, we initiate an HTTP GET method - Get("/donut/prices?") - but as you may have observed, we are not passing through the donutName query parameter. Next, we chain this particular HTTP request to our donutRoutes, and then forward the resulting closure to akka-testkit's check method. This HTTP chaining, followed by the forwarding construct, was naturally made possible using Akka HTTP's ~> notation, also known as the Tilde Arrow.
Unlike the preceding code snippet though, we've provided a top-level closure for the donutRoutes using the Route.seal(...) method. This is necessary because as with most things with Akka's ecosystem, exceptions will bubble up through your call hierarchy. By not providing the required donutName query parameter, an exception will be thrown as one would expect. It is this distinct exception that we will build upon and test for within the check method's closure as shown below. Without the Route.seal(...) closure, the exception that we are about to test for will not be passed-through to the check method. As far as the actual test is concerned, we are simply verifying that the output from the exception matches the following String literal: "Request is missing required query parameter 'donutName'". In addition, we also verify that the subsequent status code should be StatusCodes.NotFound.
"Check for required donutName query parameter at /donut/prices" in {
Get("/donut/prices?") ~> Route.seal(donutRoutes) ~> check {
responseAs[String] shouldEqual "Request is missing required query parameter 'donutName'"
status shouldEqual StatusCodes.NotFound
}
}
To run this test within IntelliJ is similar to the previous ones by simply Right click somewhere within the class DonutQueryParametersTest and selecting the Run ... menu item.
Testing Optional Query Parameter
The previous tests primarily focused on required query parameters. Yet, the Akka HTTP Routes for our so-called donut routes API had some added combinations of required and optional query parameters. Without any surprise, we will follow a similar pattern as per the preceding code snippets, and write tests to verify the behaviour from the passing through of required and optional query parameters within our Akka HTTP API. For the purpose of this illustration, we will use the /donut/bake path as the test candidate as its required donutName query parameter, followed by its optional topping query parameter, naturally fit the above-mentioned test scenario.
We of course modify the class DonutQueryParametersTest and add a new test definition named: "Validate the pass-through of required and optional parameters in /donut/bake" in { ... }. We then initiate an HTTP GET request with the following URL: "/donut/bake?donutName=plain%20donut&topping=chocolate". As you must have observed, this particular test will in fact pass-through the optional topping query parameter, namely, topping=chocolate. The testing approach is comparable to the previous examples by chaining the HTTP GET request to the donutRoutes using the Tilde Arrow notation, and then forwarding this specific request to akka-testkit's check method. It is worth mentioning that this particular test is our happy path, so to speak, such that the output shouldEqual: "Received parameters: donutName=plain donut and topping=chocolate".
"Validate the pass-through of required and optional parameters in /donut/bake" in {
Get("/donut/bake?donutName=plain%20donut&topping=chocolate") ~> donutRoutes ~> check {
responseAs[String] shouldEqual "Received parameters: donutName=plain donut and topping=chocolate"
status shouldEqual StatusCodes.OK
}
}
Next, we create yet another test definition, but we will omit the optional topping query parameter within the HTTP GET request - Get("/donut/bake?donutName=plain%20donut"). As per our Route DSL in class DonutRoutes, the topping's value should default to the "sprinkles" String literal. As you would expect, we verify that the subsequent output does match the previously mentioned behaviour using: responseAs[String] shouldEqual "Received parameters: donutName=plain donut and topping=sprinkles".
"Verify the optional parameter topping for /donut/bake" in {
Get("/donut/bake?donutName=plain%20donut") ~> donutRoutes ~> check {
responseAs[String] shouldEqual "Received parameters: donutName=plain donut and topping=sprinkles"
status shouldEqual StatusCodes.OK
}
}
Within IntelliJ, simply Right Click somewhere within the class DonutQueryParametersTest to run all the above-mentioned test scenarios.
Testing Typed Query Parameter
The writing of Unit Tests not only has some therapeutic effects on us, but it undeniably helps us gain confidence with the software applications that we are building. Let us therefore consider yet another example to further improve the test coverage of for our Donut API. In particular, we'll use the /ingredients path, which had the following query parameters: ?donutName=some donut name&priceLevel=1.50. While both of these query parameters were defined as being required ones within the Routing DSL, let us remind ourselves that we were also leveraging Akka HTTP's built-in support for matching the values from the query parameters to their corresponding types.
As a result, we would expect a String value to be passed-through to the donutName query parameter, and a Double value for the priceLevel query parameter, accordingly. A Unit Test for the above-mentioned happy path using akka-testkit would look as follows:
"Verify typed parameters for /ingredients" in {
Get("/ingredients?donutName=plain%20donut&priceLevel=1.50") ~> donutRoutes ~> check {
responseAs[String] shouldEqual "Received parameters: donutName=plain donut, priceLevel=1.5"
status shouldEqual StatusCodes.OK
}
}
What if a value of the wrong type was passed-through to, say, the priceLevel query parameter? For instance, a String value with the "cheap" literal is being passed-through to the priceLevel query parameter below. Without any surprise, this would trigger an exception that will bubble up our given Akka HTTP hierarchy. In order to verify the appropriate exception, we first need to make use of the Route.seal method for the donutRoutes, such that the given exception can be forwarded to akka-testkit's check method. In addition, we verify that the subsequent StatusCode will be that of a BadRequest.
"Check for wrong types being passed through to the priceLevel query param at /ingredients" in {
Get("/ingredients?donutName=plain%20donut&priceLevel=cheap") ~> Route.seal(donutRoutes) ~> check {
responseAs[String] shouldEqual """The query parameter 'priceLevel' was malformed:
|'cheap' is not a valid 64-bit floating point value""".stripMargin
status shouldEqual StatusCodes.BadRequest
}
}
To run all the above-mentioned test scenarios, simply Right Click somewhere within the class DonutQueryParametersTest in IntelliJ, and select the Run ... menu option.
Akka HTTP project setup build.sbt
So far in our Akka tutorial series, we've covered the basics of an Actor System, provided various examples on using Actors and their life-cycle, differentiated and showed the importance of routers and dispatchers, made available step-by-step examples of using Akka for developing Finite State Machines and, showed beyond doubt, how easy it is to test Akka systems using akka-testkit and scalatest. You'd be glad to know that our Akka journey is not over just yet! Akka is also a great favorite for developing reactive HTTP middle-tier using no other than Akka HTTP.
Much of the Akka HTTP code base, though, is inspired by the now-decommissioned high-performance Spray HTTP middle tier. Without much of a surprise, Spray was naturally built using Scala and Akka. Akka HTTP retains the actor model and asynchronous nature of Spray, but is designed from the ground up on stream processing based on Akka Stream. If you have used Spray in the past, you would certainly not feel lost as Akka HTTP cherished the elegant and, yet very flexible, Spray Domain-specific language (DSL) when defining and using HTTP routes.
Testing CSV Query Parameters
Surely we've provided a variety of sample code snippets for using akka-testkit in combination with Akka HTTP and ScalaTest for writing tests that cover query parameters. For completeness and/or for the fun of it :), we'll go ahead and add one more - that is, to illustrate the wiring of an HTTP GET which contains a CSV query parameter.
For illustration purposes, we'll use the /bake-donuts path as it indeed expects a query parameter named ingredients whose values are comma separated. By now, you should be familiar with the basic steps for the write up of the test: (1) create the ScalaTest test definition, (2) call the convenient Get() method from the akka.http.scaladsl.client package, (3) use the Tilde Arrow notation ~> to pass-through the HTTP GET to your given Akka HTTP Route, (4) use the Tilde Arrow notation ~> once more to forward the request to akka-testkit's check method, and (5) finally add your necessary validations by of course making use of the Matchers provided by ScalaTest.
"Verify CSV parameters for /bake-donuts" in {
Get("/bake-donuts?ingredients=flour,sugar,vanilla") ~> donutRoutes ~> check {
responseAs[String] shouldEqual "Received CSV parameter: ingredients=List(flour, sugar, vanilla)"
status shouldEqual StatusCodes.OK
}
}
As usual, this test is self-contained. In IntelliJ, you can Right Click with the class DonutQueryParametersTest and select the Run ... menu option.
Akka HTTP setup build.sbt
To start with, we'll need to update our build.sbt artifacts to include the following relevant Akka HTTP dependencies.
Akka Http core artifacts:
- akka-http: The core module for akka-http
- akka-http-spray-json: For marshalling and unmarshalling json payloads
- akka-stream: akka-http is designed from the ground up on streams
Akka Http test artifacts:
- akka-http-testkit: To easily spin up tests for akka-http. This plugs in nicely with scalatest and akka-testkit.
Logging:
- scala-logging and its logback-classic dependency to easily log useful details about our Akka HTTP server and when processing HTTP requests.
To sum up, our final build.sbt now has the following dependencies:
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.5.19",
"com.typesafe.akka" %% "akka-stream" % "2.5.19",
"com.typesafe.akka" %% "akka-http" % "10.1.6",
"com.typesafe.akka" %% "akka-http-spray-json" % "10.1.6",
"com.typesafe.akka" %% "akka-http-testkit" % "10.1.6" % "test",
"com.typesafe.akka" %% "akka-testkit" % "2.5.19" % Test,
"com.typesafe.scala-logging" %% "scala-logging" % "3.7.2",
"ch.qos.logback" % "logback-classic" % "1.2.3",
"org.scalatest" %% "scalatest" % "3.0.1" % Test
)
Start Akka HTTP server
From the previous section, you should now be ready with all the required artifacts for setting up Akka HTTP in your build.sbt file. Our goal, for now, is to kick off a simple HTTP server on localhost or 127.0.0.1 at port 8080. More precisely, when we open up http://localhost:8080 in a browser such as Chrome or Firefox, we'll see the following message as plain text: Akka HTTP Server is UP.
With the above in mind, let's start by creating a Scala object named AkkaHttpServer, which extends the App trait and adds logging capabilities thanks to the LazyLogging trait from scala-logging.
object AkkaHttpServer extends App with LazyLogging {
}
Since we are dealing with actors, it should come as no surprise that we'll need to define an ActorSystem. With regards to the asynchronous nature of Akka Actors, we'll need some ExecutionContext, for which we'll use Akka's default dispatcher: system.dispatcher. Since Akka HTTP is based on Akka Stream, we'll also need to provide an ActorMaterializer().
object AkkaHttpServer extends App with LazyLogging {
implicit val system = ActorSystem("akka-http-rest-server")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
}
To start our Akka HTTP server, we'll need to define some Route, which is a fundamental concept when working with Akka HTTP. For our basic example, we'll use the get directive, which naturally maps to an HTTP GET request. With regards to the body of our get directive, we'll simply output the plain text message: Akka HTTP Server is UP, using the complete() method. It is worth mentioning that you will also need to add the following import statement:import akka.http.scaladsl.server.Directives._, in order to use the get, complete, and other methods, when defining Routes.
object AkkaHttpServer extends App with LazyLogging {
implicit val system = ActorSystem("akka-http-rest-server")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
// these should ideally be in some configuration file
val host = "127.0.0.1"
val port = 8080
val routes: Route =
get {
complete("Akka HTTP Server is UP.")
}
}
Finally, to start our Akka HTTP server, we need to make use of Http().bindAndHandle(...). In addition, we'll also borrow some handy closures from the Official Akka HTTP documentation with regards to gracefully shutting down our Akka HTTP server. Firing up our Akka HTTP server from within IntelliJ is no different than running any other Scala application - we simply right-click and select the "Run AkkaHttpServer" option.
object AkkaHttpServer extends App with LazyLogging {
implicit val system = ActorSystem("akka-http-rest-server")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
// these should ideally be in some configuration file
val host = "127.0.0.1"
val port = 8080
val routes: Route =
get {
complete("Akka HTTP Server is UP.")
}
val httpServerFuture = Http().bindAndHandle(routes, host, port)
}
You should see the following output when you run your Scala application in IntelliJ:
16:37:33.167 [akka-http-rest-server-akka.actor.default-dispatcher-4] INFO com.allaboutscala.learn.akka.http.AkkaHttpServer$ - Akka Http Server is UP and is bound to /127.0.0.1:8080
At this point, we can open a web browser such as Chrome or FireFox, and navigate to http://localhost:8080 and see the output: Akka HTTP Server is UP.. In the next section, we will further improve on our Akka HTTP Route.
To sum up, our complete code is as follows:
object AkkaHttpServer extends App with LazyLogging {
implicit val system = ActorSystem("akka-http-rest-server")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
// these should ideally be in some configuration file
val host = "127.0.0.1"
val port = 8080
val routes: Route =
get {
complete("Akka HTTP Server is UP.")
}
val httpServerFuture = Http().bindAndHandle(routes, host, port)
httpServerFuture.onComplete {
case Success(binding) =>
logger.info(s"Akka Http Server is UP and is bound to ${binding.localAddress}")
case Failure(e) =>
logger.error(s"Akka Http server failed to start", e)
system.terminate()
}
StdIn.readLine() // let it run until user presses return
httpServerFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ => system.terminate()) // and shutdown when done
}
HTTP GET plain text
So far, we are able to spin up an Akka HTTP server and, we've provided a basic Route when you access localhost at port 8080; its output in a web browser is: Akka HTTP Server is UP. For this Akka HTTP code sample, we'll start to illustrate some of the flexibility of Route Directives in Akka HTTP. If you have used the now deprecated Spray server in the past, you would most certainly be familiar with the fluency of routes in **Spray**, which has evidently been carried forward as part of Akka HTTP.
Our goal is to provide a **plain text** Media Type HTTP response when we access the URL: http://localhost:8080/server-version. Since we will further build on this example in the coming tutorials, let's provide a simple class closure for defining this new route.
class ServerVersion {
def route(): Route = {
path("server-version") {
get {
val serverVersion = "1.0.0.0"
complete(HttpEntity(ContentTypes.`text/plain(UTF-8)`, serverVersion))
}
}
}
}
Notice that we have a method named route() which returns a Route type. This is necessary in order to chain from and/or to additional routes. We then make use of a path("server-version") directive, which obviously is a closure for the behaviour we'd like to provide for the path at http://localhost:8080/server-versionget(...) directive which, without any surprises, is yet another closure for serving HTTP GET requests. Within the body of the get(...) directive, we make use of the now familiar complete(...) method. Note, however, that we are using one of its method overload to be explicit about the Media Type for the response which, in our case, is a text/plain. Similar to our previous Akka HTTP examples, you will need to import akka.http.scaladsl.server.Directives._, in order to have the path, get and complete methods in scope.
Back to the AkkaHttpServer code, we'll refactor our previous route definitions to include an instance of the class ServerVersion. With this in mind, we simply capture an immutable value named val serverVersionRoute for its route() method. Chaining the previous serverUpRoute with this new serverVersionRoute is achieved without any difficulty using the ~ notation. The ~ symbol is typically not in scope, and also requires the import akka.http.scaladsl.server.Directives._ to be added.
val serverVersion = new ServerVersion()
val serverVersionRoute = serverVersion.route()
val routes: Route = serverVersionRoute ~ serverUpRoute
We are now in a position to start our Akka HTTP server, and navigate to the http://localhost:8080/server-version URL. You should see the following output when you run your Scala application in IntelliJ:
1.0.0.0
Our modified AkkaHttpServer code is now as follows:
object AkkaHttpServer extends App with LazyLogging {
implicit val system = ActorSystem("akka-http-rest-server")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
// these should ideally be in some configuration file
val host = "127.0.0.1"
val port = 8080
// routes
val serverUpRoute: Route = get {
complete("Akka HTTP Server is UP.")
}
val serverVersion = new ServerVersion()
val serverVersionRoute = serverVersion.route()
val routes: Route = serverVersionRoute ~ serverUpRoute
val httpServerFuture = Http().bindAndHandle(routes, host, port)
httpServerFuture.onComplete {
case Success(binding) =>
logger.info(s"Akka Http Server is UP and is bound to ${binding.localAddress}")
case Failure(e) =>
logger.error(s"Akka Http server failed to start", e)
system.terminate()
}
StdIn.readLine() // let it run until user presses return
httpServerFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ => system.terminate()) // and shutdown when done
}
HTTP GET json response
While being able to output some plain text as per the previous example is a great start, JSON format is no doubt popular in real-world enterprise applications. Without any further ado, we'll create another path, such that a JSON response is output when we access: http://localhost:8080/server-version-json.
To this end, we'll add another method named routeAsJson() to the ServerVersion class. Its return type will naturally be of a Route type from the akka.http.scaladsl package. Within the body of the method, we follow a similar pattern to the previous example. More precisely, we nest the following Akka HTTP directives: path, get and complete. With regards to the complete method, we pass-through an HttpEntity with ContentTypes of application/json. It is worth mentioning that for the purpose of this illustration, we are merely focusing on sending back a JSON. For that reason, we are manually creating a sample JSON String as captured by val jsonResponse. As may be expected, though, you would most certainly encode domain objects as JSON in real-world systems, as opposed to using a String.
class ServerVersion {
def routeAsJson(): Route = {
path("server-version-json") {
get {
val jsonResponse =
"""
|{
| "app": "Akka HTTP REST Server",
| "version": "1.0.0.0"
|}
"""
.stripMargin
complete(HttpEntity(ContentTypes.`application/json`, jsonResponse))
}
}
}
}
What's left to do is to update our AkkaHttpServer code by chaining this new route using the familiar ~ notation:
val serverVersionRouteAsJson = serverVersion.routeAsJson()
val routes: Route = serverVersionRoute ~ serverVersionRouteAsJson ~ serverUpRoute
After starting our Akka HTTP server, you will see the following JSON response, when accessing http://localhost:8080/server-version-json.
{
"app": "Akka HTTP REST Server",
"version": "1.0.0.0"
}
The updated AkkaHTTPServer code is now as follows:
object AkkaHttpServer extends App with LazyLogging {
implicit val system = ActorSystem("akka-http-rest-server")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
// these should ideally be in some configuration file
val host = "127.0.0.1"
val port = 8080
// routes
val serverUpRoute: Route = get {
complete("Akka HTTP Server is UP.")
}
val serverVersion = new ServerVersion()
val serverVersionRoute = serverVersion.route()
val serverVersionRouteAsJson = serverVersion.routeAsJson()
val routes: Route = serverVersionRoute ~ serverVersionRouteAsJson ~ serverUpRoute
val httpServerFuture = Http().bindAndHandle(routes, host, port)
httpServerFuture.onComplete {
case Success(binding) =>
logger.info(s"Akka Http Server is UP and is bound to ${binding.localAddress}")
case Failure(e) =>
logger.error(s"Akka Http server failed to start", e)
system.terminate()
}
StdIn.readLine() // let it run until user presses return
httpServerFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ => system.terminate()) // and shutdown when done
}
JSON Encoding
Being able to send a JSON String as output to an HTTP GET request is great, but perhaps in real-world enterprise applications, you would most certainly have to encode domain objects into JSON. Without any further ado, this is precisely what we will show in this section. We create a new Route at http://localhost:8080/server-version-json-encoding, such that we'll replicate the previous JSON String example, by deriving the JSON output from an Abstract Data Type.
To get started, we'll create a simple ADT using a case class named AkkaHttpRestServer, which has two properties, app and version of type String, respectively.
final case class AkkaHttpRestServer(app: String, version: String)
Encoding the AkkaHttpRestServer ADT is a no brainer thanks to the rich JSON feature set which was inherited from the now deprecated Spray server. All you have to do is to provide a closure with the akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport and spray.json.DefaultJsonProtocol in scope. In our case, we've created a new trait named JsonSupport, which extends the SprayJsonSupport trait, and then mixin the DefaultJsonProtocol trait. Next, we place an implicit jsonFormat for our domain object in scope. This will esentially signal Akka HTTP to produce a JSON output from our ADT.
trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
implicit val serverFormat = jsonFormat2(AkkaHttpRestServer)
}
We can now extend our JsonSupport trait from the ServerVersion class, and provide a new path to react to an HTTP GET request at http://localhost:8080/server-version-json-encoding. Within the body of the get directive, we create an instance of the AkkaHttpRestServer domain object, and pass it through to the complete directive. We do not have to do any additional work to produce a JSON output from the domain object since we've already provided an implicit jsonFormat in scope from the JsonSupport trait.
class ServerVersion extends JsonSupport {
...
def routeAsJsonEncoding(): Route = {
path("server-version-json-encoding") {
get {
val server = AkkaHttpRestServer("Akka HTTP REST Server", "1.0.0.0")
complete(server)
}
}
}
}
Finally, we hook in this new Route to our existing Route definitions using the ~ notation.
val serverVersionJsonEncoding = serverVersion.routeAsJsonEncoding()
val routes: Route = serverVersionRoute ~ serverVersionRouteAsJson ~ serverVersionJsonEncoding ~ serverUpRoute
After starting our Akka HTTP server, you will see the following JSON response, when accessing http://localhost:8080/server-version-json-encoding.
{"app":"Akka HTTP REST Server","version":"1.0.0.0"}
The updated AkkaHTTPServer code is now as follows:
object AkkaHttpServer extends App with LazyLogging {
implicit val system = ActorSystem("akka-http-rest-server")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
// these should ideally be in some configuration file
val host = "127.0.0.1"
val port = 8080
// routes
val serverUpRoute: Route = get {
complete("Akka HTTP Server is UP.")
}
val serverVersion = new ServerVersion()
val serverVersionRoute = serverVersion.route()
val serverVersionRouteAsJson = serverVersion.routeAsJson()
val serverVersionJsonEncoding = serverVersion.routeAsJsonEncoding()
val routes: Route = serverVersionRoute ~ serverVersionRouteAsJson ~ serverVersionJsonEncoding ~ serverUpRoute
val httpServerFuture = Http().bindAndHandle(routes, host, port)
httpServerFuture.onComplete {
case Success(binding) =>
logger.info(s"Akka Http Server is UP and is bound to ${binding.localAddress}")
case Failure(e) =>
logger.error(s"Akka Http server failed to start", e)
system.terminate()
}
StdIn.readLine() // let it run until user presses return
httpServerFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ => system.terminate()) // and shutdown when done
}
JSON pretty print
In the previous example, we showed how Akka HTTP facilitates JSON encoding of domain objects out-of-the-box!. As a result, we were able to encode our AkkaHttpRestServer Abstract Data Type (ADT) to produce the following JSON output:
{"app":"Akka HTTP REST Server","version":"1.0.0.0"}
The above JSON string is, without any question, a valid one. But, what if our ADT consisted of multiple properties or fields, along with nested types and/or multi-value properties? For such a situation and in addition to providing a friendly and easily readable JSON string, you sometimes have requirements for indenting your JSON properties. You can, of course, be eager to craft your own custom JSON indentation or pull in another third-party library to achieve this. Once again, though, Akka HTTP provides JSON pretty printing out-of-the-box!
What is more, enabling JSON pretty printing is as easy as importing spray.json._ and exposing an implicit val printer = PrettyPrinter. We add these to the JsonSupport trait which we created in the previous tutorial as follows:
trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
import spray.json._
implicit val printer = PrettyPrinter
implicit val serverFormat = jsonFormat2(AkkaHttpRestServer)
}
Our Route remains unchanged for path("server-version-json-encoding"):
class ServerVersion extends JsonSupport {
...
def routeAsJsonEncoding(): Route = {
path("server-version-json-encoding") {
get {
val server = AkkaHttpRestServer("Akka HTTP REST Server", "1.0.0.0")
complete(server)
}
}
}
}
Similarly, the bootstrapping code for our Akka HTTP server remains unchanged as per the previous tutorial:
object AkkaHttpServer extends App with LazyLogging {
implicit val system = ActorSystem("akka-http-rest-server")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
// these should ideally be in some configuration file
val host = "127.0.0.1"
val port = 8080
// routes
val serverUpRoute: Route = get {
complete("Akka HTTP Server is UP.")
}
val serverVersion = new ServerVersion()
val serverVersionRoute = serverVersion.route()
val serverVersionRouteAsJson = serverVersion.routeAsJson()
val serverVersionJsonEncoding = serverVersion.routeAsJsonEncoding()
val routes: Route = serverVersionRoute ~ serverVersionRouteAsJson ~ serverVersionJsonEncoding ~ serverUpRoute
val httpServerFuture = Http().bindAndHandle(routes, host, port)
httpServerFuture.onComplete {
case Success(binding) =>
logger.info(s"Akka Http Server is UP and is bound to ${binding.localAddress}")
case Failure(e) =>
logger.error(s"Akka Http server failed to start", e)
system.terminate()
}
StdIn.readLine() // let it run until user presses return
httpServerFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ => system.terminate()) // and shutdown when done
}
After starting our Akka HTTP server, you will see the JSON response being nicely formatted and indented when accessing http://localhost:8080/server-version-json-encoding.
{
"app": "Akka HTTP REST Server",
"version": "1.0.0.0"
}
HTTP POST JSON payload
Until now, we have seen various examples for setting up an Akka HTTP Route in order to respond to some HTTP GET request. Moreover, we've also introduced the built-in support for JSON, such as producing valid JSON output from business or domain objects, as well as formatting JSON output in a more readable fashion with appropriate indentation. It is perhaps time that we demonstrate how to react to an HTTP POST request when using Akka HTTP.
An HTTP POST request is typically used to submit some data, such as a form data from a webpage, to a server. For the purpose of illustration, we will send a JSON String over the HTTP POST method to our Akka HTTP server which, in turn, will convert the String message into a corresponding Abstract Data Type (ADT).
To get started, we will define another business or domain ADT, called Donut, alongside the previous AkkaHttpRestServer ADT.
final case class AkkaHttpRestServer(app: String, version: String)
final case class Donut(name: String, price: Double)
Next, we'll provide an implicit val of jsonFormat2 for the Donut type, in the JsonSupport trait. In doing so, Akka HTTP will be aware to convert the JSON String, which we will send via an HTTP POST request, into the above Donut type.
trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
import spray.json._
implicit val printer = PrettyPrinter
implicit val serverFormat = jsonFormat2(AkkaHttpRestServer)
implicit val donutFormat = jsonFormat2(Donut)
}
We'll also go ahead and create a class DonutRoutes in which we provide a handler for a path("create-donut"), that will enclose a post{ ... } Akka HTTP directive. To convert our JSON String payload that we are sending via the HTTP POST request to the corresponding Donut type, we make use of the convenient entity(as[Donut]) { donut => ...} closure. As a reminder, don't forget to import akka.http.scaladsl.server.Directives._, in order to get access to the path, post, entity and complete directives.
class DonutRoutes extends JsonSupport with LazyLogging {
def route(): Route = {
path("create-donut") {
post {
entity(as[Donut]) { donut =>
logger.info(s"creating donut = $donut")
complete(StatusCodes.Created, s"Created donut = $donut")
}
}
}
}
}
Finally, we create an instance of the class DonutRoutes, and append its Route to our existing chain of Routes using the familiar ~ notation.
val donutRoutes = new DonutRoutes().route()
val routes: Route = donutRoutes ~ serverVersionRoute ~ serverVersionRouteAsJson ~ serverVersionJsonEncoding~
serverUpRoute
The updated AkkaHTTPServer code is now as follows:
object AkkaHttpServer extends App with LazyLogging {
implicit val system = ActorSystem("akka-http-rest-server")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
// these should ideally be in some configuration file
val host = "127.0.0.1"
val port = 8080
// routes
val serverUpRoute: Route = get {
complete("Akka HTTP Server is UP.")
}
val serverVersion = new ServerVersion()
val serverVersionRoute = serverVersion.route()
val serverVersionRouteAsJson = serverVersion.routeAsJson()
val serverVersionJsonEncoding = serverVersion.routeAsJsonEncoding()
val donutRoutes = new DonutRoutes().route()
val routes: Route = donutRoutes ~ serverVersionRoute ~ serverVersionRouteAsJson ~ serverVersionJsonEncoding~
serverUpRoute
val httpServerFuture = Http().bindAndHandle(routes, host, port)
httpServerFuture.onComplete {
case Success(binding) =>
logger.info(s"Akka Http Server is UP and is bound to ${binding.localAddress}")
case Failure(e) =>
logger.error(s"Akka Http server failed to start", e)
system.terminate()
}
StdIn.readLine() // let it run until user presses return
httpServerFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ => system.terminate()) // and shutdown when done
}
We can now start our Akka HTTP server but, unlike the previous examples, we will not access the http://localhost:8080/create-donut path from a web browser such as Chrome or FireFox. Instead, we will use the handy curl command line to send an HTTP POST request of type application/json, and with the following payload: {"name":"plain donut", "price":1.50}.
curl --header "Content-Type: application/json" \
--request POST \
--data '{"name":"plain donut", "price":1.50}' \
http://localhost:8080/create-donut
If our Akka HTTP server is able to sucessfully convert the String payload into the Donut type, we should see the following response following the HTTP POST request: Created donut = Donut(plain donut,1.5).
Could not find implicit value
In the last example, we revealed how to unmarshall an HTTP POST request, where the payload was of a JSON format, into its corresponding data type. More precisely, we had created a new Abstract Data Type (ADT) named, Donut, using a case class for its definition.
In doing so, we also had to put in the implicit value of jsonFormat2 for the Donut type within the JsonSupport trait. This was necessary so that Akka HTTP could automatically take care of the unmarshalling task from the JSON payload into the Donut type. A very common mistake, though, when working with Akka HTTP is to somehow forget to add the implicit value for new types. What would happen if we were to comment out the implicit value for the Donut type, in order to simulate having overlooked this requirement?
trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
import spray.json._
implicit val printer = PrettyPrinter
implicit val serverFormat = jsonFormat2(AkkaHttpRestServer)
// implicit val donutFormat = jsonFormat2(Donut)
}
When compiling our code in IntelliJ, you will notice some exceptions along the following lines: could not find implicit value for parameter um: akka.http.scaladsl.unmarshalling.FromRequestUnmarshaller... or not enough arguments for method as: implicit um: akka.http.scaladsl.unmarshalling.FromRequestUnmarshaller.... If you are new to Akka HTTP, or have not used Spray, or perhaps you are new to Scala in general and the use of implicits, these sort of messages can somehow feel a bit cryptic!
Rest assured that we've actually added this section to precisely help you get comfortable with these types of exceptions when using Akka HTTP. As we've mentioned in the previous section, a lot of the features from Spray have naturally made their way into the Akka HTTP codebase. The built-in support for automatic unmarshalling of JSON String to a given type, as well as the Routing architecture and the Directives DSL, are very much relying on Spray's Magnet Pattern. This is especially useful in providing the nice high-level Routing API but, without question, at the cost of throwing some implicit-like exceptions. We cover the Magnet Pattern in much more detail in our upcoming Hands-on Scala book, which is just couple of months away! For now, though, I hope that this section will remind you that such implicit-like exceptions are just hints that you've missed wiring the implicit val ... = jsonFormat for your given domain object.
HTTP DELETE restriction
Up to this point, we've covered the two popular HTTP methods, namely, an HTTP GET and an HTTP POST, using Akka HTTP. Of course, the HTTP Protocol makes provision for additional operations, such as DELETE, TRACE, PATCH, and others, accordingly. For that matter, you should bear in mind of the other HTTP methods that clients could potentially use and send as part of their interactions with your Akka HTTP server.
Let us consider what would happen if we send an HTTP DELETE request to the http://localhost:8080/create-donut path, for which we had previously provisioned to react to an HTTP POST request. Similar to the HTTP POST code snippet above, we will use the handy curl command line utility to send the --request DELETE as shown below.
curl --header "Content-Type: application/json" \
--request DELETE \
--data '{"name":"plain donut", "price":1.50}' \
http://localhost:8080/create-donut
Fortunately for us, Akka HTTP is smart enough to recognize that, although the http://localhost:8080/create-donut path is in fact valid, but the HTTP method being used is not one that we had defined as part of our Route definitions. As a result, you should see the following error message:
A request was made of a resource using a request method not supported by that resource
While the above message may be fine for, say, a fairly simple Akka HTTP application, you would most likely have to provide custom or friendlier or business oriented messages for an enterprise application. By now, you should already be familiar with the fluent Routing API and, as a result, we can conveniently chain an additional ~ delete { ... } directive with our custom message for the path("create-donut").
class DonutRoutes extends JsonSupport with LazyLogging {
def route(): Route = {
path("create-donut") {
post {
entity(as[Donut]) { donut =>
logger.info(s"creating donut = $donut")
complete(StatusCodes.Created, s"Created donut = $donut")
}
} ~ delete {
complete(StatusCodes.MethodNotAllowed, "The HTTP DELETE operation is not allowed for the create-donut path.")
}
}
}
}
With this new delete closure in place, our end users will naturally see the custom message when sending an HTTP DELETE method, as opposed to an HTTP POST method for the http://localhost:8080/create-donut path.
curl --header "Content-Type: application/json" \
--request DELETE \
--data '{"name":"plain donut", "price":1.50}' \
http://localhost:8080/create-donut
The HTTP DELETE operation is not allowed for the create-donut path
Moreover, it goes without saying that sending an HTTP POST request to the http://localhost:8080/create-donut path is not impacted with this additional handler, and work as expected.
curl --header "Content-Type: application/json" \
--request POST \
--data '{"name":"plain donut", "price":1.50}' \
http://localhost:8080/create-donut
Created donut = Donut(plain donut,1.5)
Future onSuccess
Surely, when you are using Akka HTTP, you should write code that does not block. Hence, in this section, we will illustrate how to complete an HTTP response using the onSuccess directive. For this example, we will create a new path http://localhost:8080/donuts, which will output a JSON array of Donut items. To this end, let's start by defining a new case class Donuts:
final case class Donuts(donuts: Seq[Donut])
In order to benefit from the automatic marshalling or serialization of the Donuts domain object into JSON format, do not forget to add an implicit val ... jsonFormat1:
trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
import spray.json._
implicit val printer = PrettyPrinter
implicit val serverFormat = jsonFormat2(AkkaHttpRestServer)
implicit val donutFormat = jsonFormat2(Donut)
implicit val donutsJsonFormat = jsonFormat1(Donuts)
}
Next, we'll create a Data Access Object (DAO) named, DonutDao, which will have a method whose return type is a Future of type Donuts, Future[Donuts]. For simplicity, we'll also use a Vector to hold a bunch of Donut objects to simulate data from a database or from some service.
class DonutDao {
import scala.concurrent.ExecutionContext.Implicits.global
val donutsFromDb = Vector(
Donut("Plain Donut", 1.50),
Donut("Chocolate Donut", 2),
Donut("Glazed Donut", 2.50)
)
def fetchDonuts(): Future[Donuts] = Future {
Donuts(donutsFromDb)
}
}
Afterwards, we define a new path("donuts") as an HTTP GET for http://localhost:8080/donuts. Within the get { ... } directive, we call the onSuccess() method by passing through a call to the Future method: donutDao.fetchDonuts(). If this call is successful, you will get access to a donuts value that you can use to feed the complete directive.
class DonutRoutes extends JsonSupport with LazyLogging {
val donutDao = new DonutDao()
def route(): Route = {
path("create-donut") {
post {
entity(as[Donut]) { donut =>
logger.info(s"creating donut = $donut")
complete(StatusCodes.Created, s"Created donut = $donut")
}
} ~ delete {
complete(StatusCodes.MethodNotAllowed, "The HTTP DELETE operation is not allowed for the create-donut path.")
}
} ~ path("donuts") {
get {
onSuccess(donutDao.fetchDonuts()){ donuts =>
complete(StatusCodes.OK, donuts)
}
}
}
}
}
After starting our Akka HTTP server, you will see the following JSON response, when accessing http://localhost:8080/donuts.
{
"donuts": [{
"name": "Plain Donut",
"price": 1.5
}, {
"name": "Chocolate Donut",
"price": 2.0
}, {
"name": "Glazed Donut",
"price": 2.5
}]
}
The onSuccess directive is great for handling a happy scenario, but what would happen if the fetchDonuts() method were to throw an exception? We will start looking at exception handling in the next sections but, for now, it is worth keeping in mind that exceptions bubble-up in Akka HTTP similar to those that occur within an Akka Actor System. As a result, exceptions should be either localized or handled via some global handler up the hierarchy.
Future onComplete
Let us extend the previous example and, instead of using the onSuccess directive when having to handle a Future operation, we'll use a similar onComplete method. If you already have a good knowledge of asynchronous programming using Scala Futures, the onComplete method should feel very familiar. More precisely, the onComplete method takes into consideration that a Future operation could be either successful or fail.
On the above grounds, we will create a new path("donuts-with-future-success-failure") that will use the onComplete directive as a closure for the donutDao.fetchDonuts() method. Consequently, you will be able to provide a case Success ... and a case Failure ... , to react to either the success or the failure, accordingly, that may result from the donutDao.fetchDonuts() method. Do note that for each of the two outcomes, you would still need to return something back to the client by completing the HTTP response using the habitual complete directive.
~ path("donuts-with-future-success-failure") {
get {
onComplete(donutDao.fetchDonuts()) {
case Success(donuts) => complete(StatusCodes.OK, donuts)
case Failure(ex) => complete(s"Failed to fetch donuts = ${ex.getMessage}")
}
}
}
The modified DonutRoutes following the addition of the path("donuts-with-future-success-failure") is as follows:
class DonutRoutes extends JsonSupport with LazyLogging {
val donutDao = new DonutDao()
def route(): Route = {
path("create-donut") {
post {
entity(as[Donut]) { donut =>
logger.info(s"creating donut = $donut")
complete(StatusCodes.Created, s"Created donut = $donut")
}
} ~ delete {
complete(StatusCodes.MethodNotAllowed, "The HTTP DELETE operation is not allowed for the create-donut path.")
}
} ~ path("donuts") {
get {
onSuccess(donutDao.fetchDonuts()){ donuts =>
complete(StatusCodes.OK, donuts)
}
}
} ~ path("donuts-with-future-success-failure") {
get {
onComplete(donutDao.fetchDonuts()) {
case Success(donuts) => complete(StatusCodes.OK, donuts)
case Failure(ex) => complete(s"Failed to fetch donuts = ${ex.getMessage}")
}
}
}
}
}
The JSON output when accessing http://localhost:8080/donuts-with-future-success-failure should be identical to the previous code snippet. Nonetheless, by using the onComplete directive, as opposed to onSuccess, you were also able to provide a custom response that will be sent back to the client in the event of a failure.
{
"donuts": [{
"name": "Plain Donut",
"price": 1.5
}, {
"name": "Chocolate Donut",
"price": 2.0
}, {
"name": "Glazed Donut",
"price": 2.5
}]
}
Complete with an HttpResponse
Before we proceed further with yet another approach for handling error or failure situations as in the previous examples, let us introduce the HttpResponse type. By now, we have seen various ways to handle and complete an HTTP request at the server side in Akka HTTP. Likewise, Akka HTTP provides a nice abstraction for what makes up an HTTP Response, using a general purpose HttpResponse type.
It is defined in the akka.http.scaladsl.model and, if you look closely at its apply() method, you will notice that you can specify additional properties to be appended to an HTTP Response. More precisely, you can specify a status code, custom headers, an Http Protocol and, of course, the actual output to be sent back to a client as encapsulated by a ResponseEntity object.
object HttpResponse {
def apply(
status: StatusCode = StatusCodes.OK,
headers: immutable.Seq[HttpHeader] = Nil,
entity: ResponseEntity = HttpEntity.Empty,
protocol: HttpProtocol = HttpProtocols.`HTTP/1.1`) = new HttpResponse(status, headers, entity, protocol)
}
To illustrate the use of the HttpResponse object, we will create a new path("complete-with-http-response") as shown below. For added simplicity, we'll output the String "Using an HttpResponse object" with regards to the ResponseEntity.
~ path("complete-with-http-response") {
get {
complete(HttpResponse(status = StatusCodes.OK, entity = "Using an HttpResponse object"))
}
}
The modified code for the DonutRoutes class is as follows:
class DonutRoutes extends JsonSupport with LazyLogging {
val donutDao = new DonutDao()
def route(): Route = {
path("create-donut") {
post {
entity(as[Donut]) { donut =>
logger.info(s"creating donut = $donut")
complete(StatusCodes.Created, s"Created donut = $donut")
}
} ~ delete {
complete(StatusCodes.MethodNotAllowed, "The HTTP DELETE operation is not allowed for the create-donut path.")
}
} ~ path("donuts") {
get {
onSuccess(donutDao.fetchDonuts()){ donuts =>
complete(StatusCodes.OK, donuts)
}
}
} ~ path("donuts-with-future-success-failure") {
get {
onComplete(donutDao.fetchDonuts()) {
case Success(donuts) => complete(StatusCodes.OK, donuts)
case Failure(ex) => complete(s"Failed to fetch donuts = ${ex.getMessage}")
}
}
} ~ path("complete-with-http-response") {
get {
complete(HttpResponse(status = StatusCodes.OK, entity = "Using an HttpResponse object"))
}
}
}
}
Without any surprise, you should see the following output after booting up the AkkaHttpServer, and navigating to the URL: http://localhost:8080/complete-with-http-response.
Using an HttpResponse object
As you can see, an HttpResponse object is esentially a general closure for sending back an HTTP Response back to a client. With this in mind and, from the previous examples we've seen thus far with regards to the complete directive, an HttpResponse makes it possible to write general purpose functions for outputing results. In light of this, we can also use it to handle error or failure scenarios when paired with a scala.util.Try. We will, of course, demonstrate this behaviour in the next code snippet.
Try failure using an HttpResponse
Previously, we introduced the HttpResponse type that is defined in the akka.http.scaladsl.model package. In this section, we will extend our preceding discussion, and emphasize thinking ahead in terms of dealing with failure scenarios. As a result, we will use the HttpResponse type as a general purpose return type for a given method, and pairing it with a scala.util.Try. This Try top-level closure is necessary, such that we benefit from the handy getOrElse method, as a means to providing an alternative response path, should a particular method fail.
To this end, let us add a new method named tryFetchDonuts() in our DonutDao class, but notice that its return type is a Try[HttpResponse] - that is, a Try of an HttpResponse. For simplicity, we throw an IllegalStateException to force a particular failure.
def tryFetchDonuts(): Try[HttpResponse] = Try {
throw new IllegalStateException("Boom!")
}
Next, we define another method named defaultResponse, whose return type is an HttpResponse. We will make a call to this method as part of the getOrElse block and, for that reason, we'll plainly output the following message: "An unexpected error occurred. Please try again.".
def defaultResponse(): HttpResponse =
HttpResponse(
status = StatusCodes.NotFound,
entity = "An unexpected error occurred. Please try again.")
The modified class DonutDao is now as follows:
class DonutDao {
import scala.concurrent.ExecutionContext.Implicits.global
val donutsFromDb = Vector(
Donut("Plain Donut", 1.50),
Donut("Chocolate Donut", 2),
Donut("Glazed Donut", 2.50)
)
def fetchDonuts(): Future[Donuts] = Future {
Donuts(donutsFromDb)
}
def tryFetchDonuts(): Try[HttpResponse] = Try {
throw new IllegalStateException("Boom!")
}
def defaultResponse(): HttpResponse =
HttpResponse(
status = StatusCodes.NotFound,
entity = "An unexpected error occurred. Please try again.")
}
Likewise to our Akka HTTP code examples, we'll define a new path("donut-with-try-httpresponse"), which will react to a given HTTP GET request. It goes without saying that when calling our potentially unsafe tryFetchDonuts method, we now benefit from the getOrElse method, thanks to the method's return type of Try[HttpResponse]. Instead of letting our Akka Http server crash unexpectedly from a failure following a call to the tryFetchDonuts method, we are able to provide a default or custom message back to the client - in our case, its the output from the defaultResponse() method.
~ path("donut-with-try-httpresponse") {
get {
val result: HttpResponse = donutDao.tryFetchDonuts().getOrElse(donutDao.defaultResponse())
complete(result)
}
}
The modified class DonutRoutes is now as follows:
class DonutRoutes extends JsonSupport with LazyLogging {
val donutDao = new DonutDao()
def route(): Route = {
path("create-donut") {
post {
entity(as[Donut]) { donut =>
logger.info(s"creating donut = $donut")
complete(StatusCodes.Created, s"Created donut = $donut")
}
} ~ delete {
complete(StatusCodes.MethodNotAllowed, "The HTTP DELETE operation is not allowed for the create-donut path.")
}
} ~ path("donuts") {
get {
onSuccess(donutDao.fetchDonuts()){ donuts =>
complete(StatusCodes.OK, donuts)
}
}
} ~ path("donuts-with-future-success-failure") {
get {
onComplete(donutDao.fetchDonuts()) {
case Success(donuts) => complete(StatusCodes.OK, donuts)
case Failure(ex) => complete(s"Failed to fetch donuts = ${ex.getMessage}")
}
}
} ~ path("complete-with-http-response") {
get {
complete(HttpResponse(status = StatusCodes.OK, entity = "Using an HttpResponse object"))
}
} ~ path("donut-with-try-httpresponse") {
get {
val result: HttpResponse = donutDao.tryFetchDonuts().getOrElse(donutDao.defaultResponse())
complete(result)
}
}
}
}
After starting our Akka Http server and navigating to the http://localhost:8080/donut-with-try-httpresponse URL, you should see the following message.
An unexpected error occurred. Please try again.
Global rejection handler
Continuing with our discussion on error handling in Akka HTTP, let us introduce a global catch-all closure to trap any exceptions that may bubble up within our server. Of course, one could argue that exceptions should be localized and not allowed to propagate throughout the Akka HTTP server. Nonetheless, there is no harm to plan ahead for unseen errors, and react accordingly in order to provide a better user experience, as opposed to tolerating unfriendly stacktraces following a particular failure situation.
In Akka HTTP, the RejectionHandler type from the akka.http.scaladsl.server package represents the global catch-all closure. Setting it up is actually as easy as declaring an implicit val, and using its builder DSL, which facilitates the chaining of various handlers.
implicit val globalRejectionHandler =
RejectionHandler.newBuilder()
.handle { ... }
.handleNotFound { ... }
.result()
It is worth pointing out that this RejectionHandler should be defined prior to binding the server using Http().bindAndHandle(...). For the purpose of illustration, we will configure the first handle{ ... } closure, which accepts a Partial Function to catch exceptions such as IllegalArgumentExceptions. We then wire the second handleNotFound{ ... } closure and output the message "The path is not supported", for any URLs or paths which have not been accounted for in our Akka HTTP Route definition. As a reminder, do not forget to call the result() method after having chained all your respective handlers.
implicit val globalRejectionHandler =
RejectionHandler.newBuilder()
.handle { case ValidationRejection(msg, route) =>
complete(StatusCodes.InternalServerError, s"The operation is not supported, error = $msg")
}
.handleNotFound {
complete(StatusCodes.NotFound, "The path is not supported.")
}
.result()
The modified object AkkaHttpServer is now as follows. Note also that we've disabled the serverUpRoute in order to enable the handleNotFound global catch-all for unsupported URLs or paths. Should you require to provide a URL or path as part of, say, a general smoke test to verify that your Akka HTTP server is up and running, you can of course provide a specific ~ path(...), similar to any other Route definition.
object AkkaHttpServer extends App with LazyLogging {
implicit val system = ActorSystem("akka-http-rest-server")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
// these should ideally be in some configuration file
val host = "127.0.0.1"
val port = 8080
implicit val globalRejectionHandler =
RejectionHandler.newBuilder()
.handle { case ValidationRejection(msg, route) =>
complete(StatusCodes.InternalServerError, s"The operation is not supported, error = $msg")
}
.handleNotFound {
complete(StatusCodes.NotFound, "The path is not supported.")
}
.result()
// routes
// val serverUpRoute: Route = get {
// complete("Akka HTTP Server is UP.")
// }
val serverVersion = new ServerVersion()
val serverVersionRoute = serverVersion.route()
val serverVersionRouteAsJson = serverVersion.routeAsJson()
val serverVersionJsonEncoding = serverVersion.routeAsJsonEncoding()
val donutRoutes = new DonutRoutes().route()
val routes: Route = donutRoutes ~ serverVersionRoute ~ serverVersionRouteAsJson ~ serverVersionJsonEncoding
//~serverUpRoute
val httpServerFuture = Http().bindAndHandle(routes, host, port)
httpServerFuture.onComplete {
case Success(binding) =>
logger.info(s"Akka Http Server is UP and is bound to ${binding.localAddress}")
case Failure(e) =>
logger.error(s"Akka Http server failed to start", e)
system.terminate()
}
StdIn.readLine() // let it run until user presses return
httpServerFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ => system.terminate()) // and shutdown when done
}
After starting up the AkkaHttpServer, and navigating to an undefined URL or path, such as http://localhost:8080/boom, you should see the following output.
The path is not supported.
Global exception handler
The previous use of the RejectionHandler type represents a convenient generalization for intercepting exceptions or errors that may arise during the Route execution. But, what about exceptions or errors that may be thrown during the normal execution of your Akka HTTP server? Since exceptions or errors bubble up in an Akka HTTP hierarchy, similar to that of an Akka Actor System hierarchy, it would most certainly be wise to trap these as well.
In such a situation, Akka HTTP provides an ExceptionHandler type, likewise to the RejectionHandler type. As a matter of fact, both the ExceptionHandler and the RejectionHandler types reside within the akka.http.scaladsl.server package. What is more, the wiring up of an ExceptionHandler is also done through the use of an implicit val = ... , that needs to be defined prior to bootstrapping the Akka HTTP server with the Http().bindAndHandle() method.
To illustrate the use of the ExceptionHandler as a global catch-all for Runtime exceptions, and the likes, we'll define another path("akka-http-failWith"). Within its get directive, we call the failWith method, and pass-through a new RuntimeException("Boom").
~ path("akka-http-failwith") {
get {
failWith(new RuntimeException("Boom"))
}
}
The modified DonutRoutes is now as follows:
class DonutRoutes extends JsonSupport with LazyLogging {
val donutDao = new DonutDao()
def route(): Route = {
path("create-donut") {
post {
entity(as[Donut]) { donut =>
logger.info(s"creating donut = $donut")
complete(StatusCodes.Created, s"Created donut = $donut")
}
} ~ delete {
complete(StatusCodes.MethodNotAllowed, "The HTTP DELETE operation is not allowed for the create-donut path.")
}
} ~ path("donuts") {
get {
onSuccess(donutDao.fetchDonuts()){ donuts =>
complete(StatusCodes.OK, donuts)
}
}
} ~ path("donuts-with-future-success-failure") {
get {
onComplete(donutDao.fetchDonuts()) {
case Success(donuts) => complete(StatusCodes.OK, donuts)
case Failure(ex) => complete(s"Failed to fetch donuts = ${ex.getMessage}")
}
}
} ~ path("complete-with-http-response") {
get {
complete(HttpResponse(status = StatusCodes.OK, entity = "Using an HttpResponse object"))
}
} ~ path("donut-with-try-httpresponse") {
get {
val result: HttpResponse = donutDao.tryFetchDonuts().getOrElse(donutDao.defaultResponse())
complete(result)
}
} ~ path("akka-http-failwith") {
get {
failWith(new RuntimeException("Boom"))
}
}
}
}
We then define an implicit val for the ExceptionHandler, and pass-through a Partial Function to trap any exceptions of type RuntimeException. It is worth pointing out that the Partial Function expects a Throwable as input to produce a particular Route as its output - PartialFunction[Throwable, Route]. For the purpose of demonstration, we are simply completing the HTTP response with the message from the RuntimException. It goes without saying that in a real-world application, you would most likely log this message, and output a user friendly or business message back to the client.
implicit val globalExceptionHandler = ExceptionHandler {
case e: RuntimeException => complete(s"A runtime exception occurred with, msg = ${e.getMessage}")
}
The modified AkkaHttpServer code is now as follows:
object AkkaHttpServer extends App with LazyLogging {
implicit val system = ActorSystem("akka-http-rest-server")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
// these should ideally be in some configuration file
val host = "127.0.0.1"
val port = 8080
implicit val globalRejectionHandler =
RejectionHandler.newBuilder()
.handle { case ValidationRejection(msg, route) =>
complete(StatusCodes.InternalServerError, s"The operation is not supported, error = $msg")
}
.handleNotFound {
complete(StatusCodes.NotFound, "The path is not supported.")
}
.result()
implicit val globalExceptionHandler = ExceptionHandler {
case e: RuntimeException => complete(s"A runtime exception occurred with, msg = ${e.getMessage}")
}
// // routes
// val serverUpRoute: Route = get {
// complete("Akka HTTP Server is UP.")
// }
val serverVersion = new ServerVersion()
val serverVersionRoute = serverVersion.route()
val serverVersionRouteAsJson = serverVersion.routeAsJson()
val serverVersionJsonEncoding = serverVersion.routeAsJsonEncoding()
val donutRoutes = new DonutRoutes().route()
val routes: Route = donutRoutes ~ serverVersionRoute ~ serverVersionRouteAsJson ~ serverVersionJsonEncoding//
// ~ serverUpRoute
val httpServerFuture = Http().bindAndHandle(routes, host, port)
httpServerFuture.onComplete {
case Success(binding) =>
logger.info(s"Akka Http Server is UP and is bound to ${binding.localAddress}")
case Failure(e) =>
logger.error(s"Akka Http server failed to start", e)
system.terminate()
}
StdIn.readLine() // let it run until user presses return
httpServerFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ => system.terminate()) // and shutdown when done
}
After starting our Akka HTTP server, and navigating to the http://localhost:8080/akka-http-failwith URL or path, you should see the following output:
A runtime exception occurred with, msg = Boom
Load HTML from resources
We have seen various ways of trapping exceptions and errors as well as displaying custom messages to end users, instead of the not so friendly exception stacktraces. In this same line of thought, you may be asked to redirect your clients to some custom HTML page.
We'll, therefore, take this opportunity to showcase how you can load an HTML page from the resources folder under: src/main/resources. As such, let's define a new path("akka-http-getresource"), which makes use of the getFromResource directive. This handy method resides under the akka.http.scaladsl.server.directives package, and it takes a parameter of type String for the particular resource name. The getFromResource method will then attempt to load the given resource by its name and, obviously, complete a client's GET request by serving the content of the resource.
~ path("akka-http-getresource") {
getFromResource("error-page.html")
}
The modified class DonutRoutes is now as follows:
class DonutRoutes extends JsonSupport with LazyLogging {
val donutDao = new DonutDao()
def route(): Route = {
path("create-donut") {
post {
entity(as[Donut]) { donut =>
logger.info(s"creating donut = $donut")
complete(StatusCodes.Created, s"Created donut = $donut")
}
} ~ delete {
complete(StatusCodes.MethodNotAllowed, "The HTTP DELETE operation is not allowed for the create-donut path.")
}
} ~ path("donuts") {
get {
onSuccess(donutDao.fetchDonuts()){ donuts =>
complete(StatusCodes.OK, donuts)
}
}
} ~ path("donuts-with-future-success-failure") {
get {
onComplete(donutDao.fetchDonuts()) {
case Success(donuts) => complete(StatusCodes.OK, donuts)
case Failure(ex) => complete(s"Failed to fetch donuts = ${ex.getMessage}")
}
}
} ~ path("complete-with-http-response") {
get {
complete(HttpResponse(status = StatusCodes.OK, entity = "Using an HttpResponse object"))
}
} ~ path("donut-with-try-httpresponse") {
get {
val result: HttpResponse = donutDao.tryFetchDonuts().getOrElse(donutDao.defaultResponse())
complete(result)
}
} ~ path("akka-http-failwith") {
get {
failWith(new RuntimeException("Boom"))
}
} ~ path("akka-http-getresource") {
getFromResource("error-page.html")
}
}
}
Next, we create a new HTML page named error-page.html under the src/main/resources folder. For the purpose of illustration, we also create a simple HTML page that essentially displays the message: This is some default error page. It goes without saying that in a real-world application, your error page will provide custom messages, logos, etc, as required by your business domain.
<HTML>
<HEAD>
<TITLE>Error Page</TITLE>
</HEAD>
<BODY BGCOLOR="FFFFFF">
<H1>This is some default error page</H1>
</BODY>
</HTML>
After starting our Akka HTTP server and navigating to the http://localhost:8080/akka-http-getresource URL or path, you should see the following output:
This is some default error page
RESTful URLs with segment
The previous tutorials on errors and exceptions no doubt showed that Akka HTTP was designed to natively and easily handle various failure scenarios. On the same line of thought, Akka HTTP's fluent Route DSL or API very much promotes creating RESTful HTTP service middle-tiers.
Let us consider the following URL: http://localhost:8080/donuts/Plain Donut. The localhost:8080 part, of course, relates to the location of the Akka HTTP endpoint. It also goes without saying that in most real-world applications, the server details would be hidden behind some proxy or CNAME, in order to provide a user-friendly URL, such as, www.google.com! The /donuts part perhaps refers to a logical categorization for your particular domain. Thereafter, you could allow a user to search or filter for a specific donut, such as a Plain Donut subsequent to the /donuts path as follows: http://localhost:8080/donuts/Plain Donut. Once again, Akka HTTP comes fully prepared to easily allow you to create such user-friendly URLs or APIs.
We kick off by first adding a new method named donutDetails to the class DonutDao. Its role will be to scan the simulated donuts database, donutsFromDb, and Pattern Match for that particular donut. On a side note, scanning a Vector data structure using the find method is obviously not an efficient operation for a large dataset.
def donutDetails(donutName: String): Future[String] = Future {
val someDonut = donutsFromDb.find(_.name == donutName)
someDonut match {
case Some(donut) => s"$donut"
case None => s"Donut = $donutName was not found."
}
}
The modified class DonutDao is now as follows:
class DonutDao {
import scala.concurrent.ExecutionContext.Implicits.global
val donutsFromDb = Vector(
Donut("Plain Donut", 1.50),
Donut("Chocolate Donut", 2),
Donut("Glazed Donut", 2.50)
)
def fetchDonuts(): Future[Donuts] = Future {
Donuts(donutsFromDb)
}
def tryFetchDonuts(): Try[HttpResponse] = Try {
throw new IllegalStateException("Boom!")
}
def defaultResponse(): HttpResponse =
HttpResponse(
status = StatusCodes.NotFound,
entity = "An unexpected error occurred. Please try again.")
def donutDetails(donutName: String): Future[String] = Future {
val someDonut = donutsFromDb.find(_.name == donutName)
someDonut match {
case Some(donut) => s"$donut"
case None => s"Donut = $donutName was not found."
}
}
}
Next up is where it gets interesting with creating a new path("donuts" / Segment), but notice the Segment part which correlates to http://localhost:8080/donuts/some user provided donut name. Akka HTTP's Route DSL feels very intuitive by allowing us to capture the donutName within the path(...) closure.
~ path("donuts" / Segment) { donutName =>
get {
val result = donutDao.donutDetails(donutName)
onSuccess(result) { donutDetail =>
complete(StatusCodes.OK, donutDetail)
}
}
}
The modified class DonutRoutes is now as follows:
class DonutRoutes extends JsonSupport with LazyLogging {
val donutDao = new DonutDao()
def route(): Route = {
path("create-donut") {
post {
entity(as[Donut]) { donut =>
logger.info(s"creating donut = $donut")
complete(StatusCodes.Created, s"Created donut = $donut")
}
} ~ delete {
complete(StatusCodes.MethodNotAllowed, "The HTTP DELETE operation is not allowed for the create-donut path.")
}
} ~ path("donuts") {
get {
onSuccess(donutDao.fetchDonuts()) { donuts =>
complete(StatusCodes.OK, donuts)
}
}
} ~ path("donuts-with-future-success-failure") {
get {
onComplete(donutDao.fetchDonuts()) {
case Success(donuts) => complete(StatusCodes.OK, donuts)
case Failure(ex) => complete(s"Failed to fetch donuts = ${ex.getMessage}")
}
}
} ~ path("complete-with-http-response") {
get {
complete(HttpResponse(status = StatusCodes.OK, entity = "Using an HttpResponse object"))
}
} ~ path("donut-with-try-httpresponse") {
get {
val result: HttpResponse = donutDao.tryFetchDonuts().getOrElse(donutDao.defaultResponse())
complete(result)
}
} ~ path("akka-http-failwith") {
get {
failWith(new RuntimeException("Boom"))
}
} ~ path("akka-http-getresource") {
getFromResource("error-page.html")
} ~ path("donuts" / Segment) { donutName =>
get {
val result = donutDao.donutDetails(donutName)
onSuccess(result) { donutDetail =>
complete(StatusCodes.OK, donutDetail)
}
}
}
}
}
After starting our Akka HTTP server and navigating to our RESTful donuts API http://localhost:8080/donuts/Plain%20Donut, you should see the following output:
Donut(Plain Donut,1.5)
Naturally, if you were to query for a particular donut that does not exist using, say, http://localhost:8080/donuts/Vanilla%20Donut, you should see the following output:
Donut = Vanilla Donut was not found.
RESTful URLs with regex
To continue our discussion on RESTful URLS with Akka HTTP, there may be times when you require further refinements or optimizations, such as, having to leverage the powerful features of Regular Expressions (short for regex). For instance, what if you wanted to validate and react to donut items which had a logical unique representation using the following format: donut_ABC123. As a result, a client could send a request to query or search for the above-mentioned donut id using the URL: http://localhost:8080/donuts/stock/donut_ABC123.
Thanks once more to the fluent Route DSL from Akka HTTP, you can easily pair up a given path(...) with a regex expression. Without any surprise, we go ahead and define a path("donuts" / "stock" / ...). Notice, however, that for the third part or Segment of the URL, we define a scala.util.matching.Regex. This regular expression validates the format for a particular donut item as represented by a "donut_" prefix, and then by an alpha-numerical literal as per the [a-zA-Z0-9\-]* regex.
~ path("donuts" / "stock" / new scala.util.matching.Regex("""donut_[a-zA-Z0-9\-]*""")) { donutId =>
get {
complete(StatusCodes.OK, s"Looking up donut stock by donutId = $donutId")
}
}
The modified class DonutRoutes is now as follows:
class DonutRoutes extends JsonSupport with LazyLogging {
val donutDao = new DonutDao()
def route(): Route = {
path("create-donut") {
post {
entity(as[Donut]) { donut =>
logger.info(s"creating donut = $donut")
complete(StatusCodes.Created, s"Created donut = $donut")
}
} ~ delete {
complete(StatusCodes.MethodNotAllowed, "The HTTP DELETE operation is not allowed for the create-donut path.")
}
} ~ path("donuts") {
get {
onSuccess(donutDao.fetchDonuts()) { donuts =>
complete(StatusCodes.OK, donuts)
}
}
} ~ path("donuts-with-future-success-failure") {
get {
onComplete(donutDao.fetchDonuts()) {
case Success(donuts) => complete(StatusCodes.OK, donuts)
case Failure(ex) => complete(s"Failed to fetch donuts = ${ex.getMessage}")
}
}
} ~ path("complete-with-http-response") {
get {
complete(HttpResponse(status = StatusCodes.OK, entity = "Using an HttpResponse object"))
}
} ~ path("donut-with-try-httpresponse") {
get {
val result: HttpResponse = donutDao.tryFetchDonuts().getOrElse(donutDao.defaultResponse())
complete(result)
}
} ~ path("akka-http-failwith") {
get {
failWith(new RuntimeException("Boom"))
}
} ~ path("akka-http-getresource") {
getFromResource("error-page.html")
} ~ path("donuts" / Segment) { donutName =>
get {
val result = donutDao.donutDetails(donutName)
onSuccess(result) { donutDetail =>
complete(StatusCodes.OK, donutDetail)
}
}
} ~ path("donuts" / "stock" / new scala.util.matching.Regex("""donut_[a-zA-Z0-9\-]*""")) { donutId =>
get {
complete(StatusCodes.OK, s"Looking up donut stock by donutId = $donutId")
}
}
}
}
After starting our Akka HTTP server, and nagivating to the http://localhost:8080/donuts/stock/donut_ABC123 URL, you should see the following output:
Looking up donut stock by donutId = donut_ABC123
Query parameter
Following on from the previous tutorials, you should start noticing the fluency of Akka HTTP's Route DSL for helping you easily build RESTful HTTP APIs. Obviously, Akka HTTP provides full support for HTTP query parameters to be captured and passed-through to an Akka HTTP server. For instance, consider a donutName to be a required query parameter that a client needs to provide when sending a request for the given URL http://localhost:8080/donut/prices?donutName=some donut name.
Without any surprises, we start off by defining a new path("donut" / "prices") in the class DonutRoutes. Within the get { ... } directive, notice that we provide an additional parameter(...) directive in order to capture and process the donutName query parameter. For the purpose of illustration, we are simply producing a String with the user provided query parameter for donutName as output.
~ path("donut" / "prices") {
get {
parameter("donutName") { donutName =>
val output = s"Received parameter: donutName=$donutName"
complete(StatusCodes.OK, output)
}
}
}
The modified class DonutRoutes is now as follows:
class DonutRoutes extends JsonSupport with LazyLogging {
val donutDao = new DonutDao()
def route(): Route = {
path("create-donut") {
post {
entity(as[Donut]) { donut =>
logger.info(s"creating donut = $donut")
complete(StatusCodes.Created, s"Created donut = $donut")
}
} ~ delete {
complete(StatusCodes.MethodNotAllowed, "The HTTP DELETE operation is not allowed for the create-donut path.")
}
} ~ path("donuts") {
get {
onSuccess(donutDao.fetchDonuts()) { donuts =>
complete(StatusCodes.OK, donuts)
}
}
} ~ path("donuts-with-future-success-failure") {
get {
onComplete(donutDao.fetchDonuts()) {
case Success(donuts) => complete(StatusCodes.OK, donuts)
case Failure(ex) => complete(s"Failed to fetch donuts = ${ex.getMessage}")
}
}
} ~ path("complete-with-http-response") {
get {
complete(HttpResponse(status = StatusCodes.OK, entity = "Using an HttpResponse object"))
}
} ~ path("donut-with-try-httpresponse") {
get {
val result: HttpResponse = donutDao.tryFetchDonuts().getOrElse(donutDao.defaultResponse())
complete(result)
}
} ~ path("akka-http-failwith") {
get {
failWith(new RuntimeException("Boom"))
}
} ~ path("akka-http-getresource") {
getFromResource("error-page.html")
} ~ path("donuts" / Segment) { donutName =>
get {
val result = donutDao.donutDetails(donutName)
onSuccess(result) { donutDetail =>
complete(StatusCodes.OK, donutDetail)
}
}
} ~ path("donuts" / "stock" / new scala.util.matching.Regex("""donut_[a-zA-Z0-9\-]*""")) { donutId =>
get {
complete(StatusCodes.OK, s"Looking up donut stock by donutId = $donutId")
}
} ~ path("donut" / "prices") {
get {
parameter("donutName") { donutName =>
val output = s"Received parameter: donutName=$donutName"
complete(StatusCodes.OK, output)
}
}
}
}
}
After starting our Akka HTTP server, and navigating to the http://localhost:8080/donut/prices, you will be required to provide the donutName query parameter by appending the following notation to the HTTP URL: ?donutName=. In the example below, the literal Plain%20Donut represents the value for the donutName query parameter and, of course, the value is user driven. You should then see the following output:
Received parameter: donutName=Plain Donut
It is worth noting that based on our Route definition for the "donut/prices" path, the donutName query parameter is indeed a strict requirement. In other words, you will see the error below should you forget or omit the donutName query parameter by passing through, say, a URL as http://localhost:8080/donut/prices?.
Request is missing required query parameter 'donutName'
Optional query parameter
The preceeding section introduced the ease of use for query parameters in Akka HTTP, thanks to its fluent Route DSL. Query parameters in themselves are an integral part of real-world applications and, as a matter of fact, come in various flavours. It would be, for instance, absurd to assume that query parameters are always required to be passed-through by a client making an HTTP call for a certain URL. In other words, one of the nuances with query parameters is that they can also be optional.
Let us consider the case that a customer to our donut store will naturally provide a particular donut name as part of a given order to prepare or bake a donut. With regards to the topping, however, the customer may choose to provide his or her own choice. This essentially makes the topping query parameter optional. What is more, in the event that the topping query parameter is not provided by the customer, the donut store will default to a value of sprinkles.
To codify this particular scenario in Akka HTTP, we'll create a new path("donut" / "bake"). Within its get directive, we use the parameters diective which, as its name implies, allows us to handle multiple parameters. The first parameter donutName is a required one. But, notice that the topping parameter is in fact optinoal and has a default value of "sprinkles", using the notation 'topping ? "sprinkles".
~ path("donut" / "bake") {
get {
parameters('donutName, 'topping ? "sprinkles") { (donutName, topping) =>
val output = s"Received parameters: donutName=$donutName and topping=$topping"
complete(StatusCodes.OK, output)
}
}
}
The modified class DonutRoutes is now as follows:
class DonutRoutes extends JsonSupport with LazyLogging {
val donutDao = new DonutDao()
def route(): Route = {
path("create-donut") {
post {
entity(as[Donut]) { donut =>
logger.info(s"creating donut = $donut")
complete(StatusCodes.Created, s"Created donut = $donut")
}
} ~ delete {
complete(StatusCodes.MethodNotAllowed, "The HTTP DELETE operation is not allowed for the create-donut path.")
}
} ~ path("donuts") {
get {
onSuccess(donutDao.fetchDonuts()) { donuts =>
complete(StatusCodes.OK, donuts)
}
}
} ~ path("donuts-with-future-success-failure") {
get {
onComplete(donutDao.fetchDonuts()) {
case Success(donuts) => complete(StatusCodes.OK, donuts)
case Failure(ex) => complete(s"Failed to fetch donuts = ${ex.getMessage}")
}
}
} ~ path("complete-with-http-response") {
get {
complete(HttpResponse(status = StatusCodes.OK, entity = "Using an HttpResponse object"))
}
} ~ path("donut-with-try-httpresponse") {
get {
val result: HttpResponse = donutDao.tryFetchDonuts().getOrElse(donutDao.defaultResponse())
complete(result)
}
} ~ path("akka-http-failwith") {
get {
failWith(new RuntimeException("Boom"))
}
} ~ path("akka-http-getresource") {
getFromResource("error-page.html")
} ~ path("donuts" / Segment) { donutName =>
get {
val result = donutDao.donutDetails(donutName)
onSuccess(result) { donutDetail =>
complete(StatusCodes.OK, donutDetail)
}
}
} ~ path("donuts" / "stock" / new scala.util.matching.Regex("""donut_[a-zA-Z0-9\-]*""")) { donutId =>
get {
complete(StatusCodes.OK, s"Looking up donut stock by donutId = $donutId")
}
} ~ path("donut" / "prices") {
get {
parameter("donutName") { donutName =>
val output = s"Received parameter: donutName=$donutName"
complete(StatusCodes.OK, output)
}
}
} ~ path("donut" / "bake") {
get {
parameters('donutName, 'topping ? "sprinkles") { (donutName, topping) =>
val output = s"Received parameters: donutName=$donutName and topping=$topping"
complete(StatusCodes.OK, output)
}
}
}
}
}
After starting our Akka HTTP server and navigating to the URL http://localhost:8080/donut/bake?donutName=plain%20donut&topping=chocolate%20chips, you should see the following output:
Received parameters: donutName=plain donut and topping=chocolate chips
What would happen if we were to omit the topping query parameter? Let's give it a try: http://localhost:8080/donut/bake?donutName=plain%20donut. You will, of course, observe that the optinal topping query parameter has a default value of sprinkles, as per our above-mentioned path("donut" / "bake") definition.
Received parameters: donutName=plain donut and topping=sprinkles
Typed query parameters
This section is also dedicated to the processing of query parameters in Akka HTTP. We've already covered the handling of multiple query parameters, and we'll now provide a nice bonus when using Akka HTTP's Routing DSL with respect to query parameters. More precisely, Akka HTTP will automatically deserialize your query parameters to their corresponding types, as specified by the .as[some type] notation. Therefore, you no longer need to maintain redundant parsing methods in your code base, such as, the parsing of a given String into an Int, Double, etc.
As per our previous code snippets, we'll go ahead and create a new path(...) with a get directive. In order to process multiple query parameters, we'll use the familiar parameters(...) directive. For the purpose of this illustration, we'll have two query parameters, namely, a donutName and a priceLevel, respectively. Using the .as[String] and the .as[Double] notations, you are able to automatically convert the incoming donutName and priceLevel query parameters to their corresponding String and Double types.
This fluency in Akka HTTP's Routing DSL is certainly very simple and easy to use. Yet, it presents numerous advantages to working directly with our expected types, such as, passing-through the incoming query parameters to some function or method, which naturally expects matching type parameters. Later on, we'll also showcase how you can create test cases with regards to your typed query parameters.
~ path("ingredients") {
get {
parameters('donutName.as[String], 'priceLevel.as[Double]) { (donutName, priceLevel) =>
val output = s"Received parameters: donutName=$donutName, priceLevel=$priceLevel"
complete(StatusCodes.OK, output)
}
}
}
The modified class DonutRoutes is now as follows:
class DonutRoutes extends JsonSupport with LazyLogging {
val donutDao = new DonutDao()
def route(): Route = {
path("create-donut") {
post {
entity(as[Donut]) { donut =>
logger.info(s"creating donut = $donut")
complete(StatusCodes.Created, s"Created donut = $donut")
}
} ~ delete {
complete(StatusCodes.MethodNotAllowed, "The HTTP DELETE operation is not allowed for the create-donut path.")
}
} ~ path("donuts") {
get {
onSuccess(donutDao.fetchDonuts()) { donuts =>
complete(StatusCodes.OK, donuts)
}
}
} ~ path("donuts-with-future-success-failure") {
get {
onComplete(donutDao.fetchDonuts()) {
case Success(donuts) => complete(StatusCodes.OK, donuts)
case Failure(ex) => complete(s"Failed to fetch donuts = ${ex.getMessage}")
}
}
} ~ path("complete-with-http-response") {
get {
complete(HttpResponse(status = StatusCodes.OK, entity = "Using an HttpResponse object"))
}
} ~ path("donut-with-try-httpresponse") {
get {
val result: HttpResponse = donutDao.tryFetchDonuts().getOrElse(donutDao.defaultResponse())
complete(result)
}
} ~ path("akka-http-failwith") {
get {
failWith(new RuntimeException("Boom"))
}
} ~ path("akka-http-getresource") {
getFromResource("error-page.html")
} ~ path("donuts" / Segment) { donutName =>
get {
val result = donutDao.donutDetails(donutName)
onSuccess(result) { donutDetail =>
complete(StatusCodes.OK, donutDetail)
}
}
} ~ path("donuts" / "stock" / new scala.util.matching.Regex("""donut_[a-zA-Z0-9\-]*""")) { donutId =>
get {
complete(StatusCodes.OK, s"Looking up donut stock by donutId = $donutId")
}
} ~ path("donut" / "prices") {
get {
parameter("donutName") { donutName =>
val output = s"Received parameter: donutName=$donutName"
complete(StatusCodes.OK, output)
}
}
} ~ path("donut" / "bake") {
get {
parameters('donutName, 'topping ? "sprinkles") { (donutName, topping) =>
val output = s"Received parameters: donutName=$donutName and topping=$topping"
complete(StatusCodes.OK, output)
}
}
} ~ path("ingredients") {
get {
parameters('donutName.as[String], 'priceLevel.as[Double]) { (donutName, priceLevel) =>
val output = s"Received parameters: donutName=$donutName, priceLevel=$priceLevel"
complete(StatusCodes.OK, output)
}
}
}
}
}
After starting our Akka HTTP server and navigating to the http://localhost:8080/ingredients?donutName=Plain%20Donut&priceLevel=2.0 URL, you should see the following output.
Received parameters: donutName=Plain Donut, priceLevel=2.0
CSV query parameter
We soldier on with this mini series regarding HTTP query parameters when using Akka HTTP, and introduce the handling of Comma Separated Values, short for CSV, query parameters. As its name implies, a CSV query parameter is one which has multiple values and with each value partitioned by a **comma**. CSV query parameter is such a common use case that Akka HTTP provides a built-in unmarshaller, which will parse and unwrap the CSV values into a typed value.
As usual, for the purpose of illustration, we create a new path("bake-donuts") with a get directive. We then add a parameter directive to capture an ingredients query parameter. However, notice that we immediately **type** this parameter as being a CSV query parameter, using the **as(CsvSeq[String])** notation. Note also that you will need to explicitly import akka.http.scaladsl.unmarshalling.PredefinedFromStringUnmarshallers.CsvSeq, which will place all the necessary implicit type conversions in scope. A common mistake is to forget the above-mentioned import statement leading to the following compile time error: not found: value CsvSeq. It is also worth pointing out that the resulting type for the ingredients CSV query parameter in the example below is a Seq[String], which you can of course pass-through to any function or method for your particular requirement.
~ path("bake-donuts") {
get {
import akka.http.scaladsl.unmarshalling.PredefinedFromStringUnmarshallers.CsvSeq
parameter('ingredients.as(CsvSeq[String])) { ingredients =>
val output = s"Received CSV parameter: ingredients=$ingredients"
complete(StatusCodes.OK, output)
}
}
}
The modified class DonutRoutes is now as follows:
class DonutRoutes extends JsonSupport with LazyLogging {
val donutDao = new DonutDao()
def route(): Route = {
path("create-donut") {
post {
entity(as[Donut]) { donut =>
logger.info(s"creating donut = $donut")
complete(StatusCodes.Created, s"Created donut = $donut")
}
} ~ delete {
complete(StatusCodes.MethodNotAllowed, "The HTTP DELETE operation is not allowed for the create-donut path.")
}
} ~ path("donuts") {
get {
onSuccess(donutDao.fetchDonuts()) { donuts =>
complete(StatusCodes.OK, donuts)
}
}
} ~ path("donuts-with-future-success-failure") {
get {
onComplete(donutDao.fetchDonuts()) {
case Success(donuts) => complete(StatusCodes.OK, donuts)
case Failure(ex) => complete(s"Failed to fetch donuts = ${ex.getMessage}")
}
}
} ~ path("complete-with-http-response") {
get {
complete(HttpResponse(status = StatusCodes.OK, entity = "Using an HttpResponse object"))
}
} ~ path("donut-with-try-httpresponse") {
get {
val result: HttpResponse = donutDao.tryFetchDonuts().getOrElse(donutDao.defaultResponse())
complete(result)
}
} ~ path("akka-http-failwith") {
get {
failWith(new RuntimeException("Boom"))
}
} ~ path("akka-http-getresource") {
getFromResource("error-page.html")
} ~ path("donuts" / Segment) { donutName =>
get {
val result = donutDao.donutDetails(donutName)
onSuccess(result) { donutDetail =>
complete(StatusCodes.OK, donutDetail)
}
}
} ~ path("donuts" / "stock" / new scala.util.matching.Regex("""donut_[a-zA-Z0-9\-]*""")) { donutId =>
get {
complete(StatusCodes.OK, s"Looking up donut stock by donutId = $donutId")
}
} ~ path("donut" / "prices") {
get {
parameter("donutName") { donutName =>
val output = s"Received parameter: donutName=$donutName"
complete(StatusCodes.OK, output)
}
}
} ~ path("donut" / "bake") {
get {
parameters('donutName, 'topping ? "sprinkles") { (donutName, topping) =>
val output = s"Received parameters: donutName=$donutName and topping=$topping"
complete(StatusCodes.OK, output)
}
}
} ~ path("ingredients") {
get {
parameters('donutName.as[String], 'priceLevel.as[Double]) { (donutName, priceLevel) =>
val output = s"Received parameters: donutName=$donutName, priceLevel=$priceLevel"
complete(StatusCodes.OK, output)
}
}
} ~ path("bake-donuts") {
get {
import akka.http.scaladsl.unmarshalling.PredefinedFromStringUnmarshallers.CsvSeq
parameter('ingredients.as(CsvSeq[String])) { ingredients =>
val output = s"Received CSV parameter: ingredients=$ingredients"
complete(StatusCodes.OK, output)
}
}
}
}
}
After starting our Akka HTTP server and navigating to the http://localhost:8080/bake-donuts?ingredients=flour,sugar,vanilla URL, you should see the following output.
Received CSV parameter: ingredients=List(flour, sugar, vanilla)
Query parameter to Case Class
It is fair to say that by now we may be going overboard with regards to query parameters in Akka HTTP. Needless to say, when you get to develop real-world HTTP APIs, it is very much expected that you will have various flavors and use cases when it comes to exposing query parameters. As you have seen thus far, the Routing DSL of Akka HTTP supports query parameters, and we are certainly not done with its trick of the trade. This tutorial showcases the built-in support for turning your query parameters into equivalent case class. A case class is perhaps more valuable within a particular business context or domain, as opposed to the raw query parameters.
To illustrate this, we introduce yet another path("ingredients-to-case-class"). Within its get directive, we expect two parameters, namely, a donutName of type String, and a priceLevel of type Double, respectively. This is set forth using the familiar parameters('donutName.as[String], 'priceLevel.as[Double]) notation. What is more, we can chain yet another .as(), to essentially type these two parameters to a given case class. In the example below, it is a case class named Ingredient using the .as(Ingredient) notation.
~ path("ingredients-to-case-class") {
get {
parameters('donutName.as[String], 'priceLevel.as[Double]).as(Ingredient) { ingredient =>
val output = s"Encoded query parameters into case class, ingredient: $ingredient"
complete(StatusCodes.OK, output)
}
}
}
Of course, for your code to compile, you will need to have the case class Ingredient either defined within your code base, or imported as an external artifact via your build.sbt. Whichever the case, the Ingredient case class will need to be in scope.
final case class Ingredient(donutName: String, priceLevel: Double)
The modified class DonutRoutes is now as follows:
class DonutRoutes extends JsonSupport with LazyLogging {
val donutDao = new DonutDao()
def route(): Route = {
path("create-donut") {
post {
entity(as[Donut]) { donut =>
logger.info(s"creating donut = $donut")
complete(StatusCodes.Created, s"Created donut = $donut")
}
} ~ delete {
complete(StatusCodes.MethodNotAllowed, "The HTTP DELETE operation is not allowed for the create-donut path.")
}
} ~ path("donuts") {
get {
onSuccess(donutDao.fetchDonuts()) { donuts =>
complete(StatusCodes.OK, donuts)
}
}
} ~ path("donuts-with-future-success-failure") {
get {
onComplete(donutDao.fetchDonuts()) {
case Success(donuts) => complete(StatusCodes.OK, donuts)
case Failure(ex) => complete(s"Failed to fetch donuts = ${ex.getMessage}")
}
}
} ~ path("complete-with-http-response") {
get {
complete(HttpResponse(status = StatusCodes.OK, entity = "Using an HttpResponse object"))
}
} ~ path("donut-with-try-httpresponse") {
get {
val result: HttpResponse = donutDao.tryFetchDonuts().getOrElse(donutDao.defaultResponse())
complete(result)
}
} ~ path("akka-http-failwith") {
get {
failWith(new RuntimeException("Boom"))
}
} ~ path("akka-http-getresource") {
getFromResource("error-page.html")
} ~ path("donuts" / Segment) { donutName =>
get {
val result = donutDao.donutDetails(donutName)
onSuccess(result) { donutDetail =>
complete(StatusCodes.OK, donutDetail)
}
}
} ~ path("donuts" / "stock" / new scala.util.matching.Regex("""donut_[a-zA-Z0-9\-]*""")) { donutId =>
get {
complete(StatusCodes.OK, s"Looking up donut stock by donutId = $donutId")
}
} ~ path("donut" / "prices") {
get {
parameter("donutName") { donutName =>
val output = s"Received parameter: donutName=$donutName"
complete(StatusCodes.OK, output)
}
}
} ~ path("donut" / "bake") {
get {
parameters('donutName, 'topping ? "sprinkles") { (donutName, topping) =>
val output = s"Received parameters: donutName=$donutName and topping=$topping"
complete(StatusCodes.OK, output)
}
}
} ~ path("ingredients") {
get {
parameters('donutName.as[String], 'priceLevel.as[Double]) { (donutName, priceLevel) =>
val output = s"Received parameters: donutName=$donutName, priceLevel=$priceLevel"
complete(StatusCodes.OK, output)
}
}
} ~ path("bake-donuts") {
get {
import akka.http.scaladsl.unmarshalling.PredefinedFromStringUnmarshallers.CsvSeq
parameter('ingredients.as(CsvSeq[String])) { ingredients =>
val output = s"Received CSV parameter: ingredients=$ingredients"
complete(StatusCodes.OK, output)
}
}
} ~ path("ingredients-to-case-class") {
get {
parameters('donutName.as[String], 'priceLevel.as[Double]).as(Ingredient) { ingredient =>
val output = s"Encoded query parameters into case class, ingredient: $ingredient"
complete(StatusCodes.OK, output)
}
}
}
}
}
After starting our Akka HTTP server and navigating to the http://localhost:8080/ingredients-to-case-class?donutName=plain%20donut&priceLevel=2.0 URL, you should see the output below. As you can observe, Akka HTTP nicely encoded the two incoming query parameters, donutName and priceLevel, into the corresponding case class Ingredient.
Encoded query parameters into case class, ingredient: Ingredient(plain donut,2.0)
HTTP request headers
Likewise to the previously discussed query parameters, you commonly have to inspect the HTTP Request Headers when developing real-world HTTP middle tier applications or APIs. The headers section from an HTTP payload is primarily represented as a bunch of key-value pairs, which provide additional meta data to a given HTTP request. For instance, an authenticated client request would perhaps pass-through a security token to an Akka HTTP server. Or, generally speaking, web browsers also capture and send across meta data along a particular HTTP request, such as, the User-Agent or Connection details.
With these in mind, it would come as no surprise that Akka HTTP naturally allows you to extract and inspect the key-value pairs that are passed-through upon a given HTTP request. It goes without saying that this feature is nicely embedded within the fluent Routing DSL as per the previous code snippets.
To this end, we create another path("request-with-headers"). Inside its get directive, we make use of the built-in extractRequest directive, which will provide you with an { httpRequest => ...}. With the httpRequest value in scope, you can access the HTTP headers section using the httpRequest.headers notation. In fact, this returns a Sequence of HttpHeader, that is, Seq[HttpHeader]. For the purpose of illustration, we simply produce a comma delimited String for all the HTTP headers using the mkString(", ") method.
~ path("request-with-headers") {
get {
extractRequest { httpRequest =>
val headers = httpRequest.headers.mkString(", ")
complete(StatusCodes.OK, s"headers = $headers")
}
}
}
The modified class DonutRoutes is now as follows:
final case class Ingredient(donutName: String, priceLevel: Double)
class DonutRoutes extends JsonSupport with LazyLogging {
val donutDao = new DonutDao()
def route(): Route = {
path("create-donut") {
post {
entity(as[Donut]) { donut =>
logger.info(s"creating donut = $donut")
complete(StatusCodes.Created, s"Created donut = $donut")
}
} ~ delete {
complete(StatusCodes.MethodNotAllowed, "The HTTP DELETE operation is not allowed for the create-donut path.")
}
} ~ path("donuts") {
get {
onSuccess(donutDao.fetchDonuts()) { donuts =>
complete(StatusCodes.OK, donuts)
}
}
} ~ path("donuts-with-future-success-failure") {
get {
onComplete(donutDao.fetchDonuts()) {
case Success(donuts) => complete(StatusCodes.OK, donuts)
case Failure(ex) => complete(s"Failed to fetch donuts = ${ex.getMessage}")
}
}
} ~ path("complete-with-http-response") {
get {
complete(HttpResponse(status = StatusCodes.OK, entity = "Using an HttpResponse object"))
}
} ~ path("donut-with-try-httpresponse") {
get {
val result: HttpResponse = donutDao.tryFetchDonuts().getOrElse(donutDao.defaultResponse())
complete(result)
}
} ~ path("akka-http-failwith") {
get {
failWith(new RuntimeException("Boom"))
}
} ~ path("akka-http-getresource") {
getFromResource("error-page.html")
} ~ path("donuts" / Segment) { donutName =>
get {
val result = donutDao.donutDetails(donutName)
onSuccess(result) { donutDetail =>
complete(StatusCodes.OK, donutDetail)
}
}
} ~ path("donuts" / "stock" / new scala.util.matching.Regex("""donut_[a-zA-Z0-9\-]*""")) { donutId =>
get {
complete(StatusCodes.OK, s"Looking up donut stock by donutId = $donutId")
}
} ~ path("donut" / "prices") {
get {
parameter("donutName") { donutName =>
val output = s"Received parameter: donutName=$donutName"
complete(StatusCodes.OK, output)
}
}
} ~ path("donut" / "bake") {
get {
parameters('donutName, 'topping ? "sprinkles") { (donutName, topping) =>
val output = s"Received parameters: donutName=$donutName and topping=$topping"
complete(StatusCodes.OK, output)
}
}
} ~ path("ingredients") {
get {
parameters('donutName.as[String], 'priceLevel.as[Double]) { (donutName, priceLevel) =>
val output = s"Received parameters: donutName=$donutName, priceLevel=$priceLevel"
complete(StatusCodes.OK, output)
}
}
} ~ path("bake-donuts") {
get {
import akka.http.scaladsl.unmarshalling.PredefinedFromStringUnmarshallers.CsvSeq
parameter('ingredients.as(CsvSeq[String])) { ingredients =>
val output = s"Received CSV parameter: ingredients=$ingredients"
complete(StatusCodes.OK, output)
}
}
} ~ path("ingredients-to-case-class") {
get {
parameters('donutName.as[String], 'priceLevel.as[Double]).as(Ingredient) { ingredient =>
val output = s"Encoded query parameters into case class, ingredient: $ingredient"
complete(StatusCodes.OK, output)
}
}
} ~ path("request-with-headers") {
get {
extractRequest { httpRequest =>
val headers = httpRequest.headers.mkString(", ")
complete(StatusCodes.OK, s"headers = $headers")
}
}
}
}
}
After starting our Akka HTTP server and navigating to the http://localhost:8080/request-with-headers URL, you should see a similar output for the popular key-value HTTP header pairs that are sent across by your particular web browser.
headers = Host: localhost:8080, Connection: keep-alive, Upgrade-Insecure-Requests: 1, User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.103 Safari/537.36, Accept: text/html, application/xhtml+xml, application/xml;q=0.9, image/webp, image/apng, */*;q=0.8, application/signed-exchange; v=b3, Accept-Encoding: gzip, deflate, br, Accept-Language: en-US, en;q=0.9, fr;q=0.8, Timeout-Access:
RESTful URLs multiple segments
We've previously illustrated how to capture a RESTful URL using the Segment feature provided by Akka HTTP, or filtering specific URLs using a plain regex. Developing RESTful URLs can grow increasingly in complexity with multiple URL parts or subsections, such as, http://localhost:8080/partA/partB/partC/partC. This type of RESTful URL design is especially common in real-world applications that require various levels of implicit hierarchy or categorization similar to those found in certain Machine Learning pipelines. As expected, Akka HTTP's Routing DSL can easily capture such potential long trailing URL parts or subsections by using a Segements path matcher, as opposed to the previously discussed Segment matcher.
As usual, we'll define a new path("multiple-segments" / Segments). The resulting { segments => ... } value is typed to a convenient List[String], following which you can apply any further Pattern Matching to meet your specific business needs. For the purpose of this demo, we'll simply extract the three parts that we are expecting for the URL: http://localhost:8080/partA/partB/partC/partC.
~ path("multiple-segments" / Segments ) { segments =>
get {
val partA :: partB :: partC :: Nil = segments
val output =
s"""
|Received the following Segments = $segments, with
|partA = $partA
|partB = $partB
|partC = $partC
""".stripMargin
complete(StatusCodes.OK, output)
}
}
The modified class DonutRoutes is now as follows:
final case class Ingredient(donutName: String, priceLevel: Double)
class DonutRoutes extends JsonSupport with LazyLogging {
val donutDao = new DonutDao()
def route(): Route = {
path("create-donut") {
post {
entity(as[Donut]) { donut =>
logger.info(s"creating donut = $donut")
complete(StatusCodes.Created, s"Created donut = $donut")
}
} ~ delete {
complete(StatusCodes.MethodNotAllowed, "The HTTP DELETE operation is not allowed for the create-donut path.")
}
} ~ path("donuts") {
get {
onSuccess(donutDao.fetchDonuts()) { donuts =>
complete(StatusCodes.OK, donuts)
}
}
} ~ path("donuts-with-future-success-failure") {
get {
onComplete(donutDao.fetchDonuts()) {
case Success(donuts) => complete(StatusCodes.OK, donuts)
case Failure(ex) => complete(s"Failed to fetch donuts = ${ex.getMessage}")
}
}
} ~ path("complete-with-http-response") {
get {
complete(HttpResponse(status = StatusCodes.OK, entity = "Using an HttpResponse object"))
}
} ~ path("donut-with-try-httpresponse") {
get {
val result: HttpResponse = donutDao.tryFetchDonuts().getOrElse(donutDao.defaultResponse())
complete(result)
}
} ~ path("akka-http-failwith") {
get {
failWith(new RuntimeException("Boom"))
}
} ~ path("akka-http-getresource") {
getFromResource("error-page.html")
} ~ path("donuts" / Segment) { donutName =>
get {
val result = donutDao.donutDetails(donutName)
onSuccess(result) { donutDetail =>
complete(StatusCodes.OK, donutDetail)
}
}
} ~ path("donuts" / "stock" / new scala.util.matching.Regex("""donut_[a-zA-Z0-9\-]*""")) { donutId =>
get {
complete(StatusCodes.OK, s"Looking up donut stock by donutId = $donutId")
}
} ~ path("donut" / "prices") {
get {
parameter("donutName") { donutName =>
val output = s"Received parameter: donutName=$donutName"
complete(StatusCodes.OK, output)
}
}
} ~ path("donut" / "bake") {
get {
parameters('donutName, 'topping ? "sprinkles") { (donutName, topping) =>
val output = s"Received parameters: donutName=$donutName and topping=$topping"
complete(StatusCodes.OK, output)
}
}
} ~ path("ingredients") {
get {
parameters('donutName.as[String], 'priceLevel.as[Double]) { (donutName, priceLevel) =>
val output = s"Received parameters: donutName=$donutName, priceLevel=$priceLevel"
complete(StatusCodes.OK, output)
}
}
} ~ path("bake-donuts") {
get {
import akka.http.scaladsl.unmarshalling.PredefinedFromStringUnmarshallers.CsvSeq
parameter('ingredients.as(CsvSeq[String])) { ingredients =>
val output = s"Received CSV parameter: ingredients=$ingredients"
complete(StatusCodes.OK, output)
}
}
} ~ path("ingredients-to-case-class") {
get {
parameters('donutName.as[String], 'priceLevel.as[Double]).as(Ingredient) { ingredient =>
val output = s"Encoded query parameters into case class, ingredient: $ingredient"
complete(StatusCodes.OK, output)
}
}
} ~ path("request-with-headers") {
get {
extractRequest { httpRequest =>
val headers = httpRequest.headers.mkString(", ")
complete(StatusCodes.OK, s"headers = $headers")
}
}
} ~ path("multiple-segments" / Segments ) { segments =>
get {
val partA :: partB :: partC :: Nil = segments
val output =
s"""
|Received the following Segments = $segments, with
|partA = $partA
|partB = $partB
|partC = $partC
""".stripMargin
complete(StatusCodes.OK, output)
}
}
}
}
After starting our Akka HTTP server and navigating to the http://localhost:8080/multiple-segments/segment-1/segment-2/segment-3 URL, you should see the following output.
Received the following Segments = List(segment-1, segment-2, segment-3), with
partA = segment-1
partB = segment-2
partC = segment-3
It is worth pointing out that the Segments matcher will fail to capture your indended URL if the latter ends with a trailing slash, such as, http://localhost:8080/multiple-segments/segment-1/segment-2/segment-3/. In such a situation, the unmatched URL will potentially fall-through to other nested directives as defined by your Routing DSL, or produce an error similar to The path is not supported..
HTTP client GET
We've primarily been focusing our attention on the server facilities offered by Akka HTTP. As a matter of fact, we've provided various code snippets in a step-by-step fashion so that you can gradually learn the features and versatility offered by the Routing DSL of Akka HTTP. Without question, these should help you confidently work with Akka HTTP for developing server-side HTTP endpoints, such as, exposing a RESTful API. What is more, and perhaps one of best kept secrets of the Akka ecosystem, is that it can equally assist you whenever you require to create client applications that interact over some HTTP protocol, such as, a RESTful API.
Thus far, we've connected to our Akka HTTP server using either the curl command line utility, or a web browser such as Chrome or FireFox. For instance, in the Future onSuccess section, our Akka HTTP server produced the following output when navigating to the http://localhost:8080/donuts URL.
http://localhost:8080/donuts
{
"donuts": [{
"name": "Plain Donut",
"price": 1.5
}, {
"name": "Chocolate Donut",
"price": 2.0
}, {
"name": "Glazed Donut",
"price": 2.5
}]
}
Likewise, we will create a client application using Akka HTTP's built-in support for connecting to, and sending HTTP requests, over the wire to an HTTP endpoint. As it happens, we will replicate the above-mentioned example, and to get started, we'll create a new runnable Scala application named AkkaHttpClient, which extends the familiar App trait.
object AkkaHttpClient extends App {
}
Next, we add the traditional components of an Akka Actor System, namely, the ActorSystem itself, followed by the ActorMaterializer, and the ExecutionContext or dispatcher.
object AkkaHttpClient extends App {
implicit val system = ActorSystem("akka-http-donuts-client")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
}
We then use the HttpRequest type from the akka.http.scaladsl.model package to act as a nice closure to represent an HTTP GET request to the http://localhost:8080/donuts endpoint.
object AkkaHttpClient extends App {
implicit val system = ActorSystem("akka-http-donuts-client")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
val donutsUri = "http://localhost:8080/donuts"
val donutsHttpRequest = HttpRequest(
uri = donutsUri,
method = HttpMethods.GET
)
}
With the HttpRequest created, we can pass it through to the Http().singleRequest method. As its name implies, this method will kick off the HTTP call to our endpoint, and return a similar HttpResponse. Without any surprise, though, the Http().singleRequest method is asynchronous in nature and, as such, the HttpResponse is enclosed within a Future - that is, the type of donutReponse is Future[HttpResponse]. With this in mind, we can attach the familiar onComplete callback to react to either a Success or a Failure. For the purpose of this illustration, we are simply printing either the Success or Failure message, respectively.
object AkkaHttpClient extends App {
implicit val system = ActorSystem("akka-http-donuts-client")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
val donutsUri = "http://localhost:8080/donuts"
val donutsHttpRequest = HttpRequest(
uri = donutsUri,
method = HttpMethods.GET
)
val donutsResponse = Http().singleRequest(donutsHttpRequest)
donutsResponse
.onComplete {
case Success(donutsResponse) => println(s"Raw HttpResponse = $donutsResponse")
case Failure(e) => println(s"Failed to HTTP GET $donutsUri, error = ${e.getMessage}")
}
Thread.sleep(3000)
system.terminate()
}
Before running this client application, first kick off our Akka HTTP server so that the http://localhost:8080/donuts is accessible. After that, you can fire off this new AkkaHttpClient application, and it should connect to the http://localhost:8080/donuts URL, and produce the following output.
Raw HttpResponse = HttpResponse(200 OK,List(Server: akka-http/10.1.6, Date: Thu, 18 Apr 2019 17:01:12 GMT),HttpEntity.Strict(application/json,{
"donuts": [{
"name": "Plain Donut",
"price": 1.5
}, {
"name": "Chocolate Donut",
"price": 2.0
}, {
"name": "Glazed Donut",
"price": 2.5
}]
}),HttpProtocol(HTTP/1.1))
Regarding the above output, you must have surely observed that there is a higher HttpResponse closure surrouding the JSON output produced from the http://localhost:8080/donuts URL. In most cases, it is less likely that you will be required to capture this raw output, and rather zero in on the actual JSON payload. We will in fact, go over the unmarshalling of this JSON message in the next section.
Unmarshal HttpResponse to Case Class
To accompany the previous discussion on using Akka Http for sending an Http GET client request, we'll expand the code snippet to showcase the unmarshalling of an HttpResponse to a more convenient Case Class. As a matter of fact, if you've followed our Akka tutorial series from the start, we've somewhat already covered the mechanics for automatic conversion from/to a Case Class.
More precisely, you will recall that we created a trait JsonSupport, which extended trait SprayJsonSupport from the akka.http.scaladsl.marshallers.sprayjson package, in addition to mixin the trait DefaultJsonProtocol from the spray.json package. In short, this trait JsonSupport acted as a nice closure to lift our domain types, namely, AkkaHttpRestServer, Donut and Donuts, into the implicit scope using Akka Http's handy jsonFormatN methods. And that's all the implicit magic, so to speak, that we needed to benefit from the automatic un/marshalling from/to a given Case Class.
trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
import spray.json._
implicit val printer = PrettyPrinter
implicit val serverFormat = jsonFormat2(AkkaHttpRestServer)
implicit val donutFormat = jsonFormat2(Donut)
implicit val donutsJsonFormat = jsonFormat1(Donuts)
}
Next, you obviously need to mixin the above-mentioned trait JsonSupport with the AkkaHttpClient application.
object AkkaHttpClient extends App with JsonSupport {
...
}
We then focus our attention to the case Success which, as of the last example, constrained the JSON output from the HTTP call to the http://localhost:8080/donuts URL within a top level type, named, HttpResponse. In case you've forgotten, see below the raw HttpResponse value.
Raw HttpResponse = HttpResponse(200 OK,List(Server: akka-http/10.1.6, Date: Sun, 21 Apr 2019 17:04:50 GMT),HttpEntity.Strict(application/json,{
"donuts": [{
"name": "Plain Donut",
"price": 1.5
}, {
"name": "Chocolate Donut",
"price": 2.0
}, {
"name": "Glazed Donut",
"price": 2.5
}]
}),HttpProtocol(HTTP/1.1))
But, we would rather work with a more appropriate Case Class as opposed to an HttpResponse. To this end, let's start by passing through the HttpResponse, that is, the donutsResponse value, to the Unmarshal class from the akka.http.scaladsl.unmarshalling package. Thereafter, you can trigger the unmarshalling process to a corresponding Case Class Donuts using its to method. However, the unmarshalling process will require an in-scope unmarshaller for the specific type which, for this illustration, is implied within the trait JsonSupport that was mixin earlier. Notice also that, as with most things in Akka, the unmarshalling process is asynchronous in nature and returns a Future, that is, Future[Donuts]. In order to show the resulting Case Class generation, we'll use the Await.result method. This, of course, blocks! You should avoid blocking in real-world application in favour of, say, a flatMap or equivalent sequencing of your Future operations.
val donutsResponse = Http().singleRequest(donutsHttpRequest)
donutsResponse
.onComplete {
case Success(donutsResponse) =>
println(s"Raw HttpResponse = $donutsResponse")
val donutsF: Future[Donuts] = Unmarshal(donutsResponse).to[Donuts]
val donuts: Donuts = Await.result(donutsF, 5.second)
println(s"Unmarshalled HttpResponse to Case Class = $donuts")
The modified code for the AkkaHttpClient is now as follows:
object AkkaHttpClient extends App with JsonSupport {
implicit val system = ActorSystem("akka-http-donuts-client")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
val donutsUri = "http://localhost:8080/donuts"
val donutsHttpRequest = HttpRequest(
uri = donutsUri,
method = HttpMethods.GET
)
val donutsResponse = Http().singleRequest(donutsHttpRequest)
donutsResponse
.onComplete {
case Success(donutsResponse) =>
println(s"Raw HttpResponse = $donutsResponse")
val donutsF: Future[Donuts] = Unmarshal(donutsResponse).to[Donuts]
val donuts: Donuts = Await.result(donutsF, 5.second)
println(s"Unmarshalled HttpResponse to Case Class = $donuts")
case Failure(e) => println(s"Failed to HTTP GET $donutsUri, error = ${e.getMessage}")
}
Thread.sleep(3000)
system.terminate()
}
Similar to the previous example, you first have to kick off the AkkaHttpServer so that the http://localhost:8080/donuts URL is visible. After running the AkkaHttpClient application, you will notice the automatic conversion of the nested JSON value within the HttpResponse type into the corresponding Donuts Case Class.
Unmarshalled HttpResponse to Case Class = Donuts(List(Donut(Plain Donut,1.5), Donut(Chocolate Donut,2.0), Donut(Glazed Donut,2.5)))
HTTP client POST JSON
Earlier on, we looked at the handling of JSON payload within an Akka HTTP server. As a reminder, other than adding the necessary Routing paths at the server side, we also demonstrated how to manually test or trigger sending a JSON payload over an HTTP POST request using the handy curl command.
curl --header "Content-Type: application/json" \
--request POST \
--data '{"name":"plain donut", "price":1.50}' \
http://localhost:8080/create-donut
If the above-mentioned went OK, we then received the following output within the terminal where the curl command was executed.
Created donut = Donut(plain donut,1.5
Likewise, we will mirror the curl HTTP POST request by instead making use of Akka. Within our AkkaHttpClient application, we begin by defining an HttpRequest, which provides a nice closure over the uri, HTTP method, and Media Type. The uri will be http://localhost:8080/create-donut, the HTTP method will obviously be an HTTP POST request, and the Media Type representing the payload will be an application/json. As a matter of fact, it is very important that you explicitly specify your Media Type for this sort of client HTTP request.
val jsonDonutInput = ByteString("""{"name":"plain donut", "price":1.50}""")
val httpPostCreateDonut = HttpRequest(
uri = "http://localhost:8080/create-donut",
method = HttpMethods.POST,
entity = HttpEntity(MediaTypes.`application/json`, jsonDonutInput))
Next, we make use of the convenient for comprehension to craft the following steps: (1) issue the HTTP POST request, (2) output the resulting HttpResponse which can be potentially useful for debugging purposes, (3) verify that the HttpResponse was in fact successful, (4) Unmarshal the HttpResponse, and naturally output the unmarshalled response. Kindly note that above-mentioned steps execute asynchronously, and for the purpose of illustration, we use the Await.result method to essentially wait for the completion of these Future operations.
val createDonutF = for {
response <- Http().singleRequest(httpPostCreateDonut)
_ = println(s"Akka HTTP request status = ${response.status}")
if response.status.isSuccess()
output <- Unmarshal(response).to[String]
} yield println(s"HTTP POST request output = $output")
Await.result(createDonutF, 5.second)
To verify that we can indeed send a client HTTP POST request using Akka, we first need to start our AkkaHttpServer so that the http://localhost:8080/create-donut is accessible. We can then kick off the AkkaHttpClient application, and you should see the following output.
Akka HTTP request status = 201 Created
HTTP POST request output = Created donut = Donut(plain donut,1.5)
Akka HTTP CRUD project
In this section, we will go ahead and put into practice what we've learned so far when it comes to using Akka HTTP. In particular, we will build a standalone Akka HTTP Create, Read, Update, and Delete application. Before you get started, you will require the build.sbt file from the Real-world build.sbt tutorial. In addition, you can find the source code for this project at our usual allaboutscala GitHub project, and under the Single Project, donut-store application.
Project package structure
Therefore, go ahead and create a new donut-store application in the IntelliJ IDEA code editor, and the general structure of our donut-store Scala Single Project will be identical to the one when creating a new Scala project in the IntelliJ IDEA code editor and with its default build.sbt file. Our redesigned build.sbt file, however, adds various refinements that will be practical, if not essential, within an enterprise setting. In addition, these will also help us easily translate our Single Project into equivalent enterprise-level Multi-Project.
In the same line of thought - that is, with a Multi-Project end goal - we will organize our Scala constructs (class, trait, object, etc) into the corresponding package structure below. As we will illustrate in the last chapter, these packages will naturally translate into corresponding individual projects that make up a Multi-Project.
com.allaboutscala.donutstore:
- client
- common
- config
- data
- httpserver
- marshalling
- routes
The root package com.allaboutscala.donutstore will obviously reside under src/main/scala. The subsequent packages are intended with the following in mind:
- client package will hold a general purpose client application that will interact with all the respective REST endpoints.
- common package will be the placeholder to house any constructs that could potentially be shared across other projects.
- config package will obviously be all things that relate to the loading of configurations for our application.
- data package will naturally provide the necessary APIs to interface with a given storage layer.
- httpserver package will be the main entry point for bootstrapping our Akka HTTP server.
- marshalling package will have marshallers from/to JSON and from/to our matching domain
objects.
- routes package will have all the necessary Akka HTTP route definitions for our corresponding
REST endpoints.
Bootstrapping Akka HTTP server
We in fact put into practice the aforementioned Single Facade approach, when it comes to bootstrap- ping, or starting, our Akka HTTP server. The core lineage types reside in the httpserver package, and include the following: DonutStoreApp, DonutStoreHttpServer, DonutStoreHttpController, and DonutStoreServices.
DonutStoreApp
The DonutStoreApp extends the familiar App trait, and it is essentially the main entry point for our Scala application. You will also note the DonutStoreHttpServer object which represents the bootstrapping closure for launching the Donut Store application through its startAndBind() method. You will, of course, keenly observe that DonutStoreHttpController requires the mixin of DonutStoreServices.
object DonutStoreApp extends App {
DonutStoreHttpServer.startAndBind()
}
object DonutStoreHttpServer extends DonutStoreHttpController with DonutStoreServices
DonutStoreHttpController
The trait DonutStoreHttpController exposes a single method named startAndBind() which is essentially responsible for the setting up of the required bindings for an Akka HTTP server. To be exact, we invoke the Http().bindAndHandle(...) and pass-through a donutApiRoutes - an Akka HTTP Route, as we will see shortly, is the mapping of a given REST endpoint to a corresponding Scala code. And, this is followed by the respective HTTP ip address, and port for the server - cfg.httpServer.ip and cfg.httpServer.port - these are custom configurations properties.
While it is true that we cover the Future abstraction later in this book, you can think of the Http().bindAndHandle(...) method as being one that basically runs asynchronously. The subse- quent .onComplete handler outputs some logging messages in order for us to know that our Akka HTTP server is up and running, or not. An instance of DonutStoreHttpController as shown earlier, requires the mixin of DonutStoreServices - that is, this: DonutStoreServices. The latter places in scope all the required implicit val for a typical Akka HTTP server.
trait DonutStoreHttpController extends LazyLogging {
this: DonutStoreServices =>
def startAndBind(): Unit = {
logger.info("Initializing and binding Akka HTTP server")
val httpServerFuture = Http().bindAndHandle(donutApiRoutes, cfg.httpServer.ip, cfg.httpServer.port)
httpServerFuture.onComplete {
case Success(binding) =>
logger.info(s"Akka Http Server is bound to ${binding.localAddress}")
logger.info(s"To stop the server, press the [Enter] key in IntelliJ's console.")
case Failure(e) =>
logger.error(s"Akka Http server failed to bind to ${cfg.httpServer.ip}:${cfg.httpServer.port}",e)
system.terminate()
}
// pressing enter key will kill the server
StdIn.readLine()
for {
serverBinding
DonutStoreServices
The trait DonutStoreServices acts as a nice single closure in loading all the required services for the Donut Store application. As we’ve previously seen, it is designed to be mixin with DonutStoreHttpController, and it does the following: (1) loading of configurations, (2) placing implicit values in scope, (3) creating the Akka HTTP Routes. Since Akka HTTP deals with the Actor Model242, you need to define an ActorSystem, and you can think of it as a container where actors will interact through message-passing. Actors commonly require an ExecutionContext, or thread-pool, that will be used to execute asynchronous methods. Since Akka HTTP is based on Akka Stream, you also need to provide an ActorMaterializer(). As a general good practice, you will note that we are expressing the actual types of each corresponding implicit values, such as, implicit val cfg: DonutStoreConfig.
trait DonutStoreServices extends LazyLogging {
// load config
val (donutStoreConfig, rawConfig) = new DonutConfig{}.load().fold(error => sys.error(error.toString), identity)
logger.info(s"Loaded donut store config = $donutStoreConfig")
// required values to be in the implicit scope
implicit val cfg: DonutStoreConfig = donutStoreConfig
implicit val dataApi: DataApi = new DonutDao()
logger.info("Initialized data access layer")
// Akka Http routes
val donutApiRoutes = loadRoutes(Seq(new ApiVersionRoute(), new DonutRoute()))
logger.info("Initialized all Akka HTTP routes")
// Akka Http required Actor System, Materializer and Execution Context
implicit val system: ActorSystem = ActorSystem("donut-store-http-server")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val ec: ExecutionContextExecutor = system.dispatcher
/** Helper method for producing a single Akka HTTP [[Route]] that is required for bootstrapping the Akka HTTP server.
*
* @param httpRoutes the Sequence which conforms to the [[HttpRoute]] type
* @return an Akka HTTP [[Route]]
*/
private def loadRoutes(httpRoutes: Seq[HttpRoute]): Route = {
val defaultRoute = new DefaultRoute().routes()
httpRoutes.foldLeft(defaultRoute)((acc, httpRoute) => acc ~ httpRoute.routes())
}
}
In the next section, we will go over the Akka HTTP routes for our server, and at this point it includes:
REST Endpoint | HTTP Method | Description |
---|---|---|
/ | GET | Display a welcome message |
/api-version | GET | Output general version details for the REST API |
/add-donut | POST | Adds a new donut object into the storage layer. |
/donuts | GET | Lists all the donut objects from the storage layer. |
/donuts/some donut?ingredients=ingredient1,ingredient2,etc | POST | Updates the ingredients for a particular donut object. |
/donuts/some donut | DELETE | Deletes a particular donut object from the storage layer. |
Akka HTTP CRUD project - part 2
We follow on from the previous section, and will now cover the Akka HTTP routes, configuration, logging, and the basic routes - namely, /, and /api-version.
Akka HTTP Routes
For the time being, we will focus on the two classes that will define our REST endpoints, namely, DefaultRoute and ApiVersionRoute, respectively. These, as well as any other routes that we define, will extend a base HttpRoute trait, which outputs general logging information for each route. It also acts as a contract for all Akka HTTP routes that will form part of the overall Donut Store endpoints.
trait HttpRoute extends LazyLogging {
logRoute()
def routes()(implicit config: DonutStoreConfig, dataApi: DataApi): Route
def logRoute(): Unit = {
logger.info(s"Loading route for ${getClass.getSimpleName}")
}
The class DefaultRoute basically loads a welcome.html file from the src/resources folder whenever a user will access the root endpoint - that is, http://localhost:8080.
class DefaultRoute extends HttpRoute {
override def routes()(implicit config: DonutStoreConfig, dataApi: DataApi): Route = {
path("") {
getFromResource("welcome.html")
}
}
}
The welcome.html file has a basic HTML structure and, of course, a real-world welcome page will display additional information.
<HTML>
<HEAD>
<TITLE>Donut Store Application</TITLE>
</HEAD>
<BODY BGCOLOR="FFFFFF">
<H1>Welcome to the Donut Store Application!</H1>
</BODY>
</HTML>
The class ApiVersionRoute exposes an Akka HTTP route at /api-version with relevant details for our given Donut Store API, such as, the application name and version. Notice the nesting of the path("api-version"), get, and parameter, which uses the fluent API from Akka HTTP to represent an HTTP GET method for the following endpoint: http://localhost:8080/api-version. And, you are further able to pass-through ?prettyPrint=true, or ?prettyPrint=false, as parameters to the Query String.
class ApiVersionRoute extends HttpRoute {
import spray.json._
override def routes()(implicit config: DonutStoreConfig, dataApi: DataApi): Route = {
val apiVersion =
s"""
|{
| "app": "${config.app}",
| "version": "${config.httpServer.apiVersion}"
|}""".stripMargin
path("api-version") {
get {
parameter("prettyPrint" ? "true") { prettyPrint => // the prettyPrint parameter is optional and we also default to true
val shouldPrettyPrint = Try(prettyPrint.toBoolean).getOrElse(true) // we'll default to pretty print
val apiVersionOutput = if (shouldPrettyPrint) apiVersion.parseJson.prettyPrint else apiVersion.parseJson.toString
complete(HttpEntity(ContentTypes.`application/json`, apiVersionOutput))
}
}
}
}
}
Loading configuration
The earlier trait DonutStoreServices is in charge of loading, and placing in scope, the necessary configurations for our application.
trait DonutStoreServices extends LazyLogging {
// load config
val (donutStoreConfig, rawConfig) = new DonutConfig{}.load().fold(error => sys.error(error.toString), identity)
logger.info(s"Loaded donut store config = $donutStoreConfig")
...
}
Having said that, let us examine the inner-workings of trait DonutConfig. The latter uses the com.github.pureconfig artifact to automatically parse an application.conf file, located in the src/main/resources directory, into corresponding custom classes - that is, DonutStoreConfig and HttpServer, respectively. For the automatic wiring of the application.conf file into the above- mentioned classes, you are required to add import pureconfig.generic.auto._.
trait DonutConfig {
// This import is required to facilitate the automatic wiring of application.conf to the classes in scope below.
import pureconfig.generic.auto._
// Showing the flexibility for loading configuration from, say, a system variable named CONFIG_PATH
private val configPath = sys.env.getOrElse("CONFIG_PATH", "src/main/resources/application.conf")
// We enforce a strict parsing restriction
private val parseOptions = ConfigParseOptions.defaults().setAllowMissing(false)
def load(): Either[ConfigReaderFailures, (DonutStoreConfig, Config)] = {
val config = ConfigFactory.parseFile(new File(configPath), parseOptions).resolve()
pureconfig.loadConfig[DonutStoreConfig](config, "donut-store").map(_ -> config)
}
}
/** A top level closure of all the configurations for the Donut Store.
*
* @param app the name for the Donut Store application
* @param httpServer the [[HttpServer]] configurations
*/
final case class DonutStoreConfig(app: String, httpServer: HttpServer)
/** Captures the properties of the http-server configuration, namely, ip, port and api-version.
*
* @param ip the ip address for bootstrapping the Akka HTTP server
* @param port the port at which the Akka HTTP server will be reached
* @param apiVersion the given version for the Donut Store API
*/
final case class HttpServer(ip: String, port: Int, apiVersion: String)
Likewise, and thanks to the automatic wiring from Pure Config, above are our respective custom classes that model the configuration details in the application.conf file. They are further marked as final since you will generally not extend them.
Real-world enterprise infrastructure is typically made up of several environments, such as, de- velopment, testing, integration, pre-production, production, among others. As a result, your ap- plication is bound to run in different environments, which may have distinctive configuration values, as opposed to the ones defined in the application.conf. A potential way of solving this problem would be to have an identical Environment Variable, such as, CONFIG_PATH, across those environments. And, this is represented in our code as: sys.env.getOrElse("CONFIG_PATH", "src/main/resources/application.conf"). Yet, it is worth mentioning that in larger enterprises, you may even have to connect to a full-fledged configuration service, or some other similar component. The application.conf file under the src/main/resouces folder is as follows:
donut-store {
app = "donut-store"
http-server {
ip = 127.0.0.1
port = 8080
api-version = 1.0.0.0
}
}
Logging
It is hard to design, implement, and manage, a given real-world application without adding appro- priate logging support. The logger.info statements in, say, the earlier DonutStoreHttpController, DonutStoreServices and HttpRoute traits, are inherited from the LazyLogging trait. And, the latter is part of the classpath for our project thanks to settings(libraryDependencies ++= artifacts.scalaLogging in the build.sbt file.
You, however, are required to provide a logback.xml file under the src/main/resources folder that will prescribe the particular logging characteristics for your application. Our logback.xml file, for instance, defines the location of the log file, pattern for the file name, rolling policy, retention policy, and the log level, through the corresponding file, fileNamePattern, rollingPolicy, maxHistory, and logger attributes below.
<configuration>
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>/tmp/log/app/donutstore/donutstore.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>/tmp/log/app/donustore/donutstore.%d{yyyyMMdd}.log</fileNamePattern>
<maxHistory>30</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %message%n%xException</pattern>
</encoder>
</appender>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %message%n%xException</pattern>
</encoder>
</appender>
<logger name="com.allaboutscala.donutstore" level="INFO" />
<root level="WARN">
<appender-ref ref="STDOUT" />
<appender-ref ref="FILE" />
</root>
</configuration>
The above logging settings may obviously differ from organization to organization. Besides, and especially within a micro-service, or lambda, architecture, you may even have your log statements pushed to an indexing layer, such as, the one provided by Elastic Search
REST endpoints
We should now be ready to fire-up our Akka HTTP server within the IntelliJ IDEA code editor. Right click on the DonutStoreApp from the httpserver package, and select the Run menu item. The IntelliJ IDEA code editor will open a corresponding Console Panel where you will see the following log messages.
[main] INFO c.a.d.h.DonutStoreHttpServer$ - Loaded donut store config=DonutStoreConfig(donut-store,HttpServer(127.0.0.1,8080,1.0.0.0))
[main] INFO c.a.d.h.DonutStoreHttpServer$ - Initialized data access layer
[main] INFO c.a.d.routes.ApiVersionRoute - Loading route for ApiVersionRoute
[main] INFO c.a.donutstore.routes.DonutRoute - Loading route for DonutRoute
[main] INFO c.a.donutstore.routes.DefaultRoute - Loading route for DefaultRoute
[main] INFO c.a.d.h.DonutStoreHttpServer$ - Initialized all Akka HTTP routes
[main] INFO c.a.d.h.DonutStoreHttpServer$ - Initializing and binding AkkaHTTPserver
[donut-store-http-server-akka.actor.default-dispatcher-4] INFO c.a.d.h.DonutStoreHttpServer$-AkkaHttpServerisboundto/127.0.0.1:8080
[donut-store-http-server-akka.actor.default-dispatcher-4] INFO c.a.d.h.DonutStoreHttpServer$-Tostoptheserver,pressthe[Enter]keyinIntelliJ'sconsole.
Since the / and api-version endpoints are using an HTTP GET method, you can open a web browser, such as, Chrome or FireFox, to visually tryout our Donut Store API. Navigating to http://localhost:8080/ should output the following message:
Welcome to the DonutStore Application!
Likewise, you should see the corresponding details for our API when navigating to http://localhost:8080/api-version
{
"app": "donut-store",
"version": "1.0.0.0"
}
If you have reached this far, well done! You have an Akka HTTP server running that expose the above-mentioned REST endpoints. To gracefully shutdown the server, press the [ENTER] key within the Console Window of the IntelliJ IDEA code editor, and you should see the log message below.
[donut-store-http-server-akka.actor.default-dispatcher-10] INFO c.a.d.h.DonutStoreHttpServer$-AkkaHttpserverwasterminated=Terminated(Actor[akka://donut-store-http-server/])
In the next section, we will look at implementing the CRUD functionalities as per the REST endpoints from the earlier section.
Akka HTTP CRUD project - part 3
In the earlier sections, we setup a basic Akka HTTP server that exposed two REST endpoints, namely, http://localhost:8080/ and http://localhost:8080/api-version. The former displayed a welcome message, and the latter produced some general version details for the donut-store REST API. We in fact used these REST endpoints to get us familiar with defining and mapping of a given REST endpoint to a matching Akka HTTP route. In addition, we structured the build.sbt file into corresponding artifacts or dependencies, settings, and project or projects, sections that are inherent to a real-world Scala project. For additional details on the build.sbt file, you can review the real-world build.sbt tutorial.
We will now complete data layer and the Akka HTTP routes to illustrate the classic Create, Read, Update, and Delete (short for CRUD) operations. The source code for the donut-store project is available on GitHub under the source-code/single-project/donut-store folder.
Our main goal in this section will be the following:
- Define custom types to represent Donut objects, and provide automatic JSON encoding and decoding.
- Add the data layer to mirror add, select, update, and delete, operations against a fictitious storage layer.
- And, in the next section, we will look to wire and expose the CRUD REST endpoints below with the above data layer, and also we'll develop a handy and very basic client application that will interact with the above REST endpoints.
Donut ADT and JSON support
It is typical in real-world applications to use a case class to define and model the particular objects within your domain or business unit. Taking this into consideration, we define two case classes in the common package of the donut-store project: (1) a case class Donut to represent a particular donut object and its corresponding properties, and (2) a case class Donuts to easily mirror a JSON payload with multiple Donut items.
/** A Donut item for the Donut Store application.
*
* @param name the name of the particular donut
* @param ingredients the [[List]] of ingredients that make up this donut
*/
final case class Donut(name: String, ingredients: List[String])
/** A list of Donut items.
*
* @param donuts the root of the JSON payload with a [[Seq]] of [[Donut]] items
*/
final case class Donuts(donuts: Seq[Donut])
Akka HTTP has built-in support for the automatic JSON encoding and decoding of Abstract Data
Types (ADT), such as the above Donut and Donuts case classes. In the marshalling package of the
donut-store project, we define a trait JsonSupport which extends the akka.http.scaladsl.marshallers.sprayj trait. Thereafter, you will benefit from the automatic encoding and decoding of a given case class
from and to JSON representations.
You, however, have to use the jsonFormatN method from import spray.json.DefaultJsonProtocol._- to lift a particular case class with N parameters into the implicit scope. For instance, the case class Donut has two parameters, and we therefore use the jsonFormat2 method. Likewise, we use the jsonFormat1 method for the case class Donuts as it has only one parameter. We later on use this trait JsonSupport with all Akka HTTP routes that will require automatic JSON marshalling capabilities into corresponding a case class.
trait JsonSupport extends SprayJsonSupport {
import spray.json.DefaultJsonProtocol._
implicit val donutJsonFormat: RootJsonFormat[Donut] = jsonFormat2(Donut)
implicit val donutsJsonFormat: RootJsonFormat[Donuts] = jsonFormat1(Donuts)
}
Data API
In the data package of the donut-store project, we define a trait DataApi that will act as a contract for the CRUD operations: (1) creating a particular donut, (2) listing of all donut objects, (3) updating the properties of a particular donut, and (4) deleting a particular donut.
trait DataApi {
def createDonut(query: Donut): Future[String]
def fetchDonuts(): Future[Donuts]
def updateDonutIngredients(donut: Donut): Future[String]
def deleteDonut(donutName: String): Future[String]
}
You will, of course, notice the asynchronous nature of the above methods using Scala’s Future abstraction. What’s more, and for illustrative purposes, the return types are fairly simplistic - that is, Future[String] or Future[Donuts]. Thereafter, we define a class DonutDao which extends DataApi and further adds support for logging through the with LazyLogging notation. This class naturally represents a Data Access Object411, but we use a scala.collection.concurrent.TrieMap to simulate Donut data points being stored to an external data source. Needless to say, we used a TrieMap for the sake of clarity and illustration.
class DonutDao extends DataApi with LazyLogging {
/**
* For convenience, we are using scala.collection.concurrent.TrieMap to simulate the Donut data points being
* stored within an external data source.
*/
private val donutDatabase = TrieMap.empty[String, Donut]
…
}
Next, class DonutDao has to provide the necessary implementations for the methods of trait DataApi. In doing so, and as a good practice, you should make sure to use the override keyword. The methods are verbose in nature, and use Pattern Matching to log corresponding messages. As a result, however, there are certainly redundant and duplicate code that can be abstracted. As a beginner though, the added expansiveness should be easier to follow, and besides being able to relate to the various concepts from our tutorials.
override def createDonut(donut: Donut): Future[String] = Future {
logger.info(s"Create donut = $donut")
val donutExists = donutDatabase.putIfAbsent(donut.name, donut)
donutExists match {
case Some(d) => s"${d.name} already exists in database."
case None => s"${donut.name} has been added to the database."
}
}
override def fetchDonuts(): Future[Donuts] = Future {
logger.info("Fetching all donuts")
Donuts(donutDatabase.values.toSeq)
}
override def updateDonutIngredients(donut: Donut): Future[String] = Future {
logger.info(s"Updating ingredients = ${donut.ingredients} for donutName = ${donut.name}")
val someDonut = donutDatabase.get(donut.name)
someDonut match {
case Some(d) =>
donutDatabase.replace(d.name, donut)
s"Updated donut ingredients for donutName = ${donut.name}"
case None =>
s"Donut ${donut.name} does not exist in database. The update operation was not run."
}
}
override def deleteDonut(donutName: String): Future[String] = Future {
logger.info(s"Deleting donut = $donutName")
val someDonut = donutDatabase.get(donutName)
someDonut match {
case Some(d) =>
donutDatabase.remove(d.name)
s"Deleted $d from database."
case None =>
s"$donutName does not exist in database. The delete operation was not run."
}
}