frp v0.1.0 源码学习

frp 是一个专注于内网穿透的高性能的反向代理应用,支持 TCP、UDP、HTTP、HTTPS 等多种协议,且支持 P2P 通信。可以将内网服务以安全、便捷的方式通过具有公网 IP 节点的中转暴露到公网。

frp 在 v0.10.0 进行过代码重构,所以 v0.1.0 看起来和现在的结构不太一样,但是直接读新版本的代码还是有点难,这里就通过 v0.1.0 的源代码学习一下 frp 的实现原理。

因为包管理方式不同,需要设置 GO111MODULEoff 以及对应的 GOPATH

概览

我们使用 frp 需要配置两个端,两个端各负责一些功能

frps

  • 监听 bind_port 和 frpc 连接
  • 监听 remote_port 和 user 连接
  • 转发 user 和 frpc 之间数据

frpc

  • 和 frps 建立连接
  • 转发 frp 和本地服务之间数据

接下来我们看 v0.1.0 是怎么实现这些功能的

v0.1.0

frps

先从入口源码开始,配置文件是用一个第三方库读,然后循环读取每一个 ProxyServer 的配置,每个 ProxyServer 可以给对应的 passwd、bind_addr、listen_port ,日志没什么好说的

src/frp/cmd/frps/main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func main() {  
// 加载配置文件
err := server.LoadConf("./frps.ini")
// ...

// 初始化日志
log.InitLog(server.LogWay, server.LogFile, server.LogLevel)

// 创建 Listener
l, err := conn.Listen(server.BindAddr, server.BindPort)
// ...

log.Info("Start frps success")
ProcessControlConn(l)
}

先看 conn.Listen,返回一个 Listener ,包含一个 channel conns 用于接收和存储 frpc 的连接,监听 BindPort,还开启了一个协程来接收 TCP 连接写入 conns

src/frp/utils/conn/conn.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func Listen(bindAddr string, bindPort int64) (l *Listener, err error) {  
// ...

l = &Listener{
addr: listener.Addr(),
l: listener,
conns: make(chan *Conn),
closeFlag: false,
}

go func() {
for {
conn, err := l.l.AcceptTCP()
// ...

c := &Conn{
TcpConn: conn,
closeFlag: false,
}
c.Reader = bufio.NewReader(c.TcpConn)
l.conns <- c
}
}()
return l, err
}

对应的可以用 Listener.GetConn 取出通道中的连接

1
2
3
4
5
6
7
8
func (l *Listener) GetConn() (conn *Conn, err error) {  
var ok bool
conn, ok = <-l.conns
if !ok {
return conn, fmt.Errorf("channel close")
}
return conn, nil
}

接下来回到主函数的 ProcessControlConn(l) ,传入的就是刚刚创建的 Listener,在拿到 frpc 的连接后,启动 controlWorker 来处理这个连接

src/frp/cmd/frps/control.go

1
2
3
4
5
6
7
8
9
10
func ProcessControlConn(l *conn.Listener) {  
for {
c, err := l.GetConn()
if err != nil {
return
}
log.Debug("Get one new conn, %v", c.GetRemoteAddr())
go controlWorker(c)
}
}

contorlWorker 用于控制 frp 客户端和服务端之间的连接,首先第一条信息肯定是由 frpc 发起的连接,通过 checkProxy 来进行身份验证并判断请求类型做出相应处理,如果不是 workconn 就继续执行,开启一个协程向 frpc 收发心跳包确保连接存活,循环等待 user 发起连接然后向 frpc 请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// connection from every client and server 
// 控制 frpc 和 frps 间的连接,frps 响应 frpc 的请求
func controlWorker(c *conn.Conn) {
// the first message is from client to server
// 第一条信息肯定是由 client 发起的
// ...

clientCtlReq := &msg.ClientCtlReq{}
clientCtlRes := &msg.ClientCtlRes{}
if err := json.Unmarshal([]byte(res), &clientCtlReq); err != nil {
log.Warn("Parse err: %v : %s", err, res)
return
}

// check
// 检查对应的 ProxyServer, 先让 ProxyServer 处理请求
succ, info, needRes := checkProxy(clientCtlReq, c)
if !succ {
clientCtlRes.Code = 1
clientCtlRes.Msg = info
}

// workconn 就直接返回,ctlConn 则继续执行
if needRes {...} else {
// work conn, just return
return
}

// other messages is from server to client
// 其余信息是 server to client
s, ok := server.ProxyServers[clientCtlReq.ProxyName]
if !ok {
log.Warn("ProxyName [%s] is not exist", clientCtlReq.ProxyName)
return
}

// read control msg from client
// 接收和发送心跳包
go readControlMsgFromClient(s, c)

// 接收 user 连接并向 client 请求
serverCtlReq := &msg.ClientCtlReq{}
serverCtlReq.Type = consts.WorkConn
for {
closeFlag := s.WaitUserConn()
// ...

buf, _ := json.Marshal(serverCtlReq)
err = c.Write(string(buf) + "\n")
// ...

log.Debug("ProxyName [%s], write to client to add work conn success", s.Name)
}

log.Info("ProxyName [%s], I'm dead!", s.Name)
return
}

