Module ThreadProxy
[hide private]
[frames] | no frames]

Source Code for Module ThreadProxy

  1  ''' 
  2  Defines a threaded proxy for L{Devices.Audio} devices. 
  3   
  4  @author: Brett Clippingdale 
  5  @author: Peter Parente 
  6  @organization: IBM Corporation 
  7  @copyright: Copyright (c) 2005, 2007 IBM Corporation 
  8  @license: The BSD License 
  9   
 10  @author: Ramona Bunk 
 11  @organization: IT Science Center Ruegen gGmbH, Germany 
 12  @copyright: Copyright (c) 2007, 2008 ITSC Ruegen 
 13  @license: The BSD License 
 14   
 15  All rights reserved. This program and the accompanying materials are made 
 16  available under the terms of the BSD license which accompanies 
 17  this distribution, and is available at 
 18  U{http://www.opensource.org/licenses/bsd-license.php} 
 19  ''' 
 20   
 21  import threading, Queue 
 22  from AccessEngine import AEOutput 
 23  from AccessEngine import AEConstants 
 24   
25 -class AudioThreadProxy(threading.Thread, AEOutput.AEOutput):
26 ''' 27 Buffers calls to methods on a L{Devices.Audio} device in a secondary thread 28 and executes them some time later when that thread runs. 29 30 @ivar device: Device on which to invoke send* methods 31 @type device: L{Devices.Audio} 32 @ivar want_stop: Flag indicating a stop is requested 33 @type want_stop: boolean 34 @ivar just_stopped: Flag indicating that the last written command was a stop. 35 Used to avoid unnecessary stops 36 @type just_stopped: boolean 37 @ivar lock: Semaphore used to ensure no commands are buffered while the 38 buffer is being reset after sending a stop 39 @type lock: threading.Sempahore 40 @ivar init_event: Event used to block non-threaded calls to the device until 41 the device has been initialized in the context of the running thread. 42 @type init_event: threading.Event 43 @ivar data_buffer: Buffer of commands to be sent to the output device 44 @type data_buffer: Queue.Queue 45 @ivar alive: Is the thread running or not? 46 @type alive: boolean 47 '''
48 - def __init__(self, device):
49 ''' 50 Initializes the parent class and stores the device reference. Creates a 51 queue that will buffer commands to the speech device. Creates flags used 52 for indicating whether a stop is requested or has been requested recently. 53 Creates a semaphore used to ensure that no commands can be added to the 54 buffer while it is being reset by a stop command. 55 56 @param device: The device reference to use for writing commands 57 @type device: L{Devices.Audio} 58 ''' 59 threading.Thread.__init__(self) 60 AEOutput.AEOutput.__init__(self) 61 self.device = device 62 self.want_stop = False 63 self.just_stopped = False 64 self.lock = threading.Semaphore() 65 self.init_event = threading.Event() 66 self.data_buffer = Queue.Queue() 67 self.alive = False
68
69 - def init(self):
70 ''' 71 Called after the instance is created to start the device running. The 72 device's init method is called in the context of the running thread's 73 L{run} method before the thread enters its loop. 74 ''' 75 self.alive = True 76 self.start()
77
78 - def getCapabilities(self):
79 ''' 80 Gets the capabilities of the proxied device. This method is called in the 81 context of the caller, not the running thread. 82 83 @return: List of capability names 84 @rtype: list of string 85 ''' 86 return self.device.getCapabilities()
87
88 - def loadStyles(self):
89 ''' 90 Called after the L{init} method by the L{AccessEngine.AEDeviceManager} to 91 ensure that the device is functioning before time is spent unserializing its 92 style data. 93 94 Calls the init method on the default style object and provides it with a 95 reference to this initialized device. Then tries to load the persisted 96 setting values from disk. If that fails, the L{AccessEngine.AEDeviceManager} 97 will try to call L{createDistinctStyles} instead. 98 99 This method is called in the context of the caller, not the running thread. 100 It blocks until the L{init_event} is set indicating the device has been 101 initialized in the second thread. 102 103 @raise KeyError: When styles have not previously been persisted for this 104 device 105 @raise OSError: When the profile file cannot be opened or read 106 ''' 107 self.init_event.wait() 108 self.device.loadStyles()
109
110 - def saveStyles(self):
111 ''' 112 Persists styles to disk. Called after the L{close} method by the 113 L{AccessEngine.AEDeviceManager} to ensure the device is properly shutdown 114 before serializing its data. This method is called in the context of the 115 caller, not the running thread. 116 117 @raise KeyError: When styles have not previously been persisted for this 118 device 119 @raise OSError: When the profile file cannot be opened or read 120 ''' 121 self.device.saveStyles()
122
123 - def setStyle(self, key, style):
124 ''' 125 Stores the style object under the given key. The style object should be one 126 previously generated by this device (e.g. using L{createDistinctStyles}) 127 but it is not an enforced requirement. Always makes the style clean before 128 storing it. This method is called in the context of the caller, not the 129 running thread. 130 131 @param key: Any immutable object 132 @type key: immutable 133 @param style: L{AEOutput} subclass of L{AEState} 134 @type style: L{AEState} 135 ''' 136 self.device.setStyle(key, style)
137
138 - def getStyle(self, key):
139 ''' 140 Gets the style object stored under the given key. If the key is unknown, 141 returns an empty flyweight backed by the default style and stores the new 142 style. This method is called in the context of the caller, not 143 the running thread. 144 145 @param key: Any immutable object 146 @type key: immutable 147 @return: L{AEOutput} subclass of L{AEState} 148 @rtype: L{AEState} 149 ''' 150 return self.device.getStyle(key)
151
152 - def createDistinctStyles(self, num_groups, num_layers):
153 ''' 154 Creates up to the given number of styles for this device. This method is 155 called in the context of the caller, not the running thread. It blocks 156 until the L{init_event} is set indicating the device has been initialized 157 in the second thread. 158 159 @param num_groups: Number of sematic groups the requestor would like to 160 represent using distinct styles 161 @type num_groups: integer 162 @param num_layers: Number of content origins (e.g. output originating from 163 a background task versus the focus) the requestor would like to represent 164 using distinct styles 165 @type num_layers: integer 166 @return: New styles 167 @rtype: list of L{AEOutput.Style} 168 ''' 169 self.init_event.wait() 170 return self.device.createDistinctStyles(num_groups, num_layers)
171
172 - def getDefaultStyle(self):
173 ''' 174 Creates up to the given number of styles for this device. This method is 175 called in the context of the caller, not the running thread. It blocks 176 until the L{init_event} is set indicating the device has been initialized 177 in the second thread. 178 179 @return: Default style 180 @rtype: L{AEOutput.Style} 181 ''' 182 self.init_event.wait() 183 return self.device.getDefaultStyle()
184
185 - def close(self):
186 ''' 187 Stops the running thread. Puts a null callable in the queue to wake the 188 thread if it is sleeping. 189 ''' 190 self.alive = False 191 self._put(lambda : None)
192
193 - def getProxy(self):
194 ''' 195 Returns this object as the proxy for itself because a thread proxying for 196 another thread proxying for a device is not supported. 197 198 @return: self 199 @rtype: L{AudioThreadProxy} 200 ''' 201 return self
202
203 - def getName(self):
204 ''' 205 Gives the user displayable (localized) name for this output device. 206 Relevant version and device status should be included. This method is 207 called in the context of the caller, not the running thread. 208 209 @return: The localized name for the device 210 @rtype: string 211 ''' 212 return self.device.getName()
213
214 - def send(self, name, value, style=None):
215 ''' 216 Buffers methods to call for known commands in the context of the thread. 217 218 @param name: Descriptor of the data value sent 219 @type name: object 220 @param value: Content value 221 @type value: object 222 @param style: Style with which this value should be output 223 @type style: L{AEOutput.Style} 224 @return: Return value specific to the given command 225 @rtype: object 226 @raise NotImplementedError: When not overridden in a subclass 227 ''' 228 if name == AEConstants.CMD_STOP: 229 self._put(self.device.sendStop, style) 230 elif name == AEConstants.CMD_TALK: 231 self._put(self.device.sendTalk, style) 232 elif name == AEConstants.CMD_STRING: 233 self._put(self.device.sendString, value, style) 234 elif name == AEConstants.CMD_STRING_SYNC: 235 self.device.sendStringSync(value, style) 236 elif name == AEConstants.CMD_FILENAME: 237 try: 238 # may not be implemented 239 self._put(self.device.sendFilename, value, style) 240 except NotImplementedError: 241 pass 242 elif name == AEConstants.CMD_INDEX: 243 try: 244 # may not be implemented 245 return self._put(self.device.sendIndex, style) 246 except NotImplementedError: 247 pass
248
249 - def isActive(self):
250 ''' 251 Indicates whether the device is active (giving output) or not. 252 253 @return: True when content is buffered or the device is outputing 254 @rtype: boolean 255 @raise NotImplementedError: When not overriden in a subclass 256 ''' 257 return (self.data_buffer.qsize() != 0 or self.device.isActive())
258
259 - def parseString(self, text, style, por, sem):
260 ''' 261 Parses the string using the implementation provided by the proxied device. 262 This method is called in the context of the caller, not the running thread. 263 264 @param text: Text to be parsed 265 @type text: string 266 @param style: Style object defining how the text should be parsed 267 @type style: L{AEOutput.Style} 268 @param por: Point of regard for the first character in the text, or None if 269 the text is not associated with a POR 270 @type por: L{AEPor} 271 @param sem: Semantic tag for the text 272 @type sem: integer 273 @return: Parsed words 274 @rtype: 3-tuple of lists of string, L{AEPor}, L{AEOutput.Style} 275 ''' 276 return self.device.parseString(text, style, por, sem)
277
278 - def _put(self, mtd, *args):
279 ''' 280 Buffers any non-stop command in the queue and returns immediately. Sets the 281 L{want_stop} flag for a stop command if neither L{want_stop} nor 282 L{just_stopped} is set. Blocks on entry if the thread is busy clearing out 283 the buffer in response to a previous stop command. Leaves the lock set if 284 a new stop command is buffered. It will be unset when the command is 285 processed. 286 287 @param mtd: Device method to call at a later time 288 @type mtd: callable 289 @param args: Additional positional arguments to be passed to the device 290 when the method is invoked 291 @type args: list 292 ''' 293 self.lock.acquire() 294 if (mtd is self.device.sendStop): 295 if not self.want_stop and not self.just_stopped: 296 # only queue the stop and set the flag if it's not already set 297 self.want_stop = True 298 # add the stop command to the buffer in case the thread is sleeping 299 self.data_buffer.put_nowait((mtd, args)) 300 # IMPORTANT: do not release the semaphore, the thread will do it when 301 # it processes the stop command; it must be held so that further 302 # invocations of this method block until the buffer has been emptied 303 return 304 else: 305 self.data_buffer.put_nowait((mtd, args)) 306 # as soon as something is buffered, stop is no longer last. 307 self.just_stopped = False 308 self.lock.release()
309
310 - def run(self):
311 ''' 312 Runs the thread until alive is not longer True. Sleeps until methods and 313 arguments to be applied are put in the L{data_buffer}. Wakes up and 314 invokes all buffered methods and arguments. Initializes the device before 315 entering the loop and closes it after leaving the loop. 316 ''' 317 self.device.init() 318 self.init_event.set() 319 while self.alive: 320 # Queue.get() blocks for data 321 mtd, args = self.data_buffer.get() 322 323 # if want to stop, it doesn't matter what the buffer element was 324 if self.want_stop: 325 # reset the buffer immediately 326 self.data_buffer = Queue.Queue() 327 # send the device specific stop command 328 self.device.sendStop() 329 # reset the stop flag 330 self.want_stop = False 331 # indicate we've just stopped 332 self.just_stopped = True 333 # IMPORTANT: release the semaphore so the put method can continue 334 self.lock.release() 335 else: 336 # handle the command 337 try: 338 mtd(*args) 339 except NotImplementedError: 340 pass 341 self.device.close()
342