Category Archives: C++

Graph Coloring with GLPK

In this article we will present a simple code finding an optimal solution to the graph coloring problem using Integer Linear Programming (ILP). We used the GNU Linear Programming Kit (glpk) to solve the ILP problem.


As a project assignment for school we recently had to implement an optimized MPI program given a undirected graph where the edges represent the communications that should take place between MPI processes. For instance this program could be used for a distributed algorithm working on a mesh where each MPI process work on a share of the whole mesh, the edges represent the exchange of boundaries conditions for each iteration.
For a given process, the order of communications with all the neighbours is highly correlated with the performance of the whole program. If all processes start by sending data to the same process, the network bandwidth to this process will be a bottleneck. Therefore it is important to find a good order for communications. Ideally, at each step one process should be involved in only one communication.

By coloring the edges of the graph (all adjacent edges must have different colors), we can find a good scheduling order for MPI communications. Edge coloring can be solved directly, but we have chosen to use the line graph (also called edge-to-vertex dual) instead of the original graph. Therefore we just have to color the vertices of the line graph instead of implementing an edge coloring algorithm.


Initially we implemented a greedy coloring algorithm using the Welsh-Powell heuristic. This algorithm is fast and generally yields fairly good results, but I was interested in getting the optimal solution. Remembering the course I had on linear programming and the research papers I read on register allocation, I decided to use integer linear programming to solve this problem. The proposed implementation is by no means optimized, my goal was to implement a simple but optimal solution using a linear programming library. The final code is approximately 100 lines of code and I think it can be an interesting example for developers that want to start using GLPK.


We need an input format to describe our graph. We used a very simple file format: each line is a vertex, the first number on each line is the identifier of the vertex and the remaining numbers are the neighbours of this vertex. For instance, Wikipedia provides the following example for graph coloring:

Labelling the vertices from 0 to 5, we obtain the following file:

0 1 2 4 5
1 0 2 3 5
2 0 1 3 4
3 1 2 4 5
4 0 2 3 5
5 0 1 3 4

You will find other graph examples here:
- A bipartite graph (2-colorable, a greedy coloring algorithm can find \frac{n}{2} colors).
- The smallest graph that fails the DSATUR heuristic (3-colorable, DSATUR finds 4 colors).
- The Grötzsch graph (4-colorable).
- Graph K_5, i.e. complete graph with 5 vertices (5-colorable).
- The Petersen graph (3-colorable).

Linear Programming formulation

We implemented the LP formulation given by Mehrotra and Trick (the "VC" formulation).

The number of colors used by our solution is stored in an integer variable y.
Given K an upper bound on the number of colors needed, we use |V| \times K binary variables: x_{v, k} = 1 if vertex v is assigned color k.

The objective function is simply:

 \mathrm{Min} \; y

Now we need to define the set of constraints for the graph coloring problem.
With the first constraint we state that each vertex must be assigned exactly one color:

x_{v, 1} + x_{v, 2} + \ldots + x_{v, K} = 1 \quad \forall v \: \in V

The second constraint is a little tricky, we ensure that we use at most y colors by stating that all columns using colors indices greater than y are not used, i.e.:

x_{v, k} = 0 \quad \forall v \: \in V, \; \forall k \gt y

However, we can't use this formula in linear programming so we have to rewrite it:

y \ge k \times x_{v, k} \quad \forall v \: \in V, \; k = 1,\ldots,K

Last but not least, we still need to ensure that adjacent vertices are assigned different colors:

x_{u, k} + x_{v, k} \le 1 \quad \forall (u, v) \: \in E, \; k = 1,\ldots,K

GLPK Implementation

This article is not an introduction to GLPK but the library is simple to use if you have basic knowledge in linear programming, therefore I will not explain the GLPK functions used.

Our function will take as input the graph as a vector of vectors, this the same representation than the input file but with the current vertex index removed from each line.

void color_graph(const std::vector<std::vector<int> >& g)

We start by creating a problem object using GLPK and we set up the objective function:

  glp_prob* prob = glp_create_prob();
  glp_set_obj_dir(prob, GLP_MIN); // minimize

Before creating variables, we need an upper bound on the number of colors needed for our graph. Every graph can be colored with one more color than the maximum vertex degree, this will be our upper bound:

  int num_vertices = g.size();
  int max_colors = 0;
  for (int i = 0; i < num_vertices; ++i)
    max_colors = std::max(int(g[i].size()) + 1, max_colors);

As we have an upper bound for integer variable y, we can create it and add it to the objective function:

  int y = glp_add_cols(prob, 1);
  glp_set_col_bnds(prob, y, GLP_DB, 1, max_colors); // DB = Double Bound
  glp_set_obj_coef(prob, y, 1.);
  glp_set_col_kind(prob, y, GLP_IV); // IV = Integer Variable

