edu.mines.jtk.util
Class Parallel

java.lang.Object
  extended by edu.mines.jtk.util.Parallel

public class Parallel
extends java.lang.Object

Utilities for parallel computing in loops over independent tasks. This class provides convenient methods for parallel processing of tasks that involve loops over indices, in which computations for different indices are independent.

As a simple example, consider the following function that squares floats in one array and stores the results in a second array.


 static void sqr(float[] a, float[] b) {
   int n = a.length;
   for (int i=0; i<n; ++i)
     b[i] = a[i]*a[i];
 }
 
A serial version of a similar function for 2D arrays is:

 static void sqrSerial(float[][] a, float[][] b) 
 {
   int n = a.length;
   for (int i=0; i<n; ++i) {
     sqr(a[i],b[i]);
 }
 
Using this class, the parallel version for 2D arrays is:

 static void sqrParallel(final float[][] a, final float[][] b) {
   int n = a.length;
   Parallel.loop(n,new Parallel.LoopInt() {
     public void compute(int i) {
       sqr(a[i],b[i]);
     }
   });
 }
 
In the parallel version, the method compute defined by the interface LoopInt will be called n times for different indices i in the range [0,n-1]. The order of indices is both indeterminant and irrelevant because the computation for each index i is independent. The arrays a and b are declared final as required for use in the implementation of LoopInt.

Note: because the method loop and interface LoopInt are static members of this class, we can omit the class name prefix Parallel if we first import these names with


 import static edu.mines.jtk.util.Parallel.*;
 
A similar method facilitates tasks that reduce a sequence of indexed values to one or more values. For example, given the following method:

 static float sum(float[] a) {
   int n = a.length;
   float s = 0.0f;
   for (int i=0; i<n; ++i)
     s += a[i];
   return s;
 }
 
serial and parallel versions for 2D arrays may be written as:

 static float sumSerial(float[][] a) {
   int n = a.length;
   float s = 0.0f;
   for (int i=0; i<n; ++i)
     s += sum(a[i]);
   return s;
 }
 
and

 static float sumParallel(final float[][] a) {
   int n = a.length;
   return Parallel.reduce(n,new Parallel.ReduceInt<Float>() {
     public Float compute(int i) {
       return sum(a[i]);
     }
     public Float combine(Float s1, Float s2) {
       return s1+s2;
     }
   });
 }
 
In the parallel version, we implement the interface ReduceInt with two methods, one to compute sums of array elements and another to combine two such sums together. The same pattern works for other reduce operations. For example, with similar functions we could compute minimum and maximum values (in a single reduce) for any indexed sequence of values.

More general loops are supported, and are equivalent to the following serial code:


 for (int i=begin; i<end; i+=step)
   // some computation that depends on i
 
The methods loop and reduce require that begin is less than end and that step is positive. The requirement that begin is less than end ensures that reduce is always well-defined. The requirement that step is positive ensures that the loop terminates.

Static methods loop and reduce submit tasks to a fork-join framework that maintains a pool of threads shared by all users of these methods. These methods recursively split tasks so that disjoint sets of indices are processed in parallel by different threads.

In addition to the three loop parameters begin, end, and step, a fourth parameter chunk may be specified. This chunk parameter is a threshold for splitting tasks so that they can be performed in parallel. If a range of indices to be processed is smaller than the chunk size, or if too many tasks have already been queued for processing, then the indices are processed serially. Otherwise, the range is split into two parts for processing by new tasks. If specified, the chunk size is a lower bound; the number of indices processed serially will never be lower, but may be higher, than a specified chunk size. The default chunk size is one.

The default chunk size is often sufficient, because the test for an excess number of queued tasks prevents tasks from being split needlessly. This test is especially useful when parallel loops are nested, as when looping over elements of multi-dimensional arrays.

For example, an implementation of the method sqrParallel for 3D arrays could simply call the 2D version listed above. Tasks will naturally tend to be split for outer loops, but not inner loops, thereby reducing overhead, time spent splitting and queueing tasks.

Reference: A Java Fork/Join Framework, by Doug Lea, describes the framework used to implement this class. This framework will be part of JDK 7.

Version:
2010.11.23
Author:
Dave Hale, Colorado School of Mines

Nested Class Summary
static interface Parallel.LoopInt
          A loop body that computes something for an int index.
static interface Parallel.ReduceInt<V>
          A loop body that computes and returns a value for an int index.
static class Parallel.Unsafe<T>
          A wrapper for objects that are not thread-safe.
 
Constructor Summary
Parallel()
           
 
Method Summary
static void loop(int begin, int end, int step, int chunk, Parallel.LoopInt body)
          Performs a loop for (int i=begin; i<end; i+=step).
static void loop(int begin, int end, int step, Parallel.LoopInt body)
          Performs a loop for (int i=begin; i<end; i+=step).
static void loop(int begin, int end, Parallel.LoopInt body)
          Performs a loop for (int i=begin; i<end; ++i).
static void loop(int end, Parallel.LoopInt body)
          Performs a loop for (int i=0; i<end; ++i).
static
<V> V
reduce(int begin, int end, int step, int chunk, Parallel.ReduceInt<V> body)
          Performs a reduce for (int i=begin; i<end; i+=step).
static
<V> V
reduce(int begin, int end, int step, Parallel.ReduceInt<V> body)
          Performs a reduce for (int i=begin; i<end; i+=step).
static
<V> V
reduce(int begin, int end, Parallel.ReduceInt<V> body)
          Performs a reduce for (int i=begin; i<end; ++i).
static
<V> V
reduce(int end, Parallel.ReduceInt<V> body)
          Performs a reduce for (int i=0; i<end; ++i).
static void setParallel(boolean parallel)
          Enables or disables parallel processing by all methods of this class.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

Parallel

public Parallel()
Method Detail

loop

public static void loop(int end,
                        Parallel.LoopInt body)
Performs a loop for (int i=0; i<end; ++i).

Parameters:
end - the end index (not included) for the loop.
body - the loop body.

loop

public static void loop(int begin,
                        int end,
                        Parallel.LoopInt body)
Performs a loop for (int i=begin; i<end; ++i).

Parameters:
begin - the begin index for the loop; must be less than end.
end - the end index (not included) for the loop.
body - the loop body.

loop

public static void loop(int begin,
                        int end,
                        int step,
                        Parallel.LoopInt body)
Performs a loop for (int i=begin; i<end; i+=step).

Parameters:
begin - the begin index for the loop; must be less than end.
end - the end index (not included) for the loop.
step - the index increment; must be positive.
body - the loop body.

loop

public static void loop(int begin,
                        int end,
                        int step,
                        int chunk,
                        Parallel.LoopInt body)
Performs a loop for (int i=begin; i<end; i+=step).

Parameters:
begin - the begin index for the loop; must be less than end.
end - the end index (not included) for the loop.
step - the index increment; must be positive.
chunk - the chunk size; must be positive.
body - the loop body.

reduce

public static <V> V reduce(int end,
                           Parallel.ReduceInt<V> body)
Performs a reduce for (int i=0; i<end; ++i).

Parameters:
end - the end index (not included) for the loop.
body - the loop body.
Returns:
the computed value.

reduce

public static <V> V reduce(int begin,
                           int end,
                           Parallel.ReduceInt<V> body)
Performs a reduce for (int i=begin; i<end; ++i).

Parameters:
begin - the begin index for the loop; must be less than end.
end - the end index (not included) for the loop.
body - the loop body.
Returns:
the computed value.

reduce

public static <V> V reduce(int begin,
                           int end,
                           int step,
                           Parallel.ReduceInt<V> body)
Performs a reduce for (int i=begin; i<end; i+=step).

Parameters:
begin - the begin index for the loop; must be less than end.
end - the end index (not included) for the loop.
step - the index increment; must be positive.
body - the loop body.
Returns:
the computed value.

reduce

public static <V> V reduce(int begin,
                           int end,
                           int step,
                           int chunk,
                           Parallel.ReduceInt<V> body)
Performs a reduce for (int i=begin; i<end; i+=step).

Parameters:
begin - the begin index for the loop; must be less than end.
end - the end index (not included) for the loop.
step - the index increment; must be positive.
chunk - the chunk size; must be positive.
body - the loop body.
Returns:
the computed value.

setParallel

public static void setParallel(boolean parallel)
Enables or disables parallel processing by all methods of this class. By default, parallel processing is enabled. If disabled, all tasks will be executed on the current thread.

Setting this flag to false disables parallel processing for all users of this class. This method should therefore be used for testing and benchmarking only.

Parameters:
parallel - true, for parallel processing; false, otherwise.