Chapter 17. Master/Slave API

The outline of this short handbook:

  1. Introduction

  2. Principle and Usage

  3. The API Description

  4. A Simple Example

  5. Advanced usage

17.1. Overview

Master/Slave computations are the most common case of distributed computations. They are often described as embarrassingly parallel problems , which means that no particular effort is needed to segment the problem into a very large number of parallel tasks, and there is no essential dependency (or communication) between those parallel tasks.

The main goal of the Master/Slave API is to provide an easy to use framework to parallelize these embarrassingly parallel applications.

The main features are:

  • Automatic scheduling of tasks to the slaves.

  • Automatic load-balancing between the slaves.

  • Automatic fault-tolerance mechanism (i.e. when a slave is missing, the task is rescheduled).

  • Very simple mechanism of solution gathering .

  • All internal concepts of ProActive are hidden from the user.

  • Open API for extensions.

17.2. The Master/Slave API Principles and Usage

The usage of the Master/Slave API is simple and it consists basically of four steps:

  1. Deployment of the master/slave framework.

  2. Task definition and submission .

  3. Results gathering .

  4. Optionally, release of acquired resources.

17.2.1. Deployment of the master/slave framework

The deployment of the Master/Slave framework relies on the ProActive deployment mechanism . In order to deploy a set of worker, the master needs either:

  • A ProActive deployment descriptor : in this case the master will use the descriptor to deploy its resources.

  • A set of already deployed ProActive resources (i.e. a VirtualNode object or a Collection of Node objects).

For a better explanation of the ProActive deployment mechanism and of ProActive deployment descriptors, see Chapter 22, XML Deployment Descriptors .

Deployment of the master/slave framework

Figure 17.1.  Deployment of the master/slave framework

17.2.2. Tasks definition and submission

In order to submit tasks to the Master, the Task interface from the Master/Slave API needs to be implemented. In this interface, the unique method run will correspond to the code being executed remotely. After tasks have been submitted to him, the master will dispatch tasks automatically to the slaves.

Warning ! : when a Java object implementing the Task interface (i.e. a user task) is submitted to the master, the object will be deep-copied to the master. In consequence, every referenced objects will also be copied. When tasks are submitted to the remote slaves, the user task objects will be serialized and sent though the network. As a consequence, information which means something only locally will be lost (database connections, reference to a thread, etc...)

Tasks definition and submission

Figure 17.2. Tasks definition and submission

Chapter 22, XML Deployment Descriptors

17.2.3. Results gathering

Results will be collected by the master when the calculations are complete. Users can either wait that one or every results are available. Users have also the option to ask the master for result availability and continue their own work until results are finally available.

Results gathering

Figure 17.3. Results gathering

Two results can be received in two modes specified in the ResultReceptionMode :

  • CompletionOrder mode (default) : in this mode, users will receive the results in an unspecified order , depending on tasks completion order.

  • SubmissionOrder mode : in this mode, users will receive the results in the same order as they were submitted to the master.

The figures below explain the two different modes :

Completion order

Figure 17.4. Completion order

Submission order

Figure 17.5. Submission order

17.3. The API Details

17.3.1. Master creation and deployment

The Master/Slave API entry point and main class is the following:

org.objectweb.proactive.examples.masterslave.ProActiveMaster

The methods of this class are described in the interface:

org.objectweb.proactive.examples.masterslave.interfaces.Master

The user has the possibility to create either a local master or a remote master.

17.3.1.1. Local Master creation

In order to create a local master the following constructors can be used:

    /**
     * Creates a local master (you can add resources afterwards)
     */
    public ProActiveMaster();

Using this constructor, a master will be created in the current JVM, the master will share CPU usage and memory with the user JVM.

17.3.1.2. Remote Master creation

In order to create a remote master the following constructors can be used:

    /**
     * Creates an empty remote master that will be created on top of the given Node <br>
     * Resources can be added to the master afterwards
     * @param remoteNodeToUse this Node will be used to create the remote master
     */
    public ProActiveMaster(Node remoteNodeToUse);

    /**
     * Creates a remote master with the URL of a descriptor and the name of a virtual node
     * The master will be created on top of a single resource deployed by this virtual node
     * @param descriptorURL url of the ProActive descriptor
     * @param masterVNName name of the virtual node to deploy inside the ProActive descriptor
     */
     
    public ProActiveMaster(URL descriptorURL, String masterVNName);

Using either of these constructors, a master will be created in the specified remote resource(JVM), the master will share CPU usage and memory with existing running applications on the remote host. The mechanism in use to deploy the master remotely is the ProActive deployment mechanism (see Chapter 22, XML Deployment Descriptors for further details).

17.3.1.3. Adding Resources

