티스토리 뷰

반응형

Photo by Christopher Robin Ebbinghaus on Unsplash

 

Websocket 대표적인 사용처는 채팅이 아닐까? 지금까지 살펴본 websocket 동작이 실제 애플리케이션에서 어떻게 구현되고 활용되는지를 gorilla/websocket 패키지에서 제공하는 채팅 예제의 서버쪽 코드로 알아보겠다. websocket connection 이루어지고 나서 message 주고 받는 것은 서버와 클라이언트에 차이가 없기 때문이다. 구글링 해보면 예제에 대한 분석과 설명은 꽤나 많다.

 

GitHub: https://github.com/gorilla/websocket/tree/master/examples/chat 

 

Websocket connection

클라이언트가 서버의 /ws endpoint websocket connection 요청하면 서버와 클라이언트간에 websocket 연결이 이루어진다. 구체적으로 serveWs 핸들러 함수가 연결과 채팅을 위한 초기 준비작업을 해준다. 연결을 통해서 서버와 클라이언트가 메시지를 주고받는 것이 websocket 전부이다.

 

코드를 보면

 

1. upgrader.Upgrade 메서드가 websocket connection 해주고,  관련한 정보를 담고, 이를 처리하는 메서드를 가진 Conn 인스턴스를 생성해준다

2. 그리고 생성한 Conn 인스턴스를 포함한 Client 인스턴스를 생성해준다. 이를 클라이언트와 구분하여 (서버쪽에서 클라이언트와 통신을 전담하는) 클라이언트 에이전트라 부르겠다.

- hub 포인터는 서버가 클라이언트 에이전트들을 관리하고 이를 통해 실제 클라이언트들간에 메시지를 주고 받도록 해주는 인스턴스를 가리킨다. 따로 자세히 다루겠다

- conn 포인터은 방금 연결된 클라이언트와의 websocket connection 정보를 담고 있다

- send 채널은 클라이언트에게 메시지를 전송할때에 클라이언트 에이전트에게 데이터를 보내는 창구이다.

3. hub.register 채널로 방금 생성한 Client 인스턴스(정확히는 포인터) 보낸다

- hub register 채널로 들어오는 Client 인스턴스들을 추가한다

4. 마지막으로 개의 고루틴을 실행시킨다. 독립적인 스레드로 각각 websocket connection 통해 메시지를 읽고 쓰는 것이다.

 

GitHub Code: https://bit.ly/3EvYAWh

// serveWs handles websocket requests from the peer.
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
	conn, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Println(err)
		return
	}
	client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)}
	client.hub.register <- client

	// Allow collection of memory referenced by the caller by doing all work in
	// new goroutines.
	go client.writePump()
	go client.readPump()
}

 

Hub 구조체

위에서 언급된 hub 들여다보자

 

서버 프로그램을 실행하면, Hub 인스턴스가 생성되고, 포인터를 serveWs 핸들러로 전달한다. 이미 위에서 대로 클라이언트와 연결이 되고 나서 클라이언트 에이전트를 생성하면서 정보를 추가하여, 에이전트가 hub 통신할 있도록 알려주는 것이다.

 

GitHub Code: https://bit.ly/3EiC2In

func main() {
	flag.Parse()
	hub := newHub()
	go hub.run()
	http.HandleFunc("/", serveHome)
	http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
		serveWs(hub, w, r)
	})
	err := http.ListenAndServe(*addr, nil)
	if err != nil {
		log.Fatal("ListenAndServe: ", err)
	}
}

Hub 구조체를 들여다보자

 

Hub 4개의 필드를 가지는데

 

1) register 채널로 들어오는 Client (포인터) clients map 추가 등록하고

2) unregister 채널로 들어오는 Client (포인터) clients map에서 제거할 것이다.

3) 그리고 broadcast 채널로 들어오는 []byte clients 등록된 Client들에게 전파할 것이다.

 

newHub() 필드들을 초기화하여 Hub ( 포인터) 리턴해준다

 

GitHub Code: https://bit.ly/3pMN822

// Hub maintains the set of active clients and broadcasts 
// messages to the clients.
type Hub struct {
	// Registered clients.
	clients map[*Client]bool

	// Inbound messages from the clients.
	broadcast chan []byte

	// Register requests from the clients.
	register chan *Client

	// Unregister requests from clients.
	unregister chan *Client
}

func newHub() *Hub {
	return &Hub{
		broadcast:  make(chan []byte),
		register:   make(chan *Client),
		unregister: make(chan *Client),
		clients:    make(map[*Client]bool),
	}
}

Hub run 메서드

 