We now need to allocate and set the type of the |V| \times K binary variables x_{v, k}. The indices are stored in a vector of vectors because we will need the indices while creating the constraints:

  std::vector<std::vector<int> > x(num_vertices, std::vector<int>(max_colors));
  for (int v = 0; v < num_vertices; ++v) 
    for (int k = 0; k < max_colors; ++k)
      x[v][k] = glp_add_cols(prob, 1);
      glp_set_col_kind(prob, x[v][k], GLP_BV); // BV = Binary Variable

To set up the constraints we must build the sparse matrix of coefficients by creating triplets (row, column, coeff).
These triplets are scattered in 3 different vectors. We must insert one element at the beginning because GLPK starts at index 1 after loading the matrix:

  std::vector<int> rows(1, 0);
  std::vector<int> cols(1, 0);
  std::vector<double> coeffs(1, 0.);

We now fill the three vectors by adding all the constraints (i.e. the rows) to the matrix, the first constraint:

  // One vertex must have exactly one color:
  // for each vertex v, sum(x(v, k)) == 1
  for (int v = 0; v < num_vertices; ++v)
    int row_idx = glp_add_rows(prob, 1);
    glp_set_row_bnds(prob, row_idx, GLP_FX, 1, 1); // FX: FiXed bound
    for (int k = 0; k < max_colors; ++k)

The second constraint:

  // We ensure we use y colors max:
  // for each vertex v and for each color c,                
  //    y >= (k + 1) * x(v, k)
  for (int v = 0; v < num_vertices; ++v)                    
    for (int k = 0; k < max_colors; ++k)
      int row_idx = glp_add_rows(prob, 1);
      glp_set_row_bnds(prob, row_idx, GLP_LO, 0, -1); // LO = LOwer bound
      coeffs.push_back(- k - 1);

And now the last set of constraints, this is a bit longer because we iterate on all edges. The graph is undirected but edges are duplicated in our file format, so we must ensure we do not add constraints twice:

  // Adjacent vertices cannot have the same color:        
  // for each edge (src, dst) and for each color k,                         
  //    x(src, k) + x(dst, k) <= 1                                          
  for (int src = 0; src < num_vertices; ++src)
    const std::vector<int>& succs = g[src];
    for (int s = 0; s < succs.size(); ++s)
      int dst = succs[s];
      // Ensure we don't add both (u, v) and (v, u)                                    
      if (src > dst)
        for (int k = 0; k < max_colors; ++k)
          int row_idx = glp_add_rows(prob, 1);
          glp_set_row_bnds(prob, row_idx, GLP_UP, -1, 1); // UP = UPper bound

Everything is now set up! We must now load our sparse matrix into GLPK, ask GLPK to use the floating point solution as the initial solution (presolve) of our ILP problem and start the solver:

  glp_load_matrix(prob, rows.size() - 1, &rows[0], &cols[0], &coeffs[0]);
  glp_iocp parm;
  parm.presolve = GLP_ON;
  glp_intopt(prob, &parm);

After the last call returns, we have a minimal coloring solution, we can now print the value of y and x:

  double solution = glp_mip_obj_val(prob);
  std::cout << "Colors: " << solution << std::endl;
  for (int i = 0; i < num_vertices; ++i)
    std::cout << i << ": ";
    for (int j = 0; j < max_colors; ++j)
      std::cout << glp_mip_col_val(prob, x[i][j]) << " ";
    std::cout << std::endl;

For instance on the provided bipartite graph we obtain:

Colors: 2
0: 0 1 0 0
1: 0 1 0 0
2: 0 1 0 0
3: 1 0 0 0
4: 1 0 0 0
5: 1 0 0 0

The last two columns are empty, this is because we started with an upper bound K = 4 but we used only 2 colors.


If you want to run the program, here a small main function to load a graph file and call our coloring algorithm:

int main(int argc, char** argv)
  std::vector<std::vector<int> > g;
  std::ifstream fs(argv[1]);                                                
  while (true)  
    std::string line;
    std::getline(fs, line);
    if (fs.eof())
    std::istringstream iss(line);
    std::istream_iterator<int> begin(iss), eof;
    g.push_back(std::vector<int>(++begin, eof));


To end this article, here are some important points:
- If the initial bounds for K is tight, solving will be faster. For the lower bound you can use 2 instead of 1 if your graph is not edgeless. To find a better upper bound you can use a greedy coloring algorithm before using ILP to find the optimal solution.
- If you want to solve the problem faster, use another formulation using column generation.
- If you use C++ you might want to implement your own wrapper above GLPK in order to manipulate variables and constraints easily.


