爬取WordPress网站:以爬虫的方式迁移WordPress至Hugo

写博客很多年了,以前用的都是WordPress,一个传统的PHP+MySQL的CMS。在2022年底开了一个新博客,用Hugo配合Obsidian来写,经过我的配置,在本地写好文章后只需git推送到github,github actions自动部署生成静态页面发送到服务器。很优雅的写作方式,所以从文章数量就能看出来,换用Hugo之后我是相当的高产。

所以我就想将这些年用WordPress写的文章都统一整理出来,集中用Hugo进行管理。很自然地我就想到了,使用python爬虫将所有文章和文章配图都爬取下来,将内容转换为Hugo能识别的Markdown格式。

分析

  • 要爬取的网站:xiake.me(写本文时还是WordPress,未来绑定到Hugo博客)
  • 使用的WP主题:Twenty Fifteen
  • 文章列表URL:https://xiake.me/page/2/,其中2是指第2页。
  • 文章URL:https://xiake.me/2020/08/06/一个傅里叶级数展开式的图象/,日期+文章标题或略缩名slug
  • 文章配图URL:https://xiake.me/wp-content/uploads/2020/08/fuliye.png,日期+文件名

要获取的数据:

  1. 文章标题
  2. 文章slug(文章略缩名,用于固定链接)
  3. 文章URL
  4. 文章文字内容
  5. 文章配图
  6. 文章发布日期

思路:访问文章列表URL,获取文章标题、文章slug、文章URL,访问文章URL获取文章内容、文章发布日期,下载文章配图,并将图片链接替换为本地相对链接。

获取文章标题、URL这些数据使用XPATH,替换图片链接使用正则表达式。

用IPython踩坑

最终我们是要将程序用面向对象的方式来写,也就是把爬虫封装成一个类。但我们可以先用IPython把爬虫的运行过程走一遍,等各个环节都理顺了,把坑都存踩完了,再改造成想要的样子。

在这个环节,我先用Jupyter Lab用面向过程的思路把代码敲一遍。每写好一个代码块(cell)都可以按shift+enter(或ctrl+enter)运行一下,检查错误很方便。

先导入一些必定用到的模块,其他模块用到的时候再导入:

1
2
3
import requests
from fake_useragent import UserAgent
from lxml import etree

定义URL、请求头、获取响应对象:

1
2
3
4
5
6
url = 'https://xiake.me'
ua = UserAgent()
headers = {
    'UserAgent': ua.random,
}
res = requests.get(url, headers=headers)

将获取到的html内容打印出来看一看是不是成功获取到了:

1
2
html = res.content.decode('utf-8')
print(html)

用XPATH解析页面,得到xpath对象:

1
2
3
4
5
# 解析html,得到文章元素
xpath_dbs = '/html/body/div/div[2]/div/main/article/header/h2/a'
parse_html = etree.HTML(html)

post_list = parse_html.xpath(xpath_dbs)

post_list是一个列表,列表里的各个元素是xpath对象,接下来从第一个元素中取数据。

取文章标题:

1
2
post_title = post_list[0].xpath('./text()')[0]
print(post_title)

取文章slug:

1
2
3
post_slug = post_list[0].xpath('./@href')[0].split('/')[-2]

print(post_slug)

取文章链接并根据链接访问文章页面(二级页面):

1
2
url1 = post_list[0].xpath('./@href')[0]
html2 = requests.get(url1,headers=headers).content.decode('utf-8')

写取文章正文内容和文章发表日期的xpath:

1
2
3
4
# 文章内容xpath
xpath_dbs2 = '/html/body/div/div[2]/div/main/article/div'
# 文章日期、作者xpath
xpath_dbs3 = '/html/body/div/div[2]/div/main/article/footer'

xpath解析二级页面,取文章正文内容:

1
2
3
4
parse_html2 = etree.HTML(html2)

post_content = parse_html2.xpath(xpath_dbs2)
print(post_content)
1
2
import html as htmlP
post_con_html = htmlP.unescape(etree.tostring(post_content[0]).decode())

