Managing asynchronous tasks

It is somewhat common, for an application, to have to deal with a number of asynchronous tasks. Managing all the callbacks and potential synchronisation issues can become rapidly overwhelming and error-prone. The olympe.async package provides a few classes that makes this a lot easier and safer. The main motivation for providing our own library is because of the constraints and particularities of the Olympe Framework, in particular in regard to the memory management. The design has been kept somewhat simple on purpose to make the package as easy to use as possible.

There are 2 kinds of executor:

  • The SequencedExecutor which, as its name implies, will execute the tasks in sequence. That is it will wait until the first task has completed before starting the second one, then wait for that one to complete as well before starting the third one, and so on. If any of the tasks fails, the sequence is interrupted.
  • The ParallelExecutor on the other hand executes all the tasks in parallel and triggers a callback when they have all completed, or failed.

Task

A Task is simply a function that takes 3 arguments:

  • done: A function used to notify when the task has completed.
  • fail: A function used to notify when the task has failed (e.g. an error occurred).
  • env: A map, shared by all tasks, which can be used to pass data between tasks. The map will always contain the Executor entry to be able to access the Executor itself.

Executor

All executors inherit from the abstract Executor class and have a common set of methods.

The most used ones are:

  • add: Adds a task to the pool. The order of the additions is important as it will have an impact on the order of execution. In general, tasks will be executed, or started, in the same order they have been added.
  • onEnd: Registers a callback to be called when the Executor has completed. That is when all its tasks have ended. Parameters to the callback carry the information about the state (success or failure) of the tasks, and the optional error message in case of failure.
  • start: Will actually start the tasks in the executor's pool according to the strategy of that particular executor.
  • withTimeout: Lets you specify a global timeout for the execution of the task pool. If the timeout expire before all the tasks have completed, then the Executor will be terminated as failed.

ParallelExecutor

A ParallelExecutor executes all the tasks in the pool in parallel. That is all the tasks are started when the Executor is started. It completes when the last task has completed regardless of whether other tasks failed in the meantime. However, if any task failed to complete, the executor will be marked as failed and will notify the onEnd callbacks as such.

