go 高并发 TCP 网络编程

零 Golang教程评论127字数 12352阅读41分10秒阅读模式

go 高并发 TCP 网络编程

什么是非阻塞 I/O

下图是四层网络分层,其中数据链路层和网络层都是不可靠的,到了传输层就是可靠的了,机器和机器才能进行可靠的传输,RESP 协议是属于应用层的

go-1.png文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/17285.html

TCP 通信过程也就是通过三次握手建立连接,建立连接之后就可以进行数据传输,之后就可以通过四次挥手来通信文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/17285.html

go-2.png文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/17285.html

操作系统给我们提供了 socket,作为开发者不需要关心底层的网络通信,只需要关心应用层的通信文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/17285.html

在 linux 中有一个 Internet domain socket,它的底层是 SOCK_STREAM文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/17285.html

在 linux 中每个 socket 都有一个 id,这个 id 叫做“文件描述符”,用 FD 作为标识文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/17285.html

我们处理 TCP 实际上就是在处理 socketsocket 内部的过程如下图所示:文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/17285.html

go-3.png文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/17285.html

server 会新建一个 socket,用来监听心的连接,这是 server socket 处理 listen 状态文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/17285.html

这时如果 client1 建立了一个 socket,它会连接 listen 状态的 server socket,然后进行三次握手,握手结束后 server 会再次新建一个 socket 用于和 client1 socket 进行通信,这时这两个 socket 状态是 established,也就是说此时 server 会有两个 socket,一个用于监听新的连接,一个用于和 client 通信文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/17285.html

如果又来了一个 client2server 会再次新建一个 socket 用于和 client2 进行通信

go-4.png

这时 server 会有 3 个 socket,当 client 越来越多时,server 会有越来越多的 socket

这时就诞生了 I/O 模型,I/O 指的是同时操作 socket 的方案,那 I/O 用什么方案同时操作多个 socket 呢?

三个方案:

  • 阻塞
  • 非阻塞
  • 多路复用

阻塞 I/O

阻塞方案是建立三个线程,这三个线程的业务都是类似的,调用 socket 是阻塞式,如果没有新的数据会卡住,也就是说线程会卡住,直到外面有新的数据过来,才会返回新的数据,业务会根据新的数据进行处理,然后把药返回的数据在写回 socket 中

go-5.png

阻塞 I/O 的特点是:

  • 同步读取 socket 时,线程陷入内核态
  • 当读写成功后,切换会用户态,继续执行
  • 优点:开发难度小,代码简单
  • 缺点:内核态切换开销大

非阻塞 I/O

业务要处理三个 socket,目的是要不断是去这三个 socket 中有没有新数据过来,有新数据就处理新数据,处理后再把数据写回去

非阻塞的意思是,不会卡住,而是会去询问,你有没有新数据,没有新数据就直接返回,在去询问下一个 socket,一直循环下去,不会卡在任何一个 socket 中

go-6.png

那么理论上来说所有的 socket 可以通过一个线程来处理

非阻塞 I/O 的特点是:

  • 如何暂时无法收发数据,会返回错误
  • 应用会不断的轮询,直到某个 socket 可以读写
  • 优点:不会陷入内核态,自由度高
  • 缺点:需要自旋轮询

多路复用

在 linux 中多路复用叫做 epoll,全称 event poll,是事件池的意思

event poll 中的事件是 各个 scoket 可读/可写事件,就可以让操作系统来监控各个 socket 是否可以操作

go-7.png

还是和之前一样,一个 socket 负责监听,两个 socket 负责和 client 通信,然后把这三个 socket 可读事件注册到 linux epoll 中

然后非阻塞的去调用 linux epoll,去询问这三个事件发生了哪些,如果某个 socket 事件发生了,epoll 会返回一个发生事件的列表,然后业务直接调用对应 socket 背后的业务处理

多路复用 epoll 的特点:

  • 注册多个 socket 事件
  • 调用 epoll,当有时间发生时,返回发生事件的列表
  • 优点:提供了事件列表,不需要查询各个 socket
  • 缺点:开发难度大,逻辑复杂
  • 不同操作系统的多路复用实现不同:
    • linuxepoll
    • mackqueue
    • windowsIOCP