checkProxy 首先需要通过代理服务器名和密码进行身份验证,然后处理两种请求类型,也就是说有两种类型的连接。CtlConn 用于启动代理,之后循环等待 user 连接;WorkConn 用于和 user 交互,会向 ProxyServer.cliConnChan 写入这个连接。(个人感觉判断请求类型并处理的逻辑应该放在 controlWorker 中比较好,也就不需要 needRes 来判断连接类型)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func checkProxy(req *msg.ClientCtlReq, c *conn.Conn) (succ bool, info string, needRes bool) {  
// check name and password
// ...

// control conn
if req.Type == consts.CtlConn {
// ...

// start proxy and listen for user conn, no block
err := s.Start()
// ...

} else if req.Type == consts.WorkConn {
// work conn
needRes = false
// ...

s.GetNewCliConn(c)
} else {
// ...
return
}

succ = true
return
}

ProxyServer.Start 启动代理服务器监听和 user 连接的端口并启动了两个协程:

  • 处理 user 的连接
    • 把收到的连接放入 userConnList,向 ctlMsgChan 写入 1 代表接收到 user 连接
  • 转发 user 的数据到 client 然后转发 client 的响应回 user
    • 转发的过程使用 io.Copy,先把 user 的字节流复制到 client,再把对应 client 响应的字节流复制到 user

src/frp/models/server/server.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// start listening for user conns  
func (p *ProxyServer) Start() (err error) {
// 监听与 user 连接的端口
p.Init()
p.listener, err = conn.Listen(p.BindAddr, p.ListenPort)
// ...
p.Status = consts.Working

// 启动一个协程接收 user 的连接
// start a goroutine for listener to accept user connection
go func() {
for {
// block
// if listener is closed, err returned
c, err := p.listener.GetConn()
// ...

// insert into list
p.Lock()
// ...
p.userConnList.PushBack(c)
p.Unlock()

// put msg to control conn
p.ctlMsgChan <- 1

// set timeout 判断连接超时
time.AfterFunc(time.Duration(UserConnTimeout)*time.Second, func() {
p.Lock()
defer p.Unlock()
element := p.userConnList.Front()
if element == nil {
return
}

userConn := element.Value.(*conn.Conn)
if userConn == c {
log.Warn("ProxyName [%s], user conn [%s] timeout", p.Name, c.GetRemoteAddr())
}
})
}
}()

// start another goroutine for join two conns from client and user
// 启动另一个协程连接 client 和 user 的两个连接
go func() {
for {
cliConn, ok := <-p.cliConnChan
if !ok {
return
}

p.Lock()
element := p.userConnList.Front()

var userConn *conn.Conn
if element != nil {
userConn = element.Value.(*conn.Conn)
p.userConnList.Remove(element)
} else {
cliConn.Close()
p.Unlock()
continue
}
p.Unlock()

// msg will transfer to another without modifying
// l means local, r means remote log.Debug("Join two conns, (l[%s] r[%s]) (l[%s] r[%s])", cliConn.GetLocalAddr(), cliConn.GetRemoteAddr(),
userConn.GetLocalAddr(), userConn.GetRemoteAddr())
go conn.Join(cliConn, userConn)
}
}()

return nil
}

总结一下,frps 监听 frp 服务端口使用 ProcessControlConn 监听然后交给 controlWorker 处理,在收到对应控制请求后启动 ProxyServer 负责监听服务端口处理用户连接和转发数据。

具体实现中,controlWorker 接收到 frpc 的连接时,先判断连接类型,CtlConn 用于启动 ProxyServer 并等待 user 连接,收到 user 连接会给 frpc 发送控制请求让 frpc 发一个 WorkConn 过来,WorkConn 用于给出和 user 交互的连接会写入 ProxyServer.cliConnChanProxyServer 接收到用户连接时,ProxyServer 会向 ProxyServer.ctlMsgChan 写入 1ProxyServerProxyServer.cliConnChan 中读出连接时会把与用户的连接相连转发数据。

frpc

从入口源码开始,读取配置文件到 ProxyClient,然后对于每一个 ProxyClient 启动一个 ControlProcess 来控制,等待所有 ProxyClient 退出后结束主进程

src/frp/cmd/frpc/main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func main() {  
err := client.LoadConf("./frpc.ini")
// ...

log.InitLog(client.LogWay, client.LogFile, client.LogLevel)

// wait until all control goroutine exit
var wait sync.WaitGroup
wait.Add(len(client.ProxyClients))

for _, client := range client.ProxyClients {
go ControlProcess(client, &wait)
}

log.Info("Start frpc success")

wait.Wait()
log.Warn("All proxy exit!")
}

ControlProcess

