您当前的位置:首页 > IT编程 > python
| C语言 | Java | VB | VC | python | Android | TensorFlow | C++ | oracle | 学术与代码 | cnn卷积神经网络 | gnn | 图像修复 | Keras | 数据集 | Neo4j | 自然语言处理 | 深度学习 | 医学CAD | 医学影像 | 超参数 | pointnet | pytorch | 异常检测 | Transformers | 情感分类 | 知识图谱 |

自学教程:python 阿里云oss实现直传签名与回调验证的示例方法

51自学网 2021-10-30 22:47:00
  python
这篇教程python 阿里云oss实现直传签名与回调验证的示例方法写得很实用,希望能帮到您。

签名

import base64import jsonimport timefrom datetime import datetimeimport hmacfrom hashlib import sha1access_key_id = ''# 请填写您的AccessKeySecret。access_key_secret = ''# host的格式为 bucketname.endpoint ,请替换为您的真实信息。host = ''# callback_url为 上传回调服务器的URL,请将下面的IP和Port配置为您自己的真实信息。callback_url = ""# 用户上传文件时指定的前缀。upload_dir = 'user-dir-prefix/'expire_time = 1200expire_syncpoint = int(time.time() + expire_time)policy_dict = {  'expiration': datetime.utcfromtimestamp(expire_syncpoint).isoformat() + 'Z',  'conditions': [    {"bucket": "test-paige"},    ['starts-with', '$key', 'user/test/']  ]}policy = json.dumps(policy_dict).strip()policy_encode = base64.b64encode(policy.encode())signature = base64.encodebytes(hmac.new(access_key_secret.encode(), policy_encode, sha1).digest())callback_dict = {  'callbackUrl': callback_url,  'callbackBody': 'filename=${object}&size=${size}&mimeType=${mimeType}&height=${imageInfo.height}&width=${'          'imageInfo.width}',  'callbackBodyType': 'application/json'}callback = base64.b64encode(json.dumps(callback_dict).strip().encode()).decode()var = {  'accessid': access_key_id,  'host': host,  'policy': policy_encode.decode(),  'signature': signature.decode().strip(),  'expire': expire_syncpoint,  'callback': callback}

回调验签

import asyncioimport base64import timeimport aiomysqlimport rsafrom aiohttp import web, ClientSessionfrom urllib import parseimport uuiddef success(msg='', data=None):  if data is None:    data = {}  dict_data = {    'code': 1,    'msg': msg,    'data': data  }  return web.json_response(dict_data)def failed(msg='', data=None):  if data is None:    data = {}  dict_data = {    'code': 0,    'msg': msg,    'data': data  }  return web.json_response(dict_data)async def handle(request):  """  获取连接池  :param web.BaseRequest request:  :return:  """  authorization_base64 = request.headers['authorization']  x_oss_pub_key_url_base64 = request.headers['x-oss-pub-key-url']  pub_key_url = base64.b64decode(x_oss_pub_key_url_base64.encode())  authorization = base64.b64decode(authorization_base64.encode())  path = request.path  async with ClientSession() as session:    async with session.get(pub_key_url.decode()) as resp:      pub_key_body = await resp.text()      pubkey = rsa.PublicKey.load_pkcs1_openssl_pem(pub_key_body.encode())      body = await request.content.read()      auth_str = parse.unquote(path) + '/n' + body.decode()      parse_url = parse.parse_qs(body.decode())      print(parse_url)      try:        rsa.verify(auth_str.encode(), authorization, pubkey)        pool = request.app['mysql_pool']        async with pool.acquire() as conn:          async with conn.cursor() as cur:            id = str(uuid.uuid4())            url = parse_url['filename'][0]            mime = parse_url['mimeType'][0]            disk = 'oss'            time_at = time.strftime("%Y-%m-%d %H:%I:%S", time.localtime())            sql = "INSERT INTO media(id,url,mime,disk,created_at,updated_at) VALUES(%s,%s,%s,%s,%s,%s)"            await cur.execute(sql, (id, url, mime, disk, time_at, time_at))            await conn.commit()        dict_data = {          'id': id,          'url': url,          'cdn_url': 'https://cdn.***.net' + '/' + url,          'mime': mime,          'disk': disk,          'created_at': time_at,          'updated_at': time_at,        }        return success(data=dict_data)      except rsa.pkcs1.VerificationError:        return failed(msg='验证错误')async def init(loop):  # 创建连接池  mysql_pool = await aiomysql.create_pool(host='127.0.0.1', port=3306,                      user='', password='',                      db='', loop=loop)  async def on_shutdown(application):    """    接收到关闭信号时,要先关闭连接池,并等待连接池关闭成功.    :param web.Application application:    :return:    """    application['mysql_pool'].close()    # 没有下面这句话会报错 RuntimeError: Event loop is closed ,因为连接池没有真正关关闭程序就关闭了,引发python的报错    await application['mysql_pool'].wait_closed()  application = web.Application()  application.on_shutdown.append(on_shutdown)  # 把连接池放到 application 实例中  application['mysql_pool'] = mysql_pool  application.add_routes([web.get('/', handle), web.post('/oss', handle)])  return applicationif __name__ == '__main__':  loop = asyncio.get_event_loop()  application = loop.run_until_complete(init(loop))  web.run_app(application, host='127.0.0.1')  loop.close()

到此这篇关于python 阿里云oss实现直传签名与回调验证的文章就介绍到这了,更多相关python 直传签名与回调验证内容请搜索51zixue.net以前的文章或继续浏览下面的相关文章希望大家以后多多支持51zixue.net!


Python实现"验证回文串"的几种方法
Python中pycharm编辑器界面风格修改方法
万事OK自学网:51自学网_软件自学网_CAD自学网自学excel、自学PS、自学CAD、自学C语言、自学css3实例,是一个通过网络自主学习工作技能的自学平台,网友喜欢的软件自学网站。