本文共 11069 字,大约阅读时间需要 36 分钟。
这是ACE反应堆的动态实现过程,其设计的架构分为以上几个框图,这里我结合源代码分析一下。 1.用户程序:ACE_Reactor::instance ()->register_handler(endpoint, ACE_Event_Handler::READ_MASK) == -1); 2.反应堆(ACE_Reactor):this->implementation ()->register_handler (event_handler, mask); 由于ACE_Reactor_Impl类是抽象接口,我们看它其中一个具体实现子类(ACE_Select_Reactor_T)。 3.ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>:this-> (handler-> (), handler, mask); 这里的get_handle是需要重载用户自己定义派生类中的get_handle实现函数 4.ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>:this->. (handle, event_handler, mask); ACE_Select_Reactor_Handler_Repository是协同合作的类5.ACE_Select_Reactor_Handler_Repository::bind(部分代码如下) ACE_Event_Handler * const current_handler = this->event_handlers_[handle];//通过位图的方式获得注册handle对应的handler if (current_handler)//发现已经处在了handler,表明不能重复注册 { if (current_handler != event_handler) return -1; existing_handle = true; } this->event_handlers_[handle] = event_handler;//这里使handle和handler绑定在一起了。 if (this->max_handlep1_ < handle + 1) this->max_handlep1_ = handle + 1; if (this->select_reactor_.is_suspended_i (handle)) { this->select_reactor_.bit_ops (handle, mask, this->select_reactor_.suspend_set_, ACE_Reactor::ADD_MASK); } else { this->select_reactor_.bit_ops (handle, mask, this->select_reactor_.wait_set_, ACE_Reactor::ADD_MASK);//看如下分析 ACE_Select_Reactor_Impl::bit_ops如果大家熟悉linux的select函数,一看就发现这个函数其实集成了很多文件描述符集的操作函数,如:FD_IISET,FD_SET,FD_ZERO等等。和我们具体相关的代码如下: ACE_FDS_PTMF ptmf = &ACE_Handle_Set::set_bit; if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK) || ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK) || ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK)) { (handle_set.rd_mask_.*ptmf) (handle);//上面的set_bit函数其实是FD_SET和FD_ZERO的封装,功能就是把handle加入rd_mask_描述符集中。 } 当Handler注册完毕,进入Initiation Dispatcher的事件循环。Initiation Dispatcher 把所有Handler中的Handle组合在一起,使用Synchronous Event Demultiplexer 去等待事件的发生。 1.用户程序:ACE_Reactor::instance ()->handle_events (tv);参数1表示用户设置的等待时间。 2.反应堆:ACE_Reactor::handle_events-------this->implementation ()->handle_events (max_wait_time); 由于ACE_Reactor_Impl类是抽象接口,我们看它其中一个具体实现子类(ACE_Select_Reactor_T)。 3. ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handle_events(ACE_Time_Value &max_wait_time) this->handle_events (&max_wait_time);//调用如下代码: ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handle_events (ACE_Time_Value *max_wait_time){ ACE_TRACE ("ACE_Select_Reactor_T::handle_events"); // Stash the current time -- the destructor of this object will // automatically compute how much time elapsed since this method was // called. ACE_Countdown_Time countdown (max_wait_time); #if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1); if (ACE_OS::thr_equal (ACE_Thread::self (), this->owner_) == 0 || this->deactivated_) return -1; // Update the countdown to reflect time waiting for the mutex. countdown.update ();#else if (this->deactivated_) return -1;#endif /* ACE_MT_SAFE */ return this->handle_events_i (max_wait_time);//调用如下代码 }4.分配器初始化(Initiation Dispatcher)联合注册好的Handle然后使用Synchronous Event Demultiplexer 去等待时间发生。 template <class ACE_SELECT_REACTOR_TOKEN> intACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handle_events_i (ACE_Time_Value *max_wait_time){ int result = -1; ACE_SEH_TRY { // We use the data member dispatch_set_ as the current dispatch // set. // We need to start from a clean dispatch_set this->dispatch_set_.rd_mask_.reset ();//dispatch(分配器)的三个描述集初始化 this->dispatch_set_.wr_mask_.reset (); this->dispatch_set_.ex_mask_.reset (); int number_of_active_handles = this->wait_for_multiple_events (this->dispatch_set_, max_wait_time);//进入等待事件发生 result = this->dispatch (number_of_active_handles, this->dispatch_set_);//当事件准备好,通知Initiation Dispatcher } ACE_SEH_EXCEPT (this->release_token ()) { // As it stands now, we catch and then rethrow all Win32 // structured exceptions so that we can make sure to release the // <token_> lock correctly. } return result;}5.终于找到了select函数了,代码在wait_for_multiple_events()函数下,如下: dispatch_set.rd_mask_ = this->wait_set_.rd_mask_; dispatch_set.wr_mask_ = this->wait_set_.wr_mask_; dispatch_set.ex_mask_ = this->wait_set_.ex_mask_; number_of_active_handles = ACE_OS::select (width, dispatch_set.rd_mask_, dispatch_set.wr_mask_, dispatch_set.ex_mask_, this_timeout); 6.当事件源准备好,例如,TCP socket可以被读了。Synchronous Demultiplexer通知Initiation Dispatcher。 睡醒后第一个处理的函数ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::dispatch 代码如下: template <class ACE_SELECT_REACTOR_TOKEN> intACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::dispatch (int active_handle_count, ACE_Select_Reactor_Handle_Set &dispatch_set){ ACE_TRACE ("ACE_Select_Reactor_T::dispatch"); int io_handlers_dispatched = 0; int other_handlers_dispatched = 0; int signal_occurred = 0; // The following do/while loop keeps dispatching as long as there // are still active handles. Note that the only way we should ever // iterate more than once through this loop is if signals occur // while we're dispatching other handlers. do { // We expect that the loop will decrease the number of active // handles in each iteration. If it does not, then something is // inconsistent in the state of the Reactor and we should avoid // the loop. Please read the comments on bug 2540 for more // details. int initial_handle_count = active_handle_count; // Note that we keep track of changes to our state. If any of // the dispatch_*() methods below return -1 it means that the // <wait_set_> state has changed as the result of an // <ACE_Event_Handler> being dispatched. This means that we // need to bail out and rerun the select() loop since our // existing notion of handles in <dispatch_set> may no longer be // correct. // // In the beginning, our state starts out unchanged. After // every iteration (i.e., due to signals), our state starts out // unchanged again. this->state_changed_ = false; // Perform the Template Method for dispatching all the handlers. // First check for interrupts. if (active_handle_count == -1) { // Bail out -- we got here since <select> was interrupted. if (ACE_Sig_Handler::sig_pending () != 0) { ACE_Sig_Handler::sig_pending (0); // If any HANDLES in the <ready_set_> are activated as a // result of signals they should be dispatched since // they may be time critical... active_handle_count = this->any_ready (dispatch_set); // Record the fact that the Reactor has dispatched a // handle_signal() method. We need this to return the // appropriate count below. signal_occurred = 1; } else return -1; } // Handle timers early since they may have higher latency // constraints than I/O handlers. Ideally, the order of // dispatching should be a strategy... else if (this->dispatch_timer_handlers (other_handlers_dispatched) == -1) // State has changed or timer queue has failed, exit loop. break; // Check to see if there are no more I/O handles left to // dispatch AFTER we've handled the timers... else if (active_handle_count == 0) return io_handlers_dispatched + other_handlers_dispatched + signal_occurred; // Next dispatch the notification handlers (if there are any to // dispatch). These are required to handle multi-threads that // are trying to update the <Reactor>. else if (this->dispatch_notification_handlers (dispatch_set, active_handle_count, other_handlers_dispatched) == -1)//貌似每次进去都是打酱油 调试代码如下:『(3078551776) calling ACE_Select_Reactor_Notify::dispatch_notifications in file `Select_Reactor_Base.cpp' on line 734 (3078551776) calling ACE_Pipe::read_handle in file `/ACE_wrappers/ace/Pipe.inl' on line 20 (3078551776) leaving ACE_Pipe::read_handle (3078551776) calling ACE_Handle_Set::is_set in file `/ACE_wrappers/ace/Handle_Set.inl' on line 82 (3078551776) leaving ACE_Handle_Set::is_set//下面没有调用‘handle_input’ (3078551776) leaving ACE_Select_Reactor_Notify::dispatch_notifications』 // State has changed or a serious failure has occured, so exit // loop. break; // Finally, dispatch the I/O handlers. else if (this->dispatch_io_handlers (dispatch_set, active_handle_count, io_handlers_dispatched) == -1)//重点就在这里 调试代码如下:『(3078551776) calling ACE_Select_Reactor_T::dispatch_io_set in file `/ACE_wrappers/ace/Select_Reactor_T.cpp' on line 1186 (3078551776) calling ACE_Handle_Set_Iterator::operator in file `Handle_Set.cpp' on line 280 (3078551776) leaving ACE_Handle_Set_Iterator::operator (3078551776) calling ACE_Select_Reactor_Handler_Repository::find in file `/ACE_wrappers/ace/Select_Reactor_Base.inl' on line 46 (3078551776) calling ACE_Select_Reactor_Handler_Repository::handle_in_range in file `Select_Reactor_Base.cpp' on line 60 (3078551776) leaving ACE_Select_Reactor_Handler_Repository::handle_in_range (3078551776) calling ACE_Select_Reactor_Handler_Repository::find_eh in file `Select_Reactor_Base.cpp' on line 168 (3078551776) leaving ACE_Select_Reactor_Handler_Repository::find_eh (3078551776) leaving ACE_Select_Reactor_Handler_Repository::find (3078551776) calling ACE_Select_Reactor_T::notify_handle in file `/ACE_wrappers/ace/Select_Reactor_T.cpp' on line 797//精华所在,就是在这里唤醒对应的handler (15682|3078551776) activity occurred on handle 3! 』//在用户的handle_input()程序中打印的。 // State has changed, so exit loop. break; // if state changed, we need to re-eval active_handle_count, // so we will not end with an endless loop if (initial_handle_count == active_handle_count || this->state_changed_) { active_handle_count = this->any_ready (dispatch_set); } } while (active_handle_count > 0); return io_handlers_dispatched + other_handlers_dispatched + signal_occurred;} ACE_Select_Reactor_Handler_Repository::find_eh (ACE_HANDLE handle){ ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::find_eh"); map_type::iterator pos (this->event_handlers_.end ()); // this code assumes the handle is in range.#if defined (ACE_WIN32) this->event_handlers_.find (handle, pos);#else map_type::iterator const tmp = &this->event_handlers_[handle];//这个绿色部分在之前也出现过 if (*tmp != 0) pos = tmp;#endif /* ACE_WIN32 */ 既然找到了handler我们就在下面的源代码中实现: template <class ACE_SELECT_REACTOR_TOKEN> voidACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::notify_handle (ACE_HANDLE handle, ACE_Reactor_Mask mask, ACE_Handle_Set &ready_mask, ACE_Event_Handler *event_handler, ACE_EH_PTMF ptmf){ ACE_TRACE ("ACE_Select_Reactor_T::notify_handle"); // Check for removed handlers. if (event_handler == 0) return; bool const reference_counting_required = event_handler->reference_counting_policy ().value () == ACE_Event_Handler::Reference_Counting_Policy::ENABLED; // Call add_reference() if needed. if (reference_counting_required) { event_handler->add_reference (); } int const status = (event_handler->*ptmf) (handle);//ptmf类型是typedef int (ACE_Event_Handler::*ACE_EH_PTMF) (ACE_HANDLE);这样定义的,说明是一个函数指针,就在这里调用上面找到的handler函数。 if (status < 0) this->remove_handler_i (handle, mask); else if (status > 0) ready_mask.set_bit (handle); // Call remove_reference() if needed. if (reference_counting_required) event_handler->remove_reference ();} 对于remove_handler我就不再多说了,自己可以看看源代码分析一下. 我们来看下如何找到hook对应的handler,源代码如下: 转载地址:http://otpbi.baihongyu.com/