go 是如何抽象 epoll

  • 在底层使用使用操作系统的多路复用 I/O
  • 在协程层次使用阻塞模型
  • 阻塞协程时,休眠协程

go 将各个系统对 epoll 的操作做了一层封装,抹平各平台的差异

多路复用器在各个系统都有以下功能:

  • 新建多路复用器 epoll_create()
  • 往多路复用器里插入需要监听的事件 epoll_ctl()
  • 查询发生了什么事件 epoll_wait()

在 go sdk 中搜索 epoll_create,会出现很多结果,我们选择 /cmd/vendor/golang.org/x/sys/unix/zsysnum_linux_amd64.go

在 go sdk 中有一个 netpoll 的文件

它一进来就写了很多注释,第一行注释写了这个工具叫做 network poller,下面有很多被注释掉的方法,这些方法是在不同平台的文件中单独去实现的,netpoll 这个文件是一个总的文件

比如 linux 平台是在 netpoll_epoll 中实现,mac 平台是在 netpoll_kqueue 中实现,windows 平台是在 netpoll_windows 中实现

  • newtpollinit:初始化多路复用器
  • netpollopen:往多路复用器里插入需要监听的事件
  • netpoll:看下有什么事件发生

go

代码解读
复制代码
// Integrated network poller (platform-independent part).
// A particular implementation (epoll/kqueue/port/AIX/Windows)
// must define the following functions:
//
// func netpollinit()
//     Initialize the poller. Only called once.
//
// func netpollopen(fd uintptr, pd *pollDesc) int32
//     Arm edge-triggered notifications for fd. The pd argument is to pass
//     back to netpollready when fd is ready. Return an errno value.
//
// func netpollclose(fd uintptr) int32
//     Disable notifications for fd. Return an errno value.
//
// func netpoll(delta int64) gList
//     Poll the network. If delta < 0, block indefinitely. If delta == 0,
//     poll without blocking. If delta > 0, block for up to delta nanoseconds.
//     Return a list of goroutines built by calling netpollready.
//
// func netpollBreak()
//     Wake up the network poller, assumed to be blocked in netpoll.
//
// func netpollIsPollDescriptor(fd uintptr) bool
//     Reports whether fd is a file descriptor used by the poller.

也就说 linux 中的 epoll 对应 go 中的 netpoll

  • epoll_create -> netpollinit
  • epoll_ctl -> netpollopen
  • epoll_wait -> netpoll

netpollinit 作用是创建多路复用器:

  • 新建 epoll
  • 新建一个 pipe 管道,用于中断 epoll
  • 将“管道有数据到达”注册在 epoll 中

go

代码解读
复制代码
func netpollinit() {
  var errno uintptr
  // 新建 epoll
  epfd, errno = syscall.EpollCreate1(syscall.EPOLL_CLOEXEC)
  if errno != 0 {
    println("runtime: epollcreate failed with", errno)
    throw("runtime: netpollinit failed")
  }
  // 创建一个管道,linux 的管道,用来关闭 epoll
  r, w, errpipe := nonblockingPipe()
  if errpipe != 0 {
    println("runtime: pipe failed with", -errpipe)
    throw("runtime: pipe failed")
  }
  ev := syscall.EpollEvent{
    Events: syscall.EPOLLIN,
  }
  *(**uintptr)(unsafe.Pointer(&ev.Data)) = &netpollBreakRd
  // 将管道的读事件加入 epoll
  errno = syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, r, &ev)
  if errno != 0 {
    println("runtime: epollctl failed with", errno)
    throw("runtime: epollctl failed")
  }
  netpollBreakRd = uintptr(r)
  netpollBreakWr = uintptr(w)
}

netpollopen 作用是插入事件

  • 传入一个 socket 的 fd 和 pollDesc 指针
  • pollDesc 指针是 socket 相关详细信息
  • pollDesc 中记录了哪个协程休眠在等待此 socket
  • 将 socket 可读,可写,端开事件注册到 epoll 中

go

代码解读
复制代码
func netpollopen(fd uintptr, pd *pollDesc) uintptr {
  var ev syscall.EpollEvent
  // 设置事件
  ev.Events = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLET
  tp := taggedPointerPack(unsafe.Pointer(pd), pd.fdseq.Load())
  *(*taggedPointer)(unsafe.Pointer(&ev.Data)) = tp
  // 调用底层的 epoll_ctl
  return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, int32(fd), &ev)
}

