api
2025年9月5日大约 1 分钟
curl -k --request POST \
--url https://域名/api/v1/chat.postMessage \
--header 'X-Auth-Token: token' \
--header 'X-User-Id: id' \
--header 'accept: application/json' \
--header 'content-type: application/json' \
--data '{
"channel": "#data_warning",
"text": "Sample message",
"attachments": [
{
"color": "#ff0000",
"title": "Test Attachment",
"text": "Attachment content",
"fields": [
{
"short": true,
"title": "Test",
"value": "Testing out something"
}
]
}
]
}'
import requests
import json
# Prometheus API 地址
PROM_URL = "http://103.118.40.237:30090/api/v1/query"
def get_prometheus_metric(query):
try:
response = requests.get(PROM_URL, params={'query': query}, timeout=10)
response.raise_for_status()
data = response.json()
if data['status'] != 'success' or not data['data']['result']:
print("没有获取到数据")
return None
# 提取指标值
value = data['data']['result'][0]['value']
timestamp = value[0]
metric_value = round(int(value[1]) / 1024 /1024 / 1024, 2)
print(f"时间戳: {timestamp}, 指标值: {metric_value}")
return metric_value
except requests.RequestException as e:
print(f"请求 Prometheus 失败: {e}")
return None
def send2rocket(msg):
# API 地址
url = "https://www.rechat.top/api/v1/chat.postMessage"
# 认证信息
headers = {
"X-Auth-Token": "YxBY-zNJX9Nu0-czH7-8Vj7ajJpoJjtzIhGZ0Uol2Wf",
"X-User-Id": "2Q3p8LhCRWPkZXGxE",
"Accept": "application/json",
"Content-Type": "application/json"
}
# 消息内容
payload = {
"channel": "#data_warning",
"text": msg,
# "attachments": [
# {
# "color": "#ff0000",
# "title": "Test Attachment",
# "text": "Attachment content",
# "fields": [
# {
# "short": True,
# "title": "Test",
# "value": "Testing out something"
# }
# ]
# }
# ]
}
try:
response = requests.post(url, headers=headers, data=json.dumps(payload), verify=False)
response.raise_for_status()
print("消息发送成功:", response.json())
except requests.RequestException as e:
print("发送失败:", e)
if __name__ == "__main__":
# 查询表达式
query = 'node_network_transmit_bytes_total{device="ens3",instance="cdnone"}'
r = get_prometheus_metric(query)
query = 'node_network_transmit_bytes_total{device="ens3",instance="cdntwo"}'
r2 = get_prometheus_metric(query)
msg = f"\U0001F4BB服务器: cdnone \U0001F6DC当前出网流量总计: {r} G \n\U0001F4BB服务器: cdntwo \U0001F6DC当前出网流量总计: {r2} G"
send2rocket(msg)