《Web接口开发与自动化测试基于Python语言》--第11章

第11章 接口的安全机制

本章将介绍接口的几种常用的安全机制。

11.1 用户认证

接口测试工具的User Auth/Authorization选项,是包含在request请求中的。

11.1.1 开发带Auth接口

为了练习与安全有关的接口开发,下面重新在sign应用下创建views_if_sec.py视图文件:

#! /usr/bin/python
# -*- coding:utf-8 -*-

from django.contrib import auth as django_auth
import base64

# 用户认证
def user_auth(request):
    get_http_auth = request.META.get('HTTP_AUTHORIZATION', b'')
    auth = get_http_auth.split()
    try:
        auth_parts = base64.b64decode(auth[1]).decode('utf-8').partition(':')
    except IndexError:
        return "null"
    username, password = auth_parts[0], auth_parts[2]
    user = django_auth.authenticate(username=username, password=password)
    if user is not None:
        django_auth.login(request, user)
        return "success"
    else:
        return "fail"

对上述代码进行分析:

  • request.META是一个字典,包含了本次HTTP请求的Header信息,eg:用户认证、IP地址、用户Agent(通常是浏览器的名称和版本号)等;HTTP_AUTHORIZATION用于获取HTTP认证数据,如果为空,将得到一个空的bytes对象

  • 当客户端传输的认证数据为:admin/admin123456,这里得到的数据为:Basic YWRtaW46YWRtaW4xMjM0NTY=

  • 通过split()方法将其拆分成list,拆分后的数据为:[‘Basic’, ‘YWRtaW46YWRtaW4xMjM0NTY=’]

  • 接下来,取出list中的加密串,通过base64对加密串进行解码,通过decode()方法以UTF-8编码对字符串进行解码,partition()方法以冒号“:”为分隔符对字符串进行分隔,得到的数据为:(‘admin’, ‘:’, ‘admin123456’)

  • 然后通过try…except…进行异常处理,如果获取不到Auth数据,则抛IndexError类型的异常,函数返回“null”字符串

  • 最后,取出auth_parts元组中对应认证的username和password,最终的数据是:admin、admin123456

  • 最后的最后,调用Django的认证模块,对得到的Auth信息进行验证,若成功则返回“success”,失败则返回“fail”

在发布会查询接口调用上面的user_auth函数:

from django.http import JsonResponse

# 查询发布会接口--增加用户认证
def get_event_list(request):
    auth_result = user_auth(request)    #调用用户认证函数
    if auth_result == "null":
        return JsonResponse({'status':10011, 'message':'user auth null'})

    if auth_result == "fail":
        return JsonResponse({'status':10012, 'message':'user auth fail'})

    eid = request.GET.get("eid", "")    # 发布会id
    name = request.GET.get("name", "")  # 发布会名称

    if eid == '' and name == '':
        return JsonResponse({'status':10021, 'message':'parameter error'})

    if eid != '':
        event = {}
        try:
            result = Event.objects.get(id=eid)
        except ObjectDoesNotExist:
            return JsonResponse({'status':10022, 'message':'query result is empty'})
        else:
            event['name'] = result.name
            event['limit'] = result.limit
            event['status'] = result.status
            event['address'] = result.address
            event['start_time'] = result.start_time
            return JsonResponse({'status':200, 'message':'success', 'data':event})
    if name != '':
        datas = []
        results = Event.objects.filter(name__contains=name)
        if results:
            for r in results:
                event = {}
                event['name'] = r.name
                event['limit'] = r.limit
                event['status'] = r.status
                event['address'] = r.address
                event['start_time'] = r.start_time
                datas.append(event)
            return JsonResponse({'status':200, 'message':'success', 'data':datas})
        else:
            return JsonResponse({'status':10022, 'message':'query result is empty'})

这样就完成了在接口中增加认证机制的功能,只有通过认证才能继续测试接口全部内容。

11.1.2 接口文档

这里写图片描述

注意: 在urls.py中增加该接口,代码如下:

#!/usr/bin/python
# -*- coding:utf-8 -*-

from django.conf.urls import url
from sign import views_if, views_if_sec