src/frp/cmd/frpc/control.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func ControlProcess(cli *client.ProxyClient, wait *sync.WaitGroup) {  
defer wait.Done()

// 和 frps 建立连接
c, err := loginToServer(cli)
// ...
connection = c
defer connection.Close()

for {
// ignore response content now
content, err := connection.ReadLine()

// 循环重连
// ...

clientCtlRes := &msg.ClientCtlRes{}
if err := json.Unmarshal([]byte(content), clientCtlRes); err != nil {
log.Warn("Parse err: %v : %s", err, content)
continue
}

// 如果是心跳包就重置计时器
if consts.SCHeartBeatRes == clientCtlRes.GeneralRes.Code {
if heartBeatTimer != nil {
log.Debug("Client rcv heartbeat response")
heartBeatTimer.Reset(time.Duration(client.HeartBeatTimeout) * time.Second)
} else {
log.Error("heartBeatTimer is nil")
}
continue
}

// 如果不是心跳包就是 frps 收到用户连接了,启动和 frps 之间的隧道
cli.StartTunnel(client.ServerAddr, client.ServerPort)
}
}

loginToServer 和 frps 建立连接让 frps 启动对应的 ProxyServer,然后开启一个协程不断发送心跳包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func loginToServer(cli *client.ProxyClient) (c *conn.Conn, err error) {
// 和 frps 建立 tcp 连接
c, err = conn.ConnectServer(client.ServerAddr, client.ServerPort)
// ...

// 发送 CtlConn,让 frps 启动对应的 ProxyServer
req := &msg.ClientCtlReq{
Type: consts.CtlConn,
ProxyName: cli.Name,
Passwd: cli.Passwd,
}
buf, _ := json.Marshal(req)
err = c.Write(string(buf) + "\n")
// ..

res, err := c.ReadLine()
// ...
log.Debug("ProxyName [%s], read [%s]", cli.Name, res)

clientCtlRes := &msg.ClientCtlRes{}
if err = json.Unmarshal([]byte(res), &clientCtlRes); err != nil {
// ...
}

// ...

// 发心跳包
go startHeartBeat(c)
log.Debug("ProxyName [%s], connect to server[%s:%d] success!", cli.Name, client.ServerAddr, client.ServerPort)

return
}

ProxyClient.StartTunnel 转发数据到本地服务,使用 io.Copy 想把 remoteConn 复制到 localConn,再是把 localConn 复制到 remoteConn 来响应

src/frp/models/client/client.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (p *ProxyClient) StartTunnel(serverAddr string, serverPort int64) (err error) {  
// 获取本地服务连接
localConn, err := p.GetLocalConn()
if err != nil {
return
}
// 获取远端 frps 连接
remoteConn, err := p.GetRemoteConn(serverAddr, serverPort)
if err != nil {
return
}

// l means local, r means remote
// 连接两个连接
log.Debug("Join two conns, (l[%s] r[%s]) (l[%s] r[%s])", localConn.GetLocalAddr(), localConn.GetRemoteAddr(),
remoteConn.GetLocalAddr(), remoteConn.GetRemoteAddr())
go conn.Join(localConn, remoteConn)
return nil
}

ProxyClient.GetRemoteConn 给 frps 提供 WorkConn 在 frps 处用于 user 信息的转发,然后在本地用于转发到本地服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (p *ProxyClient) GetRemoteConn(addr string, port int64) (c *conn.Conn, err error) {  
defer func() {
if err != nil {
c.Close()
}
}()

// 连接到 frps,发送请求指明这是 WorkConn
c, err = conn.ConnectServer(addr, port)
// ...

req := &msg.ClientCtlReq{
Type: consts.WorkConn,
ProxyName: p.Name,
Passwd: p.Passwd,
}

buf, _ := json.Marshal(req)
err = c.Write(string(buf) + "\n")
// ...

err = nil
return
}

总结一下,frpc 会为每个代理创建 ProxyClient 发送 CtlConn 到 frps 启动对应的 ProxyServer

frpc 会建立两种连接

  • CtlConn:在没有连接时建立,用于控制 ProxyServer 的启动,代表和 frps 建立了连接
  • WorkConn:在 frps 通过 CtlConn 发送非心跳包信息时建立(因为 frps 除了发送心跳包只会在接收到 user 连接时发送信息),用于转发 user 数据

结语

v0.1.0 只实现了 TCP 连接,代码量不大很容易读,对于内网穿透的实现理解帮助很大,不过作为第一个版本来说感觉有些地方写的有点乱,看版本高一点的代码可能会好一点。

转发实现并不难,主要的代码在于实现 frps 和 frpc 之间的通信,源码中使用两种连接,CtlConn 使用私有协议通信,用于收发心跳包和控制信息(frps 请求 frpc 建立 WorkConn 连接);WorkConn 用于数据转发,让 frps 转发 user 的数据以及让 frpc 转发到本地服务。

最大的感受就是 go routine 并发很方便,用无缓冲 channel 管理 tcp 连接以及协程间通信很好用,让代码写起来看起来都好简单,这下知道为什么要用 go 写了。