pollDesc 是 go 网络层对 socket 的描述

  • fdsocket id
  • rg:等待读取的协程
  • wg:等待写入的协程

go

代码解读
复制代码
type pollDesc struct {
  fd    uintptr        // constant for pollDesc usage lifetime
  rg atomic.Uintptr // pdReady, pdWait, G waiting for read or pdNil
  wg atomic.Uintptr // pdReady, pdWait, G waiting for write or pdNil
}

netpoll 查询发生了什么事件

  • 调用 epoll_wait 查询有哪些事件发生
  • 根据 socket 相关的 pollDesc 信息,返回哪些协程可以唤醒

go

代码解读
复制代码
// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
func netpoll(delay int64) gList {
  var events [128]syscall.EpollEvent
retry:
  // epfd 是 epoll 的 id
  n, errno := syscall.EpollWait(epfd, events[:], int32(len(events)), waitms)
  if errno != 0 {
    if errno != _EINTR {
      println("runtime: epollwait on fd", epfd, "failed with", errno)
      throw("runtime: netpoll failed")
    }
    // If a timed sleep was interrupted, just return to
    // recalculate how long we should sleep now.
    if waitms > 0 {
      return gList{}
    }
    goto retry
  }
  var toRun gList
  // 多少个事件就有多少 n
  for i := int32(0); i < n; i++ {
    ev := events[i]
    if ev.Events == 0 {
      continue
    }

    if *(**uintptr)(unsafe.Pointer(&ev.Data)) == &netpollBreakRd {
      if ev.Events != syscall.EPOLLIN {
        println("runtime: netpoll: break fd ready for", ev.Events)
        throw("runtime: netpoll: break fd ready for something unexpected")
      }
      if delay != 0 {
        // netpollBreak could be picked up by a
        // nonblocking poll. Only read the byte
        // if blocking.
        var tmp [16]byte
        read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
        netpollWakeSig.Store(0)
      }
      continue
    }

    var mode int32
    // EPOLLIN:可读事件
    if ev.Events&(syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
      mode += 'r'
    }
    // EPOLLOUT:可写事件
    if ev.Events&(syscall.EPOLLOUT|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
      mode += 'w'
    }
    if mode != 0 {
      tp := *(*taggedPointer)(unsafe.Pointer(&ev.Data))
      pd := (*pollDesc)(tp.pointer())
      tag := tp.tag()
      if pd.fdseq.Load() == tag {
        pd.setEventErr(ev.Events == syscall.EPOLLERR, tag)
        netpollready(&toRun, pd, mode)
      }
    }
  }
  // 返回需要唤醒的协程列表
  return toRun
}

Network Poller 是如何工作的

Network Poller 底层有一个多路复用抽象层,用来屏蔽不同平台对多路复用器的实现

Network Poller 初始化

Network Poller 初始化是 poll_runtime_pollServerInit,使用原子操作,保证一个 go 程序只会初始化一次 netpoll

go

代码解读
复制代码
//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
  netpollGenericInit()
}

func netpollGenericInit() {
  // 用原子操作初始化,保证只初始化一次
  if netpollInited.Load() == 0 {
    lockInit(&netpollInitLock, lockRankNetpollInit)
    lock(&netpollInitLock)
    if netpollInited.Load() == 0 {
      // 初始化多路复用器
      netpollinit()
      netpollInited.Store(1)
    }
    unlock(&netpollInitLock)
  }
}

pollCache 和 pollDesc

  • pollCache:是一个带锁的链表头
  • pollDesc:链表的成员
  • pollDesc 是 runtime 包对 socket 的详细描述

pollCache 结构体如下

pollCache 实际上是一个链表头,它记录了 pollDesc 中第一个成员的指针,pollCache 的作用是为了放置一个锁,用来锁住 pollDesc

go

代码解读
复制代码
type pollCache struct {
  lock  mutex      // 锁
  first *pollDesc
  // PollDesc objects must be type-stable,
  // because we can get ready notification from epoll/kqueue
  // after the descriptor is closed/reused.
  // Stale notifications are detected using seq variable,
  // seq is incremented when deadlines are changed or descriptor is reused.
}

