It is somewhat common, for an application, to have to deal with a number of asynchronous tasks. Managing
all the callbacks and potential synchronisation issues can become rapidly overwhelming and error-prone.
The olympe.async
package provides a few classes that makes this a lot easier and safer.
The main motivation for providing our own library is because of the constraints and particularities of the
Olympe Framework, in particular in regard to the memory management.
The design has been kept somewhat simple on purpose to make the package as easy to use as possible.
There are 2 kinds of executor:
- The
SequencedExecutor
which, as its name implies, will execute the tasks in sequence. That is it will wait until the first task has completed before starting the second one, then wait for that one to complete as well before starting the third one, and so on. If any of the tasks fails, the sequence is interrupted. - The
ParallelExecutor
on the other hand executes all the tasks in parallel and triggers a callback when they have all completed, or failed.
Task
A Task
is simply a function that takes 3 arguments:
done
: A function used to notify when the task has completed.fail
: A function used to notify when the task has failed (e.g. an error occurred).env
: A map, shared by all tasks, which can be used to pass data between tasks. The map will always contain theExecutor
entry to be able to access the Executor itself.
Executor
All executors inherit from the abstract Executor
class and have a common set of methods.
The most used ones are:
add
: Adds a task to the pool. The order of the additions is important as it will have an impact on the order of execution. In general, tasks will be executed, or started, in the same order they have been added.onEnd
: Registers a callback to be called when theExecutor
has completed. That is when all its tasks have ended. Parameters to the callback carry the information about the state (success or failure) of the tasks, and the optional error message in case of failure.start
: Will actually start the tasks in the executor's pool according to the strategy of that particular executor.withTimeout
: Lets you specify a global timeout for the execution of the task pool. If the timeout expire before all the tasks have completed, then the Executor will be terminated asfailed
.
ParallelExecutor
A ParallelExecutor
executes all the tasks in the pool in parallel. That is all the tasks are started when
the Executor
is started. It completes when the last task has completed regardless of whether other tasks failed
in the meantime. However, if any task failed to complete, the executor will be marked as failed
and will notify the
onEnd
callbacks as such.
In the following, very basic, example all 3 tasks will be started immediately and will run in parallel (more precisely, javascript's single thread will be allowed to progress toward completion of each task in full independence).
const executor = new olympe.async.ParallelExecutor();
executor.add((done, fail, env) => {
console.log('starting task 1.');
setTimeout(() => {
console.log('task 1 finished.');
done();
}, 100);
});
executor.add((done, fail, env) => {
console.log('starting task 2.');
setTimeout(() => {
console.log('task 2 finished.');
done();
}, 300);
});
executor.add((done, fail, env) => {
console.log('starting task 3.');
setTimeout(() => {
console.log('task 3 finished.');
done();
}, 200);
});
executor.onEnd((success, msg, env) => {
console.log('All tasks completed.');
});
executor.start();
The console output will clearly show the order of executions:
starting task 1.
starting task 2.
starting task 3.
task 1 finished.
task 3 finished.
task 2 finished.
All tasks completed.
SequencedExecutor
A SequencedExecutor
will, as its name implies, run the tasks in sequence.
That is it will wait until a task finishes before starting the next one. If any of the tasks fails, the remaining
tasks in the pool are cancelled and the Executor completes as failed
.
If we take the previous example and substitute a SequencedExecutor
:
const executor = new olympe.async.SequencedExecutor();
executor.add((done, fail, env) => {
console.log('starting task 1.');
setTimeout(() => {
console.log('task 1 finished.');
done();
}, 100);
});
executor.add((done, fail, env) => {
console.log('starting task 2.');
setTimeout(() => {
console.log('task 2 finished.');
done();
}, 300);
});
executor.add((done, fail, env) => {
console.log('starting task 3.');
setTimeout(() => {
console.log('task 3 finished.');
done();
}, 200);
});
executor.onEnd((success, msg, env) => {
console.log('All tasks completed.');
});
executor.start();
This time the console output will clearly show that tasks are executed in sequence:
starting task 1.
task 1 finished.
starting task 2.
task 2 finished.
starting task 3.
task 3 finished.
All tasks completed.
Failing a task
A Task
is provided with 2 callbacks as arguments. The first one, usually named done
, is used to notify
the Executor
that the task has successfully been completed. The second one, usually named fail
, is used
to notify that the tasks encountered an error and couldn't proceed. In both cases the tasks will be considered
as ended, so these 2 callbacks should be called only before returning from the Task
.
The error message string passed to the fail
callbacks will be carried over by the Executor and passed to the 'onEnd'
callback as the 2nd argument.
Let's take a look at an example:
executor.add((done, fail, env) => {
const request = olympe.net.HttpRequest.fromUrl(url, olympe.net.Http.Method.GET, content, olympe.net.Http.ResponseType.TEXT);
this.netManager.httpRequest(request,
response => {
const ret = response.getStatus();
if (ret === olympe.net.Http.Status.OK) {
// Store the data in the cache if not disabled.
// Do something with the response...
done();
} else {
const errorMsg = `Failed to download ${url}. Response code ${ret}`;
fail(errorMsg);
}
}
);
});
In this example we use the status of an http request to decide whether to call done()
or fail()
.
Sharing data between tasks
The 3rd parameter passed to a Task
is a Map
that will be shared among all the tasks of that Executor
and
therefore can be used to pass data between tasks. Since it is also passed as an argument to onEnd
callbacks,
it is also the preferred way to pass the result of the tasks if necessary.
Here is an example:
const executor = new olympe.async.SequencedExecutor();
executor.add((done, fail, env) => {
setTimeout(() => {
env.set('val', Math.random());
done();
}, 100);
});
executor.add((done, fail, env) => {
const val = env.get('val');
setTimeout(() => {
env.set('val', val + Math.random());
done();
}, 300);
});
executor.add((done, fail, env) => {
const val = env.get('val');
setTimeout(() => {
env.set('val', val + Math.random());
done();
}, 200);
});
executor.onEnd((success, msg, env) => {
console.log(`result is ${eng.get('val')}`);
});
executor.start();
This will display a random number partially generated by 3 different tasks.
Accessing the Executor from tasks
It is always possible to access the Executor
itself from a task by getting it from the provided map.
Its key is Executor
and it is guaranteed to be present.
In the following example, we use it to schedule an extra task if the certain conditions are not met:
const genRandom = (done, fail, env) => {
let val = env.get('val');
setTimeout(() => {
val += Math.random();
env.set('val', val );
if (val < 0.5) {
const exec = env.get('Executor');
exec.add( (done2, fail2, env2) => {
const val = env2.get('val');
setTimeout(() => {
env2.set('val', val + Math.random());
done2();
}, 200);
});
}
done();
}, 300);
};
const executor = new olympe.async.SequencedExecutor();
executor.add((done, fail, env) => {
setTimeout(() => {
env.set('val', Math.random());
done();
}, 100);
});
executor.add(genRandom);
executor.onEnd((success, msg, env) => {
console.log(`result is ${eng.get('val')}`);
});
executor.start();
Execution Contexts
Finally, it is important to talk about ExecutionContext
. By default, an Executor
will create and manage its own ExecutionContext
. The created context will be attached to the current Context. That context will be used to start all the tasks and will be automatically destroyed after the last onEnd
callback has been executed. This guarantees that nodes created by the various tasks will be properly garbage collected.
If you wish to have a finer control over the ExecutionContext
, you can override the defaults by using the
two optional parameters of the constructor:
context
: TheExecutionContext
you wish to use instead of the default one.autoDestroy
: A boolean specifying whether you want that context to be automatically destroyed.
Finally you can access the ExecutionContext
of any Executor
by using the {olympe.async.Executor#getContext getContext}
method.