libassa 3.5.1
Loading...
Searching...
No Matches
Public Member Functions | Private Types | Private Member Functions | Private Attributes | List of all members
ASSA::Reactor Class Reference

#include <Reactor.h>

Public Member Functions

 Reactor ()
 Constructor.
 
 ~Reactor ()
 Destructor.
 
TimerId registerTimerHandler (EventHandler *eh_, const TimeVal &tv_, const std::string &name_="<unknown>")
 Register Timer Event handler with Reactor.
 
bool registerIOHandler (EventHandler *eh_, handler_t fd_, EventType et_=RWE_EVENTS)
 Register I/O Event handler with Reactor.
 
bool removeHandler (EventHandler *eh_, EventType et_=ALL_EVENTS)
 Remove Event handler from reactor for either all I/O events or timeout event or both.
 
bool removeTimerHandler (TimerId id_)
 Remove Timer event from the queue.
 
bool removeIOHandler (handler_t fd_)
 Remove IO Event handler from reactor.
 
void waitForEvents (void)
 Main waiting loop that blocks indefinitely processing events.
 
void waitForEvents (TimeVal *tv_)
 Wait for events for time specified.
 
void stopReactor (void)
 Stop Reactor's activity.
 
void deactivate (void)
 Deactivate Reactor.
 

Private Types

typedef std::map< u_int, EventHandler * > Fd2Eh_Map_Type
 no cloning
 
typedef Fd2Eh_Map_Type::iterator Fd2Eh_Map_Iter
 

Private Member Functions

 Reactor (const Reactor &)
 
Reactoroperator= (const Reactor &)
 no cloning
 
void adjust_maxfdp1 (handler_t fd_)
 Adjust maxfdp1 in a portable way (win32 ignores maxfd alltogether).
 
bool handleError (void)
 Handle error in select(2) loop appropriately.
 
bool dispatch (int minimum_)
 Notify all EventHandlers registered on respecful events occured.
 
int isAnyReady (void)
 Return number of file descriptors ready accross all sets.
 
bool checkFDs (void)
 Check mask for bad file descriptors.
 
void dispatchHandler (FdSet &mask_, Fd2Eh_Map_Type &fdSet_, EH_IO_Callback callback_)
 Call handler's callback and, if callback returns negative value, remove it from the Reactor.
 
void calculateTimeout (TimeVal *&howlong_, TimeVal *maxwait_)
 Calculate closest timeout.
 

Private Attributes

int m_fd_setsize
 Max number of open files per process.
 
handler_t m_maxfd_plus1
 Max file descriptor number (in all sets) plus 1.
 
bool m_active
 Flag that indicates whether Reactor is active or had been stopped.
 
Fd2Eh_Map_Type m_readSet
 Event handlers awaiting on READ_EVENT.
 
Fd2Eh_Map_Type m_writeSet
 Event handlers awaiting on WRITE_EVENT.
 
Fd2Eh_Map_Type m_exceptSet
 Event handlers awaiting on EXCEPT_EVENT.
 
MaskSet m_waitSet
 Handlers to wait for event on.
 
MaskSet m_readySet
 Handlers that are ready for processing.
 
TimerQueue m_tqueue
 The queue of Timers.
 

Detailed Description

Definition at line 57 of file Reactor.h.

Member Typedef Documentation

◆ Fd2Eh_Map_Iter

typedef Fd2Eh_Map_Type::iterator ASSA::Reactor::Fd2Eh_Map_Iter
private

Definition at line 155 of file Reactor.h.

◆ Fd2Eh_Map_Type

no cloning

Definition at line 154 of file Reactor.h.

Constructor & Destructor Documentation

◆ Reactor() [1/2]

Reactor::Reactor ( )

Constructor.

Maximum number of sockets supported (per process) Win32 defines it to 64 in winsock2.h.

Initialize winsock2 library

Definition at line 23 of file Reactor.cpp.

24 :
25 m_fd_setsize (1024),
26 m_maxfd_plus1 (0),
27 m_active (true)
28{
29 trace_with_mask("Reactor::Reactor",REACTTRACE);
30
34#if defined(WIN32)
36
37#else // POSIX
38 struct rlimit rlim;
39 rlim.rlim_max = 0;
40
41 if ( getrlimit (RLIMIT_NOFILE, &rlim) == 0 ) {
42 m_fd_setsize = rlim.rlim_cur;
43 }
44#endif
45
48#if defined (WIN32)
50 WSAStartup (MAKEWORD (2, 2), &data);
51#endif
52}
#define trace_with_mask(s, m)
trace_with_mask() is used to trace function call chain in C++ program.
Definition Logger.h:437
A wrapper class to provide AutoPtr with reference semantics.
Definition AutoPtr.h:32
handler_t m_maxfd_plus1
Max file descriptor number (in all sets) plus 1.
Definition Reactor.h:206
int m_fd_setsize
Max number of open files per process.
Definition Reactor.h:200
bool m_active
Flag that indicates whether Reactor is active or had been stopped.
Definition Reactor.h:209
@ REACTTRACE
Extended Reactor/PrioriyQueue messages
Definition LogMask.h:40

