cinder服务启动源码分析

Posted by xue on May 21, 2017

本文以Newton版本的cinder为例

一、cinder api服务

根据setup.cfg找到cinder-api服务的启动入口

#setup.cfg
[entry_points]
console_scripts =
    cinder-api = cinder.cmd.api:main
#cinder/cmd/api.py
def main():
    # 加载辅助对象,封装与数据库相关的操作
    objects.register_all()
    gmr_opts.set_defaults(CONF)
    # 加载配置并设置日志
    CONF(sys.argv[1:], project='cinder',
         version=version.version_string())
    config.set_middleware_defaults()
    logging.setup(CONF, "cinder")
    python_logging.captureWarnings(True)
    utils.monkey_patch()

    gmr.TextGuruMeditation.setup_autorun(version, conf=CONF)
    # 初始化rpc
    rpc.init(CONF)
    launcher = service.process_launcher()
    
    # 创建一个WSGIService对象
    server = service.WSGIService('osapi_volume')
    launcher.launch_service(server, workers=server.workers)
    launcher.wait()

WSGIService关键代码如下:

#cinder/service.py
class WSGIService(service.ServiceBase):
    def _get_manager(self):
        
        fl = '%s_manager' % self.name
        if fl not in CONF:
            return None

        manager_class_name = CONF.get(fl, None)
        if not manager_class_name:
            return None

        manager_class = importutils.import_class(manager_class_name)
        return manager_class()
	def __init__(self, name, loader=None):
       
        ...
        self.name = name
        # 会加载名为`osapi_volume_manager`的管理器(或None)
        self.manager = self._get_manager()
        """创建WSGI应用加载器
    	并根据配置文件(`cinder.conf`)设置应用配置路径:
    	`config_path` = `/etc/cinder/paste-api.ini`
    	"""
        self.loader = loader or wsgi.Loader(CONF)
        """ 加载‘/etc/cinder/paste-api.ini’文件中‘[composite:osapi_volume]’中定义的app,
        具体的代码看下面
        self.app = self.loader.load_app(name)
        """根据配置文件(`cinder.conf`)设置监听地址及工作线程数
        如果未指定监听ip及端口就分别设置为`0.0.0.0`及`0`
        如果为指定工作线程数就设置为cpu个数
        如果设置的工作线程数小于1,则抛异常
        """
        self.host = getattr(CONF, '%s_listen' % name, "0.0.0.0")
        self.port = getattr(CONF, '%s_listen_port' % name, 0)
        self.use_ssl = getattr(CONF, '%s_use_ssl' % name, False)
        self.workers = (getattr(CONF, '%s_workers' % name, None) or
                        processutils.get_worker_count())
        if self.workers and self.workers < 1:
            worker_name = '%s_workers' % name
            msg = (_("%(worker_name)s value of %(workers)d is invalid, "
                     "must be greater than 0.") %
                   {'worker_name': worker_name,
                    'workers': self.workers})
            raise exception.InvalidInput(msg)
            
        """如果CONF.profiler.profiler_enabled = True就开启性能分析  
        创建一个类型为`Messaging`的通知器(`_notifier`),将性能数据发送给
        ceilometer
        """
        setup_profiler(name, self.host)
          
        # 创建WSGI Server对象
        self.server = wsgi.Server(CONF,
                                  name,
                                  self.app,
                                  host=self.host,
                                  port=self.port,
                                  use_ssl=self.use_ssl)

load_app的代码如下所示:

#oslo.service/wsgi.py
class Loader(object):
    """Used to load WSGI applications from paste configurations."""

    def __init__(self, conf):
        """Initialize the loader, and attempt to find the config.
        :param conf: Application config
        :returns: None
        """
        conf.register_opts(_options.wsgi_opts)
        self.config_path = None

        #根据配置文件获得config_path的路径
        config_path = conf.api_paste_config
        if not os.path.isabs(config_path):
            self.config_path = conf.find_file(config_path)
        elif os.path.exists(config_path):
            self.config_path = config_path

        if not self.config_path:
            raise ConfigNotFound(path=config_path)

    def load_app(self, name):
        """Return the paste URLMap wrapped WSGI application.
        :param name: Name of the application to load.
        :returns: Paste URLMap object wrapping the requested application.
        :raises: PasteAppNotFound
        """
        try:
            LOG.debug("Loading app %(name)s from %(path)s",
                      {'name': name, 'path': self.config_path})
            return deploy.loadapp("config:%s" % self.config_path, name=name)
        except LookupError:
            LOG.exception("Couldn't lookup app: %s", name)
            raise PasteAppNotFound(name=name, path=self.config_path)

load_app完成了如下操作:
1.根据/etc/cinder/api-paste.ini中的Paste规则加载相应的过滤器和app
2.设置路由映射(Router)

这些具体的过程可以参考我的另一篇博客: 深入理解nova api服务

上面的内容是创建创建一个WSGIService对象并初始化的过程,接着往下分析:

#cinder/cmd/api.py
def main():
    ...
    #通过进程启动器 - 在新的进程中启动服务程序,并等待服务启动结束
    #最终会调用`WSGIService.start`方法启动服务
    launcher.launch_service(server, workers=server.workers)
    
#cinder/service.py
from oslo_service import wsgi

class WSGIService(service.ServiceBase):
    ...
    def start(self):
        ...
        if self.manager:
            self.manager.init_host()
        """这里的server就是上面初始化时创建的wsgi Server对象
        self.server = wsgi.Server(CONF,
                                  name,
                                  self.app,
                                  host=self.host,
                                  port=self.port,
                                  use_ssl=self.use_ssl)
        """
        self.server.start()
        self.port = self.server.port
        

#接上文的start方法
#oslo.service/wsgi.py
class Server(service.ServiceBase):
    def start(self):
        ...

        self.dup_socket = self.socket.dup()

        if self._use_ssl:
            self.dup_socket = sslutils.wrap(self.conf, self.dup_socket)

        wsgi_kwargs = {
            'func': eventlet.wsgi.server,
            'sock': self.dup_socket,
            'site': self.app,
            'protocol': self._protocol,
            'custom_pool': self._pool,
            'log': self._logger,
            'log_format': self.conf.wsgi_log_format,
            'debug': False,
            'keepalive': self.conf.wsgi_keep_alive,
            'socket_timeout': self.client_socket_timeout
            }

        if self._max_url_len:
            wsgi_kwargs['url_length_limit'] = self._max_url_len

        self._server = eventlet.spawn(**wsgi_kwargs)


这样,cinder-api中的socket监听就准备就绪了:基于配置cinder-api会启动多个监听线程,每个客户端连接用来一个线程来处理.

二、cinder scheduler服务

同理,找到启动入口:

#cinder/cmd/scheduler.py
def main():
    objects.register_all()
    gmr_opts.set_defaults(CONF)
    CONF(sys.argv[1:], project='cinder',
         version=version.version_string())
    logging.setup(CONF, "cinder")
    python_logging.captureWarnings(True)
    utils.monkey_patch()
    gmr.TextGuruMeditation.setup_autorun(version, conf=CONF)
    server = service.Service.create(binary='cinder-scheduler')
    service.serve(server)
    service.wait()

service.Service.create部分代码如下:

class Service(service.Service):
	@classmethod
    def create(cls, host=None, binary=None, topic=None, manager=None,
               report_interval=None, periodic_interval=None,
               periodic_fuzzy_delay=None, service_name=None,
               coordination=False, cluster=None):
        ...
        if not host:
            host = CONF.host
        if not binary:
            binary = os.path.basename(inspect.stack()[-1][1])
        if not topic:
            topic = binary
        if not manager:
            subtopic = topic.rpartition('cinder-')[2]
            manager = CONF.get('%s_manager' % subtopic, None)
        #实例化Service类,会执行Service类的__init__方法
        service_obj = cls(host, binary, topic, manager,
                          report_interval=report_interval,
                          periodic_interval=periodic_interval,
                          periodic_fuzzy_delay=periodic_fuzzy_delay,
                          service_name=service_name,
                          coordination=coordination,
                          cluster=cluster)

        return service_obj

接着是service.serve(server)的部分代码:

#cinder/cmd/scheduler.py
service.serve(server)


#cinder/service.py
def serve(server, workers=None):
    global _launcher
    if _launcher:
        raise RuntimeError(_('serve() can only be called once'))

    _launcher = service.launch(CONF, server, workers=workers)
    
#最终会调用Service类的start方法,部分代码如下
class Service(service.Service):

    def start(self):
    
        ctxt = context.get_admin_context()
        endpoints = [self.manager]
        endpoints.extend(self.manager.additional_endpoints)
        obj_version_cap = objects.Service.get_minimum_obj_version(ctxt)
        

        target = messaging.Target(topic=self.topic, server=self.host)
        self.rpcserver = rpc.get_server(target, endpoints, serializer)
        #创建RPC连接,启动消费者线程,等待队列消息
        self.rpcserver.start()