After reading my article, my teacher Anne-Laurence Putz kindly gave me another formulation which is simpler and generally more efficient.
We delete variable y and use the following objective function instead:

 \mathrm{Min} \; 1 \times x_{1, 1} + 2 \times x_{1, 2} + \ldots + K \times x_{1, K} + 1 \times x_{2, 1} + \ldots + K \times x_{|V|, K}

We assign a weight for each color, thus the solver will minimize the number of colors used when minimizing the objective function. We don't need the second constraint anymore therefore less rows are needed.

GDB - Debugging stripped binaries

A few days ago I had a discussion with a colleague on how to debug a stripped binary on linux with GDB.
Yesterday I also read an article from an ex-colleague at EPITA on debugging with the dmesg command.
I therefore decided to write my own article, here I will demonstrate how to use GDB with a stripped binary.

Test program

First of all, here is the small C program we will be working on:

#include <stdio.h>
 __attribute__ ((noinline)) void fun(int test)
  printf("value: %d\n", test);
int main()
  int v = 21;

You can notice we have used a GCC attribute to prevent the compiler from inlining the function.


GCC and symbols

When compiling a program, GCC (for example) adds symbols to the binary to help the developer during debugging. There are several types of symbols but the goal of this article is not to explain them.

Contrarily to popular beliefs, GCC does write symbols to an object file even in release mode (with the -O3 switch). That's why even with a release binary, you can do this:

$ gcc -O3 -m32 test.c
$ gdb a.out
Reading symbols from /home/felix/test/a.out...(no debugging symbols found)...done.
(gdb) b main
Breakpoint 1 at 0x8048453

Listing symbols

We can use the nm command to list all symbols in our binary:

$ nm a.out
08049f28 d _DYNAMIC
0804853c R _IO_stdin_used
w _Jv_RegisterClasses
08048470 T __libc_csu_init
U __libc_start_main@@GLIBC_2.0
U __printf_chk@@GLIBC_2.3.4
080483e0 t frame_dummy
08048410 T fun
08048440 T main

Symbols and debugging

With the previous information, GDB knows the bounds of all functions so we can ask the assembly code for any of them. For example thanks to nm we know there is a fun function in the code, we can ask its assembly code:

(gdb) disas fun
Dump of assembler code for function fun:
0x08048410 <+0>: push %ebp
0x08048411 <+1>: mov %esp,%ebp
0x08048413 <+3>: sub $0x18,%esp
0x08048416 <+6>: mov 0x8(%ebp),%eax
0x08048419 <+9>: movl $0x8048530,0x4(%esp)
0x08048421 <+17>: movl $0x1,(%esp)
0x08048428 <+24>: mov %eax,0x8(%esp)
0x0804842c <+28>: call 0x8048340 <__printf_chk@plt>
0x08048431 <+33>: leave
0x08048432 <+34>: ret

Discarding Symbols

Symbols can be removed from the binary using the strip command:

$ gcc -O3 -m32 test.c
$ strip -s a.out
$ nm a.out
nm: a.out: no symbols