References m_fd_setsize, ASSA::REACTTRACE, and trace_with_mask.

◆ ~Reactor()

Reactor::~Reactor ( )

Destructor.

Definition at line 54 of file Reactor.cpp.

56{
57 trace_with_mask("Reactor::~Reactor",REACTTRACE);
58
59 m_readSet.clear ();
60 m_writeSet.clear ();
61 m_exceptSet.clear ();
62 deactivate ();
63}
Fd2Eh_Map_Type m_writeSet
Event handlers awaiting on WRITE_EVENT.
Definition Reactor.h:215
void deactivate(void)
Deactivate Reactor.
Definition Reactor.h:234
Fd2Eh_Map_Type m_readSet
Event handlers awaiting on READ_EVENT.
Definition Reactor.h:212
Fd2Eh_Map_Type m_exceptSet
Event handlers awaiting on EXCEPT_EVENT.
Definition Reactor.h:218

References deactivate(), m_exceptSet, m_readSet, m_writeSet, ASSA::REACTTRACE, and trace_with_mask.

◆ Reactor() [2/2]

ASSA::Reactor::Reactor ( const Reactor )
private

Member Function Documentation

◆ adjust_maxfdp1()

void Reactor::adjust_maxfdp1 ( handler_t  fd_)
private

Adjust maxfdp1 in a portable way (win32 ignores maxfd alltogether).

If the socket descriptor that has just been eliminated was the maxfd+1, we readjust to the next highest.

Win32 implementation of select() ignores this value altogether.

Definition at line 700 of file Reactor.cpp.

702{
703#if !defined (WIN32) /* POSIX */
704
705 trace_with_mask("Reactor::adjust_maxfdp1", REACTTRACE);
706
707 if (m_maxfd_plus1 == fd_ + 1)
708 {
710 DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1));
711 }
712#endif
713}
#define DL(X)
A macro for writing debug message to the Logger.
Definition Logger.h:273
int max_fd()
Return maximum value of the file descriptor in the Set.
Definition MaskSet.h:71
MaskSet m_waitSet
Handlers to wait for event on.
Definition Reactor.h:221
@ REACT
Class Reactor/PrioriyQueue messages
Definition LogMask.h:39

References DL, m_maxfd_plus1, m_waitSet, ASSA::MaskSet::max_fd(), ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.

Referenced by removeHandler(), and removeIOHandler().

◆ calculateTimeout()

void Reactor::calculateTimeout ( TimeVal *&  howlong_,
TimeVal maxwait_ 
)
private

Calculate closest timeout.

If TimerQueue is not empty, then return smallest of maxtimeout and first in the queue. Otherwise, return maxtimeout.

Parameters
maxwait_(in) how long we are expected to wait for event(s).
howlong_(out) how long we are going to wait.

Definition at line 420 of file Reactor.cpp.

422{
423 trace_with_mask("Reactor::calculateTimeout",REACTTRACE);
424
425 TimeVal now;
426 TimeVal tv;
427
428 if (m_tqueue.isEmpty () ) {
430 goto done;
431 }
433 tv = m_tqueue.top ();
434
435 if (tv < now) {
436 /*---
437 It took too long to get here (fraction of a millisecond),
438 and top timer had already expired. In this case,
439 perform non-blocking select in order to drain the timer queue.
440 ---*/
441 *howlong_ = 0;
442 }
443 else {
444 DL((REACT,"--------- Timer Queue ----------\n"));
445 m_tqueue.dump();
446 DL((REACT,"--------------------------------\n"));
447
448 if (maxwait_ == NULL || *maxwait_ == TimeVal::zeroTime ()) {
449 *howlong_ = tv - now;
450 }
451 else {
452 *howlong_ = (*maxwait_+now) < tv ? *maxwait_ : tv-now;
453 }
454 }
455
456 done:
457 if (howlong_ != NULL) {
458 DL((REACT,"delay (%f)\n", double (*howlong_) ));
459 }
460 else {
461 DL((REACT,"delay (forever)\n"));
462 }
463}
TimerQueue m_tqueue
The queue of Timers.
Definition Reactor.h:227
static TimeVal zeroTime()
Static that returns zero timeval: {0,0}.
Definition TimeVal.h:157
static TimeVal gettimeofday()
Shields off underlying OS differences in getting current time.
Definition TimeVal.cpp:44
void dump(void)
Dump Queue information to the log file.
bool isEmpty()
Is queue empty?
Definition TimerQueue.h:110
TimeVal & top(void)
Return expiration time of the top element in the queue.
Definition TimerQueue.h:117

References DL, ASSA::TimerQueue::dump(), ASSA::TimeVal::gettimeofday(), ASSA::TimerQueue::isEmpty(), m_tqueue, ASSA::REACT, ASSA::REACTTRACE, ASSA::TimerQueue::top(), trace_with_mask, and ASSA::TimeVal::zeroTime().

