本篇中所有代码分析,都是基于Jewel 10.2.9版本。本篇都是个人理解,其中有些理解或者解释有不合理的,还请指正。
在Ceph的使用中,运行时调整参数值是个高频的操作,使用起来简单方便,最重要的是不用重启服务即可生效。
Ceph动态调整参数有两种方式:
第一种:
1. ceph daemon <mon/osd/mds>.<id> config set <参数名> <参数值>
比如,设置OSD 1的heartbeat超时时间:
ceph daemon osd.1 config set osd_heartbeat_grace 60
第二种:
2. ceph tell <mon/osd/mds>.<id> injectargs '--<参数名> <参数值>'
设置OSD 1的heartbeat超时时间:
ceph tell osd.1 injectargs '--osd_heartbeat_grace 60'
第二种还有两个比较好用的地方:
1. 单条命令可以改变所有的实例的某个参数值:
ceph tell
设置所有OSD的heartbeat超时时间:
ceph tell osd.* injectargs ‘—osd_heartbeat_grace 60’
2. 单条命令可以改变多个参数:
ceph tell
设置OSD 1的heartbeat超时时间,及发起heartbeat的时间间隔
ceph tell osd.1 injectargs ‘—osd_heartbeat_grace 60 —osd_heartbeat_interval 10’
3. 当然,上面两个可以结合使用:
ceph tell
设置所有OSD的heartbeat超时时间,及发起heartbeat的时间间隔
ceph tell osd.* injectargs ‘—osd_heartbeat_grace 60 —osd_heartbeat_interval 10’
那么Ceph内部是如何实现上面提到的动态更新呢?我们来深入代码中一探究竟。
我们先看tell…injectargs方式的实现:
ceph命令行的输入,源码入口都是ceph.in文件,是python文件,也就是/usr/bin/下的可执行文件ceph。
ceph.in中有关tell的代码:
def main():
...
// 如果命令行中有'injectargs'字串,就进行切分成两个:'injectargs'之前的部分是childargs,之后的部分是injectargs。
if 'injectargs' in childargs:
position = childargs.index('injectargs')
injectargs = childargs[position:]
childargs = childargs[:position]
if verbose:
print('Separate childargs {0} from injectargs {1}'.format(childargs, injectargs),
file=sys.stderr)
else:
injectargs = None
...
if injectargs and '--' in injectargs
injectargs.remove('--')
...
// 如果childargs有'tell'命令,或者前面已经确定有injectargs,就将childargs重新赋值,也就是说childargs是真正要去发送给server端的命令。
// 在这里,和tell就没关系了,tell关键字并不会发到后端。
is_tell = False
if len(childargs) and childargs[0] == 'tell':
childargs = childargs[2:]
is_tell = True
if is_tell:
if injectargs:
childargs = injectargs
if not len(childargs):
print('"{0} tell" requires additional arguments.'.format(sys.argv[0]),
'Try "{0} tell <name> <command> [options...]" instead.'.format(sys.argv[0]),
file=sys.stderr)
return errno.EINVAL
...
// 每个命令执行,都有目标端,如果命令行指明了目标server,就向具体的server发;如果是'*',会先把所有的instance id拿到,然后在后面的for循环中,逐个去发送命令。
if target[1] == '*':
if target[0] == 'osd':
targets = [(target[0], o) for o in osdids()]
elif target[0] == 'mon':
targets = [(target[0], m) for m in monids()]
...
for target in targets:
...
从上面的代码,可知,tell命令,并没有走daemon的admin socket来进行通信,而是走正常的client->server的通信,而且,’*’也没有那么高效,就是拿到所有的id,然后逐个发送消息。
然后再经过librados、osdc等模块,将消息发送给具体的Monitor、OSD、MDS。
我们接下来直接看OSD端收到injectargs命令后,如何处理。
在OSD.cc中,do_command函数就是专门处理各种命令的,我们直接看injectargs分支:
void OSD::do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, bufferlist& data)
{
...
else if (prefix == "injectargs") {
vector<string> argsvec;
cmd_getval(cct, cmdmap, "injected_args", argsvec);
if (argsvec.empty()) {
r = -EINVAL;
ss << "ignoring empty injectargs";
goto out;
}
string args = argsvec.front();
// 获取所有的要进行更新的参数
for (vector<string>::iterator a = ++argsvec.begin(); a != argsvec.end(); ++a)
args += " " + *a;
osd_lock.Unlock();
cct->_conf->injectargs(args, &ss);
osd_lock.Lock();
}
...
}
拿到client想要更新的所有参数,然后调用了injectargs,每个OSD都有一个CephContext类变量cct,md_config_t类型的_conf变量也是CephContext类的成员。
int md_config_t::injectargs(const std::string& s, std::ostream *oss)
{
...
// 解析参数
ret = parse_injectargs(nargs, oss);
...
// 生效更新的参数
_apply_changes(oss);
}
int md_config_t::parse_injectargs(std::vector<const char*>& args, std::ostream *oss)
{
...
for (std::vector<const char*>::iterator i = args.begin(); i != args.end(); ) {
// 逐个解析参数,并更新参数。
int r = parse_option(args, i, oss);
if (r < 0)
ret = r;
}
...
}
int md_config_t::parse_option(std::vector<const char*>& args,
std::vector<const char*>::iterator& i,
ostream *oss)
{
...
// 检查更新的参数中,是否有子系统(subsystem)的log级别参数,也就是以debug_开头的日志级别参数。
// 但是下面的这种遍历,似乎很低效啊,如果根本就没有debug_类参数,还是会检查一遍。
// 再看了一下L版的代码,已经修改了,在parse_option中没有这种遍历式检查subsystem,而是直接在md_config_t类的构造函数中把所有参数vector抓换为map,这样在运行时,能更高效的找到某个参数。
// subsystems?
for (o = 0; o < subsys.get_num(); o++) {
std::string as_option("--");
as_option += "debug_";
as_option += subsys.get_name(o);
if (ceph_argparse_witharg(args, i, &val,
as_option.c_str(), (char*)NULL)) {
int log, gather;
int r = sscanf(val.c_str(), "%d/%d", &log, &gather);
...
// ceph的日志级别参数调整,直接就通过下面的两个函数调整了,和其他参数的处理是有区别的。
subsys.set_log_level(o, log);
subsys.set_gather_level(o, gather);
...
}
}
// 对于非log level参数,区分了Bool型和其他类型,然后调用的set_val_impl进行设置
for (o = 0; o < NUM_CONFIG_OPTIONS; ++o) {
ostringstream err;
const config_option *opt = config_optionsp + o;
std::string as_option("--");
as_option += opt->name;
if (opt->type == OPT_BOOL) {
if (ceph_argparse_binary_flag(args, i, &res, oss, as_option.c_str(),
(char*)NULL)) {
if (res == 0)
set_val_impl("false", opt);
else if (res == 1)
set_val_impl("true", opt);
else
ret = res;
...
}
else if (ceph_argparse_witharg(args, i, &val, err,
as_option.c_str(), (char*)NULL)) {
int res = set_val_impl(val.c_str(), opt);
}
...
}
int md_config_t::set_val_impl(const char *val, const config_option *opt)
{
assert(lock.is_locked());
// 先设置参数值,具体看下面的函数原型
int ret = set_val_raw(val, opt);
if (ret)
return ret;
// 参数通过上面的调用实现了,但是参数并不一定生效,对于大部分参数,已经在进程启动的时候加载了,所以现在更重要的是将这些参数生效。
// changed是一个set类型变量,主要记录的是从上一次apply_changes后改变的参数。
// 在injectargs函数中,parse_injectargs解析之后,调用了_apply_changes函数,函数的参数就是changed变量:
changed.insert(opt->name);
return 0;
}
// 设置参数值,根据参数类型来设置,做一些强制类型转换。
int md_config_t::set_val_raw(const char *val, const config_option *opt)
{
switch (opt->type) {
case OPT_INT: {
*(int*)opt->conf_ptr(this) = f;
}
case OPT_STR:
*(std::string*)opt->conf_ptr(this) = val ? val : "";
...
}
}
上面代码完成了参数的更新,主要有两点:
debug_类的参数(日志级别参数)单独走的一套,和其他参数不是同一种处理。
其他参数,先更新内存中的参数值;然后更重要的是:需要使其生效,即能够让系统真实的使用这些参数了。
后续的apply_changes是如何实现的?我们先暂放一下,先来看看daemon方式修改参数,因为这两种最终都会通过_apply_changes来使参数生效。
我们继续看看daemon…config set方式的实现:
ceph.in中有关daemon的代码:
def main():
...
// 如果命令中有'daemon'字符串,先尝试看有没有socket path,如果没有就从配置文件中去找具体的instance的admin_socket参数值,也就是socket文件路径
sockpath = None
if parsed_args.admin_socket:
sockpath = parsed_args.admin_socket
elif len(childargs) > 0 and childargs[0] in ["daemon", "daemonperf"]:
...
if childargs[1].find('/') >= 0:
sockpath = childargs[1]
else:
# try resolve daemon name
try:
sockpath = ceph_conf(parsed_args, 'admin_socket',
childargs[1])
...
# for both:
childargs = childargs[2:]
...
if sockpath and daemon_perf:
...
elif sockpath:
try:
// 尝试连接admin_socket,并发送命令
print(admin_socket(sockpath, childargs, format))
...
}
def admin_socket(asok_path, cmd, format=''):
def do_sockio(path, cmd_bytes):
""" helper: do all the actual low-level stream I/O """
// 创建socket,并连接,然后发送相关命令
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(path)
try:
sock.sendall(cmd_bytes + '\0')
len_str = sock.recv(4)
if len(len_str) < 4:
raise RuntimeError("no data returned from admin socket")
l, = struct.unpack(">I", len_str)
sock_ret = ''
got = 0
while got < l:
bit = sock.recv(l - got)
sock_ret += bit
got += len(bit)
except Exception as sock_e:
raise RuntimeError('exception: ' + str(sock_e))
return sock_ret
...
try:
ret = do_sockio(asok_path, json.dumps(valid_dict))
except Exception as e:
raise RuntimeError('exception: ' + str(e))
return ret
Admin socket是什么呢?其实就是UNIX Domain Socket,是在socket架构上发展出的一种IPC机制,虽然网络socket也可用于同一台主机的进程间通讯(通过loopback地址127.0.0.1),但是UNIX Domain Socket用于IPC更有效率:不需要经过网络协议栈,不需要打包拆包、计算校验和、维护序号和应答等,只是将应用层数据从一个进程拷贝到另一个进程。这是因为,IPC机制本质上是可靠的通讯,而网络协议是为不可靠的通讯设计的。UNIX Domain Socket也提供面向流和面向数据包两种API接口,类似于TCP和UDP,但是面向消息的UNIX Domain Socket也是可靠的,消息既不会丢失也不会顺序错乱。
UNIX Domain Socket与网络socket编程最明显的不同在于地址格式不同,用结构体sockaddr_un表示,网络编程的socket地址是IP地址加端口号,而UNIX Domain Socket的地址是一个socket类型的文件在文件系统中的路径,这个socket文件由bind()调用创建,如果调用bind()时该文件已存在,则bind()错误返回。
从上面的代码可以看到,我们现在是一个client,尝试进行socket连接、通信,那server端在哪里?
每个OSD实例在启动的时候,都有一个CephContext类变量cct变量,这个变量中会new AdminSocket对象,在ceph_osd.cc的main函数中会有cct的初始化操作:common_init_finish(g_ceph_context)。
我们先来看看common_init_finish函数:
common/common_init.cc:
void common_init_finish(CephContext *cct)
{
cct->init_crypto();
// 启动一些线程,包括log线程、admin socket的初始化
int flags = cct->get_init_flags();
if (!(flags & CINIT_FLAG_NO_DAEMON_ACTIONS))
cct->start_service_thread();
// 修改admin socket path的权限
if ((flags & CINIT_FLAG_DEFER_DROP_PRIVILEGES) &&
(cct->get_set_uid() || cct->get_set_gid())) {
cct->get_admin_socket()->chown(cct->get_set_uid(), cct->get_set_gid());
}
}
common/ceph_context.cc:
void CephContext::start_service_thread()
{
...
if (_conf->admin_socket.length())
_admin_socket->init(_conf->admin_socket);
}
common/admin_socket.cc:
bool AdminSocket::init(const std::string &path)
{
...
// 绑定socket文件,并开始监听sock fd
err = bind_and_listen(path, &sock_fd);
...
}
可以看到:UNIX Domain Socket server已经准备就绪,开始监听网络
在CephContext类构造函数中,创建了一个AdminSocket类变量_admin_socket,注册了一些_admin_socket可以处理的command:
CephContext::CephContext(uint32_t module_type_, int init_flags_)
{
...
_admin_socket = new AdminSocket(this);
_admin_hook = new CephContextHook(this);
_admin_socket->register_command("config show", "config show", _admin_hook, "dump current config settings");
_admin_socket->register_command("config set", "config set name=var,type=CephString name=val,type=CephString,n=N", _admin_hook, "config set <field> <val> [<val> ...]: set a config variable");
_admin_socket->register_command("config get", "config get name=var,type=CephString", _admin_hook, "config get <field>: get the config value");
...
}
从上面的代码可以看到,配置参数相关的这几个都已经注册在admin_socket中,而且这些命令相关的hook是CephContextHook类。
同时,在OSD启动时,admin_socket初始化成功后,调用了final_init,也注册了一些admin_socket可以识别的command。
我们再看看OSD的inal_init函数:
void OSD::final_init()
{
int r;
// 这个admin_socket变量是从CephContext中获取的,并没new对象。这样CephContext和OSD中的admin_socket就是同一个,
AdminSocket *admin_socket = cct->get_admin_socket();
asok_hook = new OSDSocketHook(this);
r = admin_socket->register_command("status", "status", asok_hook,
"high-level status of OSD");
assert(r == 0);
r = admin_socket->register_command("flush_journal", "flush_journal",
asok_hook,
"flush the journal to permanent store");
...
}
在这里继续注册了一些可以通过admin socket来执行的命令,这里的hook是OSDSocketHook类。
AdminSocket类继承的Thread类,在线程入口函数entry中,会通过poll方式等待event,然后有connection的时候,会进行do_accept,然后进行正常的网络stream读写。
当client通过admin socket向server端发送了命令后,admin socket server会接收消息,在do_accept函数中,会判断这个command是否注册,如果注册了,调用相应的hook->call处理,最后将结果回复给client。
在CephContext类中,hook是CephContextHook类变量:
common/ceph_context.cc:
class CephContextHook : public AdminSocketHook {
CephContext *m_cct;
public:
explicit CephContextHook(CephContext *cct) : m_cct(cct) {}
bool call(std::string command, cmdmap_t& cmdmap, std::string format,
bufferlist& out) {
m_cct->do_command(command, cmdmap, format, &out);
return true;
}
}
在OSD类中,hook是一个OSDSocketHook类变量:
osd/OSD.cc:
class OSDSocketHook : public AdminSocketHook {
OSD *osd;
public:
explicit OSDSocketHook(OSD *o) : osd(o) {}
bool call(std::string command, cmdmap_t& cmdmap, std::string format,
bufferlist& out) {
stringstream ss;
bool r = osd->asok_command(command, cmdmap, format, ss);
out.append(ss);
return r;
}
}
在这里,我们先只看config set相关的处理,在CephContext的do_command函数中:
common/ceph_context.cc:
void CephContext::do_command(std::string command, cmdmap_t& cmdmap,
std::string format, bufferlist *out)
{
...
else if (command == "config set") {
std::string var;
std::vector<std::string> val;
if (!(cmd_getval(this, cmdmap, "var", var)) ||
!(cmd_getval(this, cmdmap, "val", val))) {
f->dump_string("error", "syntax error: 'config set <var> <value>'");
} else {
// val may be multiple words
string valstr = str_join(val, " ");
int r = _conf->set_val(var.c_str(), valstr.c_str());
if (r < 0) {
f->dump_stream("error") << "error setting '" << var << "' to '" << valstr << "': " << cpp_strerror(r);
} else {
ostringstream ss;
// 调用apply_changes, 生效参数
_conf->apply_changes(&ss);
f->dump_string("success", ss.str());
}
}
}
...
}
common/config.cc:
int md_config_t::set_val(const char *key, const char *val, bool meta, bool safe)
{
...
// 下面的代码结构是不是很熟悉,对,和前面的tell方式中的parse_option很像,先设置了debug级别,然后再设置其他的参数。
if (strncmp(k.c_str(), "debug_", 6) == 0) {
...
subsys.set_log_level(o, log);
subsys.set_gather_level(o, gather);
...
}
...
for (int i = 0; i < NUM_CONFIG_OPTIONS; ++i) {
...
return set_val_impl(v.c_str(), opt);
...
}
...
}
可以看到,和tell方式一样,也是先改变参数值,然后再调用_apply_changes来生效之。
现在,我们来看看_apply_changes的实现:
void md_config_t::_apply_changes(std::ostream *oss)
{
// 从变量命名来看,是reverse obs_map_t,也就是obs_map_t结构体的反转结构,obs_map_t是这样的结构:typedef std::multimap <std::string, md_config_obs_t*> obs_map_t;
// rev_obs_map_t除了反转外,map value type成了set。
// obs_map_t是个multimap,存的是配置参数和config observer的映射关系。
// 现在的rev_obs_map_t是config observer和配置参数的映射关系,可以直接通过observer来查看都有哪些参数。
typedef std::map < md_config_obs_t*, std::set <std::string> > rev_obs_map_t;
expand_all_meta();
rev_obs_map_t robs;
std::set <std::string> empty_set;
char buf[128];
char *bufptr = (char*)buf;
// 这个循环就是要完成obs_map_t结构的反转,以及参数option组成的string set。
for (changed_set_t::const_iterator c = changed.begin();
c != changed.end(); ++c) {
const std::string &key(*c);
// multimap::equal_range是从map中查找某个key,返回第一个匹配的key对应的pair,和最后一个匹配key的下一个pair
// 所以,这里就找到了某个参数的所有observers
pair < obs_map_t::iterator, obs_map_t::iterator >
range(observers.equal_range(key));
if ((oss) &&
(!_get_val(key.c_str(), &bufptr, sizeof(buf))) &&
!_internal_field(key)) {
(*oss) << key << " = '" << buf << "' ";
// 如果observers这个map中没有这个key,range的first和second都是指向同一个pair,或者都是NULL。
// 所以,这里就说明:没有任何observer关注这个参数。
// 这里的unchangeable是不是在设置参数的经常看到,其实就标明没有observer,也就更不能谈生效了。但是我们在检查参数的时候,确实看到了参数变了,就是因为之前看到的先设置了参数。
// 在L版的代码中,这个"unchangeable"输出已经换成了"not observed, change may require restart",也更清楚的说明了你修改的参数没人关注,没生效。
if (range.first == range.second) {
(*oss) << "(unchangeable) ";
}
}
// 生成rev_obs_map_t类型的robs
for (obs_map_t::iterator r = range.first; r != range.second; ++r) {
rev_obs_map_t::value_type robs_val(r->second, empty_set);
pair < rev_obs_map_t::iterator, bool > robs_ret(robs.insert(robs_val));
std::set <std::string> &keys(robs_ret.first->second);
keys.insert(key);
}
}
// 遍历所有的observers,然后依次调用它的handle_conf_change,来应用参数。
// Make any pending observer callbacks
for (rev_obs_map_t::const_iterator r = robs.begin(); r != robs.end(); ++r) {
md_config_obs_t *obs = r->first;
obs->handle_conf_change(this, r->second);
}
changed.clear();
}
上面代码中注释挺详细的了,我们在这里说一下observers是什么?就是具体的MON、OSD、MDS实例,这里用的是观察者模式,观察者关注自己关心的参数,这些参数一旦有更新,观察者会通过handle_conf_change使其生效。
那么,这些observer是什么时候开始watch的?我们这里只说OSD,在OSD::pre_init中,调用了cct->_conf->add_observer(this); pre_init也是在OSD启动时调用的。
// 具体的MON、OSD、MDS实例在启动时,调用本函数,将自己将入到config的observers中
void md_config_t::add_observer(md_config_obs_t* observer_)
{
Mutex::Locker l(lock);
// 每个observer都watch哪些config option呢?就是通过自己的get_tracked_conf_keys来定义的
const char **keys = observer_->get_tracked_conf_keys();
for (const char ** k = keys; *k; ++k) {
obs_map_t::value_type val(*k, observer_);
observers.insert(val);
}
}
我们先看看OSD都关注了哪些config options:(其实很少的,除了一些log的,就10几个)
const char** OSD::get_tracked_conf_keys() const
{
static const char* KEYS[] = {
"osd_max_backfills",
"osd_min_recovery_priority",
"osd_op_complaint_time",
"osd_op_log_threshold",
"osd_op_history_size",
"osd_op_history_duration",
"osd_enable_op_tracker",
"osd_map_cache_size",
"osd_map_max_advance",
"osd_pg_epoch_persisted_max_stale",
"osd_disk_thread_ioprio_class"
"osd_disk_thread_ioprio_priority",
// clog & admin clog
"clog_to_monitors",
"clog_to_syslog",
"clog_to_syslog_facility",
"clog_to_syslog_level",
"osd_objectstore_fuse",
"clog_to_graylog",
"clog_to_graylog_host",
"clog_to_graylog_port",
"host",
"fsid",
"osd_client_message_size_cap",
"osd_client_message_cap",
NULL
};
return KEYS;
}
看了上面的参数列表,是不是很惊讶,就这些?难道我经常设置的参数,貌似生效了(参数值确实改变了),其实根本没有真正的生效。
我们再来看OSD的handle_conf_changes:
void OSD::handle_conf_change(const struct md_config_t *conf,
const std::set <std::string> &changed)
{
if (changed.count("osd_max_backfills")) {
service.local_reserver.set_max(cct->_conf->osd_max_backfills);
service.remote_reserver.set_max(cct->_conf->osd_max_backfills);
}
...
}
这个函数就会根据不同的参数,调用具体的函数,将新的参数值应用到运行的系统中。
到这里,我们也基本搞清了动态更新参数的实现过程,当然,还有很多细节并没有研究。我们还是先搞清整个流程、原理,然后遇到问题,再深入细节分析。
通过daemon admin socket修改参数走的是UNIX Domain Socket模式。所以,每次只能设置一个实例的参数;并且,只能在本地设置本地实例的参数。
通过tell修改参数走的是正常的命令发送流程。可以通过’*’的方式设置所有的实例的参数。
非log级别debug_类参数的动态更新,都经历了两步:设置进程内存中的参数值;调用相应的observer生效之。
log级别的debug_类参数,是直接设置参数并生效的(当然,依赖于ceph中log的实现)。
Ceph使用过程中,很多动态更新的参数,其实并没有真正的生效,修改参数时,返回的结果中有”unchangeable”,或“not observed, change may require restart”这些字串的都没生效。
关于更多文章可以阅读原文查看作者博客。
姚国涛,有多年的OpenStack、Ceph运维、开发经验。目前就职于北京多来点科技有限公司,主要负责私有云平台中Ceph相关开发、运维工作。关注Ceph、Glusterfs等分布式存储开源技术。
Ceph中国社区
是国内唯一官方正式授权的社区,
为广大Ceph爱好者提供交流平台!
开源-创新-自强
官方网站:www.ceph.org.cn
合作邮箱:devin@ceph.org.cn
投稿地址:tougao@ceph.org.cn
长期招募热爱翻译人员,
参与社区翻译外文资料工作。