注意这里我又导入了一个html模块。因为之前没想到用它,我用html来命名了一个变量,因此我只好把这里导入的这个模块取别名为htmlP

取文章发表日期:

1
2
3
post_footer = parse_html2.xpath(xpath_dbs3)
post_pubdate=post_footer[0].xpath('./span[1]/a/time[1]/@datetime')[0]
print(post_pubdate)

这段代码的运行结果(文章发表日期),格式和Hugo文章里的日期格式一样,因此就不用转换日期格式了。

将文章中的图片下载下来,并替换文章中的图片链接:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
# 将文章里的图片下载下来,并替换html中的图片链接
import re
pattern = re.compile(r'<img .*?src="(.*?)" alt=.*?>', re.S)
img_list = pattern.findall(post_con_html)

import os

dir = 'xiake/'

for img_link in img_list:
    html_bytes = requests.get(url=img_link,headers=headers).content
    filename_list = img_link.split('uploads/')[-1].split('/')
    dir_img = dir + '/'.join(filename_list[:2])+'/'
    if not os.path.exists(dir_img):
        os.makedirs(dir_img)
    filename = dir_img + filename_list[-1]
    with open(filename, 'wb') as f:
        f.write(html_bytes)
        print('%s 下载成功' % filename)
    # 图片下载成功后把html代码里的图片链接替换成本地链接
    post_con_html = post_con_html.replace(img_link,filename)

这里是用正则来取图片链接,下载图片依然是用的requests。

图片下载下来后我是打算放到代码同级目录下的一个叫xiake的文件夹里,并按日期来嵌套文件夹(即xiake/年/月/日/图片文件.jpg),因此如果这个文件夹不存在就需要提前新建它。

图片下载成功后,用replace()方法把html里的链接替换掉。

看看图片链接是不是都替换成功了:

1
print(post_con_html)

接下来要将html转换为markdown,这里我用到了一个第三方包:markdownify,所以使用之前需要安装它:

1
pip install markdownify

在Jupyter Lab里可以直接运行上面这句命令。

1
2
3
4
from markdownify import markdownify as md
post_con_md = md(post_con_html,heading_style='ATX')
post_con_md.strip().replace('\n\n\n','\n')
print(post_con_md)

参数heading_style='ATX'的作用是,在转换后,文章小标题用#来标识。转换成markdown格式之后,文章里的空行有些多,所以去掉。

将转换后的文章内容整理成符合hugo格式要求的文本:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# 将文章保存为hugo需要的markdown格式,
text = """---
title: "{}"
date: {}
categories:
- 
tags:
- 
---
{}
""".format(post_title,post_pubdate,post_con_md)
print(text)

最后保存成文件:

1
2
3
4
5
6
7
8
dir_post = 'content/post/'
if not os.path.exists(dir_post):
    os.makedirs(dir_post)

post_filename = dir_post+'wp_'+post_slug+'.md'
with open(post_filename, 'w') as f:
    f.write(text)
    print('%s 文章保存成功' % post_filename)

文件名用获取的文章slug来命名,前面再加上一个wp_前缀。文章保存在了代码同级目录下的content/post/文件夹里。

踩坑过程结束了。看上面的过程,似乎挺顺利的样子。真实情况是,总体很顺利,但也确实踩了不少坑。我已经很努力在还原真实的写代码过程了,如果把每个坑都仔细描述一下,这篇文章的长度我觉得还能再增加一倍。

封装

在改写代码之前,通过上面写代码的过程,我又想到几个可以改进的点:

  1. 程序写成交互式的,这样不仅可以爬这一个wp站,其他的也可以;
  2. 图片和markdown文件可以放到同一个文件夹里。
  3. 上面的代码都是理想情况下的样子,增加错误处理代码,提高容错性。
  4. 从同一篇文章里爬取的数据放到一个字典里,更整齐。

下面开始写代码了。

主函数:

1
2
3
4
5
if __name__ == '__main__':  
    spider_url = input("你想爬取哪一个WordPress博客?\n")  
    spider_dir = input("你想将爬取到的内容放在哪一个文件夹里面?\n")  
    spider = WordPressSpider(spider_url, spider_dir)  
    spider.run()