Referenced by waitForEvents().

◆ checkFDs()

bool Reactor::checkFDs ( void  )
private

Check mask for bad file descriptors.

Returns
true if any fd(s) were found and removed; false otherwise

Definition at line 316 of file Reactor.cpp.

318{
319 trace_with_mask("Reactor::checkFDs",REACTTRACE);
320
321 bool num_removed = false;
322 FdSet mask;
323 timeval poll = { 0, 0 };
324
325 for (handler_t fd = 0; fd < m_fd_setsize; fd++) {
326 if ( m_readSet[fd] != NULL ) {
327 mask.setFd (fd);
328 if ( ::select (fd+1, &mask, NULL, NULL, &poll) < 0 ) {
329 removeIOHandler (fd);
330 num_removed = true;
331 DL((REACT,"Detected BAD FD: %d\n", fd ));
332 }
333 mask.clear (fd);
334 }
335 }
336 return (num_removed);
337}
Class FdSet.
Definition FdSet.h:52
bool setFd(handler_t fd_)
Set flag (ON) for the argument fd.
Definition FdSet.cpp:20
bool clear(handler_t fd_)
Clear flag (OFF) for the argument fd.
Definition FdSet.cpp:39
bool removeIOHandler(handler_t fd_)
Remove IO Event handler from reactor.
Definition Reactor.cpp:247

References ASSA::FdSet::clear(), DL, m_fd_setsize, m_readSet, ASSA::REACT, ASSA::REACTTRACE, removeIOHandler(), ASSA::FdSet::setFd(), and trace_with_mask.

Referenced by handleError().

◆ deactivate()

void ASSA::Reactor::deactivate ( void  )
inline

Deactivate Reactor.

This function sets internal flag which notifies Reactor's internal event handling loop to abort its activity. It is mostly used when a slow system call is interrupted by the signal handler. The system call will be restarted by OS after control returns from the signal handler. Signal handler (GenServer::handle_signal()) should call this method to delay Reactor's deactivation.

Definition at line 234 of file Reactor.h.

234{ m_active = false; }

References m_active.

Referenced by ASSA::GenServer::handle_signal(), ASSA::GenServer::stop_service(), and ~Reactor().

◆ dispatch()

bool Reactor::dispatch ( int  minimum_)
private

Notify all EventHandlers registered on respecful events occured.

Many UNIX systems will count a particular file descriptor in the ready_ only ONCE, even if it was flagged by select(2) in, say, both read and write masks.

Parameters
minimum_number of file descriptors ready.

Definition at line 625 of file Reactor.cpp.

627{
628 trace_with_mask("Reactor::dispatch", REACTTRACE);
629
631
632 if ( ready_ < 0 )
633 {
634#if !defined (WIN32)
635 EL((ASSAERR,"::select(3) error\n"));
636#endif
637 return (false);
638 }
639 if ( ready_ == 0 ) {
640 return (true);
641 }
642
643 DL((REACT,"Dispatching %d FDs.\n",ready_));
644 DL((REACT,"m_readySet:\n"));
645 m_readySet.dump ();
646
647 /*--- Writes first ---*/
651
652 /*--- Exceptions next ---*/
656
657 /*--- Finally, the Reads ---*/
659 m_readSet,
661
662 return (true);
663}
#define EL(X)
A macro for writing error message to the Logger.
Definition Logger.h:285
virtual int handle_write(int fd)
Write handler callback.
virtual int handle_except(int fd)
Exception handler callback.
virtual int handle_read(int fd)
Read event callback.
FdSet m_rset
Read fds set.
Definition MaskSet.h:28
FdSet m_eset
Exception fds set.
Definition MaskSet.h:34
void dump()
Write current state of MaskSet object to log file.
Definition MaskSet.h:80
FdSet m_wset
Write fds set.
Definition MaskSet.h:31
MaskSet m_readySet
Handlers that are ready for processing.
Definition Reactor.h:224
void dispatchHandler(FdSet &mask_, Fd2Eh_Map_Type &fdSet_, EH_IO_Callback callback_)
Call handler's callback and, if callback returns negative value, remove it from the Reactor.
Definition Reactor.cpp:568
int expire(const TimeVal &tv_)
Traverse the queue, triggering all timers that are past argument timeval.
@ ASSAERR
ASSA and system errors
Definition LogMask.h:34

References ASSA::ASSAERR, dispatchHandler(), DL, ASSA::MaskSet::dump(), EL, ASSA::TimerQueue::expire(), ASSA::TimeVal::gettimeofday(), ASSA::EventHandler::handle_except(), ASSA::EventHandler::handle_read(), ASSA::EventHandler::handle_write(), ASSA::MaskSet::m_eset, m_exceptSet, m_readSet, m_readySet, ASSA::MaskSet::m_rset, m_tqueue, m_writeSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.

Referenced by waitForEvents().

◆ dispatchHandler()