pollDesc 是一个链表成员,link 属性指向了下一个 pollDesc

  • rg:可能是 pdReadypdWait、等待读取这个 socket 的地址
  • wg:可能是 pdReadypdWait、等待写这个 socket 的地址
    • 写为什么也需要等待,是因为底层的网卡是有发送能力的,写数据时可能需要等待前面的数据发送完毕

go

代码解读
复制代码
type pollDesc struct {
  link  *pollDesc      // in pollcache, protected by pollcache.lock
  fd    uintptr        // constant for pollDesc usage lifetime
  // rg, wg are accessed atomically and hold g pointers.
  // (Using atomic.Uintptr here is similar to using guintptr elsewhere.)
  rg atomic.Uintptr // pdReady, pdWait, G waiting for read or pdNil
  wg atomic.Uintptr // pdReady, pdWait, G waiting for write or pdNil
}

Network Poller 新增监听 socket

Network Poller 新增监听 socket 是 poll_runtime_pollOpen 方法,在 pollCache 中分配一个新的 pollDesc,初始化 pollDescrgwg 为 0),调用 netpollopen 将 fd 和 pd 插入 epoll

go:linkname 是一个 go 语言的关键字,用来将一个函数的实现指向另一个函数 poll_runtime_pollOpen 和 internal/poll.runtime_pollOpen 是同一个函数

go

代码解读
复制代码
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
  // 分配一个新的 pollDesc
  pd := pollcache.alloc()
  lock(&pd.lock)
  wg := pd.wg.Load()
  if wg != pdNil && wg != pdReady {
    throw("runtime: blocked write on free polldesc")
  }
  rg := pd.rg.Load()
  if rg != pdNil && rg != pdReady {
    throw("runtime: blocked read on free polldesc")
  }
  pd.fd = fd
  if pd.fdseq.Load() == 0 {
    // The value 0 is special in setEventErr, so don't use it.
    pd.fdseq.Store(1)
  }
  // 初始化 pollDesc
  pd.closing = false
  pd.setEventErr(false, 0)
  pd.rseq++
  pd.rg.Store(pdNil)
  pd.rd = 0
  pd.wseq++
  pd.wg.Store(pdNil)
  pd.wd = 0
  pd.self = pd
  pd.publishInfo()
  unlock(&pd.lock)

  // 将 fd 和 pd 插入 epoll
  errno := netpollopen(fd, pd)
  if errno != 0 {
    pollcache.free(pd)
    return nil, int(errno)
  }
  return pd, 0
}

Network Poller 收发数据

Network Poller 是怎么做到一个协程对应一个 socket

收发数据分为两个场景:

  • 协程需要收发数据时,socket 已经可读可写
    • runtime 循环调用 netpoll 方法(g0 协程)
      • netpoll 被 startTheWorldWithSema 调用,startTheWorldWithSema 被 gcStart 调用。为什么事件循环会被 gcStart 调用呢?因为只是 gcStart 是周期性不断的在调用
    • 发现 socket 可读可写时,给对应的 rg 或者 wg 设置为 pdReady(1)
    • 协程调用 poll_runtime_pollWait 方法
    • 判断 rg 或者 wg 已经被置为 pdReady(1),返回 0
  • 协程需要收发数据时,socket 暂时无法读写
    • runtime 循环调用 netpoll 方法(g0 协程)
    • 协程调用 poll_runtime_pollWait 方法
    • 发现对应的 rg 或者 wg 为 0
    • 给对应的 rg 或者 wg 置为协程地址
    • 休眠等待

发现 socket 可读可写时,查看对应的 rg 和 wg,如果rg 或者 wg 是协程地址的话(不是 012),说明有协程在休眠监听,将协程地址返回给 runtime,然后调度器开始调度对应的协程

go 是如何抽象 socket

go 中 socket 是 net 提供的

  • net 包抽象了 TCP 网络操作
  • 使用 net.Listen() 得到 TCPListener(listen 状态的 socket)
  • 使用 listen.Accept() 得到 TCPConn(established 状态的 socket)
  • TCPConn.Read/Write 进行读写 socket 操作
  • Network poll 作为上述功能的底层支撑