Now that the master has been created, resources (slaves) must be added to it. The following methods can be used for creating slaves:

    /**
     * Adds the given Collection of nodes to the master <br/>
     * @param nodes a collection of nodes
     */
    void addResources(Collection<Node> nodes);
    
    /**
     * Adds every resource inside the given virtual node to the master <br/>
     * @param virtualnode a virtual node object
     */
    void addResources(VirtualNode virtualnode);

    /**
     * Adds the given descriptor to the master<br>
     * Every virtual nodes inside the given descriptor will be activated<br/>
     * @param descriptorURL URL of a deployment descriptor
     */
    void addResources(URL descriptorURL);

    /**
    * Adds the given descriptor to the master<br>
    * Only the specified virtual node inside the given descriptor will be activated <br/>
    * @param descriptorURL URL of a deployment descriptor
    * @param virtualNodeName name of the virtual node to activate
    */
    void addResources(URL descriptorURL, String virtualNodeName);

The first two methods will tell the master to create slaves on already deployed ProActive resources. The last two methods will ask the master to deploy resources using a ProActive descriptor and to create Slaves on top of these resources. For a complete explanation of ProActive's deployment mechanism, please refer to Chapter 22, XML Deployment Descriptors .

17.3.2. Task Definition

The task interface is defined at:

org.objectweb.proactive.examples.masterslave.interfaces.Task

public interface Task<R extends Serializable> extends Serializable {
    /**
     * A task to be executed
     * @param memory access to the slave memory
     * @return the result
     * @throws Exception
     */
    public R run(SlaveMemory memory) throws Exception;
};

Users need to implement the Task interface to define their tasks. The SlaveMemory parameter is explained in the Advanced Usage chapter.

17.3.3. Task submission

    /**
     * Adds a list of tasks to be solved by the master <br/>
     * @param tasks list of tasks
     * @throws TaskAlreadySubmittedException if a task is submitted twice
     */
    void solve(List<T> tasks) throws TaskAlreadySubmittedException;

Warning ! : the master keeps a track of task objects that have been submitted to it and which are currently computing. Submitting two times the same task object without waiting for the result of the first computation is not allowed.

17.3.4. Specifying result reception order

Result reception order can be switched from Completion order to Submission order using the following method :

    /**
     * Sets the current ordering mode <br/>
     * If reception mode is switched while computations are in progress,<br/>
     * then subsequent calls to waitResults methods will be done according to the new mode.<br/>
     * @param mode the new mode for result gathering
     */
    void setResultReceptionOrder(OrderingMode mode);

The default mode of the M/S API is Completion order. The mode can be switched dynamically, which means that subsequent calls to waitXXX methods (see below), will be done according to the new mode.

17.3.5. Collecting results

    /**
     * Wait for all results, will block until all results are computed <br>
     * The ordering of the results depends on the result reception mode in use <br>
     * @return a collection of objects containing the result
     * @throws TaskException if a task threw an Exception
     */
    List<R> waitAllResults() throws TaskException;

    /**
     * Wait for the first result available <br>
     * Will block until at least one Result is available. <br>
     * Note that in SubmittedOrder mode, the method will block until the next result in submission
 order is available<br>
     * @return an object containing the result
     * @throws TaskException if the task threw an Exception
     */
    R waitOneResult() throws TaskException;

    /**
     * Wait for a number of results<br>
     * Will block until at least k results are available. <br>
     * The ordering of the results depends on the result reception mode in use <br>
     * @param k the number of results to wait for
     * @return a collection of objects containing the results
     * @throws TaskException if the task threw an Exception
     */
    List<R> waitKResults(int k) throws TaskException;

    /**
     * Tells if the master is completely empty (i.e. has no result to provide and no tasks
 submitted)
     * @return the answer
     */
    boolean isEmpty();

    /**
     * Returns the number of available results <br/>
     * @return the answer
     */
    int countAvailableResults();

Five methods can be used in order to collect results:

  • The first three methods will block the current thread until the corresponding result(s) is(are) available. If an exception occurs during the execution of one task, this exception will be thrown back to the user by the wait method.

  • The fourth method will give indications on results availability but will not block the user thread.

  • The last method will tell when the user has received every results of tasks previously submitted.

17.3.6. Terminating the master

    /**
     * Terminates the master (and eventually free every resources)
     * @param freeNodeResources tells if the master should as well free the node resources
     * @return success
     */
    public void terminate(boolean freeResources);

One single method is used to terminate the master. A boolean parameter tells the master to free resources or not (i.e. terminate remote JVMs).

17.4. A Simple Example

This very simple example tells if a given number is prime or not. It computes the answer by trying to divide this candidate by every integer from 2 to the candidate. Of course, this algorithm is very naive and inefficient as there are many faster ways of telling if a number is prime. The complete example is available (a slightly optimized version), along with more complex ones under the following package in the Examples source directory:

org.objectweb.proactive.examples.masterslave

