Deduplicate
在使用 Scrapy 或其它工具开发爬虫时,数据去重一直是必要步骤,选择去重的方式也是各种各样:比如基于内存,基于文件,基于 redis,基于入库前 item 唯一字段查询,基于 url 请求指纹等。这里提供一些简单常用的方法,具体使用哪种方式请根据应用场景按需选择。
唯一索引 & 项目配置
一种数据去重的方式是在 yield item 后 pipeline 入库时根据库中数据来决定是否新增或更新等,比如给数据库字段添加唯一索引,忽略重复插入的错误:
现在可以很方便地通过 AyuItem 中的内置参数结合 mysql,mongodb,postgresql 和 oracledb 的唯一索引即可达到入库时自动判断数据是否已存在,不存在就新增插入,存在时也可自定更新哪些字段或忽略它。而且是通过对应数据库的一条语句实现,减少像之前那样的先 select 再判断 insert, update 或忽略跳过它而造成的 io 延迟。比如:mysql 通过 INSERT INTO … ON DUPLICATE KEY UPDATE 实现,mongodb 通过 $set 和 $setOnInsert 实现,postgresql 是通过 INSERT INTO … ON CONFLICT 实现,oracledb 是通过 MERGE INTO 实现。
Note
在 AyugeSpiderTools 3.13.0 版本中内置了更简洁的去重及更新功能,可以通过设置 AyuItem 即可轻松达到之前先 select 然后再决定 insert 还是 update 或什么也不做的复杂操作,且此方式实现方式更简洁且高效。具体的使用场景和 mongodb,postgresql 和 oracle 数据的示例请在 DemoSpider 中查看。
Mysql
对 mysql 的存储场景进行介绍:
Note
使用 AyuItem 内置的 mysql 更新规则时,需要在 .conf 配置中设置其 odku_enable 为 True 才能激活更新功能;
再结合 .conf 中设置 insert_ignore 为 True,且不设置 _update_keys 参数时即可做到忽略更新内容,达到只新增不更新已存在内容。这个请按需使用。
当然,你也可以选择保持默认关闭 odku_enable 和关闭 insert_ignore 的配置,不使用 AyuItem 中的内置更新去重功能,使用自己的去重方式来实现。
from ayugespidertools.items import AyuItem
octree_item = AyuItem(
octree_text=octree_text,
octree_href=octree_href,
_table=_save_table,
# 更新逻辑,如果 octree_text 已存在则更新或忽略,不存在会执行插入(需配合唯一索引使用)。
_update_rule={"octree_text": octree_text},
# 以 _update_rule 查询条件判断已存在时候,要更新哪些字段。_update_keys 不设置则还是
# 走新增(注意:在设置了唯一索引时,推荐设置 _update_keys 参数或 insert_ignore 配置
# 为 true,具体使用方式请按照自己喜欢的来。)
# 比如此示例,需要在 mysql _table 表设置 octree_text 为唯一索引,插入相同唯一索引
# 对应的数据会自动触发更新 _update_keys 中的字段,否则就正常新增数据。若你不设置唯一
# 索引,则会永远执行新增插入。
# 当然,如果设置了唯一索引且遇到了相同数据,但是并不想走更新逻辑,而是忽略它,那么不设
# 置 _update_keys 并结合 insert_ignore 即可。
_update_keys={"octree_href"},
)
yield octree_item
MongoDB
对 mongodb 的存储场景进行介绍:
Warning
在 ayugespidertools 3.13.0 之前,mongodb 存储场景是通过 _mongo_update_rule 来确定更新规则,如果匹配就会更新 AyuItem 中所有字段。规则有点过于粗暴简陋了。
目前规则中,更新规则字段改为统一的 _update_rule,如果数据已存在,则会更新 _update_keys 中的字段;如果已匹配数据但是没有设置 _update_keys 则并不会更新任何字段;虽然新版本依然支持 _mongo_update_rule 和 _mongo_update_keys 字段,但是推荐使用统一内置参数 _update_rule 和 _update_keys,3.14.0已删除_mongo_x 的字段。
from ayugespidertools.items import AyuItem
octree_item = AyuItem(
octree_text=octree_text,
octree_href=octree_href,
# 更新规则,如果匹配 _update_rule 则会尝试更新已存在数据的 _update_keys 中的字段。
_update_rule={"octree_text": octree_text},
# _update_rule 匹配时自定义更新的字段,支持设置多个,比如 {"octree_href", "new_field"}
# 如果没有设置则不会更新任何字段。
_update_keys={"octree_href"},
_table="demo_two",
)
self.slog.info(f"octree_item: {octree_item}")
yield octree_item
PostgreSQL
对 PostgreSQL 的存储场景进行介绍:
Note
postgresql 的使用和上面 mysql 和 mongodb 的一样,主要是多了一个 _conflict_cols 参数用于指定数据表中的唯一索引约束字段(在设置了唯一索引时,当然也非常推荐结合唯一索引使用);为什么不使用 merge into 接口就可以不用多一个自定义 _conflict_cols 字段了,不是更方便简约吗?是因为它只在 postgresql 15 版本及以上才支持,为了兼容性考虑。
from ayugespidertools.items import AyuItem
octree_item = AyuItem(
octree_text=octree_text,
octree_href=octree_href,
start_index=index,
_table=_save_table,
_update_rule={"octree_text": octree_text},
_update_keys={"octree_href"},
_conflict_cols={"octree_text"},
)
self.slog.info(f"octree_item: {octree_item}")
yield octree_item
Oracle
对 Oracle 的存储场景进行介绍:
Note
oracle 的使用和上面 postgresql 的一致,这里就不再多余介绍了。
from ayugespidertools.items import AyuItem
octree_item = AyuItem(
octree_text=octree_text,
octree_href=octree_href,
start_index=index,
_table=_save_table,
_update_rule={"octree_text": octree_text},
_update_keys={"octree_href"},
)
self.slog.info(f"octree_item: {octree_item}")
yield octree_item
Database
一种数据去重的方式是在 yield item 的 spider 中根据数据库查询来查看是否需要入库。这里不是使用内置的更新功能(AyuItem 中自定义 _update_rule 和 _update_keys 的方式),这里是提供一个数据库链接接口,会更加的灵活,适合更复杂、更自定义的查询场景。
这里提供的接口是 ayugespidertools.utils.database,里面提供了最常见的数据库链接功能。分别介绍他们:
Mysql
Mysql 场景下除了使用 insert_ignore 或 odku_enable 的配置外,可以使用自定义的方式,推荐使用 asyncio 的方式来查询:
from ayugespidertools.common.typevars import MysqlConf
from ayugespidertools.utils.database import MysqlAsyncPortal
async def test_example():
conn = MysqlAsyncPortal(db_conf=MysqlConf(...))
pool = await conn.connect()
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute("SELECT 42;")
await conn.close()
当然,也可以使用普通的方式来查询:
from ayugespidertools.common.typevars import MysqlConf
from ayugespidertools.utils.database import MysqlPortal
def test_example():
conn = MysqlPortal(db_conf=MysqlConf(...)).connect()
cursor = conn.cursor()
cursor.execute("SELECT 42;")
conn.close()
MongoDB
MongoDB 场景下除了使用 AyuItem 的方式,也可以使用自定义的方式,推荐 asyncio 的方式。
from ayugespidertools.common.typevars import MongoDBConf
from ayugespidertools.utils.database import MongoDBAsyncPortal
async def test_example():
db = MongoDBAsyncPortal(db_conf=MongoDBConf(...)).connect()
res = await db["test_collection"].find_one({"key": "value"}, {"_id": 1})
db.client.close()
当然,也可以使用普通的方式来查询:
from ayugespidertools.common.typevars import MongoDBConf
from ayugespidertools.utils.database import MongoDBPortal
def test_example():
db = MongoDBPortal(db_conf=MongoDBConf(...)).connect()
res = db["test_collection"].find_one({"key": "value"}, {"_id": 1})
db.client.close()
PostgreSQL
PostgreSQL 场景下的 asyncio 的数据库链接操作示例:
from ayugespidertools.common.typevars import PostgreSQLConf
from ayugespidertools.utils.database import PostgreSQLAsyncPortal
async def test_example():
pool = await PostgreSQLAsyncPortal(db_conf=PostgreSQLConf(...)).connect()
async with pool.acquire() as conn:
await conn.fetchrow("SELECT 42;")
await pool.close()
Warning
在 ayugespidertool 3.12.x 旧版本中的 PostgreSQL 入库查询使用方式如下,已经删除此方式,使用较复杂。
from ayugespidertools.utils.database import PostgreSQLAsyncPortal
async def test_example():
conn = PostgreSQLAsyncPortal(db_conf=postgres_conf)
pool = conn.connect()
await pool.open()
async with pool.connection() as conn:
async with conn.cursor() as cursor:
await cursor.execute("SELECT 42;")
await pool.close()
当然,也可以使用普通的方式来查询:
from ayugespidertools.common.typevars import PostgreSQLConf
from ayugespidertools.utils.database import PostgreSQLPortal
def test_example():
conn = PostgreSQLPortal(db_conf=PostgreSQLConf(...)).connect()
cursor = conn.cursor()
cursor.execute("SELECT 42;")
conn.close()
Oracle
Oracle 场景下的 asyncio 的数据库链接操作示例:
from ayugespidertools.common.typevars import OracleConf
from ayugespidertools.utils.database import OracleAsyncPortal
async def test_example():
_sql = 'SELECT * from "_article_info_list"'
pool = OracleAsyncPortal(db_conf=OracleConf(...)).connect()
async with pool.acquire() as conn:
with conn.cursor() as cursor:
await cursor.execute(_sql)
exists = await cursor.fetchone()
await conn.close()
# 但是更推荐直接使用 conn 操作即可,更简洁,可自行挑选喜欢的方式
async with pool.acquire() as conn:
exists = await connection.fetchone(_sql)
await conn.close()
当然,也可以使用普通的方式来查询:
from ayugespidertools.common.typevars import OracleConf
from ayugespidertools.utils.database import OraclePortal
def test_example():
conn = OraclePortal(db_conf=OracleConf(...)).connect()
cursor = conn.cursor()
cursor.execute("SELECT 42;")
conn.close()
Redis
本库给了一个非常简约的根据 redis 查询数据是否已存在的方法,可用于简单场景的判断:
from ayugespidertools.extras.deduplicate import Deduplicate
dp = Deduplicate(name="test", redis_url="redis://:password@localhost:6379/0")
# 查看 key1 是否已存在,如果不存在就自动添加到 redis 中
res: int = dp.exists("key1")
# 查看 key2 是否已存在,如果不存在不会自动添加到 redis 中
res2: int = dp.get("key2")