Kotlin's map, filter, all, any, none, among others, are extension-functions that transform or evaluate collections⇗. They accept a lambda that is used as a transformation (or predicate), that is applied to each item within the collection sequentially:
val items: List<String> = listOf(1, 2, 3,).map { "test-$it" }
As the number of elements increases, map execution time will take longer and longer.
One way to parallelize this process is using Java Streams:
val items: List<String> = listOf(1, 2, 3).parallelStream().map { "test-$it" }.toList()
Of course, there are several technicalities you must be aware when using parallelStream → see Parallel streams in Java: Benchmarking and performance considerations⇗.
This parallelStream approach retricts io.turbodsl main objective to support Kotlin Multiplatform⇗ and not just JVM-based projects.
io.turbodsl objective is to provide a comprenhensive collection DSL syntax to simplify how code is written, maintained, and debugged. This implies that all the internals are using the scope foundations explained in the Fundamentals⇗ section:
Support for timeout and delay attributes.
Allow to use additional scopes like retry, async, job.
Define default values⇗.
Additionally,
Every collection function starts with prefix async.
You can use them within any io.turbodsl scopes.
There are 3 different ways to invoke:
TurboScope.execute {
// 1) As a collection extension function within a scope.
val list1 = listOf(1, 2 ,3).asyncMap {
"test-$it"
}
// 2) Scope function, specifying collection as input.
val list2 = asyncMap(input = listOf(1, 2 ,3)) {
"test-$it"
}
// 3) Within a scope where its input is a collection.
val list3: List<String> = job(input = listOf(1, 2 ,3)) {
asyncMap { "test-$it" }
}
}
Any Iterable (Array, List, Set, ...) and Map are supported.
The following functions have been defined (as of v1.2.3):
asyncMap → kotlin.collections.map
asyncFilter → kotlin.collections.filter
asyncForEach → kotlin.collections.forEach
asyncAll → kotlin.collections.all
asyncAny → kotlin.collections.any
asyncNone → kotlin.collections.none
asyncContains → kotlin.collections.contains
Future releases will include more functions.
Note that for those async expressions accepting a transformation or predicate, the lambda receives the item as one argument (it). Additionally, this same "item" is provided as the scope input - you can use either of them.
As with other scopes, the lambda also receives a SyncScope, allowing to use other io.turbodsl expressions:
TurboScope.execute {
listOf(1, 2, 3, ...).filter {
// Kotlin filter identifies each item as "it".
it < 10
}
listOf(1, 2, 3, ...).asyncFilter {
// io.turbodsl asyncFilter identifies each item as "input",
// just like any other scope.
// `it` can also be used - basically, `it` and `input` are synonyms.
it < 10
}
asyncFilter(input = listOf(10, 15, 20, 25, 30, ...)) {
// "this" is a SyncScope, allowing to use other io.turbodsl expressions.
// In this example, several asyncJobs are executed
// filtering those items having more than 5 successes.
async {
repeat (it) {
asyncJob { ... }
}
}.let { r ->
r.filter { it.isSuccess() }.size > 5
}
}
val compensation: List<Compensation> = asyncFilter(
input = job<List<Employee>> { /* returns a list of Employees (thousands) */ }
) {
// Each `input` is an Employee
it.salary >= 3_000
}.asyncMap {
// Each `item` is an Employee with salary $3,000 or higher,
// which is passed implicitly to each asyncJob through the `input` parameter.
async(
job1 = asyncJob<Benefit> { /* retrieve benefits for employee */ },
job2 = asyncJob<Timesheet> { /* retrieve timesheet for employee */ },
) { ok, r1, r2 ->
if (ok) {
Compensation.Eligible(it, r1.success(), r2.success())
} else {
Compensation.NonEligible(it)
}
}
}
}
io.turbodsl relies on existing AsyncScope to process collection items by redefining how maxJobs is interpreted:
maxJobs < 0 → Bucket Mode → Creates as many "buckets" as specified in |maxJobs|, allocating items as evenly as possible.
maxJobs > 0 → Pool Mode → Creates as many AsyncJobs as specified in maxJobs, acting as a pool, processing items.
maxJobs = 0 → Parallel Mode → Process all items in parallel. Creates as many AsyncJobs as items within the collection.
All the collection scopes are executed using AsyncMode.CancellAll.
This is the most efficient mode to process collections and the default mode.
Defines a specific number of "buckets", allocating items as evenly as possible.
Each "bucket" is assigned to an AsyncJob, which process items sequentially.
Number of buckets is set to <collection-size>/10.
Future releases may include a better algorithm to calculate the optimal number.
Memory overhead is restricted by the number of "buckets".
You may want to specify a fixed-size depending on the number of threads the target runtime environment supports.
If the processing time on each item can vary, consider using Pool Mode instead.
Creates a specific number of AsyncJobs.
Each AsyncJob will process one item, processing the collection as a queue.
The access to the collection is synchronized → this adds some processing overhead.
This mode should be used when processing time varies between items.
You should specify a fixed-size depending on the number of threads the target runtime environment supports.
This mode is included for completeness only.
It will impact performance due to the number of AsyncJobs that are created:
Each AsyncJob is running within a coroutine.
There's a memory overhead to maintain and keep track of each coroutine.
Coroutines are competing between each other for resources.
Use only when the collection is small.
Traditional collection functions process items sequentially. If one transformation (or predicate) fails, the processing stops, throwing an exception / error.
Kotlin's map, filter, all, any, none, etc. will return the expected results if and only if all the items are processed successfully.
io.turbodsl collection functions handle potential errors in a different way:
An AsyncScope manages several AsyncJobs, executing all of them using AsyncMode.CancelAll → see Asynchronous Modes⇗.
Each AsyncJob processes one or more items using a coroutine, depending on the collection processing mode outlined previously.
Whenever an error is found while processing an item, an AsyncJob will throw such exception / error, canceling all the other AsyncJobs. This will preemptive cancel all asynchronous processing.
If a default⇗ was defined, then such value is returned. Otherwise, the exception / error is thrown.
Boolean async functions (asyncAll, asyncAny, asyncNone, asyncContains) implementation is slightly different from other collection functions:
Depending on the Boolean function and the predicate, an internal PredicateResult.Success or PredicateResult.Failure will be thrown.
In this way, all parallel processing can stop without having to evaluate all items:
asyncAll⇗ → if one predicate evaluates to false, then all AsyncJobs are cancelled, returning false.
asyncAny⇗ → if one predicate evaluates to true, then all AsyncJobs are cancelled, returning true.
asyncNone⇗ → if one predicate evaluates to true, then all AsyncJobs are cancelled, returning false.
asyncContains⇗ → if an item is included in collection, then all AsyncJobs are cancelled, returning true.
There may be scenarios where all the items should be processed before returning a result, strictly verifying all the collection items.
io.turbodsl overall objective is to enable complex collection processing:
If you require to evalute a predicate against all items before returning a result, use strict=true.
This flag changes the asyncMode to AsyncMode.CancelNone (see Asynchronous Modes⇗), allowing to process all items, even if the result could be determined while processing the first items.
For example:
asyncAll{...} requires all items to comply with a predicate. If the first item does not, then it can stop all execution and return false.
But, which one is the first item? Remember: several items are being processed in parallel when using asyncAll{...}, therefore the "first item" is relative to the runtime predicate execution.
TurboScope.execute {
// Forcing to run sequentially for this example
listOf(1,2,3).asyncAll(strict = false, parallel = false) {
if (it == 2) throw Error()
it == 0
}
}
// returns false, since the first item evaluates to false.
TurboScope.execute {
listOf(1,2,3).asyncAll(strict = true, parallel = false) {
if (it == 2) throw Error()
it == 0
}
}
// throws ScopeExecutionException since evaluates all other items and fails on item `2`.
You can use asyncAll to check each item against a remote service in parallel. Of course, you should use Bucket-Mode or Pool-Mode to restrict the number of concurrent requests. Otherwise, you could potentially overload the remote service.
Transforming (asyncMap) and filtering (asyncFilter) do no include strict parameter, since these functions require to process all collection items to return results.