Database streaming on Play with Slick and PostgreSQL: from Publisher to Chunked result

At SeQura we wanted to experiment a little with Scala so we decided to build one of our services using this technology. Among the different web frameworks Play seemed to be one of the most popular options so we decided to give it a try.

Having most of our technology based on RoR, one of the surprising things about Play is that allows you more freedom about how to handle the persistence layer. In RoR the active record pattern is a central concept of the framework while Play is a more detached classical three layer architecture that does not impose preconceptions about how you should handle the persistence. This freedom comes to the price of you having to handle “manually” more things and not having the entities moving so naturally through the layers of the application, but this is another discussion.

At the moment of building the service (one year ago), it seemed that Anorm was the preferred, canonical, native persistence library on Play. In the recent times seems that Play is not embracing Anorm so strongly [1] [2] or, at least, is making things easier for other libraries like Slick [3]. In any case, the fact that Slick requires less boilerplate than Anorm, the richness of the functional relational mapping, an actual reactive architecture and a better support for providing data in streaming from the database, were good arguments to migrate to this library.

In this post I’m not going to discuss all about how to perform this migration but to focus on how to connect the flow of streaming data coming from the persistence layer with the support for a chunked response on a Play controller.

For having real streaming, the data flow must be consumed at the same pace that is generated in all the layers of our architecture. For example, it makes no sense that the persistence layer reads the data on streaming from the database but the front end layer needs to have all of it on memory to chunk it afterwards (or viceversa). We want to generate data on streaming for different purposes, but a clear case is that we can have huge datasets that cannot be stored on memory while served (or that we prefer having a lot of little machines than several huge ones).

Play support for chunked responses

Play has a good support for responding to a request on streaming, sending back to the client the data in different chunks that can be generated on the fly. For doing so, it has native support for chunked transfer encoding that it is standard from HTTP 1.1.

The following snippet (from the official documentation), shows an example of how Play supports this scenario:

def index = Action {
  val data = getDataStream
  val dataContent: Enumerator[Array[Byte]] = Enumerator.fromStream(data)
  
  Ok.chunked(dataContent)
}

Basically we need to build an Enumerator (in the example from a dynamic InputStream that provides data) and then stream this data using Ok.chunked. Easy, right?

How Slick consumes data on streaming from database

Slick has native support for consuming data on streaming from the database in a reactive way (not blocking and with Promises), i.e. it is able to pull rows from the database, process them and send them to the caller without having to consume all the record set. In any case, Slick is built on top of de facto standard Java database connection technology (JDBC), so you must be aware of its limitations and peculiarities. Here I will discuss some of these little tricks.

Let’s start from the beginning. The following snippet (adapted from the official documentation) shows how easy is to stream data from the database:

class Coffees(tag: Tag) extends Table[(String, Double)](tag, "COFFEES") {
  def name = column[String]("COF_NAME")
  def price = column[Double]("PRICE")
  def * = (name, price)
}
val coffees = TableQuery[Coffees]

val query = for (c <- coffees) yield c.name 
val action = query.result 
val publisher: DatabasePublisher[String] = db.stream(action) 

// .foreach is a convenience method on DatabasePublisher. 
// Use Akka Streams for more elaborate stream processing. 
publisher.foreach { s => println(s"Element: $s") }

Easy, right? We have a beautiful stream method that will pull on streaming on a non-blocking way the rows from the database by providing a DatabasePublisher that can be processed by Akka streams.

Yes, but the world is not so marvelous and this was not working in my default configuration as the JDBC driver (PostgreSQL version) was loading the entire result set instead of gently pulling some rows for streaming processing…

To learn what is happening, we need to understand two JDBC features: fetchSize & autoCommit and how they do interrelate to each other and what the specific PostgreSQL JDBC implementation peculiarities are.

The fetchSize is a ResultSet property that hints the JDBC driver about how many rows it should pull from the database when more rows are requested from the ResultSet. By default is set to 0 meaning “driver, do your best to guess what is the proper number of rows to fetch”. In my experience, unfortunately, this normally means “fetch all the rows on the first request and load them in memory”.

