1224 条记录
52 私有链接
52 私有链接
这段文字主要讨论了如何检测AI接口中转商的转接次数及方法。文中测试了多个AI接口(钱多多、柏拉图等),并提供了一个自部署的检测工具源码,该工具通过生成随机图片并模拟请求来探测中转过程,以识别并规避中转商的屏蔽策略。 最终指出,一些中转商会伪造用户代理和IP地址,而真正官方接口不会进行此类操作,这成为检测的关键。
import logging
from fastapi import FastAPI, Request
from faker import Faker
from io import BytesIO
from PIL import Image
from fastapi.responses import StreamingResponse
from datetime import datetime, timedelta
from fastapi_utils.tasks import repeat_every
import time
import asyncio
import httpx
from fastapi.middleware.cors import CORSMiddleware
logging.basicConfig(level=logging.WARNING)
app = FastAPI()
fake = Faker()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 允许所有来源
allow_credentials=True,
allow_methods=["*"], # 允许所有方法
allow_headers=["*"], # 允许所有头
)
# 创建一个map映射 名字是recorded_ips
recorded_ips = {}
@app.get("/trace/openai")
async def openai_request(url: str, key: str):
global recorded_ips
traceId = int(time.time())
current_time = datetime.now()
if traceId not in recorded_ips:
recorded_ips[traceId] = (current_time, [], [])
asyncio.create_task(send_post_request(url, key, traceId))
return traceId
async def send_post_request(url: str, key: str, traceId: str):
global recorded_ips
headers = {
'Accept': '',
'User-Agent': 'Apifox/1.0.0 (https://apifox.com)',
'Content-Type': 'application/json',
'Authorization': f'Bearer {key}'
}
image_url = f"https://api3.aicnn.cn/trace/fake-image?traceId={traceId}"
data = {
"model": "gpt-4o",
"messages": [
{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": image_url}},
{"type": "text", "text": "What is this?"}
]
}
],
"max_tokens": 3,
"stream": False
}
async with httpx.AsyncClient() as client:
try:
response = await client.post(url, headers=headers, json=data)
if response.status_code != 200:
recorded_ips[traceId][2].append(f"Error: {response.text}")
else:
recorded_ips[traceId][2].append(f"完成探测")
except Exception as e:
if "error" in str(e):
recorded_ips[traceId][2].append(f"Exception: {str(e)}")
return traceId
@app.on_event("startup")
@repeat_every(seconds=60) # Run every 60 seconds
def cleanup_old_ips():
global recorded_ips
current_time = datetime.now()
for traceId in list(recorded_ips.keys()):
timestamp, _, _ = recorded_ips[traceId]
if current_time - timestamp > timedelta(minutes=3):
del recorded_ips[traceId]
@app.get("/trace/get-agent")
async def fake_image(request: Request, traceId: str):
global recorded_ips
traceId = int(traceId)
if traceId in recorded_ips:
res = recorded_ips[traceId][2]
current_time = datetime.now()
timestamp, _, _ = recorded_ips[traceId]
if current_time - timestamp > timedelta(seconds=60):
current_time = datetime.now()
time_str = current_time.strftime("%H:%M:%S")
recorded_ips[traceId][2].append(str(time_str) + " 超过60秒未收到响应,完成探测" )
return recorded_ips[traceId][2]
else:
return recorded_ips[traceId][2]
return set()
@app.get("/trace/fake-image")
async def fake_image(request: Request, traceId: str):
global recorded_ips
current_time = datetime.now()
traceId = int(traceId)
# 判断recorded_ips是否有traceId,如果没有,则新建一个set
if traceId not in recorded_ips:
recorded_ips[traceId] = (current_time, [], [])
# 生成一个假的 WebP 图片
image = Image.new('RGB', (100, 100),
color=(fake.random_int(0, 255), fake.random_int(0, 255), fake.random_int(0, 255)))
buffer = BytesIO()
image.save(buffer, format="WEBP")
buffer.seek(0)
# 获取请求的 host, 源 IP, user agent 和其他详细信息
user_agent = request.headers.get('user-agent')
if user_agent and "IPS" in user_agent:
user_agent = "Azure " + user_agent
if user_agent and "OpenAI" in user_agent:
user_agent = "OpenAI" + user_agent
if user_agent is None:
user_agent = "未知,可能来自逆向,完成探测"
x_forwarded_for = request.headers.get('x-forwarded-for')
cf_connecting_ip = request.headers.get('cf-connecting-ip')
client_host = request.client.host
headers = request.headers
# 检查并记录IP地址
new_ips = True
# if cf_connecting_ip and cf_connecting_ip in recorded_ips[traceId][1]:
# new_ips = False
# else:
# recorded_ips[traceId][1].append(cf_connecting_ip)
# if x_forwarded_for:
# for ip in x_forwarded_for.split(','):
# ip = ip.strip()
# if ip in recorded_ips[traceId][1]:
# new_ips = False
# else:
# recorded_ips[traceId][1].append(ip)
# break
if new_ips:
# 脱敏
new_x_forwarded_for = ""
if x_forwarded_for:
for ip in x_forwarded_for.split(','):
ip_parts = ip.split('.')
if len(ip_parts) == 4:
new_x_forwarded_for = new_x_forwarded_for + f"{ip_parts[0]}.***.***.{ip_parts[3]}, "
time_str = current_time.strftime("%H:%M:%S")
recorded_ips[traceId][2].append(str(time_str) + " " + user_agent + " " + str(new_x_forwarded_for))
print(
f"Time: {current_time}, TraceId: {traceId}, x_forwarded_for: {x_forwarded_for}, cf_connecting_ip: {cf_connecting_ip}, Client Host: {client_host}, User Agent: {user_agent}")
return StreamingResponse(buffer, media_type="image/webp")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8921, log_level="warning")