서버 프로그램을 실행하면 Hub 인스턴스를 만들고 run 메서드를 고루틴으로 실행한 것을 기억할 것이다.

run 메서드는 내부의 무한루프로 개의 채널로 들어오는 값들을 처리한다. 위에서 언급하였던 대로

 

1. register, unregister 채널로 들어오는 Client 인스턴스의 포인터 정보를 clients map 추가 또는 삭제하고

2. broadcase 채널로 들어오는 []byte 메시지는 clients 순회하면서, (나름 중요) 각각의 클라이언트 에이전트의 send 채널에 넣어준다. 이때 client.send 밀어넣는게 실패한다는 것은 해당 클라이언트 에이전트의 오동작을 의미하기에 unregister 같은 동작을 하도록 default 처리를 해두었다.

 

GitHub Code: https://bit.ly/3pJukkl

func (h *Hub) run() {
	for {
		select {
		case client := <-h.register:
			h.clients[client] = true
		case client := <-h.unregister:
			if _, ok := h.clients[client]; ok {
				delete(h.clients, client)
				close(client.send)
			}
		case message := <-h.broadcast:
			for client := range h.clients {
				select {
				case client.send <- message:
				default:
					close(client.send)
					delete(h.clients, client)
				}
			}
		}
	}
}

 

클라이언트 에이전트의 readPump

실제로 통신하는 부분을 보자. 먼저 클라이언트가 websocket connection으로 보내는 메시지는 클라이언트 에이전트의 readPump 메서드가 받아서 처리하게 된다.

 

1. 메시지의 크기에 대한 설정과

2. Websocket 문서에 언급되었던, 연결을 체크하는 ping, pong 관련한 설정값, 핸들러를 정의해둔다

3. 그리고 무한루프를 돌며 해당 클라이언트와 서버간의 websocket connection으로 들어오는 메시지를 ReadMessage 메서드로 받아서 hub broadcast 채널로 보낸다. 위에서 보았던 대로 hub 이를 다른 클라이언트 에이전트들에게 에이전트의 send 채널을 통해 전달한다.

 

GitHub Code: https://bit.ly/3EiJu66

// readPump pumps messages from the websocket connection to the hub.
//
// The application runs readPump in a per-connection goroutine. The application
// ensures that there is at most one reader on a connection by executing all
// reads from this goroutine.
func (c *Client) readPump() {
	defer func() {
		c.hub.unregister <- c
		c.conn.Close()
	}()
	c.conn.SetReadLimit(maxMessageSize)
	c.conn.SetReadDeadline(time.Now().Add(pongWait))
	c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
	for {
		_, message, err := c.conn.ReadMessage()
		if err != nil {
			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
				log.Printf("error: %v", err)
			}
			break
		}
		message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
		c.hub.broadcast <- message
	}
}

 

클라이언트 에이전트의 writePump

클라이언트 에이전트는 무한루프를 돌며 채널을 지켜보며 들어오는 값을 처리한다.

 

1. ticker.C pingPeriod 간격으로 발생하며 pingMessage 보낸다. 위에서 언급한 websocket 문서의 ping, pong 대한 처리이다

2. send 들어오는 []byte 메시지는 WriteMessage 메서드를 이용해 클라이언트에게 보낸다.

 

// Add queued chat messages to the current websocket message. 코멘트 아래의 for loop 다음에 챙겨보겠다.

퍼포먼스 향상을 위한 고민이 담긴 부분인데 이게 오히려 문제가 있다.

 

GitHub Code: https://bit.ly/3xO2reM

// writePump pumps messages from the hub to the websocket connection.
//
// A goroutine running writePump is started for each connection. The
// application ensures that there is at most one writer to a connection by
// executing all writes from this goroutine.
func (c *Client) writePump() {
	ticker := time.NewTicker(pingPeriod)
	defer func() {
		ticker.Stop()
		c.conn.Close()
	}()
	for {
		select {
		case message, ok := <-c.send:
			c.conn.SetWriteDeadline(time.Now().Add(writeWait))
			if !ok {
				// The hub closed the channel.
				c.conn.WriteMessage(websocket.CloseMessage, []byte{})
				return
			}

			w, err := c.conn.NextWriter(websocket.TextMessage)
			if err != nil {
				return
			}
			w.Write(message)

			// Add queued chat messages to the current websocket message.
			n := len(c.send)
			for i := 0; i < n; i++ {
				w.Write(newline)
				w.Write(<-c.send)
			}

			if err := w.Close(); err != nil {
				return
			}
		case <-ticker.C:
			c.conn.SetWriteDeadline(time.Now().Add(writeWait))
			if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
				return
			}
		}
	}
}

 

반응형
댓글
댓글쓰기 폼