First, the task definition: it tests if any number of a given interval divides the prime candidate.

    /**
     * Task to find if any number in a specified interval divides the given candidate
     */
    public static class FindPrimeTask implements Task<Boolean> {
        private long begin;
        private long end;
        private long candidate;

        public FindPrimeTask(long candidate, long begin, long end) {
            this.begin = begin;
            this.end = end;
            this.candidate = candidate;
        }

        /* (non-Javadoc)
         * @see
 org.objectweb.proactive.extra.masterslave.interfaces.Task#run(org.objectweb.proactive.extra.masters
lave.interfaces.SlaveMemory)
         */
        public Boolean run(SlaveMemory memory) {
            for (long divider = begin; divider < end; divider++) {
                if ((candidate % divider) == 0) {
                    return new Boolean(false);
                }
            }
            return new Boolean(true);
        }
    }

The example main method: the master is created and resources are added using a deployment descriptor .

        //      Creating the Master
        master = new ProActiveMaster<FindPrimeTask, Boolean>();

        //      Adding resources to the master
        master.addResources(
"ProActive/src/Examples/org/objectweb/proactive/examples/masterslave/RSHListbyHost_Example.xml", 
"matrixNode");

Then, the tasks are created and submitted to the master. First the method which divides the interval [2, candidate-1] into n subintervals

    /**
     * Creates the prime computation tasks to be solved
     * @return
     */
    public List<FindPrimeTask> createTasks(long prime_to_find, int number_of_intervals) {
        List<FindPrimeTask> tasks = new ArrayList<FindPrimeTask>();

        tasks.add(new FindPrimeTask(prime_to_find, 2,
                prime_to_find / number_of_intervals));
        for (int i = 1; i < (number_of_intervals - 1); i++) {
            tasks.add(new FindPrimeTask(prime_to_find,
                    ((prime_to_find / number_of_intervals) * i) + 1,
                    (prime_to_find / number_of_intervals) * (i + 1)));
        }
        tasks.add(new FindPrimeTask(prime_to_find,
                (prime_to_find / number_of_intervals) * (number_of_intervals -
                1), prime_to_find));
        return tasks;
    }

Then, the task submission in the main method:

        // Creating and Submitting the tasks
        master.solve(createTasks(22974513717, 20));

Then, the results are gathered and the result is displayed.

        // Collecting the results
        List<Boolean> results = master.waitAllResults();

        // Displaying result
        boolean prime = true;

        for (Boolean result : results) {
            prime = prime && result;
        }

        System.out.println("" + prime_to_find +
            (prime ? " is prime." : " is not prime."));

Finally, the master is terminated (all resources are freed) and the program exits.

        master.terminate(true);
        System.exit(0);

17.5. Advanced Usage

17.5.1. Configuration of the Master/Slave

17.5.1.1. Ping Period

At regular intervals, the Master sends a "ping" message to every Slaves to check if they are alive and reachable. The Ping period configuration parameter is the period in millisecond between two "ping" messages. The default value of this parameter is 10000 (which corresponds to 10 seconds).

In order to change this default value, the method described underneath must be called :

    /**
     * Sets the period at which ping messages are sent to the slaves <br/>
     * @param periodMillis the new ping period
     */
    void setPingPeriod(long periodMillis);

17.5.2. Using the Slave Memory

17.5.2.1. Principle

The Slave Memory principle is to allow users to store and retrieve data from a slave's address space . The typical use case is when one uses the master slave API to compute an iterative process. An iterative process consists generally of an initialization step 0, followed by n computation steps, where step n needs the results of step n-1. The initialization steps often requires that a large amount of information is "loaded" into the slave. Without the slave memory access, this information will be lost at each step of the iteration, which means that the initialization step 0 needs to be done at step 1,2, ... n, etc...

17.5.2.2. Usage

17.5.2.2.1. Structure and API

The slave memory structure is very simple: it consists of <key, value> associations. A java object value is therefore saved in the memory with the given name, and this name will be needed to retrieve the value later on.

The Slave Memory API consists of three methods save , load , and erase . The interface to the slave memory is available when running a Task as a parameter of the run method. The user can therefore use this interface to save, load or erase objects in the local slave's memory. Below is the detailed SlaveMemory interface:

    /**
     * Save data under a specific name
     * @param name name of the data
     * @param data data to be saved
     */
    void save(String name, Object data);

    /**
     * Load some data previously saved
     * @param name the name under which the data was saved
     * @return the data
     */
    Object load(String name);

    /**
     * Erase some data previously saved
     * @param name the name of the data which need to be erased
     */
    void erase(String name);
17.5.2.2.2. Storing data

A user can store data in the Slaves' memory either when :

  1. Slaves are created remotely

  2. A task is run on the slave.

Usage of the first mechanism is done by providing a list of <key, value> pairs (Map) to the constructors of the ProActiveMaster class. Every constructors detailed above have a version including this extra parameter. The given list will be the initial memory of every slaves created by the master.

Usage of the second mechanism is done by using the SlaveMemory parameter in the Task interface's run method. In contradiction with the first method, only the slave currently running the Task will store the given data.

17.5.2.2.3. Retrieving and using the data

Loading and using any object stored in a Slave's memory is simply done through the SlaveMemory parameter in the run method of the Task interface.