接下来在代码最上方调用一堆模块、定义WordPressSpider类:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import html  
import os  
import re  
  
import requests  
from fake_useragent import UserAgent  
from lxml import etree  
from markdownify import markdownify as md  
from urllib import parse

class WordPressSpider:  
    """  
    将WordPress博客“xiake.me”上的文章爬取下来,并下载文章图片到本地,替换文章中的图片链接为  
    本地链接,最后需将文章转为Hugo需要的Markdown格式。  
    """

先说明,下面的代码都是在上面这个WordPressSpider类里面的方法定义。不要当成普通函数定义了。

初始化:

1
2
3
4
def __init__(self,spider_url,spider_dir):  
    self.url = (spider_url if spider_url[-1] != '/' else spider_url[:-1])  
    self.baseurl = (spider_url if spider_url[-1] != '/' else spider_url[:-1]) + '/page/{}/'  
    self.spider_dir = spider_dir

我用了两个属性self.urlself.baseurl,这是为了让程序可以爬取多个网站所做的妥协。属性声明里用了条件解析式,这样的话无论你输入的网址是不是带最后的那个/都能正确处理。

get_proxies()方法:

1
2
3
4
5
6
def get_proxies(self):  
    proxies = {  
        'http': 'http://127.0.0.1:7890',  
        'https': 'http://127.0.0.1:7890',  
    }  
    return proxies

调用requests时可以指定一个参数(那两个字我不敢说,因为我的网站备案了),这样就可以顺畅访问“加载慢”的网站了。

请求头:

1
2
3
4
5
6
def get_headers(self):  
    ua = UserAgent()  
    headers = {  
        'UserAgent': ua.random,  
    }  
    return headers

如果只是爬我自己的网站的话,没有反爬手段,其实可以不用使用fake_useragent,为了增加程序的普适性,用上还是最好的。

parse_html()

1
2
3
4
5
def parse_html(self, xpath_dbs):  
    res = requests.get(url=self.url, headers=self.get_headers(), proxies=self.get_proxies())  
    html_source = res.content.decode('utf-8')  
    parse_html = etree.HTML(html_source)  
    return parse_html.xpath(xpath_dbs)

这里就用上了上面的get_proxies()方法。接受一个xpath表达式,解析页面,返回xpath对象(列表)。

获得文章列表:

1
2
3
4
def get_post_list(self):  
    xpath_dbs = '/html/body/div/div[2]/div/main/article/header/h2/a'  
    post_list = self.parse_html(xpath_dbs)  
    return post_list

获得总页码(文章一共有几页):

1
2
3
4
def get_page_nums(self):  
    xpath_dbs = '/html/body/div/div[2]/div/main/nav/div/a[2]/text()'  
    num = int(self.parse_html(xpath_dbs)[0])  
    return num

这个页码数字是直接从html页面里取的。文章列表下方有一串数字,第1页……第n页,直接用xpath取这个n即可。

遍历文章列表:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
def get_post_info(self):  
    post_list = self.get_post_list()  
    post_info = dict()  
    article_xpath_dbs = '/html/body/div/div[2]/div/main/article'  
    for post in post_list:  
        post_info['title'] = post.xpath('./text()')[0]  
        post_info['url'] = post.xpath('./@href')[0]  
        post_info['slug'] = parse.unquote(post_info['url'].split('/')[-2])  
  
        self.url = post_info['url']  
        article = self.parse_html(article_xpath_dbs)  
        post_info['pub_date'] = article[0].xpath('./footer/span[1]/a/time[1]/@datetime')[0]  
        post_content = article[0].xpath('./div')[0]  
        post_con_html = html.unescape(etree.tostring(post_content).decode())  
        post_con_html = self.get_images(post_con_html)  
        post_info['content_md'] = self.convert_html_to_markdown(post_con_html)  
        self.save_post(post_info)

这里又再一次调用了parse_html()方法,用于获取文章正文内容。一篇文章的各个数据都保存到了一个字典里,显得很整齐。每遍历一次,都调用convert_html_to_markdown()转换正文格式,调用get_images()保存图片,调用save_post()保存文章,这些方法是在下面定义的。

