The project
Last year we had to implement a spell-checker as a Text Mining project. The challenge was to be as fast as possible, and if possible faster than our teacher !
We implemented a trie and the Damerau-Levenshtein distance for fast lookup of similar words.
But it was still not fast enough, we decided it was a good opportunity to use Intel Threading Building Blocks (TBB) for parallelization. Our program had two modes: an interactive mode and a batch mode.
In interactive mode, the user writes a query composed of a word w and a maximum distance d. The programs then outputs all the words with a Damerau-Levenshtein less than d compared to w.
In batch mode, we have a long list of queries in a file. This is the mode used by our teacher for benchmarking our programs.
Parallelization
We can obviously store all queries in a vector and then apply a simple parallelization algorithm such as tbb::parallel_for. But we would need to store the results of each query and print them after all queries were processed (indeed we can process queries in parallel but we still need to print them in the correct order).
The program would be faster but would use a lot more RAM than the serial version: this is not acceptable. We would like to be able to use all our cores while still printing and flushing the results if possible.
In order to solve this problem I had to manipulate directly tbb::tasks, my solution is maybe not the most elegant, but it was a good opportunity for me to dig into the task system of TBB.
How it works
We need to decompose our problem into small tasks and express their dependencies.
To each query we associate two tasks: a Print task, and an Execute task.
The dependency between tasks is simple: the Print task can only be executed after the corresponding Execute task AND after the previous Print task.
This is pretty much all, now we need to express these dependencies with the task system of TBB.
Implementation
Dependencies in TBB can handled in two ways: by spawning child tasks or by manipulating the reference counter.
When a task A increments the reference counter of another task B, or if A is a child task of B, it means that A must be executed before B. Once A has finished executing, it must decrease the reference counter of B, if the reference counter of a task reaches 0, the task is ready to be executed.
With TBB we declare a task by declaring a class deriving from tbb::task, this class must implement an execute method. This method is called when the task is fired by a thread.
The return value of this method is of type tbb::task*, if the return value is different from 0, it means we specify to the scheduler what task should be executed next (this is some kind of recycling, but be aware that recycling has a different meaning within the TBB library, for example recycle_as_continuation).
The implementation is now quite straightforward: specify dependencies by spawning child tasks and incrementing reference counters.
In order to simplify the code, the dependencies to the rest of the project were removed. Therefore, instead of executing a query, we sleep a random amount of time.
The Print Class
/*! \class Print ** \brief A tbb::task printing the results of an execution. ** ** A Print task references the Print task corresponding to the next query. */ class Print: public tbb::task { public: Print(unsigned idx) : next_(0), idx_(idx) { this->increment_ref_count(); } /*! ** \brief Set the reference to the next Print task and increment its ** ref count. */ void set_next(Print* next) { next_ = next; next_->increment_ref_count(); } /*! ** \brief Print the results to a query. ** ** \return The Print task corresponding to the next query if it can be ** executed, 0 otherwise. ** ** After executing, a Print task decrements the reference count of the ** next Print task. If this reference count is 0, this query was ** processed and therefore the result can be printed too. */ tbb::task* execute() { std::cout << "print task " << idx_ << std::endl; if (next_ && next_->decrement_ref_count() == 0) return next_; // Spawn next print. return 0; } private: Print* next_; /*!< The Print task corresponding to the next query */ unsigned idx_; }; |
The Execute Class
/*! \class Execute ** \brief A tbb::task processing a query. */ class Execute: public tbb::task { public: /*! ** \brief Constructor ** */ Execute(unsigned idx) : idx_(idx) { } /*! ** \brief Process the current query. ** ** \return 0 ** */ tbb::task* execute() { std::cout << "execute task " << idx_ << std::endl; usleep(rand() % 500000); return 0; } private: unsigned idx_; }; |
The RangeSpawner Class
We need a last class in order to create all the tasks and spawn them. The RangeSpawner task emulates tbb::parallel_do.
/*! \class RangeSpawner ** \brief A tbb::task emulating tbb::parallel_do. ** ** Spawn an Execute and a Print task for each query and properly set the ** dependencies. */ class RangeSpawner: public tbb::task { public: /*! ** \brief Constructor ** */ RangeSpawner(std::vector<int>& queries) : queries_(queries) { } /*! ** \brief Spawn two tasks for each query. ** ** Emulates tbb::parallel_do. ** */ tbb::task* execute() { unsigned size = queries_.size(); // 1 child for the wait and 1 child for each query (Print) this->set_ref_count(size + 1); // Allocate print tasks std::vector<Print*> print_tasks; print_tasks.reserve(size); for (unsigned i = 0; i < size; ++i) { Print* p = new(this->allocate_child()) Print(i); print_tasks.push_back(p); } // Set dependencies between print tasks: query i + 1 must be // printed after query i. for (unsigned i = 0; i < size - 1; ++i) print_tasks[i]->set_next(print_tasks[i + 1]); // Spawn a new Execution task for each query. for (unsigned i = 0; i < size; ++i) { Execute* e = new(print_tasks[i]->allocate_child()) Execute(i); spawn(*e); } // Wait the end of all executions. this->wait_for_all(); return 0; } private: std::vector<int>& queries_;/*!< The list of all queries */ }; |
Here the list of queries is just a list of integers (the ID of each request).
Note that tasks are allocated using placement new, therefore we don't need to delete them.
Another interesting point is that we must also set the reference counter of the RangeSpawner task, otherwise the method would exit before the execution of all queries.
Main function
The main function generates 500 queries and execute them in parallel while printing when possible.
int main() { std::vector<int> queries; unsigned count = 500; for (unsigned i = 0; i < count; ++i) queries.push_back(i); RangeSpawner* root = new(tbb::task::allocate_root()) RangeSpawner(queries); tbb::task::spawn_root_and_wait(*root); } |
If everything is working well, print task i should always be preceded by execute task i.
Criticism
The main problem with this implementation is that it is highly optimistic, we hope at least one thread will start at the beginning of our vector of queries, if this is not the case, we end up storing a lot of results, waiting for previous queries to be executed and printed.
Another possibility would be to set a dependency between Execute tasks too. Task i can only be executed if, for example, task i - 32 has been executed. But it could affect scalability.