void Reactor::dispatchHandler ( FdSet mask_,
Fd2Eh_Map_Type fdSet_,
EH_IO_Callback  callback_ 
)
private

Call handler's callback and, if callback returns negative value, remove it from the Reactor.

This spot needs re-thinking.

When you have several high data-rate connections sending data at the same time, the one that had connected first would get lower FD number and would get data transfer preference over everybody else who has connected later on.

WIN32 HACK: Without having restarted scan from the beginning, this causes crash due to the fact that firing a callback of EventHandler might have invalidated the iterator (happens with Connector's in a sync mode).

Definition at line 567 of file Reactor.cpp.

569{
570 trace_with_mask("Reactor::dispatchHandler",REACTTRACE);
571
572 int ret = 0;
573 handler_t fd;
575 std::string eh_id;
576
577 Fd2Eh_Map_Iter iter = fdSet_.begin ();
578
579 while (iter != fdSet_.end ())
580 {
581 fd = (*iter).first;
582 ehp = (*iter).second;
583
584 if (mask_.isSet (fd) && ehp != NULL)
585 {
586 eh_id = ehp->get_id ();
587 DL((REACT,"Data detected from \"%s\"(fd=%d)\n",
588 eh_id.c_str (), fd));
589
590 ret = (ehp->*callback_) (fd); /* Fire up a callback */
591
592 if (ret == -1) {
593 removeIOHandler (fd);
594 }
595 else if (ret > 0) {
596 DL((REACT,"%d bytes pending on fd=%d \"%s\"\n",
597 ret, fd, eh_id.c_str ()));
598 //return; <-- would starve other connections
599 }
600 else {
601 DL((REACT,"All data from \"%s\"(fd=%d) are consumed\n",
602 eh_id.c_str (), fd));
603 mask_.clear (fd);
604 }
611 iter = fdSet_.begin ();
612 }
613 else {
614 iter++;
615 }
616 }
617}
EventHandler class.
Fd2Eh_Map_Type::iterator Fd2Eh_Map_Iter
Definition Reactor.h:155

References DL, ASSA::REACT, ASSA::REACTTRACE, removeIOHandler(), and trace_with_mask.

Referenced by dispatch().

◆ handleError()

bool Reactor::handleError ( void  )
private

Handle error in select(2) loop appropriately.

If commanded to stop, do so

Definition at line 340 of file Reactor.cpp.

342{
343 trace_with_mask("Reactor::handleError",REACTTRACE);
344
347 if ( !m_active ) {
348 DL((REACT,"Received cmd to stop Reactor\n"));
349 return (false);
350 }
351
352 /*---
353 TODO: If select(2) returns before time expires, with
354 a descriptor ready or with EINTR, timeval is not
355 going to be updated with number of seconds remaining.
356 This is true for all systems except Linux, which will
357 do so. Therefore, to restart correctly in case of
358 EINTR, we ought to take time measurement before and
359 after select, and try to select() for remaining time.
360
361 For now, we restart with the initial timing value.
362 ---*/
363 /*---
364 BSD kernel never restarts select(2). SVR4 will restart if
365 the SA_RESTART flag is specified when the signal handler
366 for the signal delivered is installed. This means taht for
367 portability, we must handle signal interrupts.
368 ---*/
369
370 if ( errno == EINTR ) {
371 EL((REACT,"EINTR: interrupted select(2)\n"));
372 /*
373 If I was sitting in select(2) and received SIGTERM,
374 the signal handler would have set m_active to 'false',
375 and this function would have returned 'false' as above.
376 For any other non-critical signals (USR1,...),
377 we retry select.
378 */
379 return (true);
380 }
381 /*
382 EBADF - bad file number. One of the file descriptors does
383 not reference an open file to open(), close(), ioctl().
384 This can happen if user closed fd and forgot to remove
385 handler from Reactor.
386 */
387 if ( errno == EBADF ) {
388 DL((REACT,"EBADF: bad file descriptor\n"));
389 return (checkFDs ());
390 }
391 /*
392 Any other error from select
393 */
394#if defined (WIN32)
395 DL ((REACT,"select(3) error = %d\n", WSAGetLastError()));
396#else
397 EL((ASSAERR,"select(3) error\n"));
398#endif
399 return (false);
400}
bool checkFDs(void)
Check mask for bad file descriptors.
Definition Reactor.cpp:317

References ASSA::ASSAERR, checkFDs(), DL, EL, m_active, ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.

Referenced by waitForEvents().

◆ isAnyReady()

int Reactor::isAnyReady ( void  )
private

Return number of file descriptors ready accross all sets.

Definition at line 403 of file Reactor.cpp.

405{
406 trace_with_mask("Reactor::isAnyReady",REACTTRACE);
407
408 int n = m_readySet.m_rset.numSet () +
411
412 if ( n > 0 ) {
413 DL((REACT,"m_readySet: %d FDs are ready for processing\n", n));
414 m_readySet.dump ();
415 }
416 return (n);
417}
int numSet()
Determine how many bits are set (ON) in the set.
Definition FdSet.h:126

References DL, ASSA::MaskSet::dump(), ASSA::MaskSet::m_eset, m_readySet, ASSA::MaskSet::m_rset, ASSA::MaskSet::m_wset, ASSA::FdSet::numSet(), ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.

Referenced by waitForEvents().

◆ operator=()

Reactor & ASSA::Reactor::operator= ( const Reactor )
private

no cloning

◆ registerIOHandler()

bool Reactor::registerIOHandler ( EventHandler eh_,
handler_t  fd_,
EventType  et_ = RWE_EVENTS 
)

Register I/O Event handler with Reactor.

Reactor will dispatch appropriate callback when event of EventType is received.

Parameters
eh_Pointer to the EventHandler
fd_File descriptor
et_Event Type
Returns
true if success, false if error

Definition at line 92 of file Reactor.cpp.

94{
95 trace_with_mask("Reactor::registerHandler(I/O)",REACTTRACE);
96
97 std::ostringstream msg;
99
100 if (isReadEvent (et_))
101 {
102 if (!m_waitSet.m_rset.setFd (fd_))
103 {
104 DL((ASSAERR,"readset: fd %d out of range\n", fd_));
105 return (false);
106 }
107 m_readSet[fd_] = eh_;
108 msg << "READ_EVENT";
109 }
110
111 if (isWriteEvent (et_))
112 {
113 if (!m_waitSet.m_wset.setFd (fd_))
114 {
115 DL((ASSAERR,"writeset: fd %d out of range\n", fd_));
116 return (false);
117 }
118 m_writeSet[fd_] = eh_;
119 msg << " WRITE_EVENT";
120 }
121
122 if (isExceptEvent (et_))
123 {
124 if (!m_waitSet.m_eset.setFd (fd_))
125 {
126 DL((ASSAERR,"exceptset: fd %d out of range\n", fd_));
127 return (false);
128 }
130 msg << " EXCEPT_EVENT";
131 }
132 msg << std::ends;
133
134 DL((REACT,"Registered EvtH(%s) fd=%d (0x%x) for event(s) %s\n",
135 eh_->get_id ().c_str (), fd_, (u_long)eh_, msg.str ().c_str () ));
136
137#if !defined (WIN32)
138 if (m_maxfd_plus1 < fd_+1) {
139 m_maxfd_plus1 = fd_+1;
140 DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1));
141 }
142#endif
143
144 DL((REACT,"Modified waitSet:\n"));
145 m_waitSet.dump ();
146
147 return (true);
148}
#define Assure_return(exp_)
Test condition and return bool from a function if assertion fails.
Definition Assure.h:64
bool isReadEvent(EventType e_)
bool isExceptEvent(EventType e_)
bool isSignalEvent(EventType e_)
bool isTimeoutEvent(EventType e_)
bool isWriteEvent(EventType e_)