urlpatterns = [
    ......
    # ex: /api/get_event_list/
    url(r'^get_event_list', views_if.get_event_list, name='get_event_list'),
    url(r'sec_get_event_list', views_if_sec.get_event_list, name='sec_get_event_list'),
    ......

11.1.3 接口测试用例

编写对应的测试用例:sec_test_case.py

Ps:Requests库的get()和post()方法均提供auth参数用于设置用户签名。

#!/usr/bin/python
# -*- coding:utf-8 -*-

import unittest
import requests

class GetEventListTest(unittest.TestCase):
    """查询发布会信息带用户认证"""

    def setUp(self):
        self.base_url = "http://10.18.214.88:8000/api/sec_get_event_list/"

    def test_get_event_list_auth_null(self):
        """auth为空"""
        r = requests.get(self.base_url, params={'eid':1})
        result = r.json()
        self.assertEqual(result['status'], 10011)
        self.assertEqual(result['message'], 'user auth null')

if __name__ == '__main__':
    unittest.main()

没有把全部的case写完,就先试一下效果,结果运行测试,返回的结果却是错误:

E
======================================================================
ERROR: test_get_event_list_auth_null (__main__.GetEventListTest)
auth为空
----------------------------------------------------------------------
Traceback (most recent call last):
  File "sec_test_case.py", line 27, in test_get_event_list_auth_null
    result = r.json()
  File "/usr/local/lib/python2.7/dist-packages/requests/models.py", line 892, in json
    return complexjson.loads(self.text, **kwargs)
  File "/usr/lib/python2.7/dist-packages/simplejson/__init__.py", line 516, in loads
    return _default_decoder.decode(s)
  File "/usr/lib/python2.7/dist-packages/simplejson/decoder.py", line 370, in decode
    obj, end = self.raw_decode(s)
  File "/usr/lib/python2.7/dist-packages/simplejson/decoder.py", line 400, in raw_decode
    return self.scan_once(s, idx=_w(s, idx).end())
JSONDecodeError: Expecting value: line 2 column 1 (char 1)

----------------------------------------------------------------------
Ran 1 test in 0.112s

FAILED (errors=1)

观察django那边返回的错误,请求的API返回了500错误,奇怪哪里出错了,但是在测试用例的返回信息里,实在是没看出哪里有误啊,不过好在django那的提示信息比较有用:

Internal Server Error: /api/sec_get_event_list/
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/django/core/handlers/exception.py", line 39, in inner
    response = get_response(request)
  File "/usr/local/lib/python2.7/dist-packages/django/core/handlers/base.py", line 187, in _get_response
    response = self.process_exception_by_middleware(e, request)
  File "/usr/local/lib/python2.7/dist-packages/django/core/handlers/base.py", line 185, in _get_response
    response = wrapped_callback(request, *callback_args, **callback_kwargs)
  File "/home/csg/guest/sign/views_if_sec.py", line 27, in get_event_list
    return JsonResponse({'status':10011, 'message':'user auth null'})
NameError: global name 'JsonResponse' is not defined
[27/Sep/2017 14:05:03] "GET /api/sec_get_event_list/?eid=1 HTTP/1.1" 500 61568

从这里就很明显了,原来我们在views_if_sec.py里没有定义JsonResponse,好吧我的错,忘记导入了,补充到该文件,重新运行测试用例,成功:

.
----------------------------------------------------------------------
Ran 1 test in 0.016s

OK

将测试用例补充完整即可:

#!/usr/bin/python
# -*- coding:utf-8 -*-

#########################################################
# (C) 2000-2017 NSFOCUS Corporation. All rights Reserved#
#########################################################

"""
Function:
Create Time: 2017年09月26日 星期二 22时50分33秒
Author: zhaoxinzhen@intra.nsfocus.com
"""


import unittest
import requests

class GetEventListTest(unittest.TestCase):
    """查询发布会信息带用户认证"""

    def setUp(self):
        self.base_url = "http://10.18.214.88:8000/api/sec_get_event_list/"

    def test_get_event_list_auth_null(self):
        """auth为空"""
        r = requests.get(self.base_url, params={'eid':1})
        result = r.json()
        self.assertEqual(result['status'], 10011)
        self.assertEqual(result['message'], 'user auth null')

    def test_get_event_list_auth_error(self):
        """auth错误"""
        auth_user = ('abc', '123')
        r = requests.get(self.base_url, auth=auth_user, params={'eid':1})
        result = r.json()
        self.assertEqual(result['status'], 10012)
        self.assertEqual(result['message'], 'user auth fail')

    def test_get_event_list_eid_null(self):
        """eid参数为空"""
        auth_user = ('admin', 'admin123456')
        r = requests.get(self.base_url, auth=auth_user, params={'eid':''})
        result = r.json()
        self.assertEqual(result['status'], 10021)
        self.assertEqual(result['message'], 'parameter error')

    def test_get_event_list_eid_success(self):
        """根据eid查询结果成功"""
        auth_user = ('admin', 'admin123456')
        r = requests.get(self.base_url, auth=auth_user, params={'eid':1})
        result = r.json()
        self.assertEqual(result['status'], 200)
        self.assertEqual(result['message'], 'success')
        self.assertEqual(result['data']['name'], u'小米5发布会')
        self.assertEqual(result['data']['address'], u'北京国家会议中心')

if __name__ == '__main__':
    unittest.main()

11.2 数字签名

在使用HTTP/SOAP协议传输数据时,签名作为其中一个参数,有着重要的作用:

  • 鉴权: 通过客户端的密钥和服务端的密钥匹配。

如何理解鉴权呢,举个例子:

向接口传参:http://127.0.0.1:8000/api/?a=1&b=2

假设签名的密钥为:@admin123

加上签名之后的接口传参为:http://127.0.0.1:8000/api/?a=1&b=2&sign=@admin123

显然这样明文传输sign参数是不安全的,一般会通过加密算法进行加密,eg:MD5

>>> import hashlib
>>> md5 = hashlib.md5()
>>> sign_str = "@admin123"
>>> sign_bytes_utf8 = sign_str.encode(encoding="utf-8")
>>> md5.update(sign_bytes_utf8)
>>> md5.hexdigest()
'4b9db269c5f978e1264480b0a7619eea'
>>> 

似曾相识啊,目前正在测试的第三方应用接口就是使用的这种数字签名方式。

将“@admin123”通过MD5加密之后得到:4b9db269c5f978e1264480b0a7619eea

单独作为鉴权,带签名的接口为:http://127.0.0.1:8000/api/?a=1&b=2&sign=4b9db269c5f978e1264480b0a7619eea

使用MD5加密算法,好处是不可逆,当服务器接收到参数后,同样需要对“@admin123”进行MD5加密,然后与调用者传来的sign加密字符串对比是否一致,从而来鉴别调用者是否有权访问接口。

MD5 Message-Digest Algorithm 5 消息摘要加密算法第五版,用于确保信息传输的完整一致,是计算机广泛使用的杂凑算法之一,主流编程语言已经普遍支持MD5实现。

  • 数据防篡改: 参数是明文传输,将接口参数及密钥生成加密字符串,将加密字符串作为签名。

eg:http://127.0.0.1:8000/api/?a=1&b=2

假设签名的密钥为:@admin123

签名的明文为:a=1&b=2&api_key=@admin123

通过MD5算法将整个接口参数(a=1&b=2&api_key=@admin123)生成加密字符串:786bfe32ae1d3764f208e03ca0bfaf13

所以,作为数据防篡改,带签名的接口为:

http://127.0.0.1:8000/api/?a=1&b=2&sign=786bfe32ae1d3764f208e03ca0bfaf13

对整个接口的参数做了加密,所以,只要任意一个参数发生变化,签名的验证就会失败。

好处是:加强了鉴权和对数据完整性的保护;

坏处是:MD5加密不可逆,服务器端必须知道客户端的接口参数和值,否则签名的验证就会失败。但实际上接口在设计时,服务器端是不知道客户端的请求参数值的。eg:嘉宾手机号的查询,服务器不知道调用者传的手机号具体是什么,只是通过数据库来查询该号码是否存在,那么就不能使用全参数加密的方式。

11.2.1 开发接口

编辑:…/guest/sign/views_if_sec.py视图文件

import time, hashlib

# 用户签名+时间戳
def user_sign(request):
    if request.method == "POST":
        client_time = request.POST.get('time', '')  # 客户端时间戳
        client_sign = request.POST.get('sign', '')  # 客户端签名
    else:
        return "error"

    if client_time == '' or client_sign == '':
        return "sign null"

    # 服务器时间
    now_time = time.time()  # 当前时间戳
    server_time = str(now_time).split('.')[0]

    # 获取时间差
    time_difference = int(server_time) - int(client_time)
    if time_difference >= 60:
        return "timeout"

    # 签名检查
    md5 = hashlib.md5()
    sign_str = client_time + "&Guest-Bugmaster"
    sign_bytes_utf8 = sign_str.encode(encoding="utf-8")
    md5.update(sign_bytes_utf8)
    server_sign = md5.hexdigest()

    if server_sign != client_sign:
        return "sign fail"
    else:
        return "sign success"

对上述代码进行分析:

  • 创建user_sign()函数,处理签名参数;

  • 首先,通过POST方法获取两个参数client_time和client_sign,如果客户端请求方法不是POST,那么函数返回error错误;

  • 然后,判断两个参数均不能为空,如果为空,则返回sign null错误;

  • 然后,对时间戳进行判断,对客户端的时间戳和服务器端的时间戳进行判断,如果时间戳大于60,则返回超时timeout错误;

  • 最后,对签名进行检查,如果签名检查通过,则返回成功,否则返回签名验证失败。

注意: 之所以将时间戳用“.”split,是因为python3的时间戳精度较高,但我们只需要小数点的前10位。

将用户签名功能应用到添加发布会接口中,编辑:…/guest/sign/views_if_sec.py视图文件:

def add_event(request):
    sign_result = user_sign(request)
    if sign_result == "error":
        return JsonResponse({'status':10011, 'message':'request error'})
    elif sign_result == "sign null":
        return JsonResponse({'status':10022, 'message':'user sign null'})
    elif sign_result == "timeout":
        return JsonResponse({'status':10013, 'message':'user sign timeout'})
    elif sign_result == "sign fail":
        return JsonResponse({'status':10014, 'message':'user sign error'})

    eid = request.POST.get('eid', '')
    name = request.POST.get('name', '')
    limit = request.POST.get('limit', '')
    status = request.POST.get('status', '')
    address = request.POST.get('address', '')
    start_time = request.POST.get('start_time', '')

    if eid == '' or name == '' or limit == '' or address == '' or start_time == '':
        return JsonResponse({'status':10021, 'message':'parameter error'})

    result = Event.objects.filter(id=eid)

    if result:
        return JsonResponse({'status':10022, 'message':'event id already exists'})

    result = Event.objects.filter(name=name)

    if result:
        return JsonResponse({'status':10023, 'message':'event name already exists'})

    if status == '':
        status = 1
    try:
        Event.objects.create(id=eid, name=name, limit=limit, address=address, status=int(status), start_time=start_time)
    except ValidationError as e:
        error = 'start_time format error. It must be in YYYY-MM-DD HH:MM:SS format.'
        return JsonResponse({'status':10024, 'message':error})

    return JsonResponse({'status':200, 'message':'add event success'})

调用user_sign()函数处理用户签名,根据函数返回字符串,将相应的处理结果返回给客户端,当用户签名验证通过后,接下来的处理过程和之前是一样的。

11.2.2 接口文档

这里写图片描述

11.2.3 接口用例

由于接口中加入了时间戳和md5加密算法,所以一般的接口测试工具无法模拟,此时就凸显出通过代码方式测试接口的优越性了。

添加接口测试用例:add_event_test.py

#!/usr/bin/python
# -*- coding:utf-8 -*-


import unittest, requests, hashlib
from time import time


class AddEventTest(unittest.TestCase):

    def setUp(self):
        self.base_url = "http://127.0.0.1:8000/api/sec_add_event/"
        # app_key
        self.api_key = "&Guest-Bugmaster"
        # 当前时间
        now_time = time()
        self.client_time = str(now_time).split('.')[0]
        # sign
        md5 = hashlib.md5()
        sign_str = self.client_time + self.api_key
        sign_bytes_utf8 = sign_str.encode(encoding="utf-8")
        md5.update(sign_bytes_utf8)
        self.sign_md5 = md5.hexdigest()

    def test_add_event_request_error(self):
        '''请求方法错误'''
        r = requests.get(self.base_url)
        result = r.json()
        self.assertEqual(result['status']. 10011)
        self.assertEqual(result['message'], 'request error')

    def test_add_event_sign_null(self):
        '''签名参数为空'''
        payload = {'eid':1, 'limit':'', 'address':'', 'start_time':'',
        'time':'', 'sign':''}
        r = requests.post(self.base_url, data=payload)
        result = r.json()
        self.assertEqual(result['status'], 10012)
        self.assertEqual(result['message'], 'user sign null')

    def test_add_event_time_out(self):
        '''请求超时'''
        now_time = str(int(self.client_time) - 61)
        payload = {'eid':1, 'limit':'', 'address':'', 'start_time':'',
        'time':now_time, 'sign':'abc'}
        r = requests.post(self.base_url, data=payload)
        result = r.json()
        self.assertEqual(result['status'], 10013)
        self.assertEqual(result['message'], 'user sign timeout')

    def test_add_event_sign_error(self):
        '''签名错误'''
        payload = {'eid':1, 'limit':'', 'address':'', 'start_time':'',
        'time':self.client_time, 'sign':'abc'}
        r = requests.post(self.base_url, data=payload)
        self.assertEqual(result['status'], 10014)
        self.assertEqual(result['message'], 'user sign error')

    def test_add_event_success(self):
        '''添加成功'''
        payload = {'eid':21, 'name':'一加5手机发布会', 'limit':2000,
        'address':'深圳宝体', 'start_time':'2017-05-10 12:00:00',
        'time':self.client_time, 'sign':self.sign_md5}
        r = requests.post(self.base_url, data=payload)
        result = r.json()
        self.assertEqual(result['status'], 200)
        self.assertEqual(result['message'], 'add event success')


if __name__ == '__main__':
    unittest.main()

11.3 接口加密

以AES加密算法为例。

11.3.1 PyCrypto库

PyCrypto库,是一个免费的加密算法库,支持常见的DES、AES加密,以及MD5、SHA等各种HASH运算。

PyCrypto在Windows系统中安装需要依赖于“vcvarsall.bat”文件,需要安装庞大的Visual Studio才能解决,所以建议还是使用Linux系统来进行学习和使用。

通过下面的例子来演示PyCrypto库的强大:

  • eg1

SHA-256算法属于密码SHA-2系列哈希,它产生了一个消息的256位摘要。哈希值用作表示大量数据的固定大小的唯一值,数据的少量更改会在哈希值中产生不可预知的大量更改,从而验证数据的安全。

>>> from Crypto.Hash import SHA256
>>> hash = SHA256.new()
>>> hash.update(b'message')
>>> hash.digest()    #使用digest()方法加密
'\xabS\n\x13\xe4Y\x14\x98+y\xf9\xb7\xe3\xfb\xa9\x94\xcf\xd1\xf3\xfb"\xf7\x1c\xea\x1a\xfb\xf0+F\x0cm\x1d'
>>> hash.hexdigest()    #使用hexdigest()方法加密
'ab530a13e45914982b79f9b7e3fba994cfd1f3fb22f71cea1afbf02b460c6d1d'
>>> 

通过digest()方法可以对字符串“message”进行加密,当然,通过hexdigest()方法也可以将其转换为16进制的加密字符串。

  • eg2

AES是Advanced Encryption Standard的缩写,即高级加密标准,是目前非常流行的加密算法。

>>> from Crypto.Cipher import AES
>>> obj = AES.new('This is a key123', AES.MODE_CBC, 'This is an IV456')
>>> message = "The answer is no"
>>> ciphertext = obj.encrypt(message)    #加密
>>> ciphertext
'\xd6\x83\x8dd!VT\x92\xaa`A\x05\xe0\x9b\x8b\xf1'
>>> obj2 = AES.new('This is a key123', AES.MODE_CBC, 'This is an IV456')
>>> obj2.decrypt(ciphertext)    #解密
'The answer is no'
>>> 

加密的过程:

  • “This is a key123”为key,长度有着严格的要求,必须为16、24或32位,否则会抛出异常:“ValueError: AES key must be either 16, 24 or 32 bytes long”。

  • “This is an IV456”为VI,长度要求更加严格,只能为16位,否则将抛出异常:“ValueError: IV must be 16 bytes long”。

  • 通过encrypt()方法对message字符串进行加密,得到:’\xd6\x83\x8dd!VT\x92\xaa`A\x05\xe0\x9b\x8b\xf1’

解密的过程:

  • 要想对加密字符串进行解密,则必须知道加密时所使用的key和VI,通过decrypt()方法对加密字符串解密得到:“The answer is no”。

  • eg3

此外,Crypto还提供了一个强大的随机算法。

>>> from Crypto.Random import random
>>> random.choice(['dogs', 'cats', 'bears'])
'bears'
>>> 

11.3.2 AES加密接口开发

将AES加密算法应用到接口开发中,先从编写测试用例开始,因为加密的过程是在客户端进行的,也就是在测试用例当中进行。

编写接口测试用例文件:Interface_AES_test.py

#! /usr/bin/python
# -*- coding:utf-8 -*-

from Crypto.Cipher import AES
import base64
import requests
import unittest
import json

class AESTest(unittest.TestCase):
    """AES加密后的接口测试用例"""

    def setUp(self):
        """初始化测试参数"""
        BS = 16
        self.pad = lambda s: s + (BS - len(s) % BS) * chr(BS - len(s) % BS)    # 使用lambda定义匿名函数来对字符串进行补足,使其长度变为16、24、32位
        self.base_url = "http://127.0.0.1:8000/api/sec_get_guest_list/"
        self.app_key = "W7v4D60fds2Cmk2U"    # 定义好app_key,app_key是密钥,只能告诉给合法的接口调用者

    def encryptBase64(self, src):
        return base64.urlsafe_b64encode(src)

    def encryptAES(self, src, key):
        """生成AES密文"""
        iv = b"1172311105789011"    # 定义好iv,iv也是保密的,必须为16位
        cryptor = AES.new(key, AES.MODE_CBC, iv)
        ciphertext = cryptor.encrypt(self.pad(src))    # 通过encrypt()方法对src(JSON格式的接口参数)生成加密字符串
        return self.encryptBase64(ciphertext)    # 通过encrypt()方法生成的加密字符串太长不适合传输,于是,通过base64模块的urlsafe_b64encode()方法对AES加密字符串进行二次加密

    def test_case_interface(self):
        """测试AES加密的接口"""
        payload = {'eid':1, 'phone': '18011001100'}    # 使用字典格式来存放接口参数
        # 加密过程
        encoded = self.encryptAES(json.dumps(payload), self.app_key).decode()    # 通过json.dumps()方法将payload字典转化为JSON格式,和app_key一起做为encryptAES()方法的参数,用于生成AES加密字符串

        r = requests.post(self.base_url, data = {"data": encoded})    # 将加密后的字符串作为data参数发送到接口请求
        result = r.json()
        self.assertEqual(result['status'], 200)
        self.assertEqual(result['message'], 'success')

if __name__ == '__main__':
    unittest.main()

对上述代码进行分析,见备注。

注意:

  1. encrypt()方法要求被加密的字符串长度必须是16、24、32位,如果直接生成可能会引发异常:“ValueError: Input strings must be a multiple of 16 in length”,可是被加密字符串的长度是可控的,因为接口参数的个数和长度是不固定的,所以,为了解决这个问题,还需要对字符串的长度进行处理,使它的长度符合encrypt()的需求;

  2. 通过encrypt()方法加密后的字符串是这样的:

b'>_\x80\xlfi\x97\x8f\x94~\xeaE\……'
  1. 通过urlsafe_b64encode()方法加密后的字符串是这样的:
b'gouBbuKWEeY5w……'

当服务器接收到加密的接口参数后,需要再警告一系列的过程解密:

编辑接口文件:views_if_sec.py

#! /usr/bin/python
# -*- coding:utf-8 -*-

import json
from Crypto.Cipher import AES


# AES加密算法
BS = 16
unpad = lambda s: s[0: - ord(s[-1])]

def decryptBase64(src):
    return base64.urlsafe_b64decode(src)    # 通过urlsafe_b64decode()方法对base64加密字符串进行解密

def decryptAES(src, key):
    """
    解析AES密文
    """
    src = decryptBase64(src)    # 调用decryptBase64()方法,将base64加密字符串解密为AES加密字符串
    iv = b'1172311105789011'
    cryptor = AES.new(key, AES.MODE_CBC, iv)
    text = cryptor.decrypt(src).decode()    # 通过decrypt()对AES加密字符串进行解密
    return unpad(text)    # 通过unpad匿名函数对字符串的长度进行还原

def aes_encryption(request):

    app_key = 'W7v4D60fds2Cmk2U'    # 服务器端与合法客户端约定的密钥app_key

    if request.method == "POST":    # 判断客户端请求方法是否为POST,通过POST.get()方法接收data参数
        data = request.POST.get("data", "")
    else:
        return "error"    # 如果请求方法不为POST,则函数返回“error”字符串

    # 解密
    decode = decryptAES(data, app_key)    # 调用decryptAES()函数解密,传参数字符串和app_key
    # 转化为字典
    dict_data = json.loads(decode)    # 将解密后的字符串通过json.loads()方法转化成字典,并作为函数的返回值
    return dict_data

在查询嘉宾列表的接口中调用aes_encryption()函数进行AES加密字符串的解密,继续编辑views_if_sec.py文件:

# 嘉宾查询接口---AES算法
def get_guest_list(request):
    dict_data = aes_encryption(request)

    if dict_data == "error":
        return JsonResponse({'status':10011, 'message':'request error'})

    # 取出对应的发布会id和嘉宾手机号
    eid = dict_data['eid']
    phone = dict_data['phone']

    if eid == '':
        return JsonResponse({'status':10021, 'message':'eid cannot be empty'})

    if eid != '' and phone == '':
        datas = []
        results = Guest.objects.filter(event_id=eid)
        if results:
            for r in results:
                guest = {}
                guest['realname'] = r.realname
                guest['phone'] = r.phone
                guest['email'] = r.email
                guest['sign'] = r.sign
                datas.append(guest)
            return JsonResponse({'status':200, 'message':'success', 'data':datas})
        else:
            return JsonResponse({'status':10022, 'message':'query result is empty'})

    if eid != '' and phone != '':
        guest = {}
        try:
            result = Guest.objects.get(phone=phone, event_id=eid)
        except ObjectDoesNotExist:
            return JsonResponse({'status':10022, 'message':'query result is empty'})
        else:
            guest['realname'] = result.realname
            guest['phone'] = result.phone
            guest['email'] = result.email
            guest['sign'] = result.sign
            return JsonResponse({'status':200, 'message':'success', 'data':guest})

如果aes_encryption()函数返回“error”,则说明该接口的方法调用错误,返回客户端“request error”,否则,取出解密字符串(字典)中的eid 和phone的参数进行查询嘉宾列表的处理。

11.3.3 接口文档

增加了加密后的查询嘉宾接口文档:

这里写图片描述

11.3.4 补充接口测试用例

最后,补充查询嘉宾接口的测试用例:

Interface_AES_test.py

#! /usr/bin/python
# -*- coding:utf-8 -*-

from Crypto.Cipher import AES
import base64
import requests
import unittest
import json

class AESTest(unittest.TestCase):
    """AES加密后的接口测试用例"""

    def setUp(self):
        """初始化测试参数"""
        BS = 16
        self.pad = lambda s: s + (BS - len(s) % BS) * chr(BS - len(s) % BS)
        self.base_url = "http://10.18.214.88:8000/api/sec_get_guest_list/"
        self.app_key = "W7v4D60fds2Cmk2U"

    def encryptBase64(self, src):
        return base64.urlsafe_b64encode(src)

    def encryptAES(self, src, key):
        """生成AES密文"""
        iv = b"1172311105789011"
        cryptor = AES.new(key, AES.MODE_CBC, iv)
        ciphertext = cryptor.encrypt(self.pad(src))
        return self.encryptBase64(ciphertext)

    def test_case_interface(self):
        """测试AES加密的接口"""
        payload = {'eid':1, 'phone': '18011001100'}
        # 加密
        encoded = self.encryptAES(json.dumps(payload), self.app_key).decode()

        r = requests.post(self.base_url, data = {"data": encoded})
        result = r.json()
        self.assertEqual(result['status'], 200)
        self.assertEqual(result['message'], 'success')

    def test_get_guest_list_request_error(self):
        """测试嘉宾查询接口:eid为空"""
        payload = {'eid': '', 'phone': ''}
        encoded = self.encryptAES(json.dumps(payload), self.app_key).decode()
        r = requests.post(self.base_url, data={"data": encoded})
        result = r.json()
        self.assertEqual(result['status'], 10011)
        self.assertEqual(result['message'], 'request error')

    def test_get_guest_list_eid_null(self):
        """测试嘉宾查询接口:phone为空"""
        payload = {'eid': '', 'phone': '18011001100'}
        encoded = self.encryptAES(json.dumps(payload), self.app_key).decode()
        r = requests.post(self.base_url, data={"data": encoded})
        result = r.json()
        self.assertEqual(result['status'], 10021)
        self.assertEqual(result['message'], 'eid cannot be empty')

    def test_get_guest_list_eid_error(self):
        """根据eid查询结果为空"""
        payload = {'eid': '901', 'phone': ''}
        encoded = self.encryptAES(json.dumps(payload), self.app_key).decode()
        r = requests.post(self.base_url, data={"data": encoded})
        result = r.json()
        self.assertEqual(result['status'], 10022)
        self.assertEqual(result['message'], 'query result is empty')

    def test_get_guest_list_eid_success(self):
        """根据eid查询结果成功"""
        payload = {'eid': '1', 'phone': '18011001100'}
        encoded = self.encryptAES(json.dumps(payload), self.app_key).decode()
        r = requests.post(self.base_url, data={"data": encoded})
        result = r.json()
        self.assertEqual(result['status'], 200)
        self.assertEqual(result['message'], 'success')
        self.assertEqual(result['data'][0]['realname'], 'alen')
        self.assertEqual(result['data'][0]['phone'], '18011001100')

    def test_get_event_list_eid_phone_null(self):
        """根据eid和phone查询结果为空"""
        payload = {'eid': '200', 'phone': '10000000000'}
        encoded = self.encryptAES(json.dumps(payload), self.app_key).decode()
        r = requests.post(self.base_url, data={"data": encoded})
        result = r.json()
        self.assertEqual(result['status'], 10022)
        self.assertEqual(result['message'], 'query result is empty')

    def test_get_event_list_eid_phone_success(self):
        """根据eid和phone查询结果成功"""
        payload = {'eid': '1', 'phone': '18011001100'}
        encoded = self.encryptAES(json.dumps(payload), self.app_key).decode()
        r = requests.post(self.base_url, data={"data": encoded})
        result = r.json()
        self.assertEqual(result['status'], 200)
        self.assertEqual(result['message'], 'success')
        self.assertEqual(result['data']['realname'], 'alen')
        self.assertEqual(result['data']['phone'], '18011001100')    

if __name__ == '__main__':
    unittest.main()

封装了AES算法的加密算法后,在接口测试用例中调用即可,过程不复杂。

总结

使用MD5方式的相对来说还简单点,AES这种的确相对复杂,书中也只是做了基础的介绍和应用展示,在实际产品的开发过程中,加密环节相对复杂,测试人员,如果要测试带有加密的接口,当然了,如果了解加密过程最好,如果不了解,也不影响测试,只需要通过研发人员获取到关键信息,加入到接口测试用例中即可。

  • 0
    点赞
  • 3
    收藏
    觉得还不错? 一键收藏
  • 0
    评论
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值