52 私有链接
这个工具使用 Gemini AI 将音视频转录为SRT字幕,解决了直接转录长音频时时间轴不准确的问题。它通过智能切片、逐片转录和精准组装,生成时间轴精确的字幕。该工具使用简单,支持多种格式,并且利用Gemini AI的免费额度和高准确率,快速高效地完成转录。使用前需要获取Gemini API Key并填写到工具中。
这段内容主要询问如何利用AI来简化Anki制卡过程,特别是针对专业书籍,因为手动制作卡片工作量太大。提问者希望了解是否有结合AI的小技巧,以提高Anki的使用效率。
这个Anki英语智能卡组将单词的不同释义、词组和习语拆分成独立的卡片,并提供随机例句帮助理解语境。它通过AI生成词义使用频率并分类,支持高度定制化学习,例如按领域和词频筛选。卡组还区分阅读理解和写作,并为不同学习活动设置不同复习参数。此外,用户可以调整每日新卡上限,并使用随机展示和按标签过滤等功能。该卡组的作者推荐使用最新版本,并计划加入AI配音功能。
这个内容描述了一个Anki牌组的数据集,包含约58005个笔记,主要提供单词的定义、美式和英式发音,以及例句。数据更新于2025年1月15日。用户反馈提到导入后界面有透明层,可能是插件问题,建议重新下载牌组导入。牌组例句主要来自牛津10高阶,并使用AI模型扩充了高频但例句少的义项。
这段内容描述了如何配置Linux系统的网络接口。首先需要找到网口名称,然后通过闪烁网口命令来确定物理网口对应的接口,接着编辑网络配置文件,设置IP地址、子网掩码、网关等参数,最后重启网络服务使配置生效。
这本书不是一本技术或职场成功指南,而是一本帮助程序员解决焦虑、倦怠等负面情绪的书籍。它旨在通过改变认知,让人更坦然地面对内心,成为一个“自洽”的程序员。书中探讨了工作哲学、方法论、人际关系、工作与家庭以及“只工作不上班”等多个方面,旨在帮助读者梳理情绪,转变心境。
lianglianglee.com的备份
极客时间电子书在线浏览
极客时间电子书
极客时间免费阅读
const QWEN_API_URL = "https://chat.qwenlm.ai/api/chat/completions";
const QWEN_MODELS_URL = "https://chat.qwenlm.ai/api/models";
const QWEN_FILES_URL = "https://chat.qwenlm.ai/api/v1/files/";
const CACHE_TTL = 60 * 60 * 1000; // 缓存1小时
const STREAM_TIMEOUT = 60000; // 60秒超时
const MAX_RETRIES = 3;
const RETRY_DELAY = 1000; // 1秒
const encoder = new TextEncoder();
const streamDecoder = new TextDecoder();
async function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
// Helper function to convert base64 to Blob
function base64ToBlob(base64) {
const byteString = atob(base64.split(',')[1]);
const mimeString = base64.split(',')[0].split(':')[1].split(';')[0];
const ab = new ArrayBuffer(byteString.length);
const ia = new Uint8Array(ab);
for (let i = 0; i < byteString.length; i++) {
ia[i] = byteString.charCodeAt(i);
}
return new Blob([ab], { type: mimeString });
}
// Fetch with retry functionality
async function fetchWithRetry(url, options, retries = MAX_RETRIES) {
let lastError;
for (let i = 0; i < retries; i++) {
try {
const response = await fetch(url, options);
if (response.ok) return response;
lastError = new Error(`HTTP error! status: ${response.status}`);
if (i < retries - 1) await sleep(RETRY_DELAY * (i + 1));
} catch (error) {
lastError = error;
if (i < retries - 1) await sleep(RETRY_DELAY * (i + 1));
}
}
throw lastError;
}
// Function to upload image to Qwen
async function uploadImageToQwen(token, imageBlob) {
const formData = new FormData();
formData.append('file', imageBlob);
const response = await fetchWithRetry(QWEN_FILES_URL, {
method: "POST",
headers: {
"Authorization": token,
"accept": "application/json",
},
body: formData,
});
const data = await response.json();
if (!data.id) {
throw new Error("File upload failed: No valid file ID returned");
}
return data.id;
}
// Process messages to handle base64 images
async function processMessages(messages, authHeader) {
return Promise.all(messages.map(async (message) => {
if (message.content && Array.isArray(message.content)) {
message.content = await Promise.all(message.content.map(async (content) => {
if (content.type === "image_url" && content.image_url?.url?.startsWith("data:")) {
const imageBlob = base64ToBlob(content.image_url.url);
const imageId = await uploadImageToQwen(authHeader, imageBlob);
return {
type: "image",
image: imageId,
};
}
return content;
}));
}
return message;
}));
}
async function processLine(line, writer, state) {
try {
const data = JSON.parse(line.slice(6));
if (data.choices?.[0]?.delta?.content) {
const currentContent = data.choices[0].delta.content;
let newContent = currentContent;
if (currentContent.startsWith(state.previousContent) && state.previousContent.length > 0) {
newContent = currentContent.slice(state.previousContent.length);
}
if (newContent) {
const newData = {
...data,
choices: [{
...data.choices[0],
delta: {
...data.choices[0].delta,
content: newContent,
},
}],
};
await writer.write(encoder.encode(`data: ${JSON.stringify(newData)}\n\n`));
}
state.previousContent = currentContent;
} else {
await writer.write(encoder.encode(`data: ${JSON.stringify(data)}\n\n`));
}
} catch (error) {
await writer.write(encoder.encode(`${line}\n\n`));
}
}
async function handleStream(context) {
let buffer = "";
try {
while (true) {
const { done, value } = await context.reader.read();
if (done) {
context.state.isCompleted = true;
if (context.timeoutId) {
clearTimeout(context.timeoutId);
}
if (buffer) {
const lines = buffer.split("\n");
for (const line of lines) {
if (line.trim().startsWith("data: ")) {
await processLine(line, context.writer, context.state);
}
}
}
await context.writer.write(encoder.encode("data: [DONE]\n\n"));
break;
}
const valueText = streamDecoder.decode(value, { stream: true });
buffer += valueText;
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
if (line.trim().startsWith("data: ")) {
await processLine(line, context.writer, context.state);
}
}
}
} catch (error) {
if (!context.state.isCompleted) {
if (context.timeoutId) {
clearTimeout(context.timeoutId);
}
try {
await context.writer.write(
encoder.encode(`data: {"error":true,"message":"${error.message}"}\n\n`)
);
await context.writer.write(encoder.encode("data: [DONE]\n\n"));
} catch {}
}
} finally {
try {
await context.writer.close();
} catch {}
}
}
async function handleModelsRequest(request) {
const authHeader = request.headers.get("Authorization");
if (!authHeader || !authHeader.startsWith("Bearer ")) {
return new Response("Unauthorized", { status: 401 });
}
try {
const response = await fetchWithRetry(QWEN_MODELS_URL, {
headers: {
"Authorization": authHeader
}
});
return response;
} catch (error) {
return new Response(
JSON.stringify({ error: true, message: error.message }),
{ status: 500 }
);
}
}
async function handleChatCompletionsRequest(request) {
const authHeader = request.headers.get("Authorization");
if (!authHeader || !authHeader.startsWith("Bearer ")) {
return new Response("Unauthorized", { status: 401 });
}
const requestData = await request.json();
const { messages, stream = false, model, max_tokens } = requestData;
if (!model) {
return new Response(
JSON.stringify({ error: true, message: "Model parameter is required" }),
{ status: 400 }
);
}
try {
// Process messages for images
const processedMessages = await processMessages(messages, authHeader);
const qwenRequest = {
model,
messages: processedMessages,
stream,
...(max_tokens !== undefined && { max_tokens })
};
const qwenResponse = await fetch(QWEN_API_URL, {
method: "POST",
headers: {
"Content-Type": "application/json",
"Authorization": authHeader
},
body: JSON.stringify(qwenRequest)
});
if (stream) {
const { readable, writable } = new TransformStream();
const writer = writable.getWriter();
const reader = qwenResponse.body.getReader();
const streamContext = {
writer,
reader,
state: {
isCompleted: false,
isStreamActive: true,
previousContent: "",
},
timeoutId: setTimeout(() => {
if (!streamContext.state.isCompleted) {
streamContext.state.isStreamActive = false;
writer.write(encoder.encode('data: {"error":true,"message":"Response timeout"}\n\n'))
.then(() => writer.write(encoder.encode("data: [DONE]\n\n")))
.then(() => writer.close())
.catch(() => {});
}
}, STREAM_TIMEOUT)
};
handleStream(streamContext);
return new Response(readable, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive"
}
});
}
return new Response(await qwenResponse.text(), {
headers: {
"Content-Type": "application/json",
"Cache-Control": "no-cache",
"Connection": "keep-alive"
}
});
} catch (error) {
return new Response(
JSON.stringify({ error: true, message: error.message }),
{ status: 500 }
);
}
}
async function handleRequest(request) {
try {
const url = new URL(request.url);
const pathname = url.pathname;
if (request.method === "GET" && pathname === "/v1/models") {
return await handleModelsRequest(request);
}
if (request.method === "POST" && pathname === "/v1/chat/completions") {
return await handleChatCompletionsRequest(request);
}
return new Response("Method not allowed", { status: 405 });
} catch (error) {
return new Response(
JSON.stringify({ error: true, message: error.message }),
{ status: 500 }
);
}
}
addEventListener("fetch", event => {
event.respondWith(handleRequest(event.request));
});
本文介绍了使用 FFmpeg 进行字幕压制的两种常见方式:内嵌字幕和硬字幕。内嵌字幕可以单独存储,允许用户在观看时自由开关字幕,使用 FFmpeg 的命令示例为:ffmpeg -i input.mp4 -i subtitle.srt -c copy -c:s mov_text output.mp4
。硬字幕则将字幕直接融入视频画面,无法分离,常见的压制命令包括使用 SRT 字幕的 ffmpeg -i input.mp4 -vf subtitles=subtitle.srt -crf 16 output.mp4
和使用 ASS 字幕的 ffmpeg -i input.mp4 -vf ass=subtitle.ass -crf 16 output.mp4
。ASS 字幕支持更多样式和特效,适合制作复杂字幕效果。
本文介绍了如何制作Ubuntu启动盘并安装Ubuntu操作系统。首先,下载并安装UltraISO软件,然后从163网易开源镜像站下载Ubuntu 20.10镜像文件。接着,准备一个4G以上的U盘或空光盘,使用UltraISO将镜像文件写入U盘或光盘,制作成启动盘。最后,将启动盘插入需要安装系统的设备,启动并安装Ubuntu操作系统。安装方法可参考相关教程。
// Qwen API 配置
const QWEN_API_URL = "https://chat.qwenlm.ai/api/chat/completions";
const QWEN_MODELS_URL = "https://chat.qwenlm.ai/api/models";
const QWEN_FILES_URL = "https://chat.qwenlm.ai/api/v1/files/"; // 文件上传接口
const MAX_RETRIES = 3;
const RETRY_DELAY = 1000; // 1秒
const STREAM_TIMEOUT = 600000; // 60秒超时
const CACHE_TTL = 60 * 60 * 1000; // 缓存 1 小时
const encoder = new TextEncoder();
const streamDecoder = new TextDecoder();
// 缓存相关
let cachedModels: string | null = null;
let cachedModelsTimestamp = 0;
// 工具函数
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
// 带重试的fetch函数
async function fetchWithRetry(
url: string,
options: RequestInit,
retries = MAX_RETRIES,
): Promise<Response> {
let lastError: unknown;
for (let i = 0; i < retries; i++) {
try {
const response = await fetch(url, options);
if (response.ok) {
return response;
}
const contentType = response.headers.get("content-type") || "";
if (response.status >= 500 || contentType.includes("text/html")) {
const responseClone = response.clone();
const responseText = await responseClone.text();
lastError = {
status: response.status,
contentType,
responseText: responseText.slice(0, 1000),
headers: Object.fromEntries(response.headers.entries()),
};
if (i < retries - 1) {
await sleep(RETRY_DELAY * (i + 1));
continue;
}
} else {
lastError = {
status: response.status,
headers: Object.fromEntries(response.headers.entries()),
};
break;
}
} catch (error) {
lastError = error;
if (i < retries - 1) {
await sleep(RETRY_DELAY * (i + 1));
continue;
}
}
}
throw new Error(JSON.stringify({
error: true,
message: "All retry attempts failed",
lastError,
retries,
}));
}
// 上传图片到 QwenLM
async function uploadImageToQwen(token: string, imageBlob: Blob): Promise<string> {
const formData = new FormData();
formData.append('file', imageBlob);
console.log('文件大小:', imageBlob.size); // 日志输出
console.log('文件类型:', imageBlob.type); // 日志输出
try {
const response = await fetchWithRetry(QWEN_FILES_URL, {
method: "POST",
headers: {
"Authorization": token, // 使用传入的 Token
"accept": "application/json",
},
body: formData,
});
if (!response.ok) {
const errorText = await response.text();
console.error('文件上传失败:', response.status, errorText); // 日志输出
throw new Error(`文件上传失败: ${response.statusText}`);
}
const data = await response.json();
if (!data.id) {
throw new Error("文件上传失败: 未返回有效的文件ID");
}
console.log('上传图片成功,imageId:', data.id); // 日志输出
return data.id;
} catch (error) {
console.error('上传图片时发生错误:', error); // 日志输出
throw error;
}
}
// 将 base64 转换为 Blob
function base64ToBlob(base64: string): Blob {
const byteString = atob(base64.split(',')[1]);
const mimeString = base64.split(',')[0].split(':')[1].split(';')[0];
const ab = new ArrayBuffer(byteString.length);
const ia = new Uint8Array(ab);
for (let i = 0; i < byteString.length; i++) {
ia[i] = byteString.charCodeAt(i);
}
return new Blob([ab], { type: mimeString });
}
// 流处理相关的接口和类型
interface StreamState {
isCompleted: boolean;
isStreamActive: boolean;
previousContent: string;
}
interface StreamContext {
writer: WritableStreamDefaultWriter<Uint8Array>;
reader: ReadableStreamDefaultReader<Uint8Array>;
state: StreamState;
timeoutId?: NodeJS.Timeout; // 确保使用正确的Timeout类型
}
// 处理单行数据
async function processLine(
line: string,
context: StreamContext,
): Promise<void> {
try {
const data = JSON.parse(line.slice(6));
if (
data.choices &&
data.choices[0] &&
data.choices[0].delta &&
data.choices[0].delta.content
) {
const currentContent: string = data.choices[0].delta.content;
let newContent = currentContent;
if (currentContent.startsWith(context.state.previousContent) &&
context.state.previousContent.length > 0) {
newContent = currentContent.slice(context.state.previousContent.length);
}
if (newContent) {
const newData = {
...data,
choices: [{
...data.choices[0],
delta: {
...data.choices[0].delta,
content: newContent,
},
}],
};
await context.writer.write(
encoder.encode(`data: ${JSON.stringify(newData)}\n\n`),
);
}
context.state.previousContent = currentContent;
} else {
console.log('无content数据:', JSON.stringify(data));
await context.writer.write(encoder.encode(`data: ${JSON.stringify(data)}\n\n`));
}
} catch (error) {
console.error('处理行数据时发生错误:', error);
await context.writer.write(encoder.encode(`${line}\n\n`));
}
}
// 处理流数据
async function handleStream(context: StreamContext): Promise<void> {
let buffer = "";
try {
while (true) {
const { done, value } = await context.reader.read();
if (done) {
context.state.isCompleted = true;
if (context.timeoutId) {
clearTimeout(context.timeoutId);
}
if (buffer) {
const lines = buffer.split("\n");
for (const line of lines) {
if (line.trim().startsWith("data: ")) {
await processLine(line, context);
}
}
}
console.log('流处理自然完成');
await context.writer.write(encoder.encode("data: [DONE]\n\n"));
break;
}
const valueText = streamDecoder.decode(value, { stream: true });
buffer += valueText;
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
if (line.trim().startsWith("data: ")) {
await processLine(line, context);
}
}
}
} catch (error) {
console.error('流处理发生错误:', error);
if (!context.state.isCompleted) {
if (context.timeoutId) {
clearTimeout(context.timeoutId);
}
try {
await context.writer.write(
encoder.encode(
`data: {"error":true,"message":"${error.message}"}\n\n`
)
);
await context.writer.write(encoder.encode("data: [DONE]\n\n"));
} catch (writeError) {
console.error('写入错误信息时发生错误:', writeError);
}
}
} finally {
try {
await context.writer.close();
} catch (closeError) {
console.error('关闭writer时发生错误:', closeError);
}
}
}
async function handleRequest(request: Request): Promise<Response> {
try {
const url = new URL(request.url);
const pathname = url.pathname;
console.log(`请求路径: ${pathname}, 方法: ${request.method}`); // 日志输出
if (request.method === "GET" && pathname === "/v1/models") {
const authHeader = request.headers.get("Authorization");
if (!authHeader || !authHeader.startsWith("Bearer ")) {
console.log('未授权访问'); // 日志输出
return new Response("Unauthorized", { status: 401 });
}
const now = Date.now();
if (cachedModels && now - cachedModelsTimestamp < CACHE_TTL) {
console.log('返回缓存模型'); // 日志输出
return new Response(cachedModels, {
headers: {
"Content-Type": "application/json",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
});
}
try {
console.log('请求模型列表'); // 日志输出
const response = await fetchWithRetry(QWEN_MODELS_URL, {
headers: {
"Authorization": authHeader,
},
});
cachedModels = await response.text();
cachedModelsTimestamp = now;
console.log('模型列表获取成功'); // 日志输出
return new Response(cachedModels, {
headers: {
"Content-Type": "application/json",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
});
} catch (error) {
console.error('获取模型列表失败:', error); // 日志输出
return new Response(
JSON.stringify({ error: true, message: error.message }),
{ status: 500 },
);
}
}
// 处理聊天完成请求
if (request.method !== "POST" || pathname !== "/v1/chat/completions") {
console.log('方法不允许'); // 日志输出
return new Response("Method not allowed", { status: 405 });
}
const authHeader = request.headers.get("Authorization");
if (!authHeader || !authHeader.startsWith("Bearer ")) {
console.log('未授权访问'); // 日志输出
return new Response("Unauthorized", { status: 401 });
}
const requestData = await request.json();
console.log('请求数据:', JSON.stringify(requestData)); // 日志输出
const { messages, stream = false, model, max_tokens } = requestData;
if (!model) {
console.log('缺少模型参数'); // 日志输出
return new Response(
JSON.stringify({ error: true, message: "Model parameter is required" }),
{ status: 400 },
);
}
// 检查消息中是否包含 base64 图片
const updatedMessages = await Promise.all(messages.map(async (message: any) => {
if (message.content && Array.isArray(message.content)) {
message.content = await Promise.all(message.content.map(async (content: any) => {
if (content.type === "image_url" && content.image_url?.url?.startsWith("data:")) {
console.log('检测到 base64 图片'); // 日志输出
const imageBlob = base64ToBlob(content.image_url.url);
const imageId = await uploadImageToQwen(authHeader, imageBlob);
console.log('上传图片成功,imageId:', imageId); // 日志输出
return {
type: "image",
image: imageId, // 替换为上传后的 imageId
};
}
return content;
}));
}
return message;
}));
const qwenRequest = {
model,
messages: updatedMessages,
stream,
...(max_tokens !== undefined && { max_tokens }),
};
console.log('发送给 Qwen 的请求:', JSON.stringify(qwenRequest)); // 日志输出
const qwenResponse = await fetch(QWEN_API_URL, {
method: "POST",
headers: {
"Content-Type": "application/json",
"Authorization": authHeader,
},
body: JSON.stringify(qwenRequest),
});
if (stream) {
const { readable, writable } = new TransformStream<Uint8Array, Uint8Array>();
const writer = writable.getWriter();
const reader = qwenResponse.body!.getReader();
const streamContext: StreamContext = {
writer,
reader,
state: {
isCompleted: false,
isStreamActive: true,
previousContent: "",
},
timeoutId: setTimeout(() => {
if (streamContext.state.isStreamActive && !streamContext.state.isCompleted) {
console.log('触发流超时');
streamContext.state.isStreamActive = false;
writer.write(encoder.encode('data: {"error":true,"message":"Response timeout"}\n\n'))
.then(() => writer.write(encoder.encode("data: [DONE]\n\n")))
.then(() => writer.close())
.catch(err => console.error('超时处理错误:', err));
}
}, STREAM_TIMEOUT),
};
handleStream(streamContext).catch(async (error) => {
clearTimeout(streamContext.timeoutId!);
try {
await writer.write(encoder.encode(`data: {"error":true,"message":"${error.message}"}\n\n`));
await writer.write(encoder.encode("data: [DONE]\n\n"));
await writer.close();
} catch {
// 忽略写入错误
}
});
return new Response(readable, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
});
}
const responseText = await qwenResponse.text();
console.log('Qwen 响应:', responseText); // 日志输出
return new Response(responseText, {
headers: {
"Content-Type": "application/json",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
});
} catch (error) {
console.error('处理请求时发生错误:', error); // 日志输出
return new Response(
JSON.stringify({ error: true, message: error.message }),
{ status: 500 },
);
}
}
Deno.serve(handleRequest);
这篇文章讲述了一位开发者为了在自己习惯的 Helix 编辑器中使用公司购买的 Cursor AI 代码助手,通过逆向工程 Cursor 软件,找到了其后端 API 的调用方式和认证机制。他通过分析 Cursor 的 VSCode 分支代码,找到了 protobuf 定义和 RPC 端点,并成功提取了认证所需的 access token。他还逆向了 Cursor 的 checksum 算法,最终用 Go 语言编写了一个客户端,直接与 Cursor 后端通信。这使得他可以将 Cursor 的强大功能集成到自己的 LSP 插件中,从而在任何支持 LSP 的编辑器中使用 Cursor 的 AI 能力,无需切换到 VSCode。总而言之,他成功地将工具适配了自己的工作方式,而不是反过来。
这篇文章讲述了作者为了解决在 Obsidian 中阅读和总结笔记的痛点,自己开发了一个插件。这个插件的主要功能是在一个独立的面板中显示当前激活的笔记内容,并允许用户在该面板中进行读写操作。作者利用 AI 工具辅助开发,强调了跑通核心流程的重要性。最终,他成功开发并发布了插件,并分享了开发过程中的思考,例如为什么需要独立的编辑窗口,以及如何解决 Markdown 文件注释的痛点。
这段内容介绍了如何在 NextChat 中配置并使用硅基流动 (SiliconFlow) 的图像生成插件。首先需要注册并获取 API Key,然后在 NextChat 的插件设置中,选择 Bearer 授权方式并填入 Key。接着复制粘贴提供的 OpenAPI Schema 代码,并设置 System Prompt,使其能将用户输入的文字转换为英文并调用插件生成图片。文中还提供了插件的默认参数和输出格式,并建议使用智谱清言模型以获得更好的效果。最后,提到了 NextChat 的一些高级功能,例如支持视觉模型和上传图像,并解释了 Cloudflare 错误的原因和解决方法。
这个文档介绍了如何更新 Cursor-API 的 checksum 值。步骤包括停止并删除旧容器,获取新的 checksum 值,然后使用新值重启容器。最后,可以通过命令验证容器状态和新的 checksum 值。
这篇文章介绍了如何通过一段简单的JavaScript代码,在Google AI Studio的NextChat聊天界面中,免费使用Gemini 2.0 Flash模型并集成Google联网搜索功能。用户只需在浏览器控制台粘贴代码并回车即可激活,无需复杂配置。文章还提到了其他更便捷但有门槛的方法,如使用Cloudflare Worker中转或魔改客户端,并提供了相关资源链接。
Mistral AI 发布 Codestral 25.01 代码模型
这个模型的上下文高达 256K !
在多个代码生成测试的成绩都非常高,LMsys copilot 排第一
目前可以在 VS Code 插件 Continue 中免费使
这个脚本旨在帮助用户更方便地体验 Cursor Pro,因为它能自动重置设备ID并写入有效的token,解决了手动操作的复杂性。脚本会从云端token池随机获取token,并更新本地Cursor的认证信息。使用前需安装依赖,运行脚本会自动退出Cursor程序,请确保保存工作。脚本运行后,重新打开Cursor Pro即可。
import json
import logging
import os
import platform
import sqlite3
import subprocess
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
import psutil
import requests
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# 常量配置
class Config:
"""配置常量类"""
API_URL = "https://cursor.ccopilot.org/api/get_next_token.php"
ACCESS_CODE = ""
PROCESS_TIMEOUT = 5
CURSOR_PROCESS_NAMES = ['cursor.exe', 'cursor']
DB_KEYS = {
'email': 'cursorAuth/cachedEmail',
'access_token': 'cursorAuth/accessToken',
'refresh_token': 'cursorAuth/refreshToken'
}
@dataclass
class TokenData:
"""Token数据类"""
mac_machine_id: str
machine_id: str
dev_device_id: str
email: str
token: str
@classmethod
def from_dict(cls, data: Dict[str, str]) -> 'TokenData':
"""从字典创建TokenData实例"""
return cls(
mac_machine_id=data['mac_machine_id'],
machine_id=data['machine_id'],
dev_device_id=data['dev_device_id'],
email=data['email'],
token=data['token']
)
class FilePathManager:
"""文件路径管理器"""
@staticmethod
def get_storage_path() -> Path:
"""获取storage.json文件路径"""
system = platform.system()
if system == "Windows":
return Path(os.getenv('APPDATA')) / 'Cursor' / 'User' / 'globalStorage' / 'storage.json'
elif system == "Darwin":
return Path.home() / 'Library' / 'Application Support' / 'Cursor' / 'User' / 'globalStorage' / 'storage.json'
elif system == "Linux":
return Path.home() / '.config' / 'Cursor' / 'User' / 'globalStorage' / 'storage.json'
raise OSError(f"不支持的操作系统: {system}")
@staticmethod
def get_db_path() -> Path:
"""获取数据库文件路径"""
if os.name == 'nt':
return Path(os.getenv('APPDATA')) / 'Cursor' / 'User' / 'globalStorage' / 'state.vscdb'
return Path.home() / 'Library' / 'Application Support' / 'Cursor' / 'User' / 'globalStorage' / 'state.vscdb'
class FilePermissionManager:
"""文件权限管理器"""
@staticmethod
def make_file_writable(file_path: Union[str, Path]) -> None:
"""修改文件权限为可写"""
file_path = Path(file_path)
if platform.system() == "Windows":
subprocess.run(['attrib', '-R', str(file_path)], check=True)
else:
os.chmod(file_path, 0o666)
@staticmethod
def make_file_readonly(file_path: Union[str, Path]) -> None:
"""修改文件权限为只读"""
file_path = Path(file_path)
if platform.system() == "Windows":
subprocess.run(['attrib', '+R', str(file_path)], check=True)
else:
os.chmod(file_path, 0o444)
class CursorAuthManager:
"""Cursor认证信息管理器"""
def __init__(self):
self.db_path = FilePathManager.get_db_path()
def update_auth(self, email: Optional[str] = None,
access_token: Optional[str] = None,
refresh_token: Optional[str] = None) -> bool:
"""更新或插入Cursor的认证信息"""
updates: List[Tuple[str, str]] = []
if email is not None:
updates.append((Config.DB_KEYS['email'], email))
if access_token is not None:
updates.append((Config.DB_KEYS['access_token'], access_token))
if refresh_token is not None:
updates.append((Config.DB_KEYS['refresh_token'], refresh_token))
if not updates:
logger.info("没有提供任何要更新的值")
return False
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
for key, value in updates:
cursor.execute("SELECT 1 FROM itemTable WHERE key = ?", (key,))
exists = cursor.fetchone() is not None
if exists:
cursor.execute("UPDATE itemTable SET value = ? WHERE key = ?", (value, key))
else:
cursor.execute("INSERT INTO itemTable (key, value) VALUES (?, ?)", (key, value))
logger.info(f"成功{'更新' if exists else '插入'} {key.split('/')[-1]}")
return True
except sqlite3.Error as e:
logger.error(f"数据库错误: {e}")
return False
except Exception as e:
logger.error(f"发生错误: {e}")
return False
class CursorManager:
"""Cursor管理器"""
@staticmethod
def reset_cursor_id(token_data: TokenData) -> bool:
"""重置Cursor ID"""
storage_path = FilePathManager.get_storage_path()
if not storage_path.exists():
logger.warning(f"未找到文件: {storage_path}")
return False
try:
FilePermissionManager.make_file_writable(storage_path)
data = json.loads(storage_path.read_text())
data.update({
"telemetry.macMachineId": token_data.mac_machine_id,
"telemetry.machineId": token_data.machine_id,
"telemetry.devDeviceId": token_data.dev_device_id
})
storage_path.write_text(json.dumps(data, indent=4))
FilePermissionManager.make_file_readonly(storage_path)
logger.info("storage.json文件已成功修改")
return True
except Exception as e:
logger.error(f"重置Cursor ID时发生错误: {e}")
return False
@staticmethod
def exit_cursor() -> bool:
"""安全退出Cursor进程"""
try:
logger.info("开始退出Cursor...")
cursor_processes = [
proc for proc in psutil.process_iter(['pid', 'name'])
if proc.info['name'].lower() in Config.CURSOR_PROCESS_NAMES
]
if not cursor_processes:
logger.info("未发现运行中的 Cursor 进程")
return True
# 温和地请求进程终止
for proc in cursor_processes:
try:
if proc.is_running():
proc.terminate()
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
# 等待进程终止
start_time = time.time()
while time.time() - start_time < Config.PROCESS_TIMEOUT:
still_running = [p for p in cursor_processes if p.is_running()]
if not still_running:
logger.info("所有 Cursor 进程已正常关闭")
return True
time.sleep(0.5)
if still_running := [p for p in cursor_processes if p.is_running()]:
process_list = ", ".join(str(p.pid) for p in still_running)
logger.warning(f"以下进程未能在规定时间内关闭: {process_list}")
return False
return True
except Exception as e:
logger.error(f"关闭 Cursor 进程时发生错误: {e}")
return False
class TokenManager:
"""Token管理器"""
@staticmethod
def fetch_token_data() -> Optional[TokenData]:
"""获取Token数据"""
logger.info("正在获取Token数据...")
try:
response = requests.get(f"{Config.API_URL}?accessCode={Config.ACCESS_CODE}")
data = response.json()
if data.get("code") == 0 and (token_data := data.get("data")):
logger.info("成功获取Token数据")
return TokenData.from_dict(token_data)
logger.warning(f"获取Token失败: {data.get('message', '未知错误')}")
return None
except Exception as e:
logger.error(f"获取Token数据时发生错误: {e}")
return None
@staticmethod
def update_token(token_data: TokenData) -> bool:
"""更新Cursor的token信息"""
try:
# 更新机器ID
if not CursorManager.reset_cursor_id(token_data):
return False
# 更新认证信息
auth_manager = CursorAuthManager()
if not auth_manager.update_auth(email=token_data.email, access_token=token_data.token, refresh_token=token_data.token):
logger.error("更新Token时发生错误")
return False
logger.info(f"成功更新Cursor认证信息! 邮箱: {token_data.email}")
return True
except Exception as e:
logger.error(f"更新Token时发生错误: {e}")
return False
def main() -> None:
"""主函数"""
if token_data := TokenManager.fetch_token_data():
logger.info("即将退出 Cursor,请确保所有工作已保存。")
logger.info("按任意键继续...")
input("")
if CursorManager.exit_cursor():
TokenManager.update_token(token_data)
if __name__ == "__main__":
main()