References ASSA::ASSAERR, Assure_return, DL, ASSA::MaskSet::dump(), ASSA::isExceptEvent(), ASSA::isReadEvent(), ASSA::isSignalEvent(), ASSA::isTimeoutEvent(), ASSA::isWriteEvent(), ASSA::MaskSet::m_eset, m_exceptSet, m_maxfd_plus1, m_readSet, ASSA::MaskSet::m_rset, m_waitSet, m_writeSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, ASSA::FdSet::setFd(), and trace_with_mask.

Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync(), and ASSA::RemoteLogger::log_open().

◆ registerTimerHandler()

TimerId Reactor::registerTimerHandler ( EventHandler eh_,
const TimeVal tv_,
const std::string &  name_ = "<unknown>" 
)

Register Timer Event handler with Reactor.

Reactor will dispatch appropriate callback when event of EventType is received.

Parameters
eh_Pointer to the EventHandler
tv_Timeout value
name_Name of the timer
Returns
Timer ID that can be used to cancel timer and find out its name.

Definition at line 66 of file Reactor.cpp.

70{
71 trace_with_mask( "Reactor::registerTimerHandler",REACTTRACE);
73
76
77 DL((REACT,"TIMEOUT_EVENT......: (%d,%d)\n",
78 timeout_.sec(),timeout_.msec()));
79 DL((REACT,"Time now...........: %s\n", now.fmtString().c_str() ));
80 DL((REACT,"Scheduled to expire: %s\n", t.fmtString().c_str() ));
81
83
84 DL((REACT,"---Modified Timer Queue----\n"));
85 m_tqueue.dump();
86 DL((REACT,"---------------------------\n"));
87
88 return (tid);
89}
TimerId insert(EventHandler *eh_, const TimeVal &tv_, const TimeVal &delta_, const std::string &name_)
Add timer (EventHandler object) to the queue to be dispatch at the time specified.
unsigned long TimerId
Timer Id is used in handle_timeout() calls.

References Assure_return, DL, ASSA::TimerQueue::dump(), ASSA::TimeVal::gettimeofday(), ASSA::TimerQueue::insert(), m_tqueue, ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.

