Socket编程GitHub
基础知识
Socket地址结构
1
2
3
4
5
6
7
8
9struct sockaddr_in
{
uint8_t sin_len; //结构体长度
sa_family_t sin_family; //指定该地址家族,设为AF——INET,表示使用的是IPv4协议
in_port_t sin_port; //端口
struct in_addr sin_addr; //IPv4 地址
char sin_zero[8]; //一般设为0
};字节序(大端/小端)
字节序转换
1
2
3
4
5
6
7
//h代表host;n代表network;s代表short;l代表long
uint32_t htonl(uint32_t hostlong);
uint16_t htons(uint16_t hostshort);
uint32_t ntohl(uint32_t netlong);
uint16_t ntohs(uint16_t netshort);地址转换函数
1
2
3
4
5
6
7
8
9
10
11
int inet_aton(const char *cp, struct in_addr *inp); //将点分十进制转换为地址结构
in_addr_t inet_addr(const char *cp); //点分十进制->32位整数
in_addr_t inet_network(const char *cp);
char *inet_ntoa(struct in_addr in); //地址结构转换成点分十进制IP
struct in_addr inet_makeaddr(in_addr_t net, in_addr_t host);
in_addr_t inet_lnaof(struct in_addr in);
in_addr_t inet_netof(struct in_addr in);套接字类型
- 流式套接字:TCP
- 数据包套接字:UDP
- 原始套接字
测试代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
int main(void)
{
unsigned int val = 0x12345678;
unsigned char *p = (unsigned char*)&val;
printf("%0x %0x %0x %0x\n", p[0], p[1], p[2], p[3]);
unsigned int val2 = htonl(val);
p = (unsigned char*)&val2;
printf("%0x %0x %0x %0x\n", p[0], p[1], p[2], p[3]);
return 0;
}1
2
3
4
5
6
7
8
9
10
11
12
int main(void)
{
const char* cp = "192.168.0.1";
unsigned long num = inet_addr(cp);
printf("%u\n", ntohl(num));
return 0;
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20//Makefile 多文件编译
CXX = gcc
CXX_FLAGS = -Wall -g
SRC = $(wildcard *.c)
NOTDIR_SRC = $(notdir $(SRC))
OBJS = $(patsubst %.c, ./bin/%.o, $(NOTDIR_SRC))
# 多目标
TARGET_LIST = $(patsubst %.c, %, $(NOTDIR_SRC))
all:$(TARGET_LIST)
$(TARGET_LIST):$(OBJS)
@echo $@
$(CXX) -o ./bin/$@ ./bin/$@.o
@echo $(TARGET_LIST)
./bin/%.o:%.c
$(CXX) -c $(CFLAGS) $< -o $@
clean:
-rm -f ./bin/*结果
TCP cs模型
简单的客户端和服务端
服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while(0);
int main(void)
{
//创建套接字
int listenfd;
if((listenfd = socket(PF_INET, SOCK_STREAM, 0)) < 0 )
{
ERR_EXIT("socket");
}
//创建套接字地址
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(6666);
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
//servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");
//inet_aton()"127.0.0.1", &servaddr.sin_addr);
//绑定地址
if(bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0)
ERR_EXIT("bind");
//监听,等待连接请求
if(listen(listenfd, SOMAXCONN) < 0)
ERR_EXIT("listen");
struct sockaddr_in peeraddr;
socklen_t peerlen = sizeof(peeraddr);
int conn;
//从已完成连接中会的第一个连接,若无连接,则阻塞(被动套接字)
if((conn = accept(listenfd, (struct sockaddr*)&peeraddr, &peerlen)) < 0)
ERR_EXIT("accept");
char recvbud[1024];
//数据交换
while(1)
{
memset(recvbud, 0, sizeof(recvbud));
int ret = read(conn, recvbud, sizeof(recvbud));
fputs(recvbud, stdout);
write(conn, recvbud, ret);
}
//断开连接
close(conn);
close(listenfd);
return 0;
}
客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while(0);
int main(void)
{
int sock;
if((sock = socket(AF_INET, SOCK_STREAM, 0) )< 0 )
{
ERR_EXIT("socket");
}
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(6666);
servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");
//向服务端发起连接(主动套接字)
if(connect(sock, (struct sockaddr*)&servaddr, sizeof(servaddr) )< 0)
ERR_EXIT("Connect");
char sendbuf[1024] = {0};
char recvbuf[1024] = {0};
while(fgets(sendbuf, sizeof(sendbuf), stdin) != NULL)
{
write(sock, sendbuf, strlen(sendbuf));
read(sock, recvbuf, sizeof(recvbuf));
fputs(recvbuf, stdout);
memset(recvbuf, 0, sizeof(recvbuf));
memset(sendbuf, 0, sizeof(sendbuf));
}
close(sock);
return 0;
}结果
REUSEADDR
服务端断开连接的一段时间内会处于TIME_WAIT状态,再此状态下无法进行绑定连接。
服务端尽可能使用REUSERADDR
绑定前尽可能调用setsockopt来设置REUSERADDR套接字选项
使用REUSERADDR选项可以使得不必等待TIME_WAIT状态结束就可以重启服务器
1
2
3
4//set REUSEADDR opt
int on = 1;
if(setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0)
ERR_EXIT("setsocektopt");
处理多个客户端的连接:通过一个连接一个进程来处理并发
服务端在获得连接时创建子进程进行消息处理
客户端关闭后退出循环,并关闭进程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
void do_service(int conn)
{
char recvbud[1024];
//数据交换
while (1)
{
memset(recvbud, 0, sizeof(recvbud));
int ret = read(conn, recvbud, sizeof(recvbud));
if(ret == 0)
{
printf("Client closed.");
break;
}
fputs(recvbud, stdout);
write(conn, recvbud, ret);
}
}
int conn;
pid_t pid;
while (1)
{
//从已完成连接中会的第一个连接,若无连接,则阻塞(被动套接字)
if ((conn = accept(listenfd, (struct sockaddr *)&peeraddr, &peerlen)) < 0)
ERR_EXIT("accept");
printf("ip = %s port = %d\n", inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
//创建子进程
pid = fork();
if (pid == -1)
ERR_EXIT("fork");
//子进程不需要关注listenfd,父进程不需要关注conn
if (pid == 0)
{
close(listenfd);
do_service(conn);
exit(EXIT_SUCCESS);
}
else
{
close(conn);
}
//断开连接
}服务端发送消息(一对一)
使用多进程向处理消息发送和接收
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51//主要逻辑
//子进程处理消息接收,父进程处理消息发送
//服务端和客户端父子进程逻辑对调
//若接收进程为子进程时,则接收到断开连接消息时,只会结束子进程,因此采用信号,来发送消息,结束父进程
pid_t pid;
pid = fork();
if(pid == -1)
ERR_EXIT("fork");
if(pid == 0)
{
char recvbud[1024];
while (1)
{
memset(recvbud, 0, sizeof(recvbud));
int ret = read(sock, recvbud, sizeof(recvbud));
if(ret == -1)
{
ERR_EXIT("read");
}
else if(ret == 0)
{
printf("peer closed\n");
break;
}
fputs(recvbud, stdout);
}
kill(pid, SIGUSR1);
exit(EXIT_SUCCESS);
}
else
{
signal(SIGUSR1, handler);
char sendbuf[1024] = {0};
while(fgets(sendbuf, sizeof(sendbuf), stdin) != NULL)
{
write(sock, sendbuf, strlen(sendbuf));
memset(sendbuf, 0, sizeof(sendbuf));
}
exit(EXIT_SUCCESS);
}
//接收信号后的回调
void handler(int sign)
{
printf("received a sign = %d\n", sign);
exit(EXIT_SUCCESS);
}
流协议与粘包
流式协议:基于字节流,无边界
粘包问题
粘包解决办法
- 本质是在应用层维护消息与消息的边界
- 固定包的长度
- 包结尾加上分隔符 \r\n (ftp)
- 包头加上包的长度
- 其他复杂应用层协议
- 本质是在应用层维护消息与消息的边界
readn & writen
对 read 和 wirte 函数进行封装
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57//读取消息,只有读取了count大小的数据或无数据可读时才会退出循环
ssize_t readn(int fd, void *buf, size_t count)
{
size_t nleft = count;
ssize_t nread;
char *bufp = (char*) buf;
while(nleft > 0)
{
nread = read(fd, bufp, nleft);
if(nread < 0)
{
if(errno == EINTR)
continue;
return -1;
}
else if(nread == 0)
{
return count - nleft;
}
else
{
bufp += nread;
nleft -= nread;
}
}
return count;
}
//发送数据,只有在发送了count大小的数据才退出
ssize_t written(int fd, const void *buf, size_t count)
{
size_t nleft = count;
ssize_t nwritten;
char *bufp = (char*) buf;
while(nleft > 0)
{
nwritten = write(fd, bufp, nleft);
if(nwritten < 0)
{
if(errno == EINTR)
continue;
return -1;
}
else if(nwritten == 0)
{
continue;
}
else
{
bufp += nwritten;
nleft -= nwritten;
}
}
return count;
}包头添加长度解决粘包
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46//数据包结构,包含数据长度和数据体
struct packet
{
int len;
char buf[1024];
};
void do_service(int conn)
{
struct packet recvbuf;
int len;
while (1)
{
memset(&recvbuf, 0, sizeof(recvbuf));
//读取包头的长度消息
int ret = readn(conn, &recvbuf.len, 4);
if(ret == -1)
{
ERR_EXIT("read");
}
else if(ret < 4)
{
printf("Client closed.\n");
break;
}
len = ntohl(recvbuf.len);
//读取包体内容
ret = readn(conn, recvbuf.buf, len);
if(ret == -1)
{
ERR_EXIT("read");
}
else if(ret < len)
{
printf("Client closed.\n");
break;
}
fputs(recvbuf.buf, stdout);
written(conn, &recvbuf, 4 + len);
}
}read、write与recv、send
ssize_t recv(int sockfd, void *buf, size_t len, int flags);
ssize_t send(int sockfd, const void *buf, size_t len, int flags);
ssize_t read(int fd, void *buf, size_t count);
ssize_t write(int fd, const void *buf, size_t count);
recv与read一样,都能从套接口缓冲区中获得数据
recv只能用于套接口IO,read能用于所有的IO
recv可以通过选项指定接收数据的行为
- MSG_OOB:接收紧急指针发送的数据(out-of-band)
- MSG_PEEK:接收缓冲区的数据,但不清除缓冲区的数据
封装recv_peek函数
1
2
3
4
5
6
7
8
9
10
11//读取到数据就返回
ssize_t recv_peek(int sockfd, void *buf, size_t len)
{
while(1)
{
int ret = recv(sockfd, buf, len, MSG_PEEK);
if(ret == -1 & errno == EINTR)
continue;
return ret;
}
}
通过包尾添加对应字符解决粘包问题(\n)
使用recv_peek实现readline
使用recv_peek读取缓存区中的内容,直到读取到
\n
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49ssize_t readline(int sockfd, void *buf, size_t maxline)
{
int ret;
int nread;
char *bufp = buf;
int nleft = maxline;
while(1)
{
//读取一行数据
ret = recv_peek(sockfd, bufp, nleft);
if(ret <= 0)
{
return ret;
}
nread = ret;
//判断读取的数据中是否存在\n
int i;
for(i = 0; i < nread; i++)
{
//存在结尾标志,清空缓存区,返回
if(bufp[i] == '\n')
{
ret = readn(sockfd, bufp, i + 1);
if(ret != i + 1)
{
exit(EXIT_FAILURE);
}
return ret;
}
}
if(nread > nleft)
{
exit(EXIT_FAILURE);
}
//不存在结尾标志,继续往后读取
nleft -= nread;
ret = readn(sockfd, bufp, nread);
if(ret != nread)
{
exit(EXIT_FAILURE);
}
bufp += nread;
nleft -= nread;
}
return -1;
}
Socket相关函数
getsockname :获得连接套接字本地的sockaddr
int getsockname(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
1
2
3
4
5struct sockaddr_in loacladdr;
socklen_t addrlen = sizeof(loacladdr);
if(getsockname(sock, (struct sockaddr*)&loacladdr, &addrlen) < 0)
ERR_EXIT("getsockname");
printf("ip = %s port = %d\n", inet_ntoa(loacladdr.sin_addr), ntohs(loacladdr.sin_port));
getpeername:获得连接套接字对方的sockaddr
int getpeername(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
gethostname
int gethostname(char *name, size_t len);
0 for success
-1 for failure
gethostbyname
struct hostent *gethostbyname(const char *name);
1
2
3
4
5
6
7struct hostent {
char *h_name; /* official name of host */
char **h_aliases; /* alias list */
int h_addrtype; /* host address type */
int h_length; /* length of address */
char **h_addr_list; /* list of addresses */
}h_addr equals h_addr_list[0]
gethostbyaddr
struct hostent *gethostbyaddr(const void *addr, socklen_t len, int type);
close和shutdown
- close:close终止了数据传送的两个方向。
- shutdown:可以有选择的终止某个方向的数据传送或终止数据传送的两个方向。
- shutdown how = 1 可以保证对等方接收到一个EOF字符,而不管其他进程是否已经打开了套接字,而close不能保证。close只有等套接字的引用计数变为零,即所有的进程都关闭了才发送。
- 使用
shutdown(sock, SHUT_WR);
代替close(sock);
可以保证接受完缓冲区中的数据
getsockopt
int getsockopt(int sockfd, int level, int optname, void *optval, socklen_t *optlen);
int setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen);
- 功能:获取/设置一个套接字的选项
参数:
- socket:文件描述符
- level:协议层次
- SOL_SOCKET 套接字层次
- IPPROTO_IP ip层次
- IPPROTO_TCP TCP层次
- option_name:选项的名称(套接字层次)
- SO_BROADCAST 是否允许发送广播信息
- SO_REUSEADDR 是否允许重复使用本地地址
- SO_SNDBUF 获取发送缓冲区长度
- SO_RCVBUF 获取接收缓冲区长度
- SO_RCVTIMEO 获取接收超时时间
- SO_SNDTIMEO 获取发送超时时间
- option_value:获取到的选项的值
- option_len:value的长度
返回值:
- 成功:0
- 失败:-1
recv和recvfrom
- 两者可以相互替换,知识recvfrom可以接收对端的地址信息,在udp中可以很方便的进行回复操作
僵尸进程
进程中的指令已经执行完成,但是进程PCB结构还没有回收。
即子进程先于父进程退出后,子进程的PCB需要其父进程释放,但是父进程并没有释放子进程的PCB,这样的子进程就称为僵尸进程。父进程未结束,子进程结束,但父进程没有处理子进程的退出状态(当子进程先于父进程结束,父进程没有获取子进程的退出码,子进程的PCB会保留一段时间等待父进程在PCB中获取退出码,,且该进程不可执行,此时子进程变成僵尸进程)。
直到父进程获取到子进程的退出状态,子进程的PCB才会被移除。采用信号的方式对僵尸进程进行忽略
signal(SIGCHLD, SIG_IGN)
捕捉SIGCHLD信号来避免僵尸进程
signal(SIGCHLD, handle_sigchld)
1
2
3
4
5
6void handle_sigchld(int sig)
{
//捕捉到SIGCHLD状态后调用wait等待子进程状态
//wait(NULL); //只等待一个信号就会返回
while(waitpid(-1, NULL, WNOHANG) > 0); //接收到信号后,循环等待子进程,直到没有子进程
}
TCP
TCP的十一种状态
还有一种状态为 CLOSING,其产生原因是双方同时关闭
SIGPIPE信号
收到FIN信号的套接字还可以接收数据,接收到FIN只代表对方不在发送数据
在收到RST段之后,即在接收到FIN之后,再调用wirte就会产生SIGPIPE信号,对于这个信号,一般忽略处理
signal(SIGPIPE, GIS_IGN)
五种IO模型
阻塞IO
非阻塞IO
将套接口设置为非阻塞模式
fcntl(fd, F_SETFL, flag|0_NCNBLOCK)
I/O复用(select和poll)
select
信号驱动I/O
异步I/O:效率最高
I/O超时设置方法
alarm
- 使用alarm函数,超时之后会发送一个SIGALRM信号,捕捉信号就可以将阻塞函数打断,进行后续处理
套接字选项
SO_SNDTIMEO
SO_RCVTIMEO
setsockopt(sock, SOL_SOCKET, SO_RECVTIMEO, 5)
设置接收时间为5,超时后会产生错误代码 errno = EWOULDBLOCK
Select
select模型
1
2
3
4
5
6int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
void FD_CLR(int fd, fd_set *set); //将fd从set中一处
int FD_ISSET(int fd, fd_set *set); //判断fd是否再set中
void FD_SET(int fd, fd_set *set); //将fd放入set中
void FD_ZERO(fd_set *set); //将集合清空用select来管理多个I/O,一旦其中一个或多个I/O检测到感兴趣时间,select返回,返回值为事件个数和哪些事件,然后遍历并处理这些事件。
nfds:读、写、异常集合中的文件描述符的最大值+1(select为遍历所有事件来检测)
readfds:可读fd集合
writefds:可写fd集合
exceptfds:异常fd集合
timeout:超时时间结构体
1
2
3
4struct timeval {
long tv_sec; /* seconds */
long tv_usec; /* microseconds */
};
select模型处理标准输入和sock I/O
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61void echo_cli(int sock)
{
//初始化ready集合
fd_set rset;
FD_ZERO(&rset);
int nready;
int maxfd;
int fd_stdin = fileno(stdin);
maxfd = fd_stdin > sock ? fd_stdin : sock;
char sendbuf[1204] = {0};
char recvbuf[1024] = {0};
while(1)
{
//将标准输入I/O和sock加入集合
FD_SET(fd_stdin, &rset);
FD_SET(sock, &rset);
//调用select,获得ready集合
nready = select(maxfd + 1, &rset, NULL, NULL, NULL);
//错误和未有ready可以处理
if(nready == -1)
{
ERR_EXIT("select");
}
else if(nready == 0)
{
continue;
}
//sock I/O 有数据处理
if(FD_ISSET(sock, &rset))
{
int ret = readline(sock, recvbuf, sizeof(recvbuf));
if (ret == -1)
{
ERR_EXIT("read");
}
else if (ret == 0)
{
printf("server closed.\n");
break;
}
fputs(recvbuf, stdout);
memset(recvbuf, 0, sizeof(recvbuf));
}
//标准输入有数据处理
if(FD_ISSET(fd_stdin, &rset))
{
if(fgets(sendbuf, sizeof(sendbuf), stdin) == NULL)
break;
written(sock, sendbuf, strlen(sendbuf));
memset(sendbuf, 0, sizeof(sendbuf));
}
}
close(sock);
}select模型 读、写、异常事件发生的条件
- 可读
- 套接口缓冲区有数据可读
- 连接的读一半关闭,即接收到FIN,读操作返回0
- 监听套接口,已完成链接队列不为空
- 套接口上发生了一个错误待处理,错误可以通过getsockopt指定SO_ERROR选项来处理
- 可写
- 套接口发送缓冲区有数据
- 连接的写一半关闭,即接收到RST,再次调用write
- 套接口上发生了一个错误待处理,错误可以通过getsockopt指定SO_ERROR选项来处理
- 异常
- 套接口存在带外数据(out-of-band)
- 可读
使用select实现单进程处理服务器并发。
即使用select来监听所有套接口是否有事件待处理,当套接口存在事件待处理才调用相关函数(accept、read等),这样就不会产生阻塞。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99struct sockaddr_in peeraddr;
socklen_t peerlen;
int conn;
int i;
int maxidx = 0; //当前client的最大下标
int client[FD_SETSIZE]; //保存所有的已连接套接口,即客户端
for(i = 0; i < FD_SETSIZE; i++) //初始化
{
client[i] = -1;
}
int nready; //检测到的需要处理的个数
int maxfd = listenfd; //最大描述值
fd_set rset; //接收返回事件集合
fd_set allset; //总事件集合
FD_ZERO(&rset);
FD_ZERO(&allset);
FD_SET(listenfd, &allset);
while(1)
{
//因为selcet会改变rset的值,所以使用allset记录所有的需要监听的事件
rset = allset;
nready = select(maxfd + 1, &rset, NULL, NULL, NULL);
if(nready == -1)
{
if(errno == EINTR)
continue;
ERR_EXIT("select");
}
//未检测到ready事件(即select超时)
if(nready == 0)
continue;
//若监听套接口存在事件处理,即有客户端发起连接
if(FD_ISSET(listenfd, &rset))
{
peerlen = sizeof(peeraddr);
//获得连接套接口
if ((conn = accept(listenfd, (struct sockaddr *)&peeraddr, &peerlen)) < 0)
ERR_EXIT("accept");
//遍历客户端数组,保存至空位,并更新最大下标
for(i = 0; i < FD_SETSIZE; i++)
{
if(client[i] == -1)
{
client[i] = conn;
maxidx = i > maxidx ? i : maxidx;
break;
}
}
//客户端数组满了,输出错误信息
if(i == FD_SETSIZE)
{
fprintf(stderr, "too many clients\n");
exit(EXIT_FAILURE);
}
//将套接口加入待处理集合中
FD_SET(conn, &allset);
//更新最大描述符
maxfd = conn > maxfd ? conn : maxfd;
printf("ip = %s port = %d\n", inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
//更新待处理事件数
if(--nready <= 0)
continue;
}
//遍历客户端数组,处理所有接收数据
for(i = 0; i <= maxidx; i++)
{
conn = client[i];
if(conn == -1)
continue;
if(FD_ISSET(conn, &rset))
{
char recvbuf[1024] = {0};
int ret = readline(conn, recvbuf, 1024);
if(ret == -1)
{
ERR_EXIT("client readline");
}
if(ret == 0)
{
printf("client %d close\n", conn);
FD_CLR(conn, &allset);
client[i] = -1;
close(conn);
}
fputs(recvbuf, stdout);
written(conn, recvbuf, strlen(recvbuf));
if(--nready <= 0)
break;
}
}
}select 实现超时
read_timeout
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36//检测超时,不含读操作
//未超时返回0, 失败返回-1,超时返回-1 && errno = ETIMEDOUT
int read_timeout(int fd, unsigned int wait_seconds)
{
int ret = 0;
if(wait_seconds > 0)
{
//初始化
fd_set read_fdset;
struct timeval timeout;
FD_ZERO(&read_fdset);
FD_SET(fd, &read_fdset);
timeout.tv_sec = wait_seconds;
timeout.tv_usec = 0;
//检测读集合中是否有事件
do
{
ret = select(fd + 1, read_fdset, NULL, NULL, timeout);
}
while(ret < 0 && errno == EINTR);
//超时
if(ret == 0)
{
ret = -1;
errno = ETIMEDOUT;
}
//fd产生可读事件
else if(ret == -1)
ret = 0;
}
return ret;
}
- write_timeout
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
int write_timeout(int fd, unsigned int wait_seconds)
{
int ret = 0;
if(wait_seconds > 0)
{
fd_set write_fdset;
struct timeval timeout;
FD_ZERO(&write_fdset);
FD_SET(fd, &write_fdset);
timeout.tv_sec = wait_seconds;
timeout.tv_usec = 0;
do
{
ret = select(fd + 1, NULL, &write_fdset, NULL, &timeout);
}
while(ret < 0 && errno == EINTR);
if(ret == 0)
{
ret = -1;
errno = ETIMEDOUT;
}
else if(ret == 1)
ret = 0;
}
return ret;
}
- accept_timeout
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
//存在超时检测的连接功能
//wait_seconds 为0时,为正常的阻塞accpet
//成功返回连接套接字, 超时返回-1 && errno == ETIMEOUT, 否则连接失败
int accept_timeout(int fd, struct sockaddr_in *addr, unsigned int wait_seconds)
{
int ret = 0;
socklen_t addrlen = sizeof(struct sockaddr_in);
if(wait_seconds > 0)
{
fd_set accept_fdset;
struct timeval timeout;
FD_ZERO(&accept_fdset);
FD_SET(fd, &accept_fdset);
timeout.tv_sec = wait_seconds;
timeout.tv_usec = 0;
do
{
ret = select(fd + 1, &accept_fdset, NULL, NULL, &timeout);
}
while(ret < 0 && errno == EINTR);
if(ret == 0)
{
errno = ETIMEDOUT;
return -1;
}
else if(ret == -1)
return -1;
if(addr != NULL)
{
ret = accept(fd, (struct sockaddr*)addr, &addrlen);
}
else
{
ret = accept(fd, NULL, NULL);
}
if(ret == -1)
ERR_EXIT("accept");
}
return ret;
}
- connect_timeout
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
//将fd设置为非阻塞模式
//1 for active
//0 for enable
void set_nonblock(int fd, int set)
{
int ret;
//获取之前的标记
int flags = fcntl(fd, F_GETFL);
if(flags == -1)
{
ERR_EXIT("fcntl");
}
//根据参数设置更改标记
if(set)
flags |= O_NONBLOCK;
else
flags &= ~O_NONBLOCK;
//设置
ret = fcntl(fd, F_SETFL, flags);
if(ret == -1)
{
ERR_EXIT("fcntl");
}
}
//连接超时
//成功返回0, 超时返回-1 && errno == ETIMEOUT, 否则连接失败
int connect_timeout(int fd, struct sockaddr_in *addr, unsigned int wait_seconds)
{
int ret;
socklen_t addrlen = sizeof(struct sockaddr_in);
//存在超时事件,将fd设置为非阻塞模式
if(wait_seconds > 0)
{
set_nonblock(fd, 1);
}
ret = connect(fd, (struct sockaddr*) addr, addrlen);
//连接正在处理中
if(ret < 0 && errno == EINPROGRESS)
{
fd_set connect_fdset;
struct timeval timeout;
FD_ZERO(&connect_fdset);
FD_SET(fd, &connect_fdset);
timeout.tv_sec = wait_seconds;
timeout.tv_usec = 0;
do
{
//建立连接,套接字即为可写
ret = select(fd + 1, NULL, &connect_fdset, NULL, &timeout);
}
while(ret < 0 && errno == EINTR);
if(ret == 0)
{
ret = -1;
errno = ETIMEDOUT;
}
else if(ret < 0)
{
return -1;
}
else if(ret == 1)
{
//ret等于0有两种情况
//一种是简历连接成功,一种是套接字产生错误
//所以需要调用getsockopt来获取
int err;
socklen_t socklen = sizeof(err);
int sockoptret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &socklen);
if(sockoptret == -1)
{
return -1;
}
if(err == 0)
{
ret = 0;
}
else
{
errno = err;
ret = -1;
}
}
}
//改回阻塞模式
if(wait_seconds > 0)
{
set_nonblock(fd, 0);
}
return ret;
}
select限制
用select实现并发服务器,最大并发数受以下限制
一个进程能打开的最大文件描述符限制,可以通过调整内核参数更改。最大值和内存大小有关
int getrlimit(int resource, struct rlimit *rlim); int setrlimit(int resource, const struct rlimit *rlim);
指令更改
ulimit -n number
select中的fd_set集合容量的限制(FD_SIZE),需要重新编译内核才能修改
Poll
poll函数
1
2
3
4
5
6
7int poll(struct pollfd *fds, nfds_t nfds, int timeout);
struct pollfd {
int fd; /* file descriptor */
short events; /* requested events */
short revents; /* returned events */
};- 参数列表
- fds :pollfd数组
- nfds:数组中数据的个数
- timeout:超时事件,-1为永远等待
- events的类型定义在头文件 poll.h 中
- POLLIN / POLLRDNORM :可读
- POLLPRI:特殊事件可读(exceptional condition)(存在带外数据等)
- POLLOUT / POLLWANORM:可写
- POLLRDHUP:流套接字对等端关闭连接
- POLLERR:错误
- POLLHUP:关闭
- POLLNVAL:非法请求
- POLLRDBAND / POLLWRBAND:优先数据可读 / 写
- 参数列表
poll的使用与select基本一致
区别是poll不需要额外的集合来保存需要检测的事件和返回的文件描述符,所有的相关数据都保存在响应的结构体中。
且
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77int nready;
client[0].fd = listenfd;
client[0].events = POLLIN;
while(1)
{
nready = poll(client, maxidx + 1, -1);
if(nready == -1)
{
if(errno == EINTR)
continue;
ERR_EXIT("poll");
}
if(nready == 0)
continue;
//
if(client[0].revents & POLLIN)
{
peerlen = sizeof(peeraddr);
if ((conn = accept(listenfd, (struct sockaddr *)&peeraddr, &peerlen)) < 0)
ERR_EXIT("accept");
for(i = 0; i < POLLSIZE; i++)
{
if(client[i].fd == -1)
{
client[i].fd = conn;
client[i].events = POLLIN;
maxidx = i > maxidx ? i : maxidx;
break;
}
}
if(i == POLLSIZE)
{
fprintf(stderr, "too many clients\n");
exit(EXIT_FAILURE);
}
printf("ip = %s port = %d\n", inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
if(--nready <= 0)
continue;
}
//
for(i = 1; i < POLLSIZE; i++)
{
conn = client[i].fd;
if(conn == -1)
continue;
if(client[i].revents & POLLIN)
{
char recvbuf[1024] = {0};
int ret = readline(conn, recvbuf, 1024);
if(ret == -1)
{
ERR_EXIT("client readline");
}
if(ret == 0)
{
printf("client %d close\n", conn);
client[i].fd = -1;
close(conn);
}
fputs(recvbuf, stdout);
// sleep(3);
written(conn, recvbuf, strlen(recvbuf));
if(--nready <= 0)
break;
}
}
}
epoll
epoll的使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int epoll_create(int size);
int epoll_create1(int flags);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
int epoll_pwait(int epfd, struct epoll_event *events, int maxevents, int timeout, const sigset_t *sigmask);
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};- epoll_event.events事件定义:与poll大部分一致
- EPOLLET:设置关联文件描述符的边缘触发行为。epoll的默认行为是级别触发的。有关边缘和级别触发事件分布体系结构的更多详细信息
- epoll_create 和 epoll_create1。底层 数据结构更改,现在的epoll不使用哈希表,而是用红黑树,所以不需要size的参数
- flags
- EPOLL_CLOEXEC:当进程执行exec后 ,文件描述符会被关闭(close-on-exec)
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
- event:文件描述符 fd 所关心的事件
- op:存在三种方式
- EPOLL_CTL_ADD:向epoll添加文件描述符
- EPOLL_CTL_MOD:修改文件描述符的event
- EPOLL_CTL_DEL:删除epoll中的文件描述符
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
- events:返回值,包含可用事件
- maxevents:最大容量
- timeout:超时时间,-1 表示不超时
- epoll_event.events事件定义:与poll大部分一致
epoll的两种模式
- EPOLLLT : 电平触发
- 完全靠kernel epoll驱动,应用程序只需要处理从epoll_wait返回的fds,这些fds我们认为它们处于就绪状态。
- EPOLLET : 边沿触发
- 此模式下,系统仅仅通知应用程序哪些fds变成了就绪状态,一旦fd变成就绪状态,epoll将不再关
注这个fd的任何状态信息,(从epoll队列移除)直到应用程序通过读写操作触发EAGAIN状态,
epoll认为这个fd又变为空闲状态,那么epoll又重新关注这个fd的状态变化(重新加入epoll队列) - 随着epoll_wait的返回,队列中的fds是在减少的,所以在大并发的系统中,EPOLLET更有优势。但是对程序员的要求也更高。
- 此模式下,系统仅仅通知应用程序哪些fds变成了就绪状态,一旦fd变成就绪状态,epoll将不再关
- EPOLLLT : 电平触发
epoll使用案例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100//定义数组保存客户端文件描述符
vector<int> clients;
//创建epoll实例
int epollfd;
epollfd = epoll_create1(EPOLL_CLOEXEC);
//将连接套接字加入epoll中
struct epoll_event event;
event.data.fd = listenfd;
event.events = EPOLLIN | EPOLLET;
epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &event);
//保存所有ready的列表
vector<struct epoll_event> events(16);
int i;
int conn;
int nready;
int count = 0;
socklen_t peerlen;
struct sockaddr_in peeraddr;
while(1)
{
//使用epoll_wait返回所有ready文件描述符,保存在events数组中
//static_cast为强制类型转换
nready = epoll_wait(epollfd, &*events.begin(), static_cast<int>(events.size()), -1);
if(nready == -1)
{
if(errno == EINTR)
continue;
ERR_EXIT("epoll_wait");
}
if(nready == 0)
continue;
//数组扩容
if((size_t)nready == events.size())
{
events.resize(events.size() * 2);
}
//遍历events数组中的所有ready文件描述符
for(i = 0; i < nready; i++)
{
//是监听套接口
if(events[i].data.fd == listenfd)
{
peerlen = sizeof(peeraddr);
if ((conn = accept(listenfd, (struct sockaddr *)&peeraddr, &peerlen)) < 0)
ERR_EXIT("accept");
printf("ip = %s port = %d\n", inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
printf("count = %d\n", ++count);
clients.push_back(conn);
//设置该套接口为非阻塞模式
set_nonblock(conn, 1);
//将新连接的套接口放入epoll中
event.data.fd = conn;
event.events = EPOLLIN | EPOLLET;
epoll_ctl(epollfd, EPOLL_CTL_ADD, conn, &event);
}
//非监听套接口
else if(events[i].events & EPOLLIN)
{
conn = events[i].data.fd;
if(conn == -1)
continue;
//存在可读事件
if(events[i].events & POLLIN)
{
char recvbuf[1024] = {0};
int ret = readline(conn, recvbuf, 1024);
if(ret == -1)
{
ERR_EXIT("client readline");
}
//对方断开连接
if(ret == 0)
{
printf("client %d close\n", conn);
close(conn);
//条用ctl将该文件描述符从epoll中删除,并且从clients数组中删除
event = events[i];
epoll_ctl(epollfd, EPOLL_CTL_DEL, conn, &event);
clients.erase(remove(clients.begin(), clients.end(), conn), clients.end());
}
//回复
fputs(recvbuf, stdout);
written(conn, recvbuf, strlen(recvbuf));
}
}
}
}
epoll和poll、select的区别
- epoll不会随着监听fd数目的增长而降低效率
- select、poll的实现是采用遍历查询的方式处理;epoll是基于回调来实现的,当fd有期望的事件发生就通过回调函数将其加入epoll的就绪队列中。
- 内核/用户空间内存拷贝问题,如何让内核把fd消息通知给用户空间呢?在这个问题上select/poll采取了内存拷贝方法。而epoll采用了共享内存的方式。
- epoll的epoll_event结构体会保存事件的相关信息,不用遍历所有的fd集合查询
UDP 模型
简单的客户端和服务端实现
服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54void echo_srv(int sock)
{
char recvbuf[1024] = {0};
struct sockaddr_in peeraddr;
socklen_t peerlen;
int n;
while(1)
{
peerlen = sizeof(peeraddr);
memset(recvbuf, 0, sizeof(recvbuf));
//接收数据,并包含地址信息
n = recvfrom(sock, recvbuf, sizeof(recvbuf), 0, (struct sockaddr*)&peeraddr, &peerlen);
if(n == -1)
{
if(errno == EINTR)
continue;
ERR_EXIT("reacvfrom");
}
//通过接收的地址信息将消息发回
else if(n > 0)
{
fputs(recvbuf, stdout);
sendto(sock, recvbuf, n, 0, (struct sockaddr*)&peeraddr, peerlen);
}
//n = 0 不代表连接关闭
}
close(sock);
}
int main(void)
{
int sock;
if((sock = socket(PF_INET, SOCK_DGRAM, 0)) < 0)
{
ERR_EXIT("socket");
}
//初始化端口和地址
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(6666);
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
//绑定地址
if (bind(sock, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0)
ERR_EXIT("bind");
echo_srv(sock);
return 0;
}
客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42void echo_cli(int sock)
{
struct sockaddr_in servaddr;
//初始化服务器的地址
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(6666);
servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");
char sendbuf[1024] = {0};
char recvbuf[1024] = {0};
//获取输入
while(fgets(sendbuf, sizeof(sendbuf), stdin) != NULL)
{
//发送给服务端
sendto(sock, sendbuf, strlen(sendbuf), 0, (struct sockaddr*)&servaddr, sizeof(servaddr));
//接收数据
recvfrom(sock, recvbuf, sizeof(recvbuf), 0, NULL, NULL);
fputs(recvbuf, stdout);
memset(recvbuf, 0, sizeof(recvbuf));
memset(sendbuf, 0, sizeof(sendbuf));
}
close(sock);
}
//udp 客户端不需要对端口进行绑定
int main(void)
{
int sock;
if((sock = socket(PF_INET, SOCK_DGRAM, 0)) < 0)
{
ERR_EXIT("socket");
}
echo_cli(sock);
return 0;
}udp发送失败回得到一个异步错误(icmp错误),在接收时才能知道
通过connect解决:使套接字连接到远程地址,此时调用sendto不需要远程地址,即可以直接调用send