fix: improve yunpan1 search script robustness

- Cookie 兼容两种格式(key=value 和 Netscape)
- 搜索 30s 超时熔断,不傻等 1-2 分钟
- 超时/失败后自动降级到动漫板块最新帖子
- 帖子提取正则兼容多种 URL 格式

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-16 20:54:59 +08:00
parent ecc1b9d6dc
commit 01dbdc8455
+102 -40
View File
@@ -8,8 +8,8 @@ yunpan1 搜索工具 — 搜索云盘资源分享社区并提取夸克链接
py -X utf8 yunpan1_search.py 遮天
说明:
- Discuz! 首次搜索需建索引(可能等 60-120 秒),同一关键词后续秒回
- Cookie 文件: tmp/yunpan1_cookies.txt需先通过 Playwright 登录获取
- 优先搜索,30 秒超时后自动降级到浏览动漫板块最新帖子
- Cookie 文件: tmp/yunpan1_cookies.txt格式:key=value 一行一个 或 Netscape 格式均可
- 依赖: Python 标准库(无需额外安装)
"""
@@ -25,57 +25,91 @@ BASE_URL = 'https://yunpan1.cc'
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
COOKIE_FILE = os.path.abspath(os.path.join(SCRIPT_DIR, '..', '..', '..', 'tmp', 'yunpan1_cookies.txt'))
OUTPUT_FILE = os.path.abspath(os.path.join(SCRIPT_DIR, '..', '..', '..', 'tmp', 'quark_links.txt'))
SEARCH_URL = BASE_URL + '/search.php?mod=forum&srchtxt={keyword}&searchsubmit=yes'
FORUM_URL = BASE_URL + '/forum.php?mod=forumdisplay&fid=3&orderby=dateline'
SEARCH_TIMEOUT = 30 # 搜索超时(秒),超时后降级到浏览板块
FORUM_TIMEOUT = 15 # 板块页面超时
# 全局 openertimeout=300 避免分块传输时断连)
_opener = urllib.request.build_opener()
_opener.timeout = 300
_opener.timeout = SEARCH_TIMEOUT
# ── 工具函数 ──────────────────────────────────────────────────────
# ── Cookie 加载(兼容两种格式) ──────────────────────────────────
def load_cookie():
"""加载 Cookie,兼容 key=value 一行一个 和 Netscape 格式"""
path = COOKIE_FILE
if not os.path.exists(path):
print(f'❌ Cookie 文件不存在: {path}')
print(f'请通过 Playwright 登录 yunpan1.cc 后,将 Cookie 保存到该文件(一行一个 key=value')
print(f'请通过 Playwright 登录 yunpan1.cc 后,将 Cookie 保存到该文件')
sys.exit(1)
with open(path, 'r', encoding='utf-8') as f:
return f.read().strip().replace('\n', '; ')
raw = f.read()
# 如果是 Netscape 格式(含 tab、domain、TRUE/FALSE 等字段)
if '\t' in raw and ('TRUE' in raw or 'FALSE' in raw):
parts = []
for line in raw.strip().splitlines():
line = line.strip()
if not line or line.startswith('#'):
continue
cols = line.split('\t')
if len(cols) >= 7:
parts.append(f'{cols[5]}={cols[6]}')
if parts:
return '; '.join(parts)
# 否则按 key=value 一行一个处理
return raw.strip().replace('\n', '; ').replace('\r', '')
def request(url, cookie):
# ── HTTP 请求 ─────────────────────────────────────────────────────
def request(url, cookie, timeout=SEARCH_TIMEOUT):
req = urllib.request.Request(url, headers={
'Cookie': cookie,
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
})
_opener.timeout = timeout
t0 = time.time()
resp = _opener.open(req)
html = resp.read().decode('utf-8', errors='replace')
return resp.status, html, resp.url, time.time() - t0
try:
resp = _opener.open(req)
html = resp.read().decode('utf-8', errors='replace')
return resp.status, html, resp.url, time.time() - t0, None
except Exception as e:
return None, '', '', time.time() - t0, str(e)
# ── 提取函数 ──────────────────────────────────────────────────────
def extract_quark_links(html):
"""提取完整夸克链接(12 位字母数字 ID),排除末尾拼接 https 的误匹配"""
raw = re.findall(r'https?://pan\.quark\.cn/s/[a-zA-Z0-9]{12,}', html)
# 过滤掉后面紧跟着 https 的(如 .../s/xxxhttps
return sorted(set(l for l in raw if not l.endswith('https')))
def extract_truncated_links(html):
"""提取被截断的链接(1-11 位 ID,需要点进帖子)"""
raw = re.findall(r'https?://pan\.quark\.cn/s/[a-zA-Z0-9]{1,11}', html)
return sorted(set(raw))
def extract_threads(html):
"""提取帖子列表"""
titles = re.findall(r'<a[^>]+href="forum\.php\?mod=viewthread[^>]+>([^<]+)</a>', html)
"""提取帖子标题,兼容 viewthread 和 thread 格式"""
patterns = [
r'<a[^>]+href="forum\.php\?mod=viewthread[^>]+>([^<]+)</a>',
r'<a[^>]+href="thread\.php\?tid=\d+[^>]*>([^<]+)</a>',
r'class="s xst"[^>]*>([^<]+)</a>',
r'<a[^>]+class="xst"[^>]*>([^<]+)</a>',
]
all_titles = []
for p in patterns:
all_titles.extend(re.findall(p, html))
seen = set()
result = []
for t in titles:
for t in all_titles:
t = t.strip()
if t and len(t) > 5 and t not in seen:
seen.add(t)
@@ -85,30 +119,23 @@ def extract_threads(html):
# ── 主流程 ────────────────────────────────────────────────────────
def main():
if len(sys.argv) < 2:
print('用法: py -X utf8 yunpan1_search.py <关键词>')
print('示例: py -X utf8 yunpan1_search.py 遮天')
sys.exit(1)
keyword = sys.argv[1]
cookie = load_cookie()
print(f'🔍 搜索 "{keyword}" ...')
print(f'⏱ 首次搜索需建索引,可能等待 1-2 分钟,请耐心等待...')
def search(keyword, cookie):
"""搜索,超时则返回 None"""
url = SEARCH_URL.format(keyword=urllib.parse.quote(keyword))
print(f'🔍 搜索 "{keyword}"{SEARCH_TIMEOUT}s 超时)...')
sys.stdout.flush()
return request(url, cookie, timeout=SEARCH_TIMEOUT)
status, html, final_url, elapsed = request(
SEARCH_URL.format(keyword=urllib.parse.quote(keyword)),
cookie
)
links = extract_quark_links(html)
truncated = extract_truncated_links(html)
threads = extract_threads(html)
def browse_forum(cookie):
"""浏览动漫板块最新帖子作为降级方案"""
print(f'⏩ 降级到浏览动漫板块最新帖子...')
sys.stdout.flush()
return request(FORUM_URL, cookie, timeout=FORUM_TIMEOUT)
print(f'\n✅ 完成(耗时 {elapsed:.0f}s,状态码 {status}')
print(f' HTML: {len(html)} 字符 / {len(threads)} 条帖子')
def print_results(links, truncated, threads, source, elapsed):
print(f'\n✅ 完成(来源: {source},耗时 {elapsed:.0f}s')
if links:
print(f'\n{"=" * 50}')
@@ -129,9 +156,44 @@ def main():
for l in truncated[:5]:
print(f' {l}')
print(f'\n📌 帖子预览(前 10 条):')
for t in threads[:10]:
print(f'{t[:60]}')
if threads:
print(f'\n📌 帖子预览(前 10 条):')
for t in threads[:10]:
print(f'{t[:60]}')
def main():
if len(sys.argv) < 2:
print('用法: py -X utf8 yunpan1_search.py <关键词>')
print('示例: py -X utf8 yunpan1_search.py 遮天')
sys.exit(1)
keyword = sys.argv[1]
cookie = load_cookie()
# 尝试搜索
status, html, _, elapsed, err = search(keyword, cookie)
if err and 'timeout' in err.lower():
print(f'⏱ 搜索超时({elapsed:.0f}s),机器配置较低时首次建索引可能卡死')
# 降级到浏览板块
status, html, _, elapsed, err2 = browse_forum(cookie)
if err2:
print(f'❌ 降级也失败了: {err2}')
sys.exit(1)
source = '板块浏览(搜索超时降级)'
elif err:
print(f'❌ 搜索失败: {err}')
# 尝试降级
status, html, _, elapsed, _ = browse_forum(cookie)
source = '板块浏览(搜索失败降级)'
else:
source = '搜索'
links = extract_quark_links(html)
truncated = extract_truncated_links(html)
threads = extract_threads(html)
print_results(links, truncated, threads, source, elapsed)
if __name__ == '__main__':