WebSocket Connections with Akka Streams

program code

The WebSocket protocol provides full-duplex communication channels between web clients and servers.  Using Play Framework’s WebSocket class along with Akka Streams provides effective support for WebSockets on the server side.  Add an Alpakka connector for the desired data source, and you have all the pieces to create flexible, reliable, and elegant solutions for connecting WebSockets to a variety of common data sources.

To illustrate this, we will build a complete solution for tailing a text file from the server to a web client.

Akka Streams Basics

Akka is a set of open-source libraries for Scala and Java which, in broad terms, is for providing concurrency in distributed systems.  Akka Streams is a module for building data pipelines that are asynchronous and provide non-blocking backpressure.  It follows the Reactive Streams specification for interoperability with other streaming systems.

A fundamental part of Akka Streams is the concept of operators (implemented as static class methods), which create the different parts of the data stream.  These parts include:  a source, which has exactly one output; a sink, which has exactly one input; and a flow, which has both an input and an output.  A flow that is fully connected (to its sink and source) and ready to run is a runnable graph.  A list of operators is available in the Akka Operator Index. Another core concept is the materializer.  Materialization is the process of allocating any resources needed by the runnable graph to actually run.  This normally is just the Akka actor system to run in, but can include other things.  When learning Akka Streams, the materializer can be a nebulous presence that pops up in example code.  Think of it as something akin to the Scala execution context as it relates to futures;  One must be in scope, but the system default is usually sufficient.  The materializer is part of the type signature in the implementations of each of the core entities just discussed.

Source[+Out, +Mat]
Sink[-In, +Mat]
Flow[-In, +Out, +Mat]
RunnableGraph[+Mat]

A simple source can be created using the apply operator, which takes a list and creates a source from it.  Because this is Scala, we can omit the name apply, but in Java it would be Source.apply(names).

val names = Seq("Jim", "John", "Ray", "Robby")
val source:Source[String, NotUsed] = Source(names)

The first type parameter for Source is String, to match the type of the list.  This simple source does not require a materializer (no actor system is needed for a static list), and we use the special type akka.NotUsed to indicate this.

For a simple example of a sink, we use the foreach operator which takes a function to run on each element of the stream.

val sink:Sink[String, Future[Done]] = Sink.foreach[String](println)

akka.Done is the Akka way of saying that println returns Unit.  We can now run the source into the sink with source.runWith(sink).

Below is the complete example, which can be pasted into the Scala console of a project that has akka-stream as a dependency.  (Clone the repository for this article, run sbt, then in sbt run console to start a Scala console, then :paste to enter paste mode, paste the code, then Ctrl+D to execute the code.  To exit the Scala console, type :q.)

/** Example of calling a function on each element of a stream using the foreach sink.
 */
import scala.concurrent.Future

import akka.{Done, NotUsed}
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Sink, Source}

implicit val system:ActorSystem = ActorSystem("Foreach-Sink-Example")

// Create a source from a list.
val names = Seq("Jim", "John", "Ray", "Robby")
val source:Source[String, NotUsed] = Source(names)

// Create a foreach sink.
val sink:Sink[String, Future[Done]] = Sink.foreach[String](println)
val result:Future[Done] = source.runWith(sink)
result.onComplete(_ => system.terminate())(system.dispatcher)

In order to be able to exit the Scala console, system.terminate is called to terminate the actor system when the future completes.  system.dispatcher is the execution context of the actor system.

The Alpakka Library

Alpakka is built on top of Akka, providing a library of connectors for many common data sources, including HDFS, Kafka, Parquet, and S3.  Here we will use the File connector to create a source for our WebSocket to consume.

To tail a file, we use the FileTailSource.lines operator to create a data source for the outbound traffic on the WebSocket.  Parameters include the path identifying the file, the maximum line size, and the polling interval for checking the file for new lines.

val fs = FileSystems.getDefault
val lines:Source[String, NotUsed] = FileTailSource.lines(
    path            = fs.getPath(pathToFile),
    maxLineSize     = 8192,
    pollingInterval = 500.millis
)

Next, we need a sink for the inbound traffic.  We will use Sink.foreach to just log the incoming WebSocket messages, but this could easily be extended to receive data or execute commands from the client.

val in = Sink.foreach[String](s => logger.info(s"WebSocket message from ${request.remoteAddress}: $s"))

Finally, we hook up our sink and source to a flow.  Unlike the usual in-line flow, where a source is the input of the flow and a sink receives the output, here we need the sink to receive the input of the flow, and the source to provide the output.  This creates a full-duplex endpoint flow that we can hook up to a WebSocket (or other bidirectional data pipeline).  The  operator Flow.fromSinkAndSource does exactly this.

Flow.fromSinkAndSource(in, lines).keepAlive(30.second, () => "KEEP-ALIVE")

To keep the WebSocket open, we add on the keepAlive method.  The content of the message is arbitrary, but the web client will need to recognize it as a keep-alive to ignore it.

Now that we have the flow built to serve as an endpoint for the WebSocket connection, we need the HTTP machinery for the WebSocket itself.

WebSockets with Play Framework

Play Framework is an MVC framework for Scala and Java.  It is built on Akka and brings that performance and scalability to HTTP servicing.  HTTP endpoints are created in Play by defining them in a routes file, then adding handlers in controllers, which can optionally render server-side Twirl templates.

First, we need a web page to deliver the client-side of our application.  For this we first add a route to the routes file, which has a path parameter which we will use to pass the name of a file to the server.

GET /tail/:name controllers.TailController.tail(name:String)

In the controller, we then need a handler for the route, which will do nothing but render a template.  We will build that template later when we talk about the WebSocket client, which will live in that template.  The template name is views.html.tail, and we pass it the information that the client will need (the file name from the path parameter, the WebSocket url for that file name, and our keep-alive string).

