在使用Django開發web應用的時候,很多場景都會有需要微信相關功能的介入,最近我們公司在使用python的Django框架配合國產數據庫OceanBase數據庫進行開發互聯網應急指揮系統的時候,就用到了微信通知,在發生輿情事件的時候通過微信公眾號,通知對應人員有新的事件發生,或者提醒相關人員對應事件的進度情況
想要使用微信的信息推送,就需要提前做好一些準備,因為發送微信模板消息,您需要使用微信公眾平臺 API。
需要獲取微信公眾平臺的 API 密鑰,并且配置好相應的回調 URL。
需要根據微信公眾平臺的要求,選擇合適的模板,并填寫好模板內容。
根據微信公眾平臺的 API 文檔,構造合適的請求參數,發送請求。
對于推送消息的響應結果,需要進行適當的處理和解析,以便于判斷推送消息是否成功。
廢話直接不多說,直接貼代碼
import copy
import json
import redis
import requests
from django.conf import settings
from api.models import DvadminSystemUsers
class WeChat:
def __init__(self, app_id=settings.WX_APP_ID, secret=settings.WX_APP_SECRET, template=None):
self.app_id = app_id
self.secret = secret
self.template = {
"touser": "",
"template_id": "",
"url": "",
"data": {
"first": {
"value": "",
},
"keyword1": {
"value": "",
},
"keyword2": {
"value": "",
},
"remark": {
"value": "",
},
}
}
if template:
self.template = template
self.access_token = None
self.req_list = list()
self.user_list = list()
self.data_list = list()
def _get_token(self, force_update=False):
key_name = 'wechat_token_{}'.format(self.app_id)
if force_update:
self.access_token = None
else:
self.access_token = get_data(key_name)
if not self.access_token:
url = "https://api.weixin.qq.com/cgi-bin/token?"
payload = {
'grant_type': 'client_credential',
'appid': self.app_id,
'secret': self.secret,
}
response = requests.get(url, params=payload, timeout=50)
access_token = response.json().get("access_token")
if access_token:
set_data(key_name, access_token, ex=7100)
self.access_token = get_data(key_name)
def make_data_list(self):
user_openid_list = DvadminSystemUsers.objects.filter(
id__in=self.user_list
).exclude(
openId=''
).exclude(
openId__isnull=True
).values_list('openid', flat=True)
for openid in user_openid_list:
user_template = copy.deepcopy(self.template)
user_template['touser'] = openid
self.data_list.append(user_template)
def post_data(self):
# 獲取 token
self._get_token()
url = "https://api.weixin.qq.com/cgi-bin/message/template/send?access_token={}".format(self.access_token)
# 準備數據
if self.user_list:
self.make_data_list()
# 發送請求
for data in self.data_list:
json_template = json.dumps(data)
res = requests.post(url, data=json_template)
print('post_data', res.text)
def get_redis():
"""
連接redis
"""
redis = redis.Redis(host='localhost', port=6379, db=0)
return redis
def set_data(name, value, **kwargs):
# 將數據存入redis緩存
redis = get_redis()
value = json.dumps(value)
redis.set(name, value, **kwargs)
def get_data(name):
# 取出數據
redis = get_redis()
value = redis.get(name)
if value:
value = json.loads(value)
return value調用微信推送類




