// connection from every client and server // 控制 frpc 和 frps 间的连接,frps 响应 frpc 的请求 funccontrolWorker(c *conn.Conn) { // the first message is from client to server // 第一条信息肯定是由 client 发起的 // ... clientCtlReq := &msg.ClientCtlReq{} clientCtlRes := &msg.ClientCtlRes{} if err := json.Unmarshal([]byte(res), &clientCtlReq); err != nil { log.Warn("Parse err: %v : %s", err, res) return } // check // 检查对应的 ProxyServer, 先让 ProxyServer 处理请求 succ, info, needRes := checkProxy(clientCtlReq, c) if !succ { clientCtlRes.Code = 1 clientCtlRes.Msg = info }
// workconn 就直接返回,ctlConn 则继续执行 if needRes {...} else { // work conn, just return return } // other messages is from server to client // 其余信息是 server to client s, ok := server.ProxyServers[clientCtlReq.ProxyName] if !ok { log.Warn("ProxyName [%s] is not exist", clientCtlReq.ProxyName) return } // read control msg from client // 接收和发送心跳包 go readControlMsgFromClient(s, c)
// 接收 user 连接并向 client 请求 serverCtlReq := &msg.ClientCtlReq{} serverCtlReq.Type = consts.WorkConn for { closeFlag := s.WaitUserConn() // ... buf, _ := json.Marshal(serverCtlReq) err = c.Write(string(buf) + "\n") // ... log.Debug("ProxyName [%s], write to client to add work conn success", s.Name) } log.Info("ProxyName [%s], I'm dead!", s.Name) return }
checkProxy 首先需要通过代理服务器名和密码进行身份验证,然后处理两种请求类型,也就是说有两种类型的连接。CtlConn 用于启动代理,之后循环等待 user 连接;WorkConn 用于和 user 交互,会向 ProxyServer.cliConnChan 写入这个连接。(个人感觉判断请求类型并处理的逻辑应该放在 controlWorker 中比较好,也就不需要 needRes 来判断连接类型)
// start listening for user conns func(p *ProxyServer) Start() (err error) { // 监听与 user 连接的端口 p.Init() p.listener, err = conn.Listen(p.BindAddr, p.ListenPort) // ... p.Status = consts.Working // 启动一个协程接收 user 的连接 // start a goroutine for listener to accept user connection gofunc() { for { // block // if listener is closed, err returned c, err := p.listener.GetConn() // ... // insert into list p.Lock() // ... p.userConnList.PushBack(c) p.Unlock() // put msg to control conn p.ctlMsgChan <- 1 // set timeout 判断连接超时 time.AfterFunc(time.Duration(UserConnTimeout)*time.Second, func() { p.Lock() defer p.Unlock() element := p.userConnList.Front() if element == nil { return } userConn := element.Value.(*conn.Conn) if userConn == c { log.Warn("ProxyName [%s], user conn [%s] timeout", p.Name, c.GetRemoteAddr()) } }) } }() // start another goroutine for join two conns from client and user // 启动另一个协程连接 client 和 user 的两个连接 gofunc() { for { cliConn, ok := <-p.cliConnChan if !ok { return } p.Lock() element := p.userConnList.Front() var userConn *conn.Conn if element != nil { userConn = element.Value.(*conn.Conn) p.userConnList.Remove(element) } else { cliConn.Close() p.Unlock() continue } p.Unlock() // msg will transfer to another without modifying // l means local, r means remote log.Debug("Join two conns, (l[%s] r[%s]) (l[%s] r[%s])", cliConn.GetLocalAddr(), cliConn.GetRemoteAddr(), userConn.GetLocalAddr(), userConn.GetRemoteAddr()) go conn.Join(cliConn, userConn) } }() returnnil }