Kotlin Coroutines utilities

If you prefer to combine Kotlin Coroutines with popular solutions like Flowable, Streams or RxJava, where every strategy is readily available, this post is not for you.

Otherwise, if you are interested in writing your own asynchronous strategies, here are two of them (in addition to my post from April 2019 : https://libetl.wordpress.com/2019/04/30/learn-how-to-group-expensive-calls-with-the-coroutines/).

Batching strategy
That strategy helps you stream your “Extract, transform, load” program by starting a parallel execution of a correctly throttled massive process.
In other words, regardless of how many rows are in your dataset, you are able to process them all.

Not too slow, and not too fast to avoid sending a throughput that your downstream actor cannot bear.
The batching strategy is then a kind of streaming facility.

It basically consists in keeping either n workers busy with one row unless if all the rows have been processed.
The strategy is initialized on the first use, and the following datasets can be processed in less “heating” time
Source code of that strategy : https://gist.github.com/libetl/71b826a0db248e6770a2c0b5c0ae6d18#file-batchcoroutinesstrategy-kt

Caching Strategy
Want to keep long time computation results in your program memory after having them processed ? That sounds interesting when your client is requesting some data and you cannot respond in a reasonable amount of time (more than 5 seconds).
Give your client an UUID and tell it to come back later with that UUID.

When a client request an UUID that is not yet computed, you can just reply “oh it is not ready yet”.
If it is done, “here are the results”,
otherwise “sorry, apparently that UUID does not correspond to a task done on this machine”

That strategy consists in a cache object (map of uuid to results), a worker to run async tasks, a “cacheAdder”, a method to poll the status of a task.
Basically, the job starts by sending a message to the worker, which after completion sends the result to the cacheAdder. The cache is configured to automatically make the elements expire 10 minutes after the last read.
Source code of that strategy : https://gist.github.com/libetl/71b826a0db248e6770a2c0b5c0ae6d18#file-cachingcoroutinesstrategy-kt

Can I combine them ?
Absolutely, here are the declarations to have a batching strategy with cache :

private val batch =
    batchingStrategy.batchOf(
        workers = 20,
        coroutineContext = coroutineContext
    ) {
        letsProcess(it)
        // this is where you tell what to do
        // for each element in your dataset
    }

private val batchWithCache =
    cachingStrategy.cache(
        workers = 20,
        coroutineContext = coroutineContext
    ) {
        batch(it).await()
        // "it" represents your data
        // the result of "await" is the
        // global result of the operation.
        // you can add further operations there
    }

Leave a Reply

Your email address will not be published. Required fields are marked *

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.