我们在前面知道了 server 监听新连接,会新建一个 socket,新建新连接是由 net.listen 方法完成的

net.listen 方法返回的是一个 net.Listener 接口,net.Listener 接口中有一个 Accept 方法,用来接收新的连接

go

代码解读
复制代码
func Listen(network, address string) (Listener, error) {
  var lc ListenConfig
  return lc.Listen(context.Background(), network, address)
}

Listen 方法中调用了 ListenConfig 的 Listen 方法,ListenConfig 是一个结构体,它的 Listen 方法中根据不同的网络类型,调用不同的 listenTCPlistenUnix 方法

go

代码解读
复制代码
// Listen announces on the local network address.
//
// See func Listen for a description of the network and address
// parameters.
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
  // 解析地址
  addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil)
  if err != nil {
    return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: nil, Err: err}
  }
  sl := &sysListener{
    ListenConfig: *lc,
    network:      network,
    address:      address,
  }
  var l Listener
  la := addrs.first(isIPv4)
  switch la := la.(type) {
  case *TCPAddr:
    if sl.MultipathTCP() {
      l, err = sl.listenMPTCP(ctx, la)
    } else {
      // 调用 listenTCP 方法
      l, err = sl.listenTCP(ctx, la)
    }
  case *UnixAddr:
    l, err = sl.listenUnix(ctx, la)
  default:
    return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: &AddrError{Err: "unexpected address type", Addr: address}}
  }
  if err != nil {
    return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: err} // l is non-nil interface containing nil pointer
  }
  return l, nil
}

listenTCP 方法内部会调用 socket 方法

net 包中对 socket 详细的描述是 netFD

netFD 中有一个 pfd 属性,pfd 是 poll.FD 类型,poll.FD 就是上面说的 pollDesc

go

代码解读
复制代码
// Network file descriptor.
type netFD struct {
  pfd poll.FD

  // immutable until Close
  family      int
  sotype      int
  isConnected bool // handshake completed or use of association with peer
  net         string
  laddr       Addr
  raddr       Addr
}

net.Listen

  • 新建 socket,并执行 bind 操作
  • 新建一个 FDnet 包对 socket 详细描述)
  • 返回一个 TCPListener 对象
  • 将 TCPListener 的 FD 信息加入监听
  • TCPListener 对象本质上是一个 LISTEN 状态的 socket

ls.Accept 方法是用来接收新连接的,返回的时候 Conn 接口`

go

代码解读
复制代码
// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
  if !l.ok() {
    return nil, syscall.EINVAL
  }
  c, err := l.accept()
  if err != nil {
    return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
  }
  return c, nil
}
  • 直接调用 socket 的 accept 方法
  • 如果失败,休眠等待新的连接
  • 将新的 socket 包装为 TCPConn 变量返回
  • 将 TCPConn 的 FD 信息加入监听
  • TCPConn 本质上是一个 ESTABLISHED 状态的 socket

conn.Read 方法是用来读取数据

go

代码解读
复制代码
// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
  if !c.ok() {
    return 0, syscall.EINVAL
  }
  n, err := c.fd.Read(b)
  if err != nil && err != io.EOF {
    err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
  }
  return n, err
}

conn.Write 方法是用来写入数据

go

代码解读
复制代码
// Write implements the Conn Write method.
func (c *conn) Write(b []byte) (int, error) {
  if !c.ok() {
    return 0, syscall.EINVAL
  }
  n, err := c.fd.Write(b)
  if err != nil {
    err = &OpError{Op: "write", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
  }
  return n, err
}

TCPConn.Read 和 TCPConn.Write 方法内部调用了 netFD.Read 和 netFD.Write 方法

netFD.Read 和 netFD.Write 方法内部调用了 poll.FD.Read 和 poll.FD.Write 方法

poll.FD.Read 和 poll.FD.Write 方法内部调用了 netpoll 方法

  • 调用 socket 原生读写方法
  • 如果调用失败,休眠等待可读/可写
  • 被唤醒后调用系统 socket

零
  • 转载请务必保留本文链接:https://www.0s52.com/bcjc/golangjc/17285.html
    本社区资源仅供用于学习和交流,请勿用于商业用途
    未经允许不得进行转载/复制/分享

发表评论