프로그래밍/C/C++2010. 3. 23. 16:16
1. Reactor 패턴
다중 이벤트를 처리하기 위한 전통적인 방법중 하나로 새로운 프로세스 또는 쓰레드를 생성하여 각각의 이벤트를 처리하는 방식이 있습니다. 이는 서버가 동시에 여러 네트워크 연결을 처리해야 할 경우에 즐겨 사용되는 방법입니다. 이러한 모델이 대부분의 상황에서 잘 동작하기는 하지만, 프로세스 또는 쓰레드의 생성 및 관리와 복잡성 증가라는 단점이 있습니다.

select(), poll(), WaitForMultipleObjects()와 같은 디멀티플렉서(demultiplexer)에 기초한 Reactor 모델은 단지 하나의 프로세스 또느 쓰레드에서 여러 개의 이벤트를  처리할 수 있도록 합니다.

어떻게 이와 같은 처리가 가능할까요?

Reactor패턴은 클라이언트가 접속하면 새로운 핸들을 생성하여 관리합니다. Reactor패턴은 다수의 핸들을 일괄적으로 관리하고 운영하는 과정에서 일차적으로 멀티플렉싱을 수행하고, 주기적으로 디멀티플렉싱을 통해 새롭게 발생한 이벤트가 존재하는지 확인합니다. 만약 새로운 이벤트가 존재한다면 하나씩 가져와 Dispatch를 진행합니다.


위 그림은 Reactor패턴의 구조를 보여주고 있습니다.

Reactor는 Event Handler를 실제로 구현한 구현 클래스(Concrete Event Handler)들을 등록, 해제, 호출을 위한 인터페이스를 가지고 있습니다. Event Handler는 Reactor 패턴이 필요로 하는 추상 인터페이스들을 가지고 있어 특정 어플리케이션에 비의존적인 매커니즘을 제공합니다. 또한 Event Handler는 input, output, timer events 등 여러가지 조건에 대해 반응할 수 있도록 인터페이스를 제공합니다.

이러한 Reactor와 Event Handler의 관계는 개발자의 비지니스 로직 부분을 이벤트 핸들링과 분리를 함으로써 어떠한 Event-driven 상황이 필요한 곳이라도  Reactor패턴을 적용시킬수 있게 합니다.

다음은 이러한 Reactor패턴을 이용한 echo server의 예제입니다.


2. echo server 예제
#include <ace/OS.h>
#include <ace/Log_Msg.h>
#include <ace/Message_Block.h>
#include <ace/INET_Addr.h>
#include <ace/Svc_Handler.h>
#include <ace/SOCK_Acceptor.h>
#include <ace/SOCK_Stream.h>
#include <ace/Synch_Traits.h>
#include <ace/Reactor.h>
#include <ace/Acceptor.h>
#include <ace/Reactor_Notification_Strategy.h>

class Stream_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> {
private :
	typedef ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> super;
	
	ACE_INET_Addr remote_addr_;
	ACE_Reactor_Notification_Strategy noti_;

public :
	Stream_Handler()
		: noti_(0, this, ACE_Event_Handler::WRITE_MASK)
	{ /* empty */ }

	//override
	virtual int open(void * = 0)
	{
		ACE_TRACE("Stream_Handler::open");
		if( super::open() == -1 )
			return -1;
		noti_.reactor(this->reactor());
		this->msg_queue()->notification_strategy(&noti_);
		if( this->peer().get_remote_addr(remote_addr_) == 0 )
		{
			ACE_DEBUG((LM_INFO, "[DEBUG%T](%N:%l) ### New client accepted: %s:%u\n", 
				remote_addr_.get_host_addr(), remote_addr_.get_port_number()));
		}
		return 0;
	}

	//override
	virtual int handle_input(ACE_HANDLE handle = ACE_INVALID_HANDLE)
	{
		ACE_TRACE("Stream_Handler::override");
		char buf[1024];
		ssize_t recv_cnt;
		if( (recv_cnt = this->peer().recv(buf, 1024)) <= 0 )
			return -1;
		ACE_Message_Block *mb;
		ACE_NEW_RETURN(mb, ACE_Message_Block(buf, recv_cnt), -1);
		mb->wr_ptr(recv_cnt);
		this->putq(mb);
		return 0;
	}

	//override
	virtual int handle_output(ACE_HANDLE handle = ACE_INVALID_HANDLE)
	{
		ACE_TRACE("Stream_Handler::handle_output");
		ACE_Message_Block *mb;
		ACE_Time_Value nowait(ACE_OS::gettimeofday());
		while( this->getq(mb, &nowait) != -1 )
		{
			ssize_t send_cnt = this->peer().send(mb->rd_ptr(), mb->length());
			if( send_cnt == -1 )
				ACE_ERROR((LM_ERROR, "[ERROR%T](%N:%l) ### %p\n", 
                                "Stream_Handler::handle_output"));
			else
				mb->rd_ptr(send_cnt);
			if( mb->length() > 0 )
			{
				this->ungetq(mb);
				break;
			}
			mb->release();
		}
		if( this->msg_queue()->is_empty() )
			this->reactor()->cancel_wakeup(this, ACE_Event_Handler::WRITE_MASK);
		else
			this->reactor()->schedule_wakeup(this, ACE_Event_Handler::WRITE_MASK);
		return 0;
	}
	
	//override
	virtual int handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
	{
		ACE_TRACE("Stream_Handler::handle_close");
		ACE_DEBUG((LM_INFO, "[DEBUG%T](%N:%l) ### Connection close %s:%u\n", 
				remote_addr_.get_host_addr(), remote_addr_.get_port_number()));
		return super::handle_close(handle, close_mask);
	}
	
};


int ACE_TMAIN(int argc, ACE_TCHAR *argv[])
{
	ACE_INET_Addr listen;
	listen.set(9088);
	ACE_Acceptor<Stream_Handler, ACE_SOCK_ACCEPTOR> acceptor;
	acceptor.open(listen);
	ACE_Reactor::instance()->run_reactor_event_loop();
	ACE_RETURN(0);
}
Posted by devop
TAG , ,

댓글을 달아 주세요