openstack各组件的RBAC(Role Based Access Controll)

Posted by xue on June 25, 2017

以kilo版本的openstack为例

Keystone

Role 和 Policy 策略

验证用户username和password以后,用户有没有权限来执行某个操作则取决于 Role 和 基于 Roles 的鉴权。

Keystone API V3 与 V2 相比,对 policy 的支持有很多的增强。V2 的支持和 OpenStack 其它组件比如cinder,nova 等差不多,只支持基于 policy.json 文件的 policy 控制;而 V3 中,支持对 policy 的 CURD 操作,以及对 endpoint,service 等的 policy 操作等。

这里主要介绍一下V2版本的RBAC

Keystone 在每个子模块的 controller.py 文件中的各个控制类的方法上实施 RBAC。有三种方法

1.在函数体中使用 self.assert_admin(context)
2.使用装饰器@controller.protected() 3.使用装饰器@controller.filterprotected(‘type’, ‘name’)

来看一下他们各自的实现

assert_admin

keystone.common.wsgi.Application.assert_admin:
    # 如果context中is_admin是True, 直接跳过下面的验证
    if not context['is_admin']:
        try:
            user_token_ref = token_model.KeystoneToken(
                token_id=context['token_id'],
                token_data=self.token_provider_api.validate_token(
                    context['token_id']))
        except exception.TokenNotFound as e:
            raise exception.Unauthorized(e)

        validate_token_bind(context, user_token_ref)
        creds = copy.deepcopy(user_token_ref.metadata)

        try:
            creds['user_id'] = user_token_ref.user_id
        except exception.UnexpectedError:
            LOG.debug('Invalid user')
            raise exception.Unauthorized()

        if user_token_ref.project_scoped:
            creds['tenant_id'] = user_token_ref.project_id
        else:
            LOG.debug('Invalid tenant')
            raise exception.Unauthorized()

        creds['roles'] = user_token_ref.role_names
        # Accept either is_admin or the admin role
        self.policy_api.enforce(creds, 'admin_required', {})
    ↓
keystone.policy.backends.rules.Policy.enforce:
    enforce(credentials, action, target)
    ↓
keystone.policy.backends.rules.enforce:
    return _ENFORCER.enforce(action, target, credentials, **extra)
    ↓
oslo_policy.policy.Enforcer.enforce(self, rule, target, creds, do_raise=False,
                exc=None, *args, **kwargs):
    # 这里的rule为admin_required,target为{},creds为包含user_id,tenant_id,tenant_id的字典
    self.load_rules()
    
    # Allow the rule to be a Check tree
    # 这库的
    if isinstance(rule, _checks.BaseCheck):
        result = rule(target, creds, self)
    elif not self.rules:
        # No rules to reference means we're going to fail closed
        result = False
    else:
        try:
            # Evaluate the rule
            result = self.rules[rule](target, creds, self)
        except KeyError:
            LOG.debug('Rule [%s] does not exist' % rule)
            # If the rule doesn't exist, fail closed
            result = False

    # If it is False, raise the exception if requested
    if do_raise and not result:
        if exc:
            raise exc(*args, **kwargs)

        raise PolicyNotAuthorized(rule, target, creds)

    return results   

下面这行代码比较关键,rule的值为’admin_required,根据/etc/keystone/policy.json文件读出来的值为(role:admin or is_admin:1),所以selef.rules[rule]的type是 <class ‘oslo_policy._checks.OrCheck’>

result = self.rules[rule](target, creds, self)

再来看看OrCheck的代码:

class OrCheck(BaseCheck):
    def __init__(self, rules):
        self.rules = rules
        
    def __call__(self, target, cred, enforcer):
        """Check the policy.

        Requires that at least one rule accept in order to return True.
        """

        for rule in self.rules:
            if rule(target, cred, enforcer):
                return True
        return False

@controller.protected()