Why stripping you may ask ? Well, the resulting binary is smaller which mean it uses less memory and therefore it probably executes faster. When applying this strategy system-wide, the responsiveness of the system will probably be better. You can check this by yourself: use nm on /bin/*: you won't find any symbols.

The problem

Okay, there are no more symbols now, what does it change when using GDB ?

$ gdb a.out
(gdb) b main
Function "main" not defined.
(gdb) b fun
Function "fun" not defined.

We cannot add a breakpoint now, even on the main function.

The solution

Locating the entry point

Debugging is still possible, but it is more complicated. First we need the memory address of the entry point:

(gdb) info file
Symbols from "a.out".
Local exec file:
`a.out', file type elf32-i386.
Entry point: 0x8048350

With GDB we can add a breakpoint on a memory address:

(gdb) b *0x8048350
Breakpoint 1 at 0x8048350
(gdb) run
Starting program: a.out

Breakpoint 1, 0x08048350 in ?? ()

Disassembling code

We managed to add a breakpoint on the entry point of our binary (and we reached that breakpoint), but we are still having some troubles with our favorite commands:

(gdb) disas
No function contains program counter for selected frame.
(gdb) step
Cannot find bounds of current function

As GDB does not know the bounds of the functions, it does not know which address range should be disassembled.

Once again, we will need to use a command working at a lower level.
We must use the examine (x) command on the address pointed by the Program Counter register, we ask a dump of the 14 next assembly instructions:

(gdb) x/14i $pc
=> 0x8048350: xor %ebp,%ebp
0x8048352: pop %esi
0x8048353: mov %esp,%ecx
0x8048355: and $0xfffffff0,%esp
0x8048358: push %eax
0x8048359: push %esp
0x804835a: push %edx
0x804835b: push $0x80484e0
0x8048360: push $0x8048470
0x8048365: push %ecx
0x8048366: push %esi
0x8048367: push $0x8048440
0x804836c: call 0x8048330 <__libc_start_main@plt>
0x8048371: hlt

Libc initialization

By looking at the code, you might be asking yourself: "Where the hell are we??"
The C runtime has to do some initialization before calling our own main function, this is handled by the initialization routine __libc_start_main (check its prototype here).

Before calling this routine, arguments are pushed on the stack in reverse order (following the cdecl calling convention). The first argument of __libc_start_main is a pointer to our main function, so we now have the memory address corresponding to our code: 0x8048440. This is what we found with nm earlier!
Let's add a breakpoint on this address, continue and disassemble the code:

(gdb) b *0x8048440
Breakpoint 2 at 0x8048440
(gdb) c

Breakpoint 2, 0x08048440 in ?? ()
(gdb) x/10i $pc
=> 0x8048440: push %ebp
0x8048441: mov %esp,%ebp
0x8048443: and $0xfffffff0,%esp
0x8048446: sub $0x10,%esp
0x8048449: movl $0x15,(%esp)
0x8048450: call 0x8048410
0x8048455: xor %eax,%eax
0x8048457: leave
0x8048458: ret
0x8048459: nop

This looks like our main function, the value 21 (0x15) is placed on the stack and a function (the address corresponds to fun) is called.
Afterwards, the eax register is cleared because our main function returns 0.

Additional commands

To step to the next assembly instruction you can use the stepi command.
You can use print and set directly on registers:

(gdb) print $eax
$1 = 1
(gdb) set $eax = 0x2a
(gdb) print $eax
$2 = 42

You can also dump the value of all registers:

(gdb) info registers
eax 0x2a 42
ecx 0xffffd6e4 -10524
edx 0xffffd674 -10636
ebx 0xf7fb8ff4 -134508556
esp 0xffffd64c 0xffffd64c
ebp 0x0 0x0
esi 0x0 0
edi 0x0 0
eip 0x8048440 0x8048440

That's all for now!

SSE - Image Processing

Recently I have written articles on image processing and on SSE, so here is an article on image processing AND SSE!

SSE for Image Processing

SSE instructions are particularly adapted for image processing algorithms. For instance, using saturated arithmetic instructions, we can conveniently and efficiently add two grayscale images without worrying about overflow. We can easily implement saturated addition, subtraction and blending (weighted addition) using SSE operators. We can also use floating point SSE operations to speed-up algorithms like Hough Transform accumulation or image rotation.

In this article we will demonstrate how to implement a fast dilation algorithm using only a few SSE instructions. First we will present the dilation operator, afterwards we will present its implementation using SSE, and finally we will discuss about the performance of our implementation.


Dilation is one of the two elementary operators of mathematical morphology. This operation expands the shape of the objects by assigning to each pixel the maximum value of all pixels from a structuring element. A structuring element is simply defined as a configuration of pixels (i.e. a shape) on which an origin is defined, called the center. The structuring element can be of any shape but generally we use a simple shape such as a square, a circle, a diamond, etc.

On a discrete grayscale image, the dilation of an image is computed by visiting all pixels; we assign to each pixel the maximum grayscale value from pixels in the structuring element. Dilation, along with its dual operation, erosion, forms the basis of mathematical morphology algorithms such as thinning, skeletonization, hit-or-miss, etc. A fast implementation of dilation and erosion would therefore benefit to all these operators.

Scalar Implementation

The scalar (without SSE) implementation on a grayscale image is done by using an array of offsets in the image as the neighborhood. For our implementation we have used 4-connexity as illustrated below:

The red pixel is the center, and the blue pixels forms the structuring element.

In order to avoid losing time when dealing with borders, each image has extra pixels to the left and to the right of each row. The number of additional rows depends on the width of the structuring element. There is also a full additional row before and after the image data. Thus, a dilation can be computed without having to handle borders in the main loop.

// Image is just a structure containing the size of the image and a pointer to the data.
void dilation(const Image& src_, Image& dst_)
  int width = src_.width;
  int height = src_.height;
  // Step is the size (in bytes) of one row (border included).
  int step = src_.step;
  dst_.resize(width, height);
  uint8* dst = dst_.value_ptr; // uint8 is a typedef for unsigned char
  const uint8* src = src_.value_ptr;
  const size_t wsize = 5;
  // The structuring element.
  const int off[wsize] = { -1, +1, 0, -step, +step };
  for (int i = 0; i < height; ++i)
    for (int j = 0; j < width; ++j)
      uint8 sup = 0;
      for (int k = 0; k < wsize; ++k)
	sup = std::max(sup, src[j + off[k]]);
      dst[j] = sup;
    dst += step;
    src += step;

SSE Implementation

Using the SSE function _mm_max_epu8 it is possible to compute the maximum on 16 pairs of values of type unsigned char in one instruction. The principle remains the same, but instead of fetching a single value for each offset value, we fetch 16 adjacent pixels. We can then compute the dilation equivalent to 16 structuring elements using SSE instructions.

We need to perform 5 SSE loads for our structuring element, as illustrated in the following image (but with 4 pixels instead of 16). Each rectangle corresponds to a SSE load. By computing the minimum of each groups of pixel and storing the result in the destination image we have perform the dilation with 4 consecutive structuring elements.

Regarding the implementation, it is really verbose (as always with SSE!), but still short, we only need to change the inner loop.

    for (int j = 0; j < width - 16; j += 16)
      // 2 unaligned loads.                                                                                                                                                                                      
      __m128i m = _mm_loadu_si128((const __m128i*)(src + j + off[0]));
      m = _mm_max_epu8(m, _mm_loadu_si128((const __m128i*)(src + j + off[1])));
      // 3 aligned loads: more efficient.                                                                                                                                                                        
      m = _mm_max_epu8(m, _mm_load_si128((const __m128i*)(src + j + off[2])));
      m = _mm_max_epu8(m, _mm_load_si128((const __m128i*)(src + j + off[3])));
      m = _mm_max_epu8(m, _mm_load_si128((const __m128i*)(src + j + off[4])));
      // Store the result in the destination image.
      _mm_store_si128((__m128i*)(dst + j), m);

We have used an aligned SSE load for the center and the pixels above and below the center. As mentioned in previous articles, using aligned load improves performances.


We have measured the performances of the SSE implementation against the scalar implementation on a Intel Core2 Duo P7350 (2 GHz). Two grayscale images of Lena were used.

2048 x 2048 4096 x 4096
Scalar 50 ms 196 ms
SSE 7.5 ms 31 ms

Using SSE, we managed to obtain a speedup factor of 6 compared to the scalar implementation!


Our algorithm runs pretty fast but it could be even faster! If we perform the dilation in-place, the code runs twice as fast as the previous implementation. If it's still not fast enough for your needs, you can also implement the algorithm directly in assembly using advanced techniques such as instruction pairing, prefetching... A good step-by-step tutorial on the subject can be found here for 4x4 matrix-vector multiplication.

TBB - Tasks: spawning, dependencies and recycling

The project

Last year we had to implement a spell-checker as a Text Mining project. The challenge was to be as fast as possible, and if possible faster than our teacher !
We implemented a trie and the Damerau-Levenshtein distance for fast lookup of similar words.

But it was still not fast enough, we decided it was a good opportunity to use Intel Threading Building Blocks (TBB) for parallelization. Our program had two modes: an interactive mode and a batch mode.

In interactive mode, the user writes a query composed of a word w and a maximum distance d. The programs then outputs all the words with a Damerau-Levenshtein less than d compared to w.

In batch mode, we have a long list of queries in a file. This is the mode used by our teacher for benchmarking our programs.


We can obviously store all queries in a vector and then apply a simple parallelization algorithm such as tbb::parallel_for. But we would need to store the results of each query and print them after all queries were processed (indeed we can process queries in parallel but we still need to print them in the correct order).

The program would be faster but would use a lot more RAM than the serial version: this is not acceptable. We would like to be able to use all our cores while still printing and flushing the results if possible.

In order to solve this problem I had to manipulate directly tbb::tasks, my solution is maybe not the most elegant, but it was a good opportunity for me to dig into the task system of TBB.

How it works

We need to decompose our problem into small tasks and express their dependencies.
To each query we associate two tasks: a Print task, and an Execute task.
The dependency between tasks is simple: the Print task can only be executed after the corresponding Execute task AND after the previous Print task.
This is pretty much all, now we need to express these dependencies with the task system of TBB.


Dependencies in TBB can handled in two ways: by spawning child tasks or by manipulating the reference counter.
When a task A increments the reference counter of another task B, or if A is a child task of B, it means that A must be executed before B. Once A has finished executing, it must decrease the reference counter of B, if the reference counter of a task reaches 0, the task is ready to be executed.

With TBB we declare a task by declaring a class deriving from tbb::task, this class must implement an execute method. This method is called when the task is fired by a thread.
The return value of this method is of type tbb::task*, if the return value is different from 0, it means we specify to the scheduler what task should be executed next (this is some kind of recycling, but be aware that recycling has a different meaning within the TBB library, for example recycle_as_continuation).

The implementation is now quite straightforward: specify dependencies by spawning child tasks and incrementing reference counters.

In order to simplify the code, the dependencies to the rest of the project were removed. Therefore, instead of executing a query, we sleep a random amount of time.

The Print Class

/*! \class Print                                                                                                                                                                                                 
** \brief A tbb::task printing the results of an execution.                                                                                                                                                      
** A Print task references the Print task corresponding to the next query.                                                                                                                                       
class Print: public tbb::task                                                                                                                                                                                    
  Print(unsigned idx)                                                                                                                                                                                            
    : next_(0),                                                                                                                                                                                                  
** \brief Set the reference to the next Print task and increment its                                                                                                                                             
** ref count.                                                                                                                                                                                                    
  void set_next(Print* next)                                                                                                                                                                                     
      next_ = next;                                                                                                                                                                                              
** \brief Print the results to a query.                                                                                                                                                                          
** \return The Print task corresponding to the next query if it can be                                                                                                                                           
** executed, 0 otherwise.                                                                                                                                                                                        
** After executing, a Print task decrements the reference count of the                                                                                                                                           
** next Print task. If this reference count is 0, this query was                                                                                                                                                 
** processed and therefore the result can be printed too.                                                                                                                                                        
  tbb::task* execute()                                                                                                                                                                                           
      std::cout << "print task " << idx_ << std::endl;                                                                                                                                                           
      if (next_ && next_->decrement_ref_count() == 0)                                                                                                                                                            
        return next_; // Spawn next print.                                                                                                                                                                       
      return 0;                                                                                                                                                                                                  
  Print* next_; /*!< The Print task corresponding to the next query */                                                                                                                                           
  unsigned idx_;                                                                                                                                                                                                 