In the following, very basic, example all 3 tasks will be started immediately and will run in parallel (more precisely, javascript's single thread will be allowed to progress toward completion of each task in full independence).

const executor = new olympe.async.ParallelExecutor();
executor.add((done, fail, env) => {
    console.log('starting task 1.');
    setTimeout(() => {
        console.log('task 1 finished.');
        done();
    }, 100);
});
executor.add((done, fail, env) => {
    console.log('starting task 2.');
    setTimeout(() => {
        console.log('task 2 finished.');
        done();
    }, 300);
});
executor.add((done, fail, env) => {
    console.log('starting task 3.');
    setTimeout(() => {
        console.log('task 3 finished.');
        done();
    }, 200);
});
executor.onEnd((success, msg, env) => {
    console.log('All tasks completed.');
});
executor.start();

The console output will clearly show the order of executions:

starting task 1.
starting task 2.
starting task 3.
task 1 finished.
task 3 finished.
task 2 finished.
All tasks completed.

SequencedExecutor

A SequencedExecutor will, as its name implies, run the tasks in sequence. That is it will wait until a task finishes before starting the next one. If any of the tasks fails, the remaining tasks in the pool are cancelled and the Executor completes as failed.

If we take the previous example and substitute a SequencedExecutor:

const executor = new olympe.async.SequencedExecutor();
executor.add((done, fail, env) => {
    console.log('starting task 1.');
    setTimeout(() => {
        console.log('task 1 finished.');
        done();
    }, 100);
});
executor.add((done, fail, env) => {
    console.log('starting task 2.');
    setTimeout(() => {
        console.log('task 2 finished.');
        done();
    }, 300);
});
executor.add((done, fail, env) => {
    console.log('starting task 3.');
    setTimeout(() => {
        console.log('task 3 finished.');
        done();
    }, 200);
});
executor.onEnd((success, msg, env) => {
    console.log('All tasks completed.');
});
executor.start();

This time the console output will clearly show that tasks are executed in sequence:

starting task 1.
task 1 finished.
starting task 2.
task 2 finished.
starting task 3.
task 3 finished.
All tasks completed.

Failing a task

A Task is provided with 2 callbacks as arguments. The first one, usually named done, is used to notify the Executor that the task has successfully been completed. The second one, usually named fail, is used to notify that the tasks encountered an error and couldn't proceed. In both cases the tasks will be considered as ended, so these 2 callbacks should be called only before returning from the Task. The error message string passed to the fail callbacks will be carried over by the Executor and passed to the 'onEnd' callback as the 2nd argument.

Let's take a look at an example:

executor.add((done, fail, env) => {
    const request = olympe.net.HttpRequest.fromUrl(url, olympe.net.Http.Method.GET, content, olympe.net.Http.ResponseType.TEXT);
    this.netManager.httpRequest(request,
        response => {
            const ret = response.getStatus();
            if (ret === olympe.net.Http.Status.OK) {
                // Store the data in the cache if not disabled.
                // Do something with the response...
                done();
            } else {
               const errorMsg = `Failed to download ${url}. Response code ${ret}`;
               fail(errorMsg);
            }
        }
    );    
});

In this example we use the status of an http request to decide whether to call done() or fail().

Sharing data between tasks

The 3rd parameter passed to a Task is a Map that will be shared among all the tasks of that Executor and therefore can be used to pass data between tasks. Since it is also passed as an argument to onEnd callbacks, it is also the preferred way to pass the result of the tasks if necessary.

Here is an example:

const executor = new olympe.async.SequencedExecutor();
executor.add((done, fail, env) => {
    setTimeout(() => {
        env.set('val', Math.random());
        done();
    }, 100);
});
executor.add((done, fail, env) => {
    const val = env.get('val');
    setTimeout(() => {
        env.set('val', val + Math.random());
        done();
    }, 300);
});
executor.add((done, fail, env) => {
    const val = env.get('val');
    setTimeout(() => {
        env.set('val', val + Math.random());
        done();
    }, 200);
});
executor.onEnd((success, msg, env) => {
    console.log(`result is ${eng.get('val')}`);
});
executor.start();

This will display a random number partially generated by 3 different tasks.

Accessing the Executor from tasks

It is always possible to access the Executor itself from a task by getting it from the provided map. Its key is Executor and it is guaranteed to be present.

In the following example, we use it to schedule an extra task if the certain conditions are not met:

const genRandom = (done, fail, env) => {
    let val = env.get('val');
    setTimeout(() => {
         val += Math.random();
         env.set('val', val );
         if (val < 0.5) {
               const exec = env.get('Executor');
               exec.add( (done2, fail2, env2) => {
                   const val = env2.get('val');
                   setTimeout(() => {
                        env2.set('val', val + Math.random());
                        done2();
                   }, 200);
               });
         }
         done();
    }, 300);
};
const executor = new olympe.async.SequencedExecutor();
executor.add((done, fail, env) => {
    setTimeout(() => {
        env.set('val', Math.random());
        done();
    }, 100);
});
executor.add(genRandom);
executor.onEnd((success, msg, env) => {
    console.log(`result is ${eng.get('val')}`);
});
executor.start();

Execution Contexts

Finally, it is important to talk about ExecutionContext. By default, an Executor will create and manage its own ExecutionContext. The created context will be attached to the current Context. That context will be used to start all the tasks and will be automatically destroyed after the last onEnd callback has been executed. This guarantees that nodes created by the various tasks will be properly garbage collected.

If you wish to have a finer control over the ExecutionContext, you can override the defaults by using the two optional parameters of the constructor:

  • context: The ExecutionContext you wish to use instead of the default one.
  • autoDestroy: A boolean specifying whether you want that context to be automatically destroyed.

Finally you can access the ExecutionContext of any Executor by using the {olympe.async.Executor#getContext getContext} method.