The JDBC spec states that when you recover a SQL Connection by default is on autoCommit mode, i.e. all the statements executed on this Connection will be considered transactions. But.. why are we talking about this right now? Be patient my friend…

The tricky thing happens to be the following, on PostgreSQL (as on explained the JDBC PosgreSQL documentation and also in this Slick’s GitHub discussion) we have the following restraints:

  1. By default the driver collects all the results for the query at once (fetchSize = 0).
  2. The Connection must not be in autoCommit mode.
  3. From the previous point, even if fetchSize is different from 0 its value is ignored if autoCommit is true.

So, to make sure that the previous snippet is working well with our PostgeSQL JDBC driver, we must make sure that we have set the proper values for Connection#autoCommit and ResultSet#fetchSize. We will see how to do that in the context of the Play Slick plugin.

Connecting Slick and Play

We have to solve two issues:

  1. Make sure that the request on the JDBC will not fetch all the rows at once.
  2. Convert the DatabasePublisher to an Enumerator that can be served as chunked data.

Making Slick to stream rows from database on Play

Let’s recap, we need:

  1. Set a fetchSize different from 0 so the JDBC driver does not recover all the rows at once.
  2. Make sure that autoCommit is not enabled.

The following snippet shows an adaptation of the previous example to reach these goals in a Play project. In bold you can see the relevant changes.

val disableAutocommit = SimpleDBIO(_.connection.setAutoCommit(false))
val query = for (c <- coffees) yield c.name 
val action = query.result.withStatementParameters(fetchSize = 32) 
val publisher = db.stream(disableAutocommit andThen query) 

As explained, when recovering a Connection from the pool it will be in autoCommit mode so we need to disable it and then execute the original query in the context of the same transaction. You can see this on the first and the last lines of the snippet.

Additionally we need to specify the fetchSize parameter as shown on the third line.

Convert the Slick DatabasePublisher to a Play Enumerator

I found quite surprising that the Slick plugin for Play didn’t provide such a converter as for me taking advantage of Slick streaming features is one of its most interesting features. I couldn’t find very easily if somebody had previously programmed such converter and I was decided to write my own version when in the last moment I found the code as part of an play-streams-experimental library.

My shout at Twitter for help was answered a little late, but in any case, thank you James Roper:

So, we only need to add this new dependency to our build.sbt:

libraryDependencies += "com.typesafe.play" %% "play-streams-experimental" % "2.4.4"

And then apply the recipe on the Play controller:

def index = Action {
  val publisher = getDataStream
  val enumerator = Streams.publisherToEnumerator(publisher)
  
  Ok.chunked(enumerator)
}

Conclusion

  1. Streaming must be a full stack implementation.
  2. Slick supports streaming on top of JDBC.
  3. PostgreSQL JDBC implementation requires autoCommit to be disabled and provide a proper fetchSize value.
  4. In Play and Slick integration you must make sure the previous parameters get the proper values.
  5. Slick generates publishers to be processed on streaming while Play requires enumerators to produce chunked responses.
  6. The play-streams-experimental library makes this conversion for you.
Share this:

2 Replies to “Database streaming on Play with Slick and PostgreSQL: from Publisher to Chunked result”

  1. Hi,
    Thanks for your article. I came across your article while trying to wrap my head on akka streams.

    Question though: In your example you showed streaming by fetching data of size 32 in each call from database server to play app server. What’s the difference between this approach and typical spring hibernate approach where you can specify JDBC fetchsize to the database server?

    I try to understand the advantage of akka stream approach in this example.

    Thanks.

    • Hi,

      Two main differences that with Play + Akka are out of the box:
      1. full stack streaming,
      2. non-blocking I/O management.

      Focusing on [1], in a traditional Java / JEE framework, even if you use database streaming (with Hibernate or directly using JDBC), you have to program “by hand” all the writing on the response stream as soon as you’re fetching rows from the database. If you fail to do so properly, you’ll be reading from database on streaming but accumulating on memory prior to write the HTTP response. This is naturally handled by Play + Akka.

Leave a Reply

Your email address will not be published. Required fields are marked *