The Execute Class

/*! \class Execute                                                                                                                                                                                               
** \brief A tbb::task processing a query.                                                                                                                                                                        
class Execute: public tbb::task                                                                                                                                                                                  
** \brief Constructor                                                                                                                                                                                            
  Execute(unsigned idx)                                                                                                                                                                                          
    : idx_(idx)                                                                                                                                                                                                  
** \brief Process the current query.                                                                                                                                                                             
** \return 0                                                                                                                                                                                       
  tbb::task* execute()                                                                                                                                                                                           
      std::cout << "execute task " << idx_ << std::endl;                                                                                                                                                         
      usleep(rand() % 500000);                                                                                                                                                                                 
      return 0;                                                                                                                                                                                                  
  unsigned idx_;                                                                                                                                                                                                 

The RangeSpawner Class

We need a last class in order to create all the tasks and spawn them. The RangeSpawner task emulates tbb::parallel_do.

/*! \class RangeSpawner                                                                                                                                                                                          
** \brief A tbb::task emulating tbb::parallel_do.                                                                                                                                                                
** Spawn an Execute and a Print task for each query and properly set the                                                                                                                                         
** dependencies.                                                                                                                                                                                                 
class RangeSpawner: public tbb::task                                                                                                                                                                             
** \brief Constructor                                                                                                                                                                                            
  RangeSpawner(std::vector<int>& queries)                                                                                                                                                                        
    : queries_(queries)                                                                                                                                                                                          
** \brief Spawn two tasks for each query.                                                                                                                                                                        
** Emulates tbb::parallel_do.                                                                                                                                                                                    
  tbb::task* execute()                                                                                                                                                                                           
      unsigned size = queries_.size();                                                                                                                                                                           
      // 1 child for the wait and 1 child for each query (Print)                                                                                                                                                                                                
      this->set_ref_count(size + 1);                                                                                                                                                                             
      // Allocate print tasks                                                                                                                                                                                    
      std::vector<Print*> print_tasks;                                                                                                                                                                           
      for (unsigned i = 0; i < size; ++i)                                                                                                                                                                        
        Print* p = new(this->allocate_child()) Print(i);                                                                                                                                                         
      // Set dependencies between print tasks: query i + 1 must be                                                                                                                                               
      // printed after query i.                                                                                                                                                                                  
      for (unsigned i = 0; i < size - 1; ++i)                                                                                                                                                                    
        print_tasks[i]->set_next(print_tasks[i + 1]);                                                                                                                                                            
      // Spawn a new Execution task for each query.                                                                                                                                                              
      for (unsigned i = 0; i < size; ++i)                                                                                                                                                                        
        Execute* e = new(print_tasks[i]->allocate_child()) Execute(i);                                                                                                                                           
      // Wait the end of all executions.                                                                                                                                                                         
      return 0;                                                                                                                                                                                                  
  std::vector<int>& queries_;/*!< The list of all queries */                                                                                                                                                     