◆ removeHandler()

bool Reactor::removeHandler ( EventHandler eh_,
EventType  et_ = ALL_EVENTS 
)

Remove Event handler from reactor for either all I/O events or timeout event or both.

Remove handler from all events that matches event_.

If et_ is TIMEOUT_EVENT, all timers associated with Event Handler eh_ will be removed.

Parameters
eh_Pointer to the EventHandler
et_Event Type to remove. Default will remove Event Handler for all events.
Returns
true if success, false if wasn't registered for any events.

Definition at line 172 of file Reactor.cpp.

174{
175 trace_with_mask("Reactor::removeHandler(eh_,et_)",REACTTRACE);
176
177 bool ret = false;
178 handler_t fd;
180
181 if (eh_ == NULL) {
182 return false;
183 }
184
185 if (isTimeoutEvent (event_)) {
187 ret = true;
188 }
189
190 if (isReadEvent (event_)) {
191 iter = m_readSet.begin ();
192 while (iter != m_readSet.end ()) {
193 if ((*iter).second == eh_) {
194 fd = (*iter).first;
195 m_readSet.erase (iter);
197 ret = true;
198 break;
199 }
200 iter++;
201 }
202 }
203
204 if (isWriteEvent (event_)) {
205 iter = m_writeSet.begin ();
206 while (iter != m_writeSet.end ()) {
207 if ((*iter).second == eh_) {
208 fd = (*iter).first;
209 m_writeSet.erase (iter);
211 ret = true;
212 break;
213 }
214 iter++;
215 }
216 }
217
218 if (isExceptEvent (event_)) {
219 iter = m_exceptSet.begin ();
220 while (iter != m_exceptSet.end ()) {
221 if ((*iter).second == eh_) {
222 fd = (*iter).first;
223 m_exceptSet.erase (iter);
225 ret = true;
226 break;
227 }
228 iter++;
229 }
230 }
231
232 if (ret == true) {
233 DL((REACT,"Found EvtH \"%s\"(%p)\n", eh_->get_id ().c_str (), eh_));
234 eh_->handle_close (fd);
235 }
236
237 adjust_maxfdp1 (fd);
238
239 DL((REACT,"Modifies waitSet:\n"));
240 m_waitSet.dump ();
241
242 return (ret);
243}
void adjust_maxfdp1(handler_t fd_)
Adjust maxfdp1 in a portable way (win32 ignores maxfd alltogether).
Definition Reactor.cpp:701
int remove(EventHandler *eh_)
Cancel all timers for the EventHandler eh_.

References adjust_maxfdp1(), ASSA::FdSet::clear(), DL, ASSA::MaskSet::dump(), ASSA::isExceptEvent(), ASSA::isReadEvent(), ASSA::isTimeoutEvent(), ASSA::isWriteEvent(), ASSA::MaskSet::m_eset, m_exceptSet, m_readSet, ASSA::MaskSet::m_rset, m_tqueue, m_waitSet, m_writeSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, ASSA::TimerQueue::remove(), and trace_with_mask.

Referenced by ASSA::RemoteLogger::log_close(), and stopReactor().

◆ removeIOHandler()

bool Reactor::removeIOHandler ( handler_t  fd_)

Remove IO Event handler from reactor.

This will remove handler from receiving all I/O events.

Parameters
fd_File descriptor
Returns
true on success, false if fd_ is out of range

We clear m_readySet mask here as well, because if we don't, it will be erroneously used by isAnyReady() before select().

Definition at line 246 of file Reactor.cpp.

248{
249 trace_with_mask("Reactor::removeIOHandler",REACTTRACE);
250
251 bool ret = false;
254
256
257 DL((REACT,"Removing handler for fd=%d\n",fd_));
258
263 if ((iter = m_readSet.find (fd_)) != m_readSet.end ())
264 {
265 ehp = (*iter).second;
266 m_readSet.erase (iter);
269 if (m_readSet.size () > 0) {
270 iter = m_readSet.end ();
271 iter--;
272 }
273 ret = true;
274 }
275
276 if ((iter = m_writeSet.find (fd_)) != m_writeSet.end ())
277 {
278 ehp = (*iter).second;
279 m_writeSet.erase (iter);
282 if (m_writeSet.size () > 0) {
283 iter = m_writeSet.end ();
284 iter--;
285 }
286 ret = true;
287 }
288
289 if ((iter = m_exceptSet.find (fd_)) != m_exceptSet.end ())
290 {
291 ehp = (*iter).second;
292 m_exceptSet.erase (iter);
295 if (m_exceptSet.size () > 0) {
296 iter = m_exceptSet.end ();
297 iter--;
298 }
299 ret = true;
300 }
301
302 if (ret == true && ehp != NULL) {
303 DL((REACT,"Removed EvtH \"%s\"(%p)\n", ehp->get_id ().c_str (), ehp));
304 ehp->handle_close (fd_);
305 }
306
308
309 DL((REACT,"Modifies waitSet:\n"));
310 m_waitSet.dump ();
311
312 return (ret);
313}
bool is_valid_handler(handler_t socket_)
Detect socket() error in a portable way.

