MySQL亿级数据快速导出

嘉美伯爵

Python MySQL 168

mysql千万级数据如何快速导出

今天给大家讲解如何快速的导出千万级MySQL中的数据,大家平时在进行MySQL数据导出的时候,如何数据量不大(万级记录)可能不会遇到这样那样的问题,下面就我前段事件导出MySQL千万级(目前量级8千万,已快到一亿)数据遇到问题的一个回放和代码优化。

查询优化

当你接到需求,可能第一时间想到,直接全量查询不就好了,如果数据记录在几万条还好,当MySQL一个表的数据大于200W的时候,这个时候去查询已经非常吃力了,即使在添加索引的情况下。

查询需求

收到的需求是,为满足算法团队模型训练用,需要满足根据网元名称和时间范围动态快速的查询出数据,由于后台每天产生的数据是惊人的,该数据由kafka生产,MySQL存储。MySQL表中的下列字段nenamestarttimeendtime均加有索引,查询时我们需要将产生逻辑运算的字段,放在后面,不然会引起索引失效的情况。

select * from workflow_hugealarm where nename = {nename} and eventtime > {starttime} and eventtime < {endtime};

多线程+分页查

由于上面的方案查询速度和效率都非常慢,因此对程序进行了改进,使用多线程+分页的方式进行查询。具体思路是:批量开启多个线程,然后用数量行数除以每页查询的条数,得到总页数,最后交给线程池进行批量执行。

  • 获取数据行数
select max(id) as counts from workflow_hugealarm;
  • 数据等分
def thread_pool(self, **kwargs):
    pages = self.get_count() / self.step
    with ThreadPoolExecutor(100) as execute:
        for step in range(1, math.ceil(pages) + 1):
            try:
                execute.submit(self._query, step, **kwargs)
            except Exception as e:
                logging.error(e)
                continue
    return pd.DataFrame(self.data)

多线程+MySQL连接池查

上面的查询方案运行一段时间后会发现程序报错,经过多方定位发现是MySQL连接数超限,很快就到了MySQL的最大连接数,即1024个连接数。最后想到的解决方案是维护一个MySQL的连接池,这里我们使用Python字典类型进行存储维护。

  • 报错原因
AttributeError: 'NoneType' object has no attribute 'settimeout'
  • 连接池
def get_instance(self):
    """
    创建数据库连接池
    :return:
    """
    name = threading.current_thread().name
    if name not in self.pool:
        conn = pymysql.connect(**self.db_info)
        self.pool[name] = conn
    return self.pool[name]

多进程+多线程查

上面的方案其实已经满足大批量查询了,为了使导出速度变的更快,我们使用多进程+多线程组合的方式进行查询。这也是处理IO密集型业务的最佳实践方案,使用该方案,可以极大的规避GIL所带来的弊端。

  • 多进程+多线程
# Processing pool (see below for initiazation)
pool = None

# Performs a large calculation (CPU bound)
def some_work(args):
    ...
    return result

# A thread that calls the above function
def some_thread():
    while True:
        ...
        r = pool.apply(some_work, (args))
        ...

# Initiaze the pool
if __name__ == '__main__':
    import multiprocessing
    pool = multiprocessing.Pool()

总结

由于公司每天产生的数据是惊人的,因此经过我们商讨已经不适合用MySQL来进行存储了,我们使用了PostgreSql数据库集群来进行了存储,每天分别建表来存储。

  • 后台回复mysql可以获取源码
  • 大家如果有不明白的可以留言交流讨论