Here the list of queries is just a list of integers (the ID of each request).
Note that tasks are allocated using placement new, therefore we don't need to delete them.
Another interesting point is that we must also set the reference counter of the RangeSpawner task, otherwise the method would exit before the execution of all queries.

Main function

The main function generates 500 queries and execute them in parallel while printing when possible.

int main()                                                                                                                                                                                                       
  std::vector<int> queries;                                                                                                                                                                                      
  unsigned count = 500;                                                                                                                                                                                          
  for (unsigned i = 0; i < count; ++i)                                                                                                                                                                           
  RangeSpawner* root = new(tbb::task::allocate_root()) RangeSpawner(queries);                                                                                                                                    

If everything is working well, print task i should always be preceded by execute task i.


The main problem with this implementation is that it is highly optimistic, we hope at least one thread will start at the beginning of our vector of queries, if this is not the case, we end up storing a lot of results, waiting for previous queries to be executed and printed.
Another possibility would be to set a dependency between Execute tasks too. Task i can only be executed if, for example, task i - 32 has been executed. But it could affect scalability.

SSE - Saturation Arithmetic

In a previous article I presented the SSE instruction set and how to use it in C/C++ with simple examples.
In this article I will introduce other instructions that have a really nice property: they allow you to use saturation arithmetic.