html转markdown:

1
2
3
4
def convert_html_to_markdown(self, html_content):  
    md_content = md(html_content, heading_style='ATX').strip().replace('\n\n\n', '\n')  
    # md_content = md(html_content, heading_style='ATX')  
    return md_content

保存图片:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def get_images(self, html_content):  
    pattern = re.compile(r'<img .*?src="(.*?)" alt=.*?>', re.S)  
    img_list = pattern.findall(html_content)  
    if not img_list:  
        return html_content  
  
    dir_name = self.spider_dir + '/'  
    for img_link in img_list:  
        if 'greenhand' in img_link: # 文章中部分图片链接失效  
            print('失效链接跳过')  
            break  
        try:  
            html_bytes = requests.get(url=img_link, headers=self.get_headers(), proxies=self.get_proxies()).content  
            filename_list = img_link.split('uploads/')[-1].split('/')  
            dir_img = dir_name + '/'.join(filename_list[:2]) + '/'  
            if not os.path.exists(dir_img):  
                os.makedirs(dir_img)  
            filename = dir_img + filename_list[-1]  
  
            with open(filename, 'wb') as f:  
                f.write(html_bytes)  
                print('%s 下载成功' % filename)  
            # 图片下载成功后把html代码里的图片链接替换成本地链接  
            html_content = html_content.replace(img_link, filename)  
        except:  
            print('图片下载出错,跳过')  
  
    return html_content

这一部分代码相比上面用jupyter lab写的代码,增加了错误处理,如果requests报错,就捕获错误并让程序继续运行,而不是直接退出。

上面代码里的:

1
2
3
if 'greenhand' in img_link: # 文章中部分图片链接失效  
	print('失效链接跳过')  
	break  

是用来跳过一些失效链接,这些图片链接里的共同特征是含有“greenhand”,我翻了好多份备份文件,都没找到这些图片,所有也没能补上,只好跳过它们。

还有的情况是,即便图片失效了,requests依然不会报错,而是下载下来一个空文件。这种情况,requests文档里应该有办法解决,但我是选择了手动处理。

保存文章:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
    def save_post(self, post_info_dict):  
        text = """---  
title: "{}"  
date: {}  
categories:  
- tags:  
- ---  
{}  
""".format(post_info_dict['title'], post_info_dict['pub_date'], post_info_dict['content_md'])  
  
        dir_post = self.spider_dir + '/post/'  
        if not os.path.exists(dir_post):  
            os.makedirs(dir_post)  
  
        post_filename = dir_post + 'wp_' + post_info_dict['slug'] + '.md'  
        with open(post_filename, 'w') as f:  
            f.write(text)  
            print('%s 文章保存成功' % post_filename)

注意一定不要为了代码整齐好看而让文档字符串跟着代码一起缩进,不然你用来缩进的那些空格都会进入到最后保存的文件当中,从而让文件内容变得杂乱。

最后是run()方法,我把这个方法比喻作开车时“点火启动”这个步骤:

1
2
3
4
def run(self):  
    for page_num in range(1,self.get_page_nums()+1):  
        self.url = self.baseurl.format(page_num)  
        self.get_post_info()

在这个方法里,首先调用get_page_nums()方法获得总页码,然后一页页遍历文章列表。

最后

写代码是很有成就感的一个过程,如果每遇到一个困难都能顺畅解决,那就更有成就感啦。最后推荐一些帮我解决问题的网站:

  1. 菜鸟教程 - 学的不仅是技术,更是梦想!,我经常在这里直接搜python语法、函数的用法,linux命令等。
  2. Stack Overflow - Where Developers Learn, Share, & Build Careers,如果你的程序报错,在这个网站里绝对有人遇到和你一样的报错信息。不过这个网站的搜索有些弱,最好是在Google搜(site:stackoverflow.com)。
  3. Github,你想“造轮子”之前,先去看一看是不是有已经造好的轮子。比如上面用到的markdownify
  4. python包的官方网站或仓库主页。一般会有用法示范。