References adjust_maxfdp1(), Assure_return, ASSA::FdSet::clear(), DL, ASSA::MaskSet::dump(), ASSA::is_valid_handler(), ASSA::MaskSet::m_eset, m_exceptSet, m_readSet, m_readySet, ASSA::MaskSet::m_rset, m_waitSet, m_writeSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.

Referenced by checkFDs(), and dispatchHandler().

◆ removeTimerHandler()

bool Reactor::removeTimerHandler ( TimerId  id_)

Remove Timer event from the queue.

This removes particular event.

Parameters
id_Timer Id returned by registerTimer.
Returns
true if timer found and removed; false otherwise

Definition at line 151 of file Reactor.cpp.

153{
154 trace_with_mask("Reactor::removeTimer",REACTTRACE);
155 bool ret;
156
157 if ((ret = m_tqueue.remove (tid_))) {
158 DL((REACT,"---Modified Timer Queue----\n"));
159 m_tqueue.dump();
160 DL((REACT,"---------------------------\n"));
161 }
162 else {
163 EL((ASSAERR,"Timer tid 0x%x wasn't found!\n", (u_long)tid_ ));
164 }
165 return (ret);
166}

References ASSA::ASSAERR, DL, ASSA::TimerQueue::dump(), EL, m_tqueue, ASSA::REACT, ASSA::REACTTRACE, ASSA::TimerQueue::remove(), and trace_with_mask.

◆ stopReactor()

void Reactor::stopReactor ( void  )

Stop Reactor's activity.

This effectively removes all handlers from under Reactor's supervision. As of now, there is no way to re-activate the Reactor. This method is typically called from method other then EventHandler::signal_handler(). EventHandler::handle_read () is a good candidate. Calling it from EventHandler::handle_close () will most likely cause an infinite loop of recursive calls.

Definition at line 666 of file Reactor.cpp.

668{
669 trace_with_mask("Reactor::stopReactor", REACTTRACE);
670
671 m_active = false;
672
675
676 while (m_readSet.size () > 0) {
677 iter = m_readSet.begin ();
678 ehp = (*iter).second;
680 }
681
682 while (m_writeSet.size () > 0) {
683 iter = m_writeSet.begin ();
684 ehp = (*iter).second;
686 }
687
688 while (m_exceptSet.size () > 0) {
689 iter = m_exceptSet.begin ();
690 ehp = (*iter).second;
692 }
693}
bool removeHandler(EventHandler *eh_, EventType et_=ALL_EVENTS)
Remove Event handler from reactor for either all I/O events or timeout event or both.
Definition Reactor.cpp:173

References m_active, m_exceptSet, m_readSet, m_writeSet, ASSA::REACTTRACE, removeHandler(), and trace_with_mask.

◆ waitForEvents() [1/2]

void Reactor::waitForEvents ( TimeVal tv_)

Wait for events for time specified.

Passing NULL replicates behavior of waitForEvents(void). Passing tv_ {0, 0} will cause non-blocking polling for all events. This method blocks up to tv_ time interval processing event. If an event occurs, it will process event(s) and return. tv_ time is adjusted by substracting time spent in event processing.

Parameters
tv_[RW] is time to wait for.

| select() | errno | Events | Behavior | |===================================================================| | < 0 | EINTR | Interrup by signal | Retry | +-------—+----—+------------------—+-----------------------—+ | < 0 | EBADF | Bad file descriptor | Remove bad fds and retry | | | | | and retry | +-------—+----—+------------------—+-----------------------—+ | < 0 | others| Some other error | Fall through | +-------—+----—+------------------—+-----------------------—+ | == 0 | 0 | Timed out | Fall through | +-------—+----—+------------------—+-----------------------—+ | > 0 | 0 | Got some work to do | Fall through | +----------------------------------------------------------------—+

Definition at line 494 of file Reactor.cpp.

