这个用Python编写的大数据测试工具,我给100分(用python做大数据)
数据对于任何一个企业来说都是非常重要的,为了保证数据 ETL 流程的质量及效率,很多公司都会引入 ETL 工具。目前 ETL 工具有很多,但是针对 ETL 测试的测试工具在业界却比较少见。这是为什么呢?
主要是因为在日常 ETL 测试过程中会遇到很多问题,特别是 Hive SQL 类测试的问题:
(1)测试以手动测试为主,缺少自动化工具;
(2)缺少与数据质量相关的分析工具;
(3)测试中需要重复编写SQL语句,效率较低;
(4)运行SQL语句耗时太长,严重拖慢测试进度;
(5)shell窗口中的查询结果不易保存,HUE的查询结果易过期且需要手动操作保存;
(6)数据同步场景及ETL场景下,需要对比源表和目标表一致性,缺少对比工具;
(7)实时数据处理场景对数据时效性要求高,测试时场景难以模拟,问题难以复现;
(8)常用测试场景下的用例重复,例如,对拉链表测试、MapReduce脚本的测试缺少通用的测试覆盖用例;
(9)缺少Hive与HBase一致性对比工具。
总的来说,大数据测试存在门槛高、测试效率较低、测试覆盖不全、测试场景不易复现、 测试问题难以定位等问题,今天异步君就给大家介绍一款可以解决上述问题的超好用大数据测试工具——easy_data_test。
easy_data_test
easy_data_test 是用Python编写的,目前它的主要功能有:
(1)支持单表数据量、列空值数据量、列非空值数据量、列最大值、列最小值、列不同值、不同值数据量查询,支持对表结构、任意 select 语句的查询,支持表基本信息查询、值域分析、异常值分析、手机号合规性分析、ID 合规性分析。
(2)支持双表数据量对比、列空值数据量对比、列非空值数据量对比、表结构对比、Hive 双表一致性对比、Hive 与 HBase 一致性对比。
(3)支持查看主备集群及库切换、库表集群信息。
(4)支持实时查看历史执行命令及结果,以 HTML 页面展示全表分析,以 HTML 页面展示值域,以 HTML 页面展示 Hive 双表一致性分析结果。
(5)支持拉链表通用测试(判断拉链表是否断链,判断拉链表日期正确性,对比拉链表与临时表数据量、数值)
easy_data_test功能如此强大,是如何实现的呢?异步君拿到了独家资料,从这个工具的模块设计到技术选型、再到代码实现通通都有,干货满满!下面就让我们来详细看看吧。
模块设计
话不多说,直接上图:
easy_data_test 模块设计
如图所示,用户运行 easy_data_test 工具后,可以通过 ./easy_data_test –help 命令查看所有非交互式命令,使用 stdin.readline() 来获取用户输入的语句。
如果没有指定 -f 或者 -e 就会进入交互式命令行模式。进入交互式模式后,程序通过 raw_input 函数获取用户输入的命令,并根据命令的首个关键字执行对应的函数。函数中封装了一条或多条 SQL 语句,通过 Presto 读取 Hive 元数据,或通过 pyHive 的 Hive 模块连接 Hive。
部分执行结果展示在终端页面,并存储在查询历史命令及结果文件中。部分命令执行完毕后会生成 url,通过浏览器可以查看相应命令的执行结果。
不同的首个关键字对应不同的功能模块,通常每个功能模块包含多个执行函数。
技术选型
业内常用的 Python 连接 Hive 的工具有 Presto、pyHive、impala 及 pyhs2 等。设计人员在经过执行效率及公司现有环境综合比较后,最终选择了 Presto 作为查询主要工具。
Presto 是由 Facebook 公司开发的、一个运行在多台服务器上的分布式查询引擎。本身虽然并不存储数据,但是可以接入多种数据源(Hive、HBase、Oracle、MySQL、Kafka、Redis 等),并且支持跨数据源的级联查询。
Presto 所使用的执行模式与 Hive 有根本的不同,大部分场景下 Presto 比 Hive 快一个数量级。Presto 接受请求后,立即执行,全内存并行计算;Hive 需要用 Yarn 做资源调度,为了接受查询,需要先申请资源,启动进程,并且采用 MapReduce 计算模型,中间结果会保存在磁盘上,所以速度就相对较慢。
使用 easy_data_test 过程中,有时会发现 Presto 存在部分 HiveQL 不兼容问题,例如,show tables like a* 命令无法执行,表结构查询与预期不符,执行切换库操作报错时不抛出异常等。
考虑到 Presto 部分功能缺失带来的问题,于是设计人员选择 pyHive 作为功能弥补工具,在执行特定 SQL 语句时会切换到 pyHive 去连接 Hive 执行。
区别于 Hive,需要格外注意的是,Presto 不支持隐式转换。例如,Hive 会成功执行以下语句:
select count(1) from sample_label where label <> ";
但是使用 Presto 执行就会报告以下错误;
PrestoUserError(type=USER_ERROR, name=SYNTAX_ERROR, message="line 1:83: '<>' cannot be applied to integer, varchar(0)", query_id=20191106_024551_ 01370_8ukjc)
报错原因是,label 列定义的类型为 integer,在使用 Presto 时直接将该列与空字符做比较,Presto 不支持隐式转换。对于该类问题,使用时只需将 label 显式转换为 string 或者 varchar 类型即可解决。
select count(1) from sample_label where cast(label as string) <> ";
从以上内容已经不难看出研发人员的匠心,最后我们直接来看一看 easy_data_test 的模块代码。
模块代码
入口函数如下:
1 def main(options, hostname, port): 2 setup_cqlruleset(options.cqlmodule) 3 setup_cqldocs(options.cqlmodule) 4 # 初始化历史执行命令及结果文件 5 init_history() 6 if options.file is None: 7 stdin = None 8 else: 9 try:10 encoding, bom_size = get_file_encoding_bomsize(options.file) 11 stdin = codecs.open(options.file, 'r', encoding) 12 stdin.seek(bom_size) 13 except IOError, e: 14 sys.exit("Can't open %r: %s" % (options.file, e)) 15 16 try: 17 # 初始化Shell,该类继承自cmd.Cmd 18 shell = Shell(hostname,19 port, 20 database=options.database, 21 username=options.username, 22 password=options.password, 23 stdin=stdin, 24 tty=options.tty, 25 completekey=options.completekey, 26 single_statement=options.execute, 27 connect_timeout=DEFAULT_CONNECT_TIMEOUT_SECONDS) 28 except KeyboardInterrupt: 29 sys.exit('Connection aborted.') 30 except Exception, e: 31 sys.exit('Connection error: %s' % (e,)) 32 if options.debug: 33 shell.debug = True 34 35 # 通过交互式命令循环处理 36 shell.cmdloop() 37 batch_mode = options.file or options.execute 38 if batch_mode and shell.statement_error: 39 sys.exit(2) 40 41 42 if __name__ == '__main__': 43 main(*read_options(sys.argv[1:], os.environ))
通过 Presto 连接 Hive 的代码如下:
1 import prestodb 2 conn=prestodb.dbapi.connect( 3 host= ip, 4 port=8443, 5 user='username', 6 catalog='hive', 7 schema='default', 8 http_scheme='https', 9 auth=prestodb.auth.BasicAuthentication("username", "username的密码"), 10 ) 11 conn._http_session.verify = './presto.pem' #身份认证相关文件 12 cur = conn.cursor() 13 cur.execute('SELECT * FROM system.runtime.nodes') 14 rows = cur.fetchall() 15 print rows
为了使用 Hive 查询全表数据量,需要执行 SQL 语句 select count(*) from tablename。使用工具代码封装后,查询表数据只需要使用 count tablename 即可实现,且查询效率比使用原生 Hive 快一个数量级。查询结果保存在历史文件中,可以使用相关命令查看。
关于单表模块的命令有多个,count 命令的代码如下:
1 class SigleTableAnalysis(cmd.Cmd): 2 # count table,查询表数据量,支持传入where条件 3 @classmethod 4 def do_count(self, parsed, print_command=True, print_res=True): 5 try: 6 table_name = parsed.split(' ')[1].strip(';') 7 statement = 'select count(1) from %s' % table_name 8 if len(parsed.split(' ')) >=3 and parsed.split(' ')[2].strip() == 'where': 9 wherecondition = ' '.join(parsed.split(' ')[3:]) 10 statement = statement ' where ' wherecondition 11 status, res = perform_simple_statement(statement, detail=False, print_ command=print_command, print_res=print_res) 12 if not print_res: 13 return status, res 14 except IndexError as e: 15 print('please check whether your command is right') 16 except Exception as e: 17 import traceback18 print('%s detail: %s' % (str(e), traceback.format_exc()))
其他模块的代码与 count 命令的代码相似,双表查询模块、拉链表测试模块、数据质量分析模块会在单表模块的基础上进行封装,所以设计会更复杂一些,由于篇幅有限,异步君没法在这里为大家更多地展示了。想要深入了解的小伙伴,推荐阅读《机器学习测试入门与实践》。
机器学习测试入门与实践
作者:艾辉
内容简介:
本书全面且系统地介绍了机器学习测试技术与质量体系建设,能够帮助读者了解机器学习是如何工作的,了解机器学习的质量保障是如何进行的。
工程开发人员和测试工程师通过阅读本书,可以系统化地了解大数据测试、特征测试及模型评估等知识;算法工程师通过阅读本书,可以学习模型评测的方法和拓宽模型工程实践的思路;技术专家和技术管理者通过阅读本书,可以了解机器学习质量保障与工程效能的建设方案。