keystone.common.controller.protected:
    def wrapper(f):
        @functools.wraps(f)
        def inner(self, context, *args, **kwargs):
            if 'is_admin' in context and context['is_admin']:
                LOG.warning(_LW('RBAC: Bypassing authorization'))
            elif callback is not None:
                prep_info = {'f_name': f.__name__,
                             'input_attr': kwargs}
                callback(self, context, prep_info, *args, **kwargs)
            else:
                action = 'identity:%s' % f.__name__
                creds = _build_policy_check_credentials(self, action,
                                                        context, kwargs)

                policy_dict = {}
                if (hasattr(self, 'get_member_from_driver') and
                        self.get_member_from_driver is not None):
                    key = '%s_id' % self.member_name
                    if key in kwargs:
                        ref = self.get_member_from_driver(kwargs[key])
                        policy_dict['target'] = {self.member_name: ref}

                
                if context.get('subject_token_id') is not None:
                     token_ref = token_model.KeystoneToken(
                        token_id=context['subject_token_id'],
                        token_data=self.token_provider_api.validate_token(
                            context['subject_token_id']))
                    policy_dict.setdefault('target', {})
                    policy_dict['target'].setdefault(self.member_name, {})
                    policy_dict['target'][self.member_name]['user_id'] = (
                        token_ref.user_id)
                    try:
                        user_domain_id = token_ref.user_domain_id
                    except exception.UnexpectedError:
                        user_domain_id = None
                    if user_domain_id:
                        policy_dict['target'][self.member_name].setdefault(
                            'user', {})
                        policy_dict['target'][self.member_name][
                            'user'].setdefault('domain', {})
                        policy_dict['target'][self.member_name]['user'][
                            'domain']['id'] = (
                                user_domain_id)

                policy_dict.update(kwargs)
                self.policy_api.enforce(creds,
                                        action,
                                        utils.flatten_dict(policy_dict))
                LOG.debug('RBAC: Authorization granted')
            return f(self, context, *args, **kwargs)
        return inner
    return wrapper

通过这种方法最终也是调用了self.policy_api.enforce来鉴权,不过注意到其中的这行代码:

action = 'identity:%s' % f.__name__

所以,在控制类的方法上加上此装饰器,如:

@controller.protected()
    def create_domain(self, context, domain):

对面的action即为identity:create_domain

@controller.filterprotected(‘type’, ‘name’)

@controller.filterprotected('enabled', 'name')
    def list_domains(self, context, filters):

具体的实现代码和protected类似。

nova

以创建虚拟机为例

nova.compute.api.API.create:
    self._check_create_policies(context, availability_zone,
                                requested_networks, block_device_mapping)
    ↓
nova.compute.api.API._check_create_policies:
        """Check policies for create()."""
        target = {'project_id': context.project_id,
                  'user_id': context.user_id,
                  'availability_zone': availability_zone}
        if not self.skip_policy_check:
            # 检查创建虚拟机的权限
            check_policy(context, 'create', target)

            if requested_networks and len(requested_networks):
                check_policy(context, 'create:attach_network', target)

            if block_device_mapping:
                check_policy(context, 'create:attach_volume', target)
    ↓
def check_policy(context, action, target, scope='compute'):
    _action = '%s:%s' % (scope, action)
    nova.policy.enforce(context, _action, target)
    ↓
nova.policy.enforce:
    init()
    credentials = context.to_dict()
    if not exc:
        exc = exception.PolicyNotAuthorized
    try:
        result = _ENFORCER.enforce(action, target, credentials,
                                   do_raise=do_raise, exc=exc, action=action)
    except Exception:
        credentials.pop('auth_token', None)
        with excutils.save_and_reraise_exception():
            LOG.debug('Policy check for %(action)s failed with credentials '
                      '%(credentials)s',
                      {'action': action, 'credentials': credentials})
    return result
    ↓
nova.openstack.common.policy.Enforcer.enforce:
    self.load_rules() #重新读取 policy.json 文件
    ...
    result = self.rules[rule](target, creds, self) #使用特定的 rule 来检查 user 的权限
    ...
    return True or PolicyNotAuthorized(rule)

cinder

以创建cinder volume为例

cinder.volume.flows.api.create_volume.ExtractVolumeRequestTask.execute:
    # ACTION = 'volume:create'
    policy.enforce_action(context, ACTION)
    ↓
cinder.policy.enforce_action:
    return enforce(context, action, {'project_id': context.project_id,
                                     'user_id': context.user_id})
    ↓
cinder.policy.enforce:
    init()

    return _ENFORCER.enforce(action, target, context.to_dict(),
                             do_raise=True,
                             exc=exception.PolicyNotAuthorized,
                             action=action)
    ↓
cinder.openstack.common.policy.Enforcer.enforce:
    #rule:volume:create
    #target: {'project_id': u'xx', 'user_id': u'xx'},  
    #creds 中包括 user role 等信息
    self.load_rules() #重新读取 policy.json 文件
    ...
    result = self.rules[rule](target, creds, self) #使用特定的 rule 来检查 user 的权限
    ...
    return True or PolicyNotAuthorized(rule)

参考

世民谈云计算