
Linux网络编程--Udp套接字+实战 (万字详解,超详细!!)
----------------------------------------------------分割线-----------------------------------------------------2025.2.10。如:2个人都是用网络进行通信,或者都是用书信交流,不可能存在一方使用书信,另一方使用手机这种“荒谬”的事情发生;套接字创建成功后,需要绑定自己的地址信息,使用系统调
目录
套接字协议:
协议(protocol):
如果2个距离很远的人想要进行交流,首先要决定的就是"我们该用什么工具来进行通话?" :写信?手机?还是电脑?首先明确一点:能成功通信的双方一定是达成了某种共识的!如:2个人都是用网络进行通信,或者都是用书信交流,不可能存在一方使用书信,另一方使用手机这种“荒谬”的事情发生;换句话说,协议就是为了完成数据交换而定好的约定!
创建套接字(Create Socket):
在Linux中一切皆文件,使用系统调用socket函数创建返回一个套接字文件描述符(如果成功)
创建失败返回-1,错误码被设置。
函数原型:
使用socket函数需要包含对应头文件;
参数详解:
- domian:套接字中使用的协议族,什么是协议族?说人话就是:番茄炒蛋和番茄炒米(参考卢老爷名言)都是属于用番茄做的菜一类,在套接字中,番茄就是协议族,番茄炒蛋就是协议族的一种类型,在套接字中,协议族常用的就2种:AF_INET和AF_INET6,分别对应ipv4和ipv6
- type:套接字遵守的协议:udp还是tcp,设置为SOCK_DGRAM对应遵守UDP协议,设置为SOCK_STREAM对应遵守TCP协议
- protocol:设置套接字的类型,非阻塞还是阻塞,是否能被子进程继承,设置为SOCK_NONBLOCK表示设置描述符非阻塞,设置SOCK_CLOEXEC表示不能被子进程继承
绑定服务器地址
套接字创建成功后,需要绑定自己的地址信息,使用系统调用的bind函数
函数原型:
参数详解:
- sockfd:之前使用的创建套接字返回的描述符fd
- addr: 需要手动填充的服务器的地址信息,sockaddr本质上c语言的“拟基类”,我们要填充的其实不是sockaddr,而是它的"子类":struct sockaddr_in,结构体原型:
sin_addr中还有一个成员:s_addr,用于填充要绑定的ip,注意!在云服务器上只能绑定所有地址,即"0.0.0.0"; sin_family:地址族;sin_port:端口号,注意要从本地序列(uint16_t)转换成网络序列,可以使用函数::htons进行转换;sin_zero一般都填充为0即可,所以可以在刚创建出sockaddr_in时使用memset将结构体清零
- addrlen:第二个参数的大小,使用sizeof计算即可
开始通信
UDP套接字较为简单,不需要其他操作,只要bind成功就可以直接进行通信
使用recvfron接收信息,sendto发送信息
Udp服务器设计--V1
目标:实现一个简单的Echo服务器,客户端发送什么info就返回什么样的info
先引入日志Log.hpp:
/*Log.hpp*/
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
#include <ctime>
#include <cstdarg>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <fstream>
#include <mutex>
#include <pthread.h>
#define _SCREEN_TYPE_ 1
#define _FILE_TYPE 2
namespace ns_log
{
const std::string DEFAULTSTR = "Log.txt";
enum
{
DEBUG = 1,
INFO,
WARNING,
ERROR,
FATAL
};
struct Logmessage
{
std::string _level; // 等级
pid_t _pid;
std::string _filename; // 文件名
int _filenumber; // 行号
std::string _curr_time; // 获取日志信息出现的时间
std::string _logmessage; // 信息
};
std::string LevelToString(int level)
{
switch (level)
{
case DEBUG:
return "DEBUG";
break;
case INFO:
return "INFO";
break;
case WARNING:
return "WARNING";
break;
case ERROR:
return "ERROR";
break;
case FATAL:
return "FATAL";
break;
default:
return "UNKNOW";
break;
}
}
std::string GetCurrTime()
{
time_t now_time = time(nullptr);
struct tm *curr_time = localtime(&now_time);
// 转换为string格式
char buf[128];
snprintf(buf, sizeof(buf), "%d-%02d-%02d %02d:%02d:%02d",
curr_time->tm_year + 1900, curr_time->tm_mon + 1, curr_time->tm_mday, curr_time->tm_hour, curr_time->tm_min, curr_time->tm_sec);
return buf;
}
class Log
{
void LogMsgToScreen(const Logmessage &logmsg)
{
printf("[%s][%d][%s][%d][%s] %s",
logmsg._level.c_str(), logmsg._pid, logmsg._filename.c_str(), logmsg._filenumber, logmsg._curr_time.c_str(), logmsg._logmessage.c_str());
}
void LogMsgToFile(const Logmessage &logmsg)
{
char buf_info[2048] = "\0";
snprintf(buf_info, sizeof(buf_info), "[%s][%d][%s][%d][%s] %s",
logmsg._level.c_str(), logmsg._pid, logmsg._filename.c_str(), logmsg._filenumber, logmsg._curr_time.c_str(), logmsg._logmessage.c_str());
/*--系统调用版本--*/
// int fd = open(_logfile.c_str(),O_CREAT | O_WRONLY | O_APPEND,0666);
// if(fd < 0)
// {
// perror("open");
// return;
// }
// write(fd,buf_info,sizeof(buf_info));
/*--c++提供的fstream--*/
std::ofstream out;
out.open(_logfile, std::ios::out | std::ios::app | std::ios::binary);
if (!out.is_open())
return;
out.write(buf_info, sizeof(buf_info));
out.close();
}
void FlushLogMsg(int level, const Logmessage &logmsg)
{
// c++11的锁
// _mutex.lock();
// RAII类型的锁
std::unique_lock<std::mutex> lock(_mtx);
if (_isopen && level == DEBUG)
return;
switch (_type)
{
case _SCREEN_TYPE_:
LogMsgToScreen(logmsg);
break;
case _FILE_TYPE:
LogMsgToFile(logmsg);
break;
}
}
public:
Log(const std::string &logfile = DEFAULTSTR) : _logfile(logfile), _type(_SCREEN_TYPE_)
{
}
void ModPrintFormat(int type)
{
std::unique_lock<std::mutex> lock(_mtx);
_type = type;
}
void LogMessage(int level, std::string filename, int filenumber, const char *format, ...) // 注意-->可变函数参数
{
Logmessage msg;
msg._level = LevelToString(level);
msg._filename = filename;
msg._pid = getpid();
msg._filenumber = filenumber;
msg._curr_time = GetCurrTime();
// 注意:取出可变参数的固定写法
va_list _ap; // 创建变量,本质是一个指针
va_start(_ap, format); // 将参数列表中离...最近的确定的参数传入
char log_info[512];
vsnprintf(log_info, sizeof(log_info), format, _ap);
va_end(_ap); // 销毁_ap
msg._logmessage = log_info;
FlushLogMsg(level, msg);
}
~Log()
{
}
void EnableFiltration(bool flag)
{
_isopen = flag;
}
private:
int _type;
std::string _logfile;
std::mutex _mtx;
bool _isopen = false;
};
Log lg;
// 打开过滤器
#define EnabelFILTRATION() \
do \
{ \
lg.EnableFiltration(true); \
} while (0)
// 关闭过滤器
#define ClOSEFILTRATION \
do \
{ \
lg.EnableFiltration(false); \
} while (0)
#define LOG(level, format, ...) \
do \
{ \
lg.LogMessage(level, __FILE__, __LINE__, format, ##__VA_ARGS__); \
} while (0)
#define LOGTOSCREEN(level, format, ...) \
do \
{ \
lg.ModPrintFormat(_SCREEN_TYPE_); \
lg.LogMessage(level, __FILE__, __LINE__, format, ##__VA_ARGS__); \
} while (0)
#define LOGTOFILE(level) \
do \
{ \
lg.ModPrintFormat(_FILE_TYPE); \
} while (0)
}
引入日志后,正式开始编写V1版本echo服务器:
V1--Udp_Server:
根据上文的udp套接字接口介绍,下面就直接上代码:
#pragma once
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstring>
#include <functional>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "Log.hpp"
using namespace ns_log;
#define MAX_BUFFER_SIZE 4096
// Udp--无连接|不可靠|高性能|广播
// 云服务器bind--->0.0.0.0
/*这是一个echo服务器*/
class UdpServer
{
private:
int _sockfd;
uint16_t _port;
bool _isrunning;
public:
UdpServer(uint16_t port) : _port(port), _isrunning(false)
{
_sockfd = ::socket(AF_INET, SOCK_DGRAM, 0);
if (_sockfd < 0)
{
LOG(FATAL, "create socketfd failed!\n");
abort();
}
LOG(DEBUG, "---create sockfd succsee,sockfd = %d---\n", _sockfd);
// bind
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = ::htons(_port); // 本地序列转网络序列
// bind任意ip
addr.sin_addr.s_addr = INADDR_ANY;//INADDR_ANY--表示bind任意地址"0.0.0.0"
int res = ::bind(_sockfd, (struct sockaddr *)&addr, sizeof(addr));
if (res < 0)
{
LOG(FATAL, "server bind error!\n");
abort();
}
LOG(DEBUG, "server bind success!\n");
}
void Start()
{
_isrunning = true;
while (_isrunning)
{
char buff[MAX_BUFFER_SIZE];
// 接收信息
struct sockaddr_in peer;
memset(&peer, 0, sizeof(peer));
socklen_t len = sizeof(peer);
ssize_t n = ::recvfrom(_sockfd, buff, sizeof(buff) - 1, 0, (struct sockaddr *)&peer, &len);
if (n > 0)
{
buff[n] = 0;
LOG(INFO, "client info#: %s\n", buff);
// 发送
std::string info = "[server][info]# ";
info += buff;
ssize_t n = ::sendto(_sockfd, info.c_str(), info.size(), 0,(struct sockaddr *)&peer, len);
}
else
{
if(n == 0) continue;
if(n < 0)
{
LOG(ERROR,"recv error!\n");
break;
}
}
}
}
void Close()
{
if(_sockfd > 0) ::close(_sockfd);
}
void Stop()
{
_isrunning = false;
}
~UdpServer()
{
}
};
编写客户端代码:client.cc
对于客户端来说,需要bind要连接的服务器的ip地址和端口号,不能bind任意地址了
实现:
#include "Udp_Server.hpp"
bool Usage(int argc, char *argv[])
{
if (argc != 3)
{
LOG(ERROR, "Usage:<./exe> <ip> <port>\n");
return false;
}
return true;
}
int main(int argc, char *argv[])
{
if (!Usage(argc, argv))
return -1;
std::string ip = argv[1];
uint16_t port = atoi(argv[2]);
int sockfd = ::socket(AF_INET, SOCK_DGRAM, 0);
if (sockfd < 0)
{
LOG(FATAL, "Create socket error!\n");
return -2;
}
struct sockaddr_in server;
// 将结构体清零
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
::inet_pton(AF_INET, ip.c_str(), &server.sin_addr.s_addr);
server.sin_port = ::htons(port);
while(true)
{
std::string buffer;
std::getline(std::cin,buffer);
ssize_t n = ::sendto(sockfd,buffer.c_str(),buffer.size(),0,(struct sockaddr*)&server,sizeof(server));
//接收信息
char recvbuff[1024];
struct sockaddr_in client;
memset(&client,0,sizeof(client));
socklen_t len = sizeof(client);
n = ::recvfrom(sockfd,recvbuff,sizeof(recvbuff)-1,0,(struct sockaddr*)&client,&len);
if(n > 0)
{
recvbuff[n] = 0;
LOG(INFO,"%s\n",recvbuff);
continue;
}
else
{
if(n == 0) continue;
if(n < 0)
{
LOG(ERROR,"client recv error!\n");
break;
}
}
}
return 0;
}
编写serevr.cc
#include"Udp_Server.hpp"
#include<memory>
bool Usage(int argc, char *argv[])
{
if (argc != 2)
{
LOG(ERROR, "Usage:<./exe> <port>\n");
return false;
}
return true;
}
int main(int argc, char *argv[])
{
if (!Usage(argc, argv))
return -1;
uint16_t port = atoi(argv[1]);
std::unique_ptr<UdpServer> server = std::make_unique<UdpServer>(port);
server->Start();
server->Stop();
server->Close();
}
编写Makefile:
.PHONY:ALL
ALL:server client
server:server.cc
g++ -o $@ $^ -std=c++14
client:client.cc
g++ -o $@ $^ -std=c++14
.PHONY:clean
clean:
rm -f server client
make编译即可;
Udp服务器设计--V2 引入线程池 --在线群聊服务器实现
设计图:
Udp_Server模块只负责进行IO!
Udp_Server.hpp:
#pragma once
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstring>
#include <functional>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "Log.hpp"
#include "Inetaddr.hpp"
#include "Route.hpp"
using namespace ns_log;
#define MAX_BUFFER_SIZE 4096
class nocopy
{
public:
nocopy(){}
~nocopy(){}
nocopy(const nocopy&) = delete;
const nocopy& operator=(const nocopy&) = delete;
};
using server_t = std::function<void(int sockfd, const std::string &info, InetAddr& addr)>;
class UdpServer : public nocopy
{
private:
int _sockfd;
uint16_t _port;
bool _isrunning;
server_t _route_call_back;
public:
UdpServer(uint16_t port,const server_t& task) : _port(port), _isrunning(false),_route_call_back(task)
{
_sockfd = ::socket(AF_INET, SOCK_DGRAM, 0);
if (_sockfd < 0)
{
LOG(FATAL, "---create socketfd failed !----\n");
abort();
}
LOG(DEBUG, "---create sockfd succsee,sockfd = %d---\n", _sockfd);
// bind
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = ::htons(_port); // 本地序列转网络序列
// bind任意ip
addr.sin_addr.s_addr = INADDR_ANY;//INADDR_ANY--表示bind任意地址"0.0.0.0"
int res = ::bind(_sockfd, (struct sockaddr *)&addr, sizeof(addr));
if (res < 0)
{
LOG(FATAL, "server bind error!\n");
abort();
}
LOG(DEBUG, "server bind success!\n");
}
void Start()
{
_isrunning = true;
while (_isrunning)
{
char buff[MAX_BUFFER_SIZE];
// 接收信息
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
ssize_t n = ::recvfrom(_sockfd, buff, sizeof(buff) - 1, 0, (struct sockaddr *)&peer, &len);
if (n > 0)
{
buff[n] = 0;
InetAddr addr(peer);
std::cout<<addr.GetInetAddrStr()<<"#: "<<buff<<std::endl;
//回调进行处理
_route_call_back(_sockfd,buff,addr);
}
else
{
LOG(ERROR,"recv error!\n");
}
}
}
void Close()
{
if(_sockfd > 0) ::close(_sockfd);
}
void Stop()
{
_isrunning = false;
}
~UdpServer()
{
}
};
引入线程池(同步线程池)--并使用双重检查锁定模式构造单例线程池对象
ThreadPool.hpp:
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <vector>
#include <mutex>
#include <queue>
#include <thread>
#include <memory>
#include <condition_variable>
#include <unistd.h>
#include <pthread.h>
#include "Log.hpp"
const int default_thread_num = 10;
using namespace ns_log;
template <class T>
class ThreadPool
{
private:
// 任务队列
std::queue<T> _task_queue;
// 锁
std::mutex _mtx;
// 条件变量
std::condition_variable _con;
// 管理线程的结构
std::vector<std::thread> _threads;
bool _isrunning;
// 睡眠的线程数量
int _sleep_threads;
int _threadsNums;
static ThreadPool<T>* _pool;
static std::mutex _mutex;
private:
void HandlerTask()
{
while (true)
{
std::unique_lock<std::mutex> lock(_mtx);
while (_task_queue.empty() && _isrunning)
{
_con.wait(lock);
// _sleep_threads--;
break;
}
if ((!_isrunning) && _task_queue.empty())
{
lock.unlock();
LOG(DEBUG, "thread pool is quit!\n");
break;
}
T task = _task_queue.front();
_task_queue.pop();
lock.unlock();
task();
}
}
public:
static ThreadPool<T>* GetThreadPoolInstance(int threadnum = default_thread_num)
{
//双重检查锁定模式
if(_pool == nullptr)
{
std::unique_lock<std::mutex> lock(_mutex);
if(_pool == nullptr)
_pool = new ThreadPool<T>(threadnum);
}
return _pool;
}
ThreadPool(int threadnum)
: _isrunning(true),
_sleep_threads(threadnum),
_threadsNums(default_thread_num)
{
for (int i = 0; i < _threadsNums; i++)
{
_threads.emplace_back(std::bind(&ThreadPool::HandlerTask,this));
}
}
//向任务队列插入任务
void Push(const T &task)
{
std::unique_lock<std::mutex> lock(_mtx);
_task_queue.push(task);
LOG(DEBUG,"Load once Task!,队列中的任务数量:%d\n",_task_queue.size());
// 有任务立即通知消费者
_con.notify_one();
}
//停止线程池
void Stop()
{
std::unique_lock<std::mutex> lock(_mtx);
_isrunning = false;
_con.notify_all();
}
~ThreadPool()
{
for(auto& thread : _threads)
{
LOG(DEBUG,"thread join done!\n");
thread.join();
}
}
};
//类外初始化
template<class T>
ThreadPool<T>* ThreadPool<T>::_pool = nullptr;
template<class T>
std::mutex ThreadPool<T>::_mutex;
消息转发模块设计:Route.hpp:
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <functional>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <pthread.h>
#include "InetAddr.hpp"
#include "Log.hpp"
#include "ThreadPool.hpp"
using namespace ns_log;
using task_t = std::function<void()>;
class Route
{
private:
// 在线用户
std::vector<InetAddr> _online;
std::mutex _mtx;
public:
Route()
{
}
~Route()
{
}
// 添加用户模块
void CheckOnlineUser(InetAddr &who)
{
std::unique_lock<std::mutex> lock(_mtx);
for (auto &user : _online)
{
if (user == who)
{
return;
}
}
_online.push_back(who);
LOG(INFO, "用户[%s][%d]已上线!\n", who.IP().c_str(), who.Port());
}
// 移除下线用户模块
void DeleteOnlineUser(InetAddr &who)
{
std::unique_lock<std::mutex> lock(_mtx);
auto iter = _online.begin();
while (iter != _online.end())
{
if (*iter == who)
{
_online.erase(iter);
printf("用户[%s][%d]已下线!\n", who.IP().c_str(), who.Port());
break;
}
++iter;
}
}
void ForWard(int sockfd, const std::string info, InetAddr& addr)
{
std::unique_lock<std::mutex> lock(_mtx);
// 转发信息--观察者模式
std::string message = addr.GetInetAddrStr() + info;
for (auto &user : _online)
{
struct sockaddr_in peer = user.GetAddr();
// std::cout<<"message = "<<message<<"sockfd = "<<sockfd<<std::endl;
ssize_t n = ::sendto(sockfd, message.c_str(), message.size(), 0, (struct sockaddr *)&(peer), sizeof(peer));
if (n < 0)
LOG(ERROR, "send error!,address = %s\n", addr.GetInetAddrStr().c_str());
std::cout<<strerror(errno)<<std::endl;
}
}
void RouteHandler(int sockfd, const std::string &info, InetAddr &who)
{
CheckOnlineUser(who);
task_t task;
if (info == "Q" || info == "quit" || info == "q")
{
DeleteOnlineUser(who);
std::string str = who.GetInetAddrStr();
str += "已下线!\n";
task = std::bind(&Route::ForWard, this, sockfd, str, who);
}
else
{
task = std::bind(&Route::ForWard, this, sockfd, info, who);
}
ThreadPool<task_t>::GetThreadPoolInstance()->Push(task);
}
};
server.cc:
#include "ThreadPool.hpp"
#include "udp_server.hpp"
#include "Route.hpp"
#include <memory>
int main(int argc, char *argv[])
{
if (argc != 2)
{
LOG(INFO, "Usage: <./exe><port>\n");
return -1;
}
uint16_t port = atoi(argv[1]);
Route ro;
// 业务todo
std::unique_ptr<UdpServer> server = std::make_unique<UdpServer>(port,
std::bind(&Route::RouteHandler,
&ro,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3)
);
server->Start();
server->Close();
return 0;
}
client.cc:
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <functional>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "udp_server.hpp"
#include "Log.hpp"
#include "ThreadPool.hpp"
#include "Route.hpp"
int InitClient()
{
int sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (sockfd < 0)
{
LOG(FATAL, "create socket errno!\n");
exit(-1);
}
return sockfd;
}
void RecvMessage(int sockfd)
{
while (true)
{
struct sockaddr_in peer;
char buf[1024];
socklen_t len = sizeof(peer);
ssize_t n = ::recvfrom(sockfd, buf, sizeof(buf) - 1, 0, (struct sockaddr *)(&peer), &len);
if (n > 0)
{
buf[n] = 0;
InetAddr who(peer);
std::cout <<who.GetInetAddrStr()<<buf << std::endl;
}
else
{
std::cerr << "recvfrom error!\n"
<< std::endl;
break;
}
}
}
void SendMessage(int sockfd, std::string &ip, uint16_t port)
{
struct sockaddr_in src;
memset(&src, 0, sizeof(src));
src.sin_family = AF_INET;
src.sin_addr.s_addr = ::inet_addr(ip.c_str());
src.sin_port = ::htons(port);
std::string client_prev = "SendThread# ";
while (true)
{
std::string line;
std::cout << client_prev;
std::getline(std::cin, line);
// send message;
int n = sendto(sockfd, line.c_str(), line.size(), 0, (struct sockaddr *)(&src), sizeof(src));
if (n <= 0)
{
std::cerr << "sendthread send error! \n"<< std::endl;
break;
}
}
}
int main(int argc, char *argv[])
{
if (argc != 3)
{
std::cerr << "usage 3 nums_command-->[./name ip port]" << std::endl;
exit(-1);
}
int sockfd = InitClient();
std::string s_ip = argv[1];
uint16_t port = std::stoi(argv[2]);
// start threads
std::thread recever(std::bind(&RecvMessage, sockfd));
std::thread Sender(std::bind(&SendMessage, sockfd, s_ip, port));
recever.join();
Sender.join();
::close(sockfd);
return 0;
}
Makefile:
.PHONY:ALL
ALL:server client
server:server.cc
g++ -o $@ $^ -std=c++14 -lpthread -g
client:Client.cc
g++ -o $@ $^ -std=c++14 -lpthread -g
.PHONY:clean
clean:
rm -f server client
make编译即可
done
更多推荐
所有评论(0)