Saturation Arithmetic

With saturation arithmetic the result of an operation is bounded in a range between a minimum and a maximum value. For example, with saturation arithmetic in the range [0, 255], we have: 200 + 70 = 255 and 20 - 70 = 0.

This property would be great in C/C++ because when performing arithmetic the regular way, we use modular arithmetic, for example with an unsigned char we have : 200 + 70 = 14 and 20 - 70 = 206, this phenomenon is called overflow and at CPU level, it can be detected using the carry/overflow flags.

Saturation with SSE

As mentioned in the previous article, SSE was initially designed for signal processing and graphics processing. For example imagine we want to add/subtract two grayscale images together, we would be losing a lot of time if we had to clip the result by hand after the operation.
Fortunately, SSE provides special instructions for saturation arithmetic, with a single assembly instruction you can add several values at the same time and clip all the results.


In the last article we used the _mm_add_epi8 function in order to add 16 char at the same time. In order to perform the same operation but with saturation, we simply use _mm_adds_epi8 (notice the additional 's').

In the following example we add two unsigned char values and print the result (but remember you can add 16 values at the same time).

  unsigned char a[16] __attribute__ ((aligned (16))) = { 200 };                                                                                                                                                  
  unsigned char b[16] __attribute__ ((aligned (16))) = { 70 };                                                                                                                                                   
  __m128i l = _mm_load_si128((__m128i*)a);                                                                                                                                                                                      
  __m128i r = _mm_load_si128((__m128i*)b);  
  _mm_store_si128((__m128i*)a, _mm_adds_epu8(l, r));
  std::cout << (int)a[0] << std::endl;

This small program should print 255, instead of 14 with modular arithmetic.

We can also subtract unsigned bytes using _mm_subs_epi8:

  unsigned char a[16] __attribute__ ((aligned (16))) = { 20 };                                                                                                                                                  
  unsigned char b[16] __attribute__ ((aligned (16))) = { 70 };                                                                                                                                                   
  __m128i l = _mm_load_si128((__m128i*)a);                                                                                                                                                                                      
  __m128i r = _mm_load_si128((__m128i*)b);  
  _mm_store_si128((__m128i*)a, _mm_subs_epu8(l, r));
  std::cout << (int)a[0] << std::endl;

This program should output 0 (206 with modular arithmetic).


With SSE, saturation arithmetic is limited to signed/unsigned 8 and 16 bytes operands: epi8, epu8, epi16, epu16. The available arithmetic operations are adds and subs.

C++ - Getting started with SSE

In this article I will present how to use SSE instructions in C++ (or C).
My goal is not to show how to write the fastest possible program using SSE but rather to introduce to its usage.

What is SSE ?

SSE stands for Streaming SIMD Extensions. It is a set of CPU instructions dedicated to applications like signal processing, scientific computation or 3D graphics.

SIMD is an acronym itself: Single Instruction, Multiple Data. A CPU instruction is said to be SIMD when the same operation is applied on multiple data at the same time.

SSE was first introduced in the Pentium III in 1999. Over the time, this instruction set was improved by adding more sophisticated operations. Eight 128-bits registers were added to the CPU: xmm0 through xmm7.

Initially those registers could only be used for single-precision computations (i.e. for the type float). But since SSE2, those registers can be used for any primitive data type.
Given a standard 32-bit machine we can therefore store and process in parallel:
- 2 double
- 2 long
- 4 float
- 4 int
- 8 short
- 16 char
Note that integer types can be signed or unsigned, but you will have to use (sometimes) different instructions.

For instance if you have to compute the sum of two int arrays, using SSE you can compute 4 additions in a single assembly instruction.

Simple program

It is not easy getting started with SSE, fortunately the MSDN documentation is complete and well-written.
If you take a look at the list of arithmetic operations you can notice there is always a corresponding assembly instruction. Some operations on the other side (such as set operations) are composite operations.
Using SSE in C++ is indeed a very low-level construct: we will manipulate directly the 128-bits registers through variables of type __m128 for float, __m128d for double and __m128i for int, short, char.