496{
497 trace_with_mask("Reactor::waitForEvents",REACTTRACE);
498
500 DL((REACT,"======================================\n"));
501
502 /*--- Expire all stale Timers ---*/
504
505 /* Test to see if Reactor has been deactivated as a result
506 * of processing done by any TimerHandlers.
507 */
508 if (!m_active) {
509 return;
510 }
511
512 int nReady;
514 TimeVal* dlp = &delay;
515
516 /*---
517 In case if not all data have been processed by the EventHandler,
518 and EventHandler stated so in its callback's return value
519 to dispatcher (), it will be called again. This way
520 underlying file/socket stream can efficiently utilize its
521 buffering mechaninsm.
522 ---*/
523 if ((nReady = isAnyReady ())) {
524 DL((REACT,"isAnyReady returned: %d\n",nReady));
526 return;
527 }
528
529 DL((REACT,"=== m_waitSet ===\n"));
530 m_waitSet.dump ();
531
532 do {
533 m_readySet.reset ();
534 DL ((REACT,"m_readySet after reset():\n"));
535 m_readySet.dump ();
536
538 DL ((REACT,"m_readySet after assign:\n"));
539 m_readySet.dump ();
540
542
547 dlp);
548 DL((REACT,"::select() returned: %d\n",nReady));
549
550 m_readySet.sync ();
551 DL ((REACT,"m_readySet after select:\n"));
552 m_readySet.dump ();
553
554 }
555 while (nReady < 0 && handleError ());
556
558}
void sync()
Resync internals after select() call.
Definition MaskSet.h:52
void reset()
Clear all bits in all sets.
Definition MaskSet.h:62
void calculateTimeout(TimeVal *&howlong_, TimeVal *maxwait_)
Calculate closest timeout.
Definition Reactor.cpp:421
bool handleError(void)
Handle error in select(2) loop appropriately.
Definition Reactor.cpp:341
bool dispatch(int minimum_)
Notify all EventHandlers registered on respecful events occured.
Definition Reactor.cpp:626
int isAnyReady(void)
Return number of file descriptors ready accross all sets.
Definition Reactor.cpp:404

References calculateTimeout(), dispatch(), DL, ASSA::MaskSet::dump(), ASSA::TimerQueue::expire(), ASSA::TimeVal::gettimeofday(), handleError(), isAnyReady(), m_active, ASSA::MaskSet::m_eset, m_maxfd_plus1, m_readySet, ASSA::MaskSet::m_rset, m_tqueue, m_waitSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, ASSA::MaskSet::reset(), ASSA::MaskSet::sync(), and trace_with_mask.

◆ waitForEvents() [2/2]

void Reactor::waitForEvents ( void  )

Main waiting loop that blocks indefinitely processing events.

Block forever version.

Definition at line 469 of file Reactor.cpp.

471{
472 while ( m_active ) {
474 }
475}
void waitForEvents(void)
Main waiting loop that blocks indefinitely processing events.
Definition Reactor.cpp:470

References m_active, and waitForEvents().

Referenced by waitForEvents().

Member Data Documentation

◆ m_active

bool ASSA::Reactor::m_active
private

Flag that indicates whether Reactor is active or had been stopped.

Definition at line 209 of file Reactor.h.

Referenced by deactivate(), handleError(), stopReactor(), waitForEvents(), and waitForEvents().

◆ m_exceptSet

Fd2Eh_Map_Type ASSA::Reactor::m_exceptSet
private

Event handlers awaiting on EXCEPT_EVENT.

Definition at line 218 of file Reactor.h.

Referenced by dispatch(), registerIOHandler(), removeHandler(), removeIOHandler(), stopReactor(), and ~Reactor().

◆ m_fd_setsize

int ASSA::Reactor::m_fd_setsize
private

Max number of open files per process.

This is the soft limit enforced by the kernel. It can be obtained/manipulated from the shell with ulimit/limit utilities, but may not exceed the hard limit.

Definition at line 200 of file Reactor.h.

Referenced by checkFDs(), and Reactor().

◆ m_maxfd_plus1

handler_t ASSA::Reactor::m_maxfd_plus1
private

Max file descriptor number (in all sets) plus 1.

This value is ignored by WIN32 implementation of select()

Definition at line 206 of file Reactor.h.

Referenced by adjust_maxfdp1(), registerIOHandler(), and waitForEvents().

◆ m_readSet

Fd2Eh_Map_Type ASSA::Reactor::m_readSet
private

Event handlers awaiting on READ_EVENT.

Definition at line 212 of file Reactor.h.

Referenced by checkFDs(), dispatch(), registerIOHandler(), removeHandler(), removeIOHandler(), stopReactor(), and ~Reactor().

◆ m_readySet

MaskSet ASSA::Reactor::m_readySet
private

Handlers that are ready for processing.

Definition at line 224 of file Reactor.h.

Referenced by dispatch(), isAnyReady(), removeIOHandler(), and waitForEvents().

◆ m_tqueue

TimerQueue ASSA::Reactor::m_tqueue
private

The queue of Timers.

Definition at line 227 of file Reactor.h.

Referenced by calculateTimeout(), dispatch(), registerTimerHandler(), removeHandler(), removeTimerHandler(), and waitForEvents().

◆ m_waitSet

MaskSet ASSA::Reactor::m_waitSet
private

Handlers to wait for event on.

Definition at line 221 of file Reactor.h.

Referenced by adjust_maxfdp1(), registerIOHandler(), removeHandler(), removeIOHandler(), and waitForEvents().

◆ m_writeSet

Fd2Eh_Map_Type ASSA::Reactor::m_writeSet
private

Event handlers awaiting on WRITE_EVENT.

Definition at line 215 of file Reactor.h.

Referenced by dispatch(), registerIOHandler(), removeHandler(), removeIOHandler(), stopReactor(), and ~Reactor().


The documentation for this class was generated from the following files: