写博客很多年了,以前用的都是WordPress,一个传统的PHP+MySQL的CMS。在2022年底开了一个新博客,用Hugo配合Obsidian来写,经过我的配置,在本地写好文章后只需git推送到github,github actions自动部署生成静态页面发送到服务器。很优雅的写作方式,所以从文章数量就能看出来,换用Hugo之后我是相当的高产。
所以我就想将这些年用WordPress写的文章都统一整理出来,集中用Hugo进行管理。很自然地我就想到了,使用python爬虫将所有文章和文章配图都爬取下来,将内容转换为Hugo能识别的Markdown格式。
分析
- 要爬取的网站:xiake.me(写本文时还是WordPress,未来绑定到Hugo博客)
- 使用的WP主题:Twenty Fifteen
- 文章列表URL:
https://xiake.me/page/2/
,其中2
是指第2页。 - 文章URL:
https://xiake.me/2020/08/06/一个傅里叶级数展开式的图象/
,日期+文章标题或略缩名slug - 文章配图URL:
https://xiake.me/wp-content/uploads/2020/08/fuliye.png
,日期+文件名
要获取的数据:
- 文章标题
- 文章slug(文章略缩名,用于固定链接)
- 文章URL
- 文章文字内容
- 文章配图
- 文章发布日期
思路:访问文章列表URL,获取文章标题、文章slug、文章URL,访问文章URL获取文章内容、文章发布日期,下载文章配图,并将图片链接替换为本地相对链接。
获取文章标题、URL这些数据使用XPATH,替换图片链接使用正则表达式。
用IPython踩坑
最终我们是要将程序用面向对象的方式来写,也就是把爬虫封装成一个类。但我们可以先用IPython把爬虫的运行过程走一遍,等各个环节都理顺了,把坑都存踩完了,再改造成想要的样子。
在这个环节,我先用Jupyter Lab用面向过程的思路把代码敲一遍。每写好一个代码块(cell)都可以按shift+enter
(或ctrl+enter
)运行一下,检查错误很方便。
先导入一些必定用到的模块,其他模块用到的时候再导入:
1
2
3
| import requests
from fake_useragent import UserAgent
from lxml import etree
|
定义URL、请求头、获取响应对象:
1
2
3
4
5
6
| url = 'https://xiake.me'
ua = UserAgent()
headers = {
'UserAgent': ua.random,
}
res = requests.get(url, headers=headers)
|
将获取到的html内容打印出来看一看是不是成功获取到了:
1
2
| html = res.content.decode('utf-8')
print(html)
|
用XPATH解析页面,得到xpath对象:
1
2
3
4
5
| # 解析html,得到文章元素
xpath_dbs = '/html/body/div/div[2]/div/main/article/header/h2/a'
parse_html = etree.HTML(html)
post_list = parse_html.xpath(xpath_dbs)
|
post_list
是一个列表,列表里的各个元素是xpath对象,接下来从第一个元素中取数据。
取文章标题:
1
2
| post_title = post_list[0].xpath('./text()')[0]
print(post_title)
|
取文章slug:
1
2
3
| post_slug = post_list[0].xpath('./@href')[0].split('/')[-2]
print(post_slug)
|
取文章链接并根据链接访问文章页面(二级页面):
1
2
| url1 = post_list[0].xpath('./@href')[0]
html2 = requests.get(url1,headers=headers).content.decode('utf-8')
|
写取文章正文内容和文章发表日期的xpath:
1
2
3
4
| # 文章内容xpath
xpath_dbs2 = '/html/body/div/div[2]/div/main/article/div'
# 文章日期、作者xpath
xpath_dbs3 = '/html/body/div/div[2]/div/main/article/footer'
|
xpath解析二级页面,取文章正文内容:
1
2
3
4
| parse_html2 = etree.HTML(html2)
post_content = parse_html2.xpath(xpath_dbs2)
print(post_content)
|
1
2
| import html as htmlP
post_con_html = htmlP.unescape(etree.tostring(post_content[0]).decode())
|
注意这里我又导入了一个html
模块。因为之前没想到用它,我用html
来命名了一个变量,因此我只好把这里导入的这个模块取别名为htmlP
取文章发表日期:
1
2
3
| post_footer = parse_html2.xpath(xpath_dbs3)
post_pubdate=post_footer[0].xpath('./span[1]/a/time[1]/@datetime')[0]
print(post_pubdate)
|
这段代码的运行结果(文章发表日期),格式和Hugo文章里的日期格式一样,因此就不用转换日期格式了。
将文章中的图片下载下来,并替换文章中的图片链接:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| # 将文章里的图片下载下来,并替换html中的图片链接
import re
pattern = re.compile(r'<img .*?src="(.*?)" alt=.*?>', re.S)
img_list = pattern.findall(post_con_html)
import os
dir = 'xiake/'
for img_link in img_list:
html_bytes = requests.get(url=img_link,headers=headers).content
filename_list = img_link.split('uploads/')[-1].split('/')
dir_img = dir + '/'.join(filename_list[:2])+'/'
if not os.path.exists(dir_img):
os.makedirs(dir_img)
filename = dir_img + filename_list[-1]
with open(filename, 'wb') as f:
f.write(html_bytes)
print('%s 下载成功' % filename)
# 图片下载成功后把html代码里的图片链接替换成本地链接
post_con_html = post_con_html.replace(img_link,filename)
|
这里是用正则来取图片链接,下载图片依然是用的requests。
图片下载下来后我是打算放到代码同级目录下的一个叫xiake
的文件夹里,并按日期来嵌套文件夹(即xiake/年/月/日/图片文件.jpg
),因此如果这个文件夹不存在就需要提前新建它。
图片下载成功后,用replace()
方法把html里的链接替换掉。
看看图片链接是不是都替换成功了:
接下来要将html转换为markdown,这里我用到了一个第三方包:markdownify,所以使用之前需要安装它:
1
| pip install markdownify
|
在Jupyter Lab里可以直接运行上面这句命令。
1
2
3
4
| from markdownify import markdownify as md
post_con_md = md(post_con_html,heading_style='ATX')
post_con_md.strip().replace('\n\n\n','\n')
print(post_con_md)
|
参数heading_style='ATX'
的作用是,在转换后,文章小标题用#
来标识。转换成markdown格式之后,文章里的空行有些多,所以去掉。
将转换后的文章内容整理成符合hugo格式要求的文本:
1
2
3
4
5
6
7
8
9
10
11
12
| # 将文章保存为hugo需要的markdown格式,
text = """---
title: "{}"
date: {}
categories:
-
tags:
-
---
{}
""".format(post_title,post_pubdate,post_con_md)
print(text)
|
最后保存成文件:
1
2
3
4
5
6
7
8
| dir_post = 'content/post/'
if not os.path.exists(dir_post):
os.makedirs(dir_post)
post_filename = dir_post+'wp_'+post_slug+'.md'
with open(post_filename, 'w') as f:
f.write(text)
print('%s 文章保存成功' % post_filename)
|
文件名用获取的文章slug来命名,前面再加上一个wp_
前缀。文章保存在了代码同级目录下的content/post/
文件夹里。
踩坑过程结束了。看上面的过程,似乎挺顺利的样子。真实情况是,总体很顺利,但也确实踩了不少坑。我已经很努力在还原真实的写代码过程了,如果把每个坑都仔细描述一下,这篇文章的长度我觉得还能再增加一倍。
封装
在改写代码之前,通过上面写代码的过程,我又想到几个可以改进的点:
- 程序写成交互式的,这样不仅可以爬这一个wp站,其他的也可以;
- 图片和markdown文件可以放到同一个文件夹里。
- 上面的代码都是理想情况下的样子,增加错误处理代码,提高容错性。
- 从同一篇文章里爬取的数据放到一个字典里,更整齐。
下面开始写代码了。
主函数:
1
2
3
4
5
| if __name__ == '__main__':
spider_url = input("你想爬取哪一个WordPress博客?\n")
spider_dir = input("你想将爬取到的内容放在哪一个文件夹里面?\n")
spider = WordPressSpider(spider_url, spider_dir)
spider.run()
|
接下来在代码最上方调用一堆模块、定义WordPressSpider
类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| import html
import os
import re
import requests
from fake_useragent import UserAgent
from lxml import etree
from markdownify import markdownify as md
from urllib import parse
class WordPressSpider:
"""
将WordPress博客“xiake.me”上的文章爬取下来,并下载文章图片到本地,替换文章中的图片链接为
本地链接,最后需将文章转为Hugo需要的Markdown格式。
"""
|
先说明,下面的代码都是在上面这个WordPressSpider
类里面的方法定义。不要当成普通函数定义了。
初始化:
1
2
3
4
| def __init__(self,spider_url,spider_dir):
self.url = (spider_url if spider_url[-1] != '/' else spider_url[:-1])
self.baseurl = (spider_url if spider_url[-1] != '/' else spider_url[:-1]) + '/page/{}/'
self.spider_dir = spider_dir
|
我用了两个属性self.url
、self.baseurl
,这是为了让程序可以爬取多个网站所做的妥协。属性声明里用了条件解析式,这样的话无论你输入的网址是不是带最后的那个/
都能正确处理。
get_proxies()
方法:
1
2
3
4
5
6
| def get_proxies(self):
proxies = {
'http': 'http://127.0.0.1:7890',
'https': 'http://127.0.0.1:7890',
}
return proxies
|
调用requests时可以指定一个参数(那两个字我不敢说,因为我的网站备案了),这样就可以顺畅访问“加载慢”的网站了。
请求头:
1
2
3
4
5
6
| def get_headers(self):
ua = UserAgent()
headers = {
'UserAgent': ua.random,
}
return headers
|
如果只是爬我自己的网站的话,没有反爬手段,其实可以不用使用fake_useragent
,为了增加程序的普适性,用上还是最好的。
parse_html()
:
1
2
3
4
5
| def parse_html(self, xpath_dbs):
res = requests.get(url=self.url, headers=self.get_headers(), proxies=self.get_proxies())
html_source = res.content.decode('utf-8')
parse_html = etree.HTML(html_source)
return parse_html.xpath(xpath_dbs)
|
这里就用上了上面的get_proxies()
方法。接受一个xpath表达式,解析页面,返回xpath对象(列表)。
获得文章列表:
1
2
3
4
| def get_post_list(self):
xpath_dbs = '/html/body/div/div[2]/div/main/article/header/h2/a'
post_list = self.parse_html(xpath_dbs)
return post_list
|
获得总页码(文章一共有几页):
1
2
3
4
| def get_page_nums(self):
xpath_dbs = '/html/body/div/div[2]/div/main/nav/div/a[2]/text()'
num = int(self.parse_html(xpath_dbs)[0])
return num
|
这个页码数字是直接从html页面里取的。文章列表下方有一串数字,第1页……第n页,直接用xpath取这个n
即可。
遍历文章列表:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| def get_post_info(self):
post_list = self.get_post_list()
post_info = dict()
article_xpath_dbs = '/html/body/div/div[2]/div/main/article'
for post in post_list:
post_info['title'] = post.xpath('./text()')[0]
post_info['url'] = post.xpath('./@href')[0]
post_info['slug'] = parse.unquote(post_info['url'].split('/')[-2])
self.url = post_info['url']
article = self.parse_html(article_xpath_dbs)
post_info['pub_date'] = article[0].xpath('./footer/span[1]/a/time[1]/@datetime')[0]
post_content = article[0].xpath('./div')[0]
post_con_html = html.unescape(etree.tostring(post_content).decode())
post_con_html = self.get_images(post_con_html)
post_info['content_md'] = self.convert_html_to_markdown(post_con_html)
self.save_post(post_info)
|
这里又再一次调用了parse_html()
方法,用于获取文章正文内容。一篇文章的各个数据都保存到了一个字典里,显得很整齐。每遍历一次,都调用convert_html_to_markdown()
转换正文格式,调用get_images()
保存图片,调用save_post()
保存文章,这些方法是在下面定义的。
html转markdown:
1
2
3
4
| def convert_html_to_markdown(self, html_content):
md_content = md(html_content, heading_style='ATX').strip().replace('\n\n\n', '\n')
# md_content = md(html_content, heading_style='ATX')
return md_content
|
保存图片:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| def get_images(self, html_content):
pattern = re.compile(r'<img .*?src="(.*?)" alt=.*?>', re.S)
img_list = pattern.findall(html_content)
if not img_list:
return html_content
dir_name = self.spider_dir + '/'
for img_link in img_list:
if 'greenhand' in img_link: # 文章中部分图片链接失效
print('失效链接跳过')
break
try:
html_bytes = requests.get(url=img_link, headers=self.get_headers(), proxies=self.get_proxies()).content
filename_list = img_link.split('uploads/')[-1].split('/')
dir_img = dir_name + '/'.join(filename_list[:2]) + '/'
if not os.path.exists(dir_img):
os.makedirs(dir_img)
filename = dir_img + filename_list[-1]
with open(filename, 'wb') as f:
f.write(html_bytes)
print('%s 下载成功' % filename)
# 图片下载成功后把html代码里的图片链接替换成本地链接
html_content = html_content.replace(img_link, filename)
except:
print('图片下载出错,跳过')
return html_content
|
这一部分代码相比上面用jupyter lab写的代码,增加了错误处理,如果requests报错,就捕获错误并让程序继续运行,而不是直接退出。
上面代码里的:
1
2
3
| if 'greenhand' in img_link: # 文章中部分图片链接失效
print('失效链接跳过')
break
|
是用来跳过一些失效链接,这些图片链接里的共同特征是含有“greenhand”,我翻了好多份备份文件,都没找到这些图片,所有也没能补上,只好跳过它们。
还有的情况是,即便图片失效了,requests依然不会报错,而是下载下来一个空文件。这种情况,requests文档里应该有办法解决,但我是选择了手动处理。
保存文章:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| def save_post(self, post_info_dict):
text = """---
title: "{}"
date: {}
categories:
- tags:
- ---
{}
""".format(post_info_dict['title'], post_info_dict['pub_date'], post_info_dict['content_md'])
dir_post = self.spider_dir + '/post/'
if not os.path.exists(dir_post):
os.makedirs(dir_post)
post_filename = dir_post + 'wp_' + post_info_dict['slug'] + '.md'
with open(post_filename, 'w') as f:
f.write(text)
print('%s 文章保存成功' % post_filename)
|
注意一定不要为了代码整齐好看而让文档字符串跟着代码一起缩进,不然你用来缩进的那些空格都会进入到最后保存的文件当中,从而让文件内容变得杂乱。
最后是run()
方法,我把这个方法比喻作开车时“点火启动”这个步骤:
1
2
3
4
| def run(self):
for page_num in range(1,self.get_page_nums()+1):
self.url = self.baseurl.format(page_num)
self.get_post_info()
|
在这个方法里,首先调用get_page_nums()
方法获得总页码,然后一页页遍历文章列表。
最后
写代码是很有成就感的一个过程,如果每遇到一个困难都能顺畅解决,那就更有成就感啦。最后推荐一些帮我解决问题的网站:
- 菜鸟教程 - 学的不仅是技术,更是梦想!,我经常在这里直接搜python语法、函数的用法,linux命令等。
- Stack Overflow - Where Developers Learn, Share, & Build Careers,如果你的程序报错,在这个网站里绝对有人遇到和你一样的报错信息。不过这个网站的搜索有些弱,最好是在Google搜(
site:stackoverflow.com
)。 - Github,你想“造轮子”之前,先去看一看是不是有已经造好的轮子。比如上面用到的
markdownify
。 - python包的官方网站或仓库主页。一般会有用法示范。