But the good news is that you don't need to declare arrays of type __m128 in order to use SSE: for instance if you want to calculate the square root of a large array of float, you can simply cast your pointer to the type __m128* and then use SSE operations.

There is a tweak however, most SSE operations requires the data to be 16-bytes aligned, here we will use another variable attributes of gcc.
We use the align attribute:

aligned (alignment)
This attribute specifies a minimum alignment for the variable or structure field, measured in bytes.

Here is a simple code on how to use SSE in order to compute the square root of 4 float in a single operation using the _mm_sqrt_ps function.

  float a[] __attribute__ ((aligned (16))) = { 41982.,  81.5091, 3.14, 42.666 };                                                                                                                                 
  __m128* ptr = (__m128*)a;                                                                                                                                                                                      
  __m128 t = _mm_sqrt_ps(*ptr);

The corresponding assembly instruction is SQRTPS, if we generate the assembly code of our program (using gcc -S) we can notice that this instruction is indeed used on a SSE register:

sqrtps  %xmm0, %xmm0

Don't forget the include !

  #include <emmintrin.h>

First benchmark

In the previous program we computed the square root of 4 float simultaneously but the result was not stored in the array. For this purpose we use _mm_store_ps.
In the following program we compute the square root of a very large array on float. We use the timer class presented here in order to measure the execution time of the SSE version against the standard version.

void normal(float* a, int N)                                                                                                                                                                                     
  for (int i = 0; i < N; ++i)                                                                                                                                                                                    
    a[i] = sqrt(a[i]);                                                                                                                                                                                           
void sse(float* a, int N)                                                                                                                                                                                        
  // We assume N % 4 == 0.                                                                                                                                                                                        
  int nb_iters = N / 4;                                                                                                                                                                                         
  __m128* ptr = (__m128*)a;                                                                                                                                                                                      
  for (int i = 0; i < nb_iters; ++i, ++ptr, a += 4)                                                                                                                                                              
    _mm_store_ps(a, _mm_sqrt_ps(*ptr));                                                                                                                                                                          
int main(int argc, char** argv)                                                                                                                                                                                  
  if (argc != 2)                                                                                                                                                                                                 
    return 1;                                                                                                                                                                                                    
  int N = atoi(argv[1]);                                                                                                                                                                                         
  float* a;                                                                                                                                                                                                      
  posix_memalign((void**)&a, 16,  N * sizeof(float));                                                                                                                                                            
  for (int i = 0; i < N; ++i)                                                                                                                                                                                    
    a[i] = 3141592.65358;                                                                                                                                                                                        
    normal(a, N);                                                                                                                                                                                                
  for (int i = 0; i < N; ++i)                                                                                                                                                                                    
    a[i] = 3141592.65358;                                                                                                                                                                                        
    sse(a, N);                                                                                                                                                                                                   

In the SSE function we use two pointers containing the same address but of different types, it is of course not necessary but avoid casting. It is also interesting to notice that we must increment the __m128 pointer by only 1 (128 bits) whereas we must increment the float pointer by 4 (32 bits).

Another interesting function here is the posix_memalign function instead of the align attribute. The first function allocates aligned data on the heap whereas the gcc attribute allocates on the stack.

The benchmark was compiled with llvm-g++ 4.2 (flags: -O3 -msse2) and ran on a Intel Core2 Duo P7350 (2GHz):

$ ./sqrt 64000000
normal: 392ms
SSE: 145ms

Pretty fast indeed !

Second Benchmark

And now how to add two arrays of char together:

void sse(char* a, const char* b, int N)                                                                                                                                                                          
  int nb_iters = N / 16;                                                                                                                                                                                         
  __m128i* l = (__m128i*)a;                                                                                                                                                                                      
  __m128i* r = (__m128i*)b;                                                                                                                                                                                      
  for (int i = 0; i < nb_iters; ++i, ++l, ++r)                                                                                                                                                                   
    _mm_store_si128(l, _mm_add_epi8(*l, *r));                                                                                                                                                                    

And the benchmark with full optimizations turned on:

$ ./add 64000000
normal: 98ms
SSE: 42ms

Performance considerations

You may ask why we don't have a 4-fold speedup when we compute the square root of 4 float in a single instruction and why we only have a 2-fold speedup when we add 16 char at the same time.

The answer is that your compiler is smart and performs a lot of optimizations, especially at the O3 level. In fact, if you take a look at the generated assembly code for the "normal" version of the sort and add functions you will probably notice that your compiler used SSE instructions behind your back. The compiler detected a loop pattern suitable for SSE and therefore decided to use dedicated instructions, however using directly the SSE instructions the generated code was even faster.

Depending on your compiler version, for such simple loops it is possible that you will notice no difference in execution time, but as mentioned, my goal was not pure performances.