1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT // file at the top-level directory of this distribution and at // http://rust-lang.org/COPYRIGHT. // // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your // option. This file may not be copied, modified, or distributed // except according to those terms. use sync::{Mutex, Condvar}; /// A barrier enables multiple threads to synchronize the beginning /// of some computation. /// /// ``` /// use std::sync::{Arc, Barrier}; /// use std::thread; /// /// let barrier = Arc::new(Barrier::new(10)); /// for _ in 0..10 { /// let c = barrier.clone(); /// // The same messages will be printed together. /// // You will NOT see any interleaving. /// thread::spawn(move|| { /// println!("before wait"); /// c.wait(); /// println!("after wait"); /// }); /// } /// ``` #[stable(feature = "rust1", since = "1.0.0")] pub struct Barrier { lock: Mutex<BarrierState>, cvar: Condvar, num_threads: usize, } // The inner state of a double barrier struct BarrierState { count: usize, generation_id: usize, } /// A result returned from wait. /// /// Currently this opaque structure only has one method, `.is_leader()`. Only /// one thread will receive a result that will return `true` from this function. pub struct BarrierWaitResult(bool); impl Barrier { /// Creates a new barrier that can block a given number of threads. /// /// A barrier will block `n`-1 threads which call `wait` and then wake up /// all threads at once when the `n`th thread calls `wait`. #[stable(feature = "rust1", since = "1.0.0")] pub fn new(n: usize) -> Barrier { Barrier { lock: Mutex::new(BarrierState { count: 0, generation_id: 0, }), cvar: Condvar::new(), num_threads: n, } } /// Blocks the current thread until all threads has rendezvoused here. /// /// Barriers are re-usable after all threads have rendezvoused once, and can /// be used continuously. /// /// A single (arbitrary) thread will receive a `BarrierWaitResult` that /// returns `true` from `is_leader` when returning from this function, and /// all other threads will receive a result that will return `false` from /// `is_leader` #[stable(feature = "rust1", since = "1.0.0")] pub fn wait(&self) -> BarrierWaitResult { let mut lock = self.lock.lock().unwrap(); let local_gen = lock.generation_id; lock.count += 1; if lock.count < self.num_threads { // We need a while loop to guard against spurious wakeups. // http://en.wikipedia.org/wiki/Spurious_wakeup while local_gen == lock.generation_id && lock.count < self.num_threads { lock = self.cvar.wait(lock).unwrap(); } BarrierWaitResult(false) } else { lock.count = 0; lock.generation_id += 1; self.cvar.notify_all(); BarrierWaitResult(true) } } } impl BarrierWaitResult { /// Returns whether this thread from `wait` is the "leader thread". /// /// Only one thread will have `true` returned from their result, all other /// threads will have `false` returned. #[stable(feature = "rust1", since = "1.0.0")] pub fn is_leader(&self) -> bool { self.0 } } #[cfg(test)] mod tests { use prelude::v1::*; use sync::{Arc, Barrier}; use sync::mpsc::{channel, TryRecvError}; use thread; #[test] fn test_barrier() { const N: usize = 10; let barrier = Arc::new(Barrier::new(N)); let (tx, rx) = channel(); for _ in 0..N - 1 { let c = barrier.clone(); let tx = tx.clone(); thread::spawn(move|| { tx.send(c.wait().is_leader()).unwrap(); }); } // At this point, all spawned threads should be blocked, // so we shouldn't get anything from the port assert!(match rx.try_recv() { Err(TryRecvError::Empty) => true, _ => false, }); let mut leader_found = barrier.wait().is_leader(); // Now, the barrier is cleared and we should get data. for _ in 0..N - 1 { if rx.recv().unwrap() { assert!(!leader_found); leader_found = true; } } assert!(leader_found); } }