Kotlin, Coroutines and, Ktor

What is Ktor and why use it?

ktor.io — Kotlin coroutines based and simple to use
Asynchronous with simplicity

After using Rocket and Rust I was less than satisfied on two counts: the inability to use the reactive-streams mongo driver and the difficulty in setting up routes due to less than clear documentation. With Ktor everything works a bit easier, perhaps because JetBrains is very much concerned that there should be clear documentation (but…) and also because Kotlin is my JVM language of choice.

The mongo driver for Java comes in two basic flavors (as does the Rust driver): synchronous or modern, and the reactive-streams. The reactive-streams driver is the same as the synchronous driver for the client model, so there is not a new syntax to learn, however, the results are sent back as a Publisher, so to read the results you need to set up a subscriber. Fortunately, JetBrains has an IDE that will fill in the boilerplate for that anonymous class.

There are a couple of things that you may not notice in the above code, critically the s.request(1000) in the onSubscribe method. That piece is important since it is like a prod to tell the database that you want the data. If you have a large collection and you don’t want to get all the data in one shot set it so something that matches your data sink and periodically call s.request(xx) to keep the flow coming. I don’t recall seeing this in the documentation but unless you enjoy sitting staring at an empty screen remember this.

Welcome to Kotlin and coroutines.

Kotlin has a better approach to the Publisher and Subscriber (you can still use them if you wish). You can return a call to the Mongo server as either a Flow or Channel. I include the use of both in the code below. On the most basic level coroutines look pretty similar to the normal usage of the mongo driver with the synchronous driver:

val database = mongoClient.getDatabase("sample_restaurants").withCodecRegistry(pojoCodecRegistry)
val result = database.getCollection("neighborhoods").distinct("name", String::class.java)
val flow = result.asFlow()
flow.collect { a -> println(Document("value", BsonString(a))) }

Does this beg the question as to why bother with the reactive-streams driver and coroutines at all? Yes, but the answer is that sometimes you have lots of data and you need to transform what is in the database to do something else (and there isn’t an aggregation to do the work for you). Some of my use-cases use either ML, parsing, or more complex statistics. Another case might be in a web server, where the client can start processing the results before the query has been completed. To make that work though you need a web server that is also reactive, like KTOR.

In the example above there are the following points to notice:

  • Serialization/Deserialization using Kotlin data classes
  • Using call.respondTextWriter to stream out the results
  • Authentication with X.509 certificate (omitting the longer story for later)

Serialization/Deserialization with Kotlin Data Classes

/* For use with the codec - nested utility class */
data class
var id: ObjectId? = ObjectId(),
var address: Address? = Address(),
var borough: String? = "",
var name: String? = "",
var cuisine: String? = "",

data class Address(
var building: String? = "",
var street: String? = "",
var zipcode: String? = ""

Under the hood the Jackson library is being used, so the use of the standard attributes e.g. @JsonProperty can be added to the data classes. It is important that all the fields are marked optional (?) and a default value be provided. This makes the combination of Kotlin plus Mongodb feel natural.

Creating a streaming response: respondTextWriter

Ktor has two features that are not well documented in the online documentation respondTextWriter and respondBytesWriter. Using the git repository and looking at the tests respondTextWriter allows for a streamed response to a HTTP request. I’ve included an example in the project’s code. In this case the MongoDB find is linked to the call to respondTextWriter via a Channel as follows:

val channel = neighborhoodWriter()
call.respondTextWriter {
run {
channel.consumeEach {
val txt = mapper.createObjectNode().put("name", it)

The [ and ] are not necessary but it’ll make parsing on the client-side easier.

Something new: X.509 authentication

For this project, I found that my normal authentication method was not working with Kotlin. Looking into the issue it seems that there is an incompatibility between Kotlin 1.6.0, and the MongoDB java driver 4.3.4. Rather than investing too much effort in the why or the correction I decided to try the X.590 certification. It worked and is simple to use. The screen below is from the Database authentication page of the atlas project. Simply assign a name for the certificate and download the result. The setup for the client is also simple. Again, I am using an implementation of dotenv ( io.github.cdimascio:dotenv-kotlin:6.2.2). The .env file is accessed from the following within the code:

dotenv {
systemProperties = true
val dotenv = Dotenv.load()
# in .env

Creating the pkcs12 file by openssl:

openssl pkcs12 -export -out mongodb.pkcs12 -in X509-cert.pem

As in the code the connection string is:

val connection =  "mongodb+srv://mflix.beal2.mongodb.net/sample_mflix?authMechanism=MONGODB-X509&replicaSet=atlas-a7tqy4-shard-0&connectTimeoutMS=7000&maxPoolSize=50&wTimeoutMS=2500&readPreference=primary&authSource=%24external"val connectionString = ConnectionString(connection)

I feel that with Ktor there is a good alternative for a webserver for JVM and MongoDB that takes advantage of what the reactive-streams API can provide



Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store