共计 1830 个字符,预计需要花费 5 分钟才能阅读完成。
前言
前几天,由于个人有需求,所以就要对站酷网一些类别下的作品的图片进行批量抓取,首先是采用的是 NodeJs 来写的,但是在运行的途中遇到很多的问题,所以后来就换成了 Python,同时使用了多线程,使得图片下载时达到了宽带的峰值,同样也保证了其质量(首先声明:此代码仅供于学习交流使用,不能用于其他的任何用途)。
正文
我本次采用的是 request
和BeautifulSoup
组合来进行抓取的,首先我们需要分析站酷的 DOM 结构:
由上我们可以看出,其主要内容包含在 class 为 work-list-box
的这个 div 内,并且每个作品又是单独包含在 class 为 card-box
这个 div 内的,所以我们需要获取到页面所有的作品元素集合,我封装成了一个函数:
def startRequest(url):
print("【提示】正在抓取 - %s" % url)
res = requests.get(url)
if res.status_code == 200:
res_html = res.content.decode()
doc = BeautifulSoup(res_html)
work_box = doc.find('div', class_={'work-list-box'})
card_box_list = work_box.find_all('div', class_={'card-box'}) # 此处为所有的作品元素集合
else:
print("【文档获取失败】【状态为 %s】- %s," % (url, res.status_code))
接下来分析每个模块元素的结构,通过查看 DOM 元素得知每个 card-box 下的 class 为 title-content 的这个 a 标签包含了其作品的链接地址以及作品标题,所以我们就可以由此来获取:
title_content = item.find("a", class_={'title-content'})
avatar = item.find('span', class_={'user-avatar'})
但是在作品集合元素内,嵌套着推广或广告,但是推广和广告是没有头像 avatar 元素的,所以我们只需要这样检测头像是否为空就可以过滤掉广告元素,其后获取到作者,URL 链接以及标题:
def getContent(item):
title_content = item.find("a", class_={'title-content'})
avatar = item.find('span', class_={'user-avatar'})
if title_content is not None and avatar is not None:
title = title_content.text
author = avatar.find("a")["title"]
href = title_content['href']
else:
return
接下来,我们需要访问对应的 URL,并获取到其中的 URL 地址:
def getDocImgLinks(html):
doc = BeautifulSoup(html)
work_box = doc.find("div", class_={'work-show-box'})
revs = work_box.find_all("div", class_={'reveal-work-wrap'})
img_list = []
for item in revs:
img = item.find("img")
if img is not None:
img_url = img["src"]
img_list.append(img_url)
else:
print("【提示】:没有图片")
continue
return img_list
http://img.zcool.cn/community/0170165a372344a80121db8001781d.jpg
最后,我们需要将获取的 URL 图片进行存储,存储的文件夹就以【作者名】-【作品标题】为标准,然后图片就以 UUID 来随机命名,同时我们需要对一些特殊字符进行过滤转换,以防创建文件夹或文件失败:
def nameEncode(file_name):
file_stop_str = ['\\', '/', '*', '?', ':', '"','<','>','|']
for item2 in file_stop_str:
file_name = file_name.replace(item2, '-')
return file_name
import sys from bs4 import BeautifulSoup import requests import os import uuid import threading def startRequest(url): print("【提示】正在抓取 - %s" % url) res = requests.get(url) if res.status_code == 200: res_html = res.content.decode() doc = BeautifulSoup(res_html) work_box = doc.find('div', class_={'work-list-box'}) card_box_list = work_box.find_all('div', class_={'card-box'}) for item in card_box_list: getContent(item) else: print("【文档获取失败】【状态为 %s】- %s," % (url, res.status_code)) def getContent(item): title_content = item.find("a", class_={'title-content'}) avatar = item.find('span', class_={'user-avatar'}) if title_content is not None and avatar is not None: title = title_content.text author = avatar.find("a")["title"] href = title_content['href'] # print("%s -【%s】- %s" % (title, author, href)) res = requests.get(href) if res.status_code == 200: # 获取所有的图片链接 img_list = getDocImgLinks(res.content.decode()) path_str = "【%s】-【%s】" % (author, title) path_str_mk = pathBase('./data/'+nameEncode(path_str)) if path_str_mk is None: return else: for img_item in img_list: downloadImg(img_item, path_str_mk) else: print("【文档获取失败】【状态为 %s】- %s," % (href, res.status_code)) else: returndef getDocImgLinks(html): doc = BeautifulSoup(html) work_box = doc.find("div", class_={'work-show-box'}) revs = work_box.find_all("div", class_={'reveal-work-wrap'}) img_list = [] for item in revs: img = item.find("img") if img is not None: img_url = img["src"] img_list.append(img_url) else: print("【提示】:没有图片") continue return img_list
def pathBase(file_path): file_name_s = file_path.split("/") file_name = file_name_s[len(file_name_s) - 1] file_name_s[len(file_name_s) - 1] = file_name path = "/".join(file_name_s) if not os.path.exists(path): os.mkdir(path) return pathdef nameEncode(file_name): file_stop_str = ['\\', '/', '*', '?', ':', '"','<','>','|'] for item2 in file_stop_str: file_name = file_name.replace(item2, '-') return file_name
def downloadImg(url, path): z_url = url.split("@")[0] hz = url.split(".") z_hz = hz[len(hz) - 1] res = requests.get(z_url) if res.status_code == 200: img_down_path = path + "/" + str(uuid.uuid1()) + "." + z_hz f = open(img_down_path, 'wb') f.write(res.content) f.close() print("【下载成功】- %s" % img_down_path) else: print("【IMG 下载失败】【状态为 %s】- %s," % (z_url, res.status_code)) if __name__ == '__main__': threads = [] for i in range(1, 22): url = 'http://www.zcool.com.cn/search/content?type=3&field=8&other=0&sort=5&word=APP%E8%AE%BE%E8%AE%A1' \ '&recommend=0&requestId=requestId_1513228221822&p='+(str(i))+'#tab_anchor' threads.append(threading.Thread(target=startRequest, args={url})) for item in threads: item.start()
Github:https://github.com/Licoy/python-es/blob/master/old/zcool.py
后记
如果上述代码中有任何不清楚的地方,您可以在下方进行留言,我会及时回答您的疑惑。