backToServiceQueue: 有无并发读写;读与读之间;写于写之间,读写之间;放的数据格式
最后需要考虑:
ostoolcli -module legacy -action exec -type script -contents ”{“appName”: “PacketForwardAgent”,“scriptName”: “bootstrap.sh”,“params”: [“gre_dialing”,“start”]}” > {_APP_LOG_DIR}/startOrStopLog.tmp >> {_APP_LOG_DIR}/startOrStopLog.tmp |python -c “import sys,json;print(json.load(sys.stdin)[0].get(“returnCode”))“;rm -f ${_APP_LOG_DIR}/startOrStopLog.tmp)
当前go进程是提权拉起,考虑下是否能用普通权限
要点:
- 发包不(特别)在意延时
- 收包很在意延时,因为收包要打上尽量准确的收包时间戳,如此才能统计准确的用时
因此,这里收到回包之后,直接放池子里,打上准确时间戳。然后起两个协程向处理回包,向java进程上报。
条件变量常用来解决线程相互唤醒的问题(适用与生产者-消费者模型),使用条件变量需要注意一些点:
配置文件监控
Reading and writing from/to a file at the same time is a bad idea.
一个进程读,一个进程写是会有问题的。因此,要保证java进程写完之后再去读。原方法是用一个循环(with sleep)不断调用 os.Stat
方法获取文件的 modtime
,据此决定要不要重新读取配置文件。
参考链接:https://meik2333.com/posts/linux-many-proc-write-file/
cpp中可以使用 stat (2) 系统调用来获取文件元数据,包括最后修改时间。参考链接:https://blog.csdn.net/weixin_38239856/article/details/77543055
Note
事实上,java进程写过一遍配置之后,不会再更改。因此只有go进程读到写入的那份配置即可。后续更改无需关心。
其他方案参考:
- http://bbs.chinaunix.net/archiver/tid-1248308.html
- atomic_queue: https://github.com/max0x7ba/atomic_queue
无锁队列实现参考:
- https://github.com/facebook/folly/blob/main/folly/ProducerConsumerQueue.h
- https://github.com/facebook/folly/blob/main/folly/AtomicHashMap.h
- https://moodycamel.com/blog/2013/a-fast-lock-free-queue-for-c++.htm
- https://github.com/max0x7ba/atomic_queue/
发包过程
- 建立tls连接,此处参考dpdkko部分建链的方式:大致是机器上存着密文,使用sdk提供的方法解出明文,在进行tls连接
- 接收拨测任务的消息
- 构造拨测报文 → 写入发包队列
- 发包线程读取发包队列,执行发包操作
收包过程
- 收包线程读取回包 → 写入收包队列
- 主线程读取收包队列进行处理
可选三方库
主线程的职责:
- 创建发包/收包线程
- 与java进程建链(是否可以交给发包线程),建链之后是否保持连接,还是下次再建?
- 给java进程回包
发包进程的职责:
- 频繁/单次向java进程建立连接?
Typicality
cf. https://www.cnblogs.com/MaxLij/p/14584187.html
/// server.cpp
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
int main(){
//创建套接字
int serv_sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
// AF_INET : 表示使用 IPv4 地址 可选参数
// SOCK_STREAM 表示使用面向连接的数据传输方式,
// IPPROTO_TCP 表示使用 TCP 协议
//将套接字和IP、端口绑定
struct sockaddr_in serv_addr;
memset(&serv_addr, 0, sizeof(serv_addr)); //每个字节都用0填充
serv_addr.sin_family = AF_INET; //使用IPv4地址
serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); //具体的IP地址
serv_addr.sin_port = htons(1234); //端口
bind(serv_sock, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
//进入监听状态,等待用户发起请求
listen(serv_sock, 20);
//接收客户端请求
struct sockaddr_in clnt_addr;
socklen_t clnt_addr_size = sizeof(clnt_addr);
int clnt_sock = accept(serv_sock, (struct sockaddr*)&clnt_addr, &clnt_addr_size); // block until connected
//向客户端发送数据
char str[] = "Hello World!";
write(clnt_sock, str, sizeof(str));
//关闭套接字
close(clnt_sock);
close(serv_sock);
return 0;
}
/// client.cpp
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
int main(){
//创建套接字
int sock = socket(AF_INET, SOCK_STREAM, 0);
//向服务器(特定的IP和端口)发起请求
struct sockaddr_in serv_addr;
memset(&serv_addr, 0, sizeof(serv_addr)); //每个字节都用0填充
serv_addr.sin_family = AF_INET; //使用IPv4地址
serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); //具体的IP地址
serv_addr.sin_port = htons(1234); //端口
connect(sock, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
//读取服务器传回的数据
char buffer[40];
read(sock, buffer, sizeof(buffer)-1);
printf("Message form server: %s\n", buffer);
//关闭套接字
close(sock);
return 0;
}
using asio;
//
// client.cpp
// ~~~~~~~~~~
//
// Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <iostream>
#include <boost/array.hpp>
#include <asio.hpp>
using asio::ip::tcp;
int main(int argc, char* argv[])
{
try
{
if (argc != 2)
{
std::cerr << "Usage: client <host>" << std::endl;
return 1;
}
asio::io_context io_context;
tcp::resolver resolver(io_context);
tcp::resolver::results_type endpoints =
resolver.resolve(argv[1], "daytime");
tcp::socket socket(io_context);
asio::connect(socket, endpoints);
for (;;)
{
boost::array<char, 128> buf;
asio::error_code error;
size_t len = socket.read_some(asio::buffer(buf), error);
if (error == asio::error::eof)
break; // Connection closed cleanly by peer.
else if (error)
throw asio::system_error(error); // Some other error.
std::cout.write(buf.data(), len);
}
}
catch (std::exception& e)
{
std::cerr << e.what() << std::endl;
}
return 0;
}
//
// server.cpp
// ~~~~~~~~~~
//
// Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <ctime>
#include <iostream>
#include <string>
#include <asio.hpp>
using asio::ip::tcp;
std::string make_daytime_string()
{
using namespace std; // For time_t, time and ctime;
time_t now = time(0);
return ctime(&now);
}
int main()
{
try
{
asio::io_context io_context;
tcp::acceptor acceptor(io_context, tcp::endpoint(tcp::v4(), 13));
for (;;)
{
tcp::socket socket(io_context);
acceptor.accept(socket);
std::string message = make_daytime_string();
asio::error_code ignored_error;
asio::write(socket, asio::buffer(message), ignored_error);
}
}
catch (std::exception& e)
{
std::cerr << e.what() << std::endl;
}
return 0;
}
工作量输出
- 讲一下为什么从golang切成c++,问题的背景(golang使用了很多不合规的三方件)
- 分析了梳理了拨测业务的流程,了解了拨测业务的背景
- 设计了c++拨测的流程,业务框架
- 开发实现中,自研了日志模块、通信队列,避免引入了spdlog、concurrentqueue两个三方件
这样,这个工作才显丰满。