GetFEM  5.4.3
getfem_omp.cc
1 /*===========================================================================
2 
3  Copyright (C) 2012-2020 Andriy Andreykiv.
4 
5  This file is a part of GetFEM
6 
7  GetFEM is free software; you can redistribute it and/or modify it
8  under the terms of the GNU Lesser General Public License as published
9  by the Free Software Foundation; either version 3 of the License, or
10  (at your option) any later version along with the GCC Runtime Library
11  Exception either version 3.1 or (at your option) any later version.
12  This program is distributed in the hope that it will be useful, but
13  WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
14  or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15  License and GCC Runtime Library Exception for more details.
16  You should have received a copy of the GNU Lesser General Public License
17  along with this program; if not, write to the Free Software Foundation,
18  Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.
19 
20 ===========================================================================*/
21 
22 #include "getfem/dal_singleton.h"
23 #include "getfem/getfem_locale.h"
24 #include "getfem/getfem_omp.h"
25 
26 #ifdef GETFEM_HAS_OPENMP
27  #include <thread>
28  #include <omp.h>
29 #endif
30 
31 using bgeot::scalar_type;
32 
33 namespace getfem{
34 
35 #ifdef GETFEM_HAS_OPENMP
36 
37  std::recursive_mutex omp_guard::mutex;
38 
39  omp_guard::omp_guard()
40  : plock{me_is_multithreaded_now() ?
41  std::make_unique<std::lock_guard<std::recursive_mutex>>(mutex)
42  : nullptr}
43  {}
44 
45  local_guard::local_guard(std::recursive_mutex& m) :
46  mutex{m},
47  plock{me_is_multithreaded_now() ?
48  std::make_shared<std::lock_guard<std::recursive_mutex>>(m)
49  : nullptr}
50  {}
51 
52  local_guard lock_factory::get_lock() const{
53  return local_guard{mutex};
54  }
55 
56  size_type global_thread_policy::this_thread() {
57  return partition_master::get().get_current_partition();
58  }
59 
60  size_type global_thread_policy::num_threads(){
61  return partition_master::get().get_nb_partitions();
62  }
63 
64  size_type true_thread_policy::this_thread() {
65  return omp_get_thread_num();
66  }
67 
68  size_type true_thread_policy::num_threads(){
69  return omp_get_max_threads();
70  }
71 
72  void set_num_threads(int n){
73  omp_set_num_threads(n);
74  partition_master::get().check_threads();
75  }
76 
78  // serial region
79  if(omp_get_num_threads() == 1 && omp_get_level() == 0) return false;
80  // parallel region with one thread
81  if(omp_get_num_threads() == 1 && omp_get_level() == 1) return true;
82  // parallel region with more than one thread
83  if(omp_in_parallel() == 1) return true;
84 
85  return false;
86  }
87 
88  bool not_multithreaded(){
89  return omp_get_max_threads() == 1;
90  }
91 
93  return std::thread::hardware_concurrency();
94  }
95 
96 #else
97 
98  size_type global_thread_policy::this_thread() {return 0;}
99 
100  size_type global_thread_policy::num_threads(){return 1;}
101 
102  size_type true_thread_policy::this_thread() {return 0;}
103 
104  size_type true_thread_policy::num_threads(){return 1;}
105 
106  bool me_is_multithreaded_now(){return false;}
107 
108  void set_num_threads(int /*n*/){}
109 
110  bool not_multithreaded(){return true;}
111 
113 
114 #endif
115 
116  /** Allows to re-throw exceptions, generated in OpemMP parallel section.
117  Collects exceptions from all threads and on destruction re-throws
118  the first one, so that
119  it can be again caught in the master thread. */
121  std::vector<std::exception_ptr> exceptions;
122 
123  void captureException(){
124  exceptions[true_thread_policy::this_thread()] = std::current_exception();
125  }
126 
127  public:
129  : exceptions(true_thread_policy::num_threads(), nullptr)
130  {}
131 
132  template <typename function, typename... parameters>
133  void run(function f, parameters... params){
134  try {f(params...);} catch (...) {captureException();}
135  }
136 
137  std::vector<std::exception_ptr> caughtExceptions() const{
138  std::vector<std::exception_ptr> non_empty_exceptions;
139  for (auto &&pException : exceptions){
140  if (pException != nullptr) non_empty_exceptions.push_back(pException);
141  }
142  return non_empty_exceptions;
143  }
144 
145  void rethrow() {
146  for (auto &&pException : exceptions){
147  if (pException != nullptr) std::rethrow_exception(pException);
148  }
149  }
150  };
151 
152  partition_iterator::partition_iterator(
153  partition_master &m, std::set<size_type>::const_iterator it_from_set)
154  : master{m}, it{it_from_set}
155  {}
156 
157  partition_iterator partition_iterator::operator++(){
158  ++it;
159  if (*this != master.end()) master.set_current_partition(*it);
160  return *this;
161  }
162 
163  bool partition_iterator::operator==(const partition_iterator &it1) const {
164  return it == it1.it;
165  }
166 
167  bool partition_iterator::operator!=(const partition_iterator &it1) const {
168  return !(*this == it1);
169  }
170 
171  size_type partition_iterator::operator*() const{
172  return *it;
173  }
174 
175  partition_master partition_master::instance;
176 
177  partition_master& partition_master::get(){
178  return instance;
179  }
180 
181  void partition_master::check_threads(){
182  GLOBAL_OMP_GUARD
183  auto must_update = false;
184  if (nb_user_threads != true_thread_policy::num_threads()){
185  nb_user_threads = true_thread_policy::num_threads();
186  must_update = true;
187  }
188  if (nb_partitions < nb_user_threads && !partitions_set_by_user){
189  nb_partitions = nb_user_threads;
190  must_update = true;
191  }
192  if (must_update){
193  update_partitions();
194  dal::singletons_manager::on_partitions_change();
195  }
196  }
197 
199  GMM_ASSERT1 (!partitions_set_by_user,
200  "Number of partitions can be set only once.");
201  if (n > nb_partitions){
202  nb_partitions = n;
203  nb_user_threads = true_thread_policy::num_threads();
204  update_partitions();
205  dal::singletons_manager::on_partitions_change();
206  }
207  else if (n < nb_partitions){
208  GMM_WARNING1("Not reducing number of partitions from "
209  << nb_partitions <<" to " << n <<
210  " as it might invalidate global storage.");
211  }
212  partitions_set_by_user = true;
213  }
214 
216  GMM_ASSERT1(nb_user_threads == true_thread_policy::num_threads(),
217  "The number of omp threads was changed outside partition_master."
218  "Please use getfem::set_num_threads for this.");
219  current_partition = *(std::begin(partitions.thrd_cast()));
220  return partition_iterator{*this, std::begin(partitions.thrd_cast())};
221  }
222 
224  return partition_iterator{*this, std::end(partitions.thrd_cast())};
225  }
226 
227  void partition_master::set_behaviour(thread_behaviour b){
228  if (b != behaviour){
229  GMM_ASSERT1(!me_is_multithreaded_now(),
230  "Cannot change thread policy in parallel section.");
231  behaviour = b;
232  check_threads();
233  }
234  }
235 
236  partition_master::partition_master()
237  : nb_user_threads{1}, nb_partitions{1} {
238  partitions_updated = false;
239  set_num_threads(1);
240  update_partitions();
241  }
242 
244  GMM_ASSERT2(behaviour == thread_behaviour::partition_threads ?
245  true_thread_policy::this_thread() < nb_partitions : true,
246  "Requesting current partition for thread " <<
247  true_thread_policy::this_thread() <<
248  " while number of partitions is " << nb_partitions
249  << ".");
250  return behaviour == thread_behaviour::partition_threads ?
251  current_partition : true_thread_policy::this_thread();
252  }
253 
255  return behaviour == thread_behaviour::partition_threads ?
256  nb_partitions : true_thread_policy::num_threads();
257  }
258 
259  void partition_master::set_current_partition(size_type p){
260  if (behaviour == thread_behaviour::partition_threads){
261  GMM_ASSERT2(partitions.thrd_cast().count(p) != 0, "Internal error: "
262  << p << " is not a valid partitions for thread "
263  << true_thread_policy::this_thread()
264  << ".");
265  current_partition = p;
266  }
267  }
268 
269  void partition_master::rewind_partitions(){
271  current_partition = *(std::begin(partitions.thrd_cast()));
272  }
273  else{
274  for (size_type t = 0; t != partitions.num_threads(); ++t){
275  current_partition(t) = *(std::begin(partitions(t)));
276  }
277  }
278  }
279 
280  void partition_master::update_partitions(){
281  partitions_updated = false;
282 
283  GLOBAL_OMP_GUARD
284 
285  if (partitions_updated) return;
286 
287  partitions = decltype(partitions){};
288  current_partition = decltype(current_partition){};
289 
290  auto n_threads = true_thread_policy::num_threads();
291  if(n_threads > nb_partitions){
292  GMM_WARNING0("Using " << n_threads <<
293  " threads which is above the maximum number of partitions :" <<
294  nb_partitions
295  << ".");
296  }
297  if (behaviour == thread_behaviour::partition_threads){
298  for (size_type t = 0; t != n_threads; ++t){
299  auto partition_size = static_cast<size_type>
300  (std::ceil(static_cast<scalar_type>(nb_partitions) /
301  static_cast<scalar_type >(n_threads)));
302  auto partition_begin = partition_size * t;
303  if (partition_begin >= nb_partitions) break;
304  auto partition_end = std::min(partition_size * (t + 1), nb_partitions);
305  auto hint_it = std::begin(partitions(t));
306  for (size_type i = partition_begin; i != partition_end; ++i){
307  hint_it = partitions(t).insert(hint_it, i);
308  }
309  current_partition(t) = partition_begin;
310  }
311  }
312  else{
313  for (size_type t = 0; t != n_threads; ++t){
314  partitions(t).insert(t);
315  current_partition(t) = t;
316  }
317  }
318 
319  partitions_updated = true;
320  }
321 
322  #if defined _WIN32 && !defined (__GNUC__)
323  #define GETFEM_ON_WIN
324  #endif
325 
326  parallel_boilerplate::
327  parallel_boilerplate()
328  : plocale{std::make_unique<standard_locale>()},
329  pexception{std::make_unique<thread_exception>()} {
330  #ifdef GETFEM_ON_WIN
331  _configthreadlocale(_ENABLE_PER_THREAD_LOCALE);
332  #endif
333  }
334 
335  void parallel_boilerplate::run_lambda(std::function<void(void)> lambda){
336  pexception->run(lambda);
337  }
338 
339  parallel_boilerplate::~parallel_boilerplate(){
340  #ifdef GETFEM_ON_WIN
341  _configthreadlocale(_DISABLE_PER_THREAD_LOCALE);
342  #endif
343  pexception->rethrow();
344  }
345 
346  void parallel_execution(std::function<void(void)> lambda,
347  bool iterate_over_partitions){
348  if (me_is_multithreaded_now()) {
349  lambda();
350  return;
351  }
352  parallel_boilerplate boilerplate;
353  auto &pm = partition_master::get();
354  if (pm.get_nb_partitions() < true_thread_policy::num_threads()){
355  pm.set_nb_partitions(true_thread_policy::num_threads());
356  }
357  #pragma omp parallel default(shared)
358  {
359  if (iterate_over_partitions) {
360  for (auto &&partitions : partition_master::get()) {
361  (void)partitions;
362  boilerplate.run_lambda(lambda);
363  }
364  }
365  else {
366  boilerplate.run_lambda(lambda);
367  }
368  }
369  if (iterate_over_partitions) partition_master::get().rewind_partitions();
370  }
371 
372 
373 #ifdef GETFEM_FORCE_SINGLE_THREAD_BLAS
374 # include <dlfcn.h>
375 # define BLAS_FORCE_SINGLE_THREAD \
376  int openblas_get_num_threads_res = 1; \
377  { \
378  typedef int (* ptrfunc1)(); \
379  ptrfunc1 func1 = ptrfunc1(dlsym(NULL, "openblas_get_num_threads")); \
380  if (func1) openblas_get_num_threads_res = (*func1)(); \
381  typedef void (* ptrfunc2)(int); \
382  ptrfunc2 func2 = ptrfunc2(dlsym(NULL, "openblas_set_num_threads")); \
383  if (func2) (*func2)(1); \
384  }
385 # define BLAS_RESTORE_NUM_THREAD \
386  { \
387  typedef void (* ptrfunc2)(int); \
388  ptrfunc func2 = ptrfunc2(dlsym(NULL, "openblas_set_num_threads")); \
389  if (func2) (*func)(openblas_get_num_threads_res); \
390  }
391 #else
392 # define BLAS_FORCE_SINGLE_THREAD
393 # define BLAS_RESTORE_NUM_THREAD
394 #endif
395 
396 
397  struct dummy_class_for_blas_nbthread_init {
398  dummy_class_for_blas_nbthread_init(void)
399  { BLAS_FORCE_SINGLE_THREAD; }
400  };
401 
402  static dummy_class_for_blas_nbthread_init dcfbnti;
403 
404 
405 } /* end of namespace getfem. */
Iterator that runs over partitions on the current thread and sets the global (but thread-specific) pa...
Definition: getfem_omp.h:356
A singleton that Manages partitions on individual threads.
Definition: getfem_omp.h:382
partition_iterator begin()
beginning of the partitions for the current thread
Definition: getfem_omp.cc:215
size_type get_current_partition() const
active partition on the thread.
Definition: getfem_omp.cc:243
size_type get_nb_partitions() const
number of partitions or threads, depending on thread policy
Definition: getfem_omp.cc:254
partition_iterator end()
end of the partitions for the current thread
Definition: getfem_omp.cc:223
void set_nb_partitions(size_type)
for thread_behaviour::partition_threads set the total number of partitions.
Definition: getfem_omp.cc:198
void set_behaviour(thread_behaviour)
Sets the behaviour for the full program: either partitioning parallel loops according to the number o...
Definition: getfem_omp.cc:227
Allows to re-throw exceptions, generated in OpemMP parallel section.
Definition: getfem_omp.cc:120
A simple singleton implementation.
thread safe standard locale with RAII semantics
Tools for multithreaded, OpenMP and Boost based parallelization.
size_t size_type
used as the common size type in the library
Definition: bgeot_poly.h:49
GEneric Tool for Finite Element Methods.
bool not_multithreaded()
is the program is running on a single thread
Definition: getfem_omp.cc:110
size_type max_concurrency()
Maximum number of threads that can run concurrently.
Definition: getfem_omp.cc:112
bool me_is_multithreaded_now()
is the program running in the parallel section
Definition: getfem_omp.cc:106
void set_num_threads(int n)
set maximum number of OpenMP threads
Definition: getfem_omp.cc:108