def tail(name:String) : Action[AnyContent] = Action { implicit request =>
    Ok(views.html.tail(
        name,
        s"wss://${request.host}/ws/tail/${encodePathSegment(name, UTF_8)}",
        "KEEP-ALIVE"
    ))
}

We now need a separate route for the client to use when opening a WebSocket connection.  Handling a WebSocket connection in Play is very similar to handling a regular HTTP connection.  First, we create a route just as we did for the regular HTTP request.

GET /ws/tail/:name controllers.TailController.wsTail(name:String)

There will be a bit more to the handler for this, though.  This will build the WebSocket connection and connect it to the file stream.

A regular Play Action route always returns a Play Result, which contains the usual HTTP response information.  For an error there will be a different HTTP status code, but the type of the response is still a Result.  A Play WebSocket, on the other hand, only returns a Result on error.  On success, it returns a Flow to service the web socket.  The WebSocket method acceptOrResult does this using syntax similar to the standard Scala Either, which returns either Left (which by convention is used for error conditions) or Right.

acceptOrResult[In, Out](f: (RequestHeader) => Future[Either[Result, Flow[In, Out, _]]])

For the WebSocket flow we created earlier, both In and Out are String.  For the body of this handler, we use the provided file name (wrapped in a function to restrict the directory, etc.) to reference the file to be streamed, returning the appropriate HTTP status if the file is not found or there is an error opening the file.  On success, we return the flow created above to handle the WebSocket traffic.  This is all wrapped in Future.successful because WebSocket.acceptOrResult is asynchronous.

def wsTail(name:String) : WebSocket = WebSocket.acceptOrResult[String, String] { request =>
    Future.successful(Try {
        val path:Path = streamFilePath(name)
        if (!Files.exists(path)) {
            throw new FileNotFoundException
        }  
        val lines:Source[String, NotUsed] = FileTailSource.lines(
            path            = path,
            maxLineSize     = maxLineSize,
            pollingInterval = pollingInterval
        )  
        lines
    } match {
        case Failure(e:FileNotFoundException) => {
            Left(NotFound)
        }  
        case Failure(e)                       => {
            Left(InternalServerError)
        }  
        case Success(lines)                   => {
            val in = Sink.foreach[String](s =>
                logger.info(s"WebSocket msg from ${request.remoteAddress}: $s")
            )
            Right(
                Flow.fromSinkAndSource(in, lines).
                    keepAlive(30.second, () => WebSocketController.KeepAlive)
            )
        }  
    }) 
}

A WebSocket Client For Our Application

To make our example fully functional, we now create a client for our WebSocket.  This will involve creating a Twirl template to house the HTML and JavaScript, writing the client code using the standard WebSocket API to manage the WebSocket connection, and some basic DOM manipulation with jQuery.

Referring back to the tail route handler we created earlier, it referenced a view with three arguments (the file name from the HTTP request, the WebSocket URL, and the keep-alive string).  To implement that view we create a Twirl template with those parameters, plus a fourth with a default value for the DOM ID of the div where we will display the tailed file.  (The DOM ID argument is just an easy way to create a variable that is global to the template.)

Twirl is similar to most templating languages, consisting of HTML and JavaScript with some code interspersed.  In Scala the code is injected using the @ character.  The @(…) at the beginning is the function signature for the template.

The client-side JavaScript is a function that takes the WebSocket URL injected into the template server-side opens a WebSocket connection, and on receipt of messages from our Akka flow, appends the message (if not a KEEP-ALIVE) to the div created for this purpose.  Logic is also included to reconnect if the connection is lost.

@(name:String, wsUrl:String, keepAlive:String, domId:String = "push-content")(implicit assetsFinder:AssetsFinder)
<!DOCTYPE html>
<html>
    <head>
        <title>Stream Web Push Example</title>
        <link rel = "stylesheet" href = "@assetsFinder.path("stylesheets/main.css")"/>
        <script src="https://code.jquery.com/jquery-3.6.0.slim.min.js"
            integrity="sha256-u7e5khyithlIdTpu22PHhENmPcRdFiHRjhAuHcs05RI="
            crossorigin="anonymous"
        ></script>
        <script>
            (function (w, url) {
                "use strict";
                function openSocket() {
                    const delay  = 60000;  // milliseconds to wait before attempting to reconnect
                    const socket = new WebSocket(url);
                    socket.onopen = event => {
                        $("#@domId").empty();
                    };
                    socket.onclose = event => {
                        // Wait then reconnect.
                        setTimeout(openSocket, delay);
                    };
                    socket.onerror = event => {
                        console.error(`WebSocket error: ${JSON.stringify(event)}`);
                    };
                    socket.onmessage = event => {
                        switch (event.data) {
                        case "@keepAlive":
                            break;
                        default:
                            $("#@domId").append(document.createTextNode(event.data + "\n"));
                            break;
                        }
                    };
                }
                openSocket();
            }(window, "@wsUrl"));
        </script>
    </head>
    <body>
        <h1>Tailing Text File @name</h1>
        <div id="@domId" class="tail"></div>
    </body>
</html>

Conclusion

The complete source code for this example is available at https://gitlab.com/lfrost/akka-websocket-example.  Once you have the application running locally, go to http://localhost:9000/tail/ulysses and the contents of the included data file should appear on the page.  To confirm that the stream is remaining open and working, append to the file with this command.

echo 'Beyond the utmost bound of human thought.' >> data/ulysses.txt

The new line should then appear at the bottom of the web page that you already have open.  You can also create other text files in the data directory, and then try the URL with the new name.