软件开发架构师

快速实现数据导入及简单DCS的实现-InfoQ

大数据 65 2019-11-09 01:10

对于多数用户而言,在利用云计算的大数据服务时首先要面临的一个问题就是如何将已有存量数据快捷的导入到大数据仓库中。本文将演示如何基于京东云数据计算服务平台,简单、快速地将数据导入数据计算服务。

我们通常说的大数据平台主要包括三部分:数据相关的产品和技术、数据资产、数据管理。京东云数据计算服务(Data Computing Service,简称:DCS)是一个全托管、低使用成本的云上数据仓库服务。通过数据工厂,可轻松实现云上各数据源 (包括对象存储、云数据库、数据仓库等) 间、本地数据源与云数据源间的多种不同数据源的数据同步,实现多源数据分析与管理。

在数据工厂服务中可以创建同步任务来搬运数据,并按照指定的调度策略(每天、每周、每月)运行。该模块提供任务监控和告警功能,用户可以通过任务执行的明细日志和告警历史,轻松查明问题所在;同时,提供全面托管的工作流服务,支持图形化设计数据分析。以工作流任务的方式实现对数据的处理和相互依赖,帮助用户快速构建数据处理分析作业并周期性地执行。

下面会以 MySQL 数据库为例说明如何利用京东云数据工厂进行数据采集和 DTS 数据库之间的数据同步。数据工厂支持常见 RDS 数据库,如 MySQL、SQLServer、Oracle、DB2 和 NoSQL 数据库,也支持从 OSS、FTP 站点及 Elastic Search 等。

下图是数据工厂支持的数据源种类:
快速实现数据导入及简单DCS的实现-InfoQ-1

下面会演示以 MySQL 数据库为例,如何利用数据工厂进行数据采集,以及如何利用数据工厂作为 DTS 在两个数据源之间进行数据迁移。

为了方便测试,首先我们要创建数据库表单并灌入测试数据。为了测试方便,提前创建了一台 CentOS 7.4 云主机作为模拟客户端对数据库进行访问。

一、核心概念

数据集

在数据工厂服务中的数据集,是指由同步任务时需指定的数据源端或数据目标端的不同数据存储实例。因此,在创建同步任务之前,你必须先连接数据集。同一个数据集,可以是多个同步任务的数据源端或者数据目标端。

数据集的连通性

在用户创建数据集连接时,为检验数据集成服务能否连接成功,需要用户更具根据不同的数据集类型,填入相应的值用于连通性校验。数据集连接成功,是数据同步任务成功的前提。

同步任务

同步任务,是用户使用数据集成服务的最小单元。每一个同步任务需要用户配置数据源端、数据目标端以及相应的同步策略(如脏数据的处理等)。

工作流

工作流以图形化设计任务的方式实现对数据的处理和相互依赖。

开始实战

一、准备测试数据源

首先创建模拟数据环境。本例采用京东云的 RDS MySQL 8.0 数据库服务作为数据源,创建时可指定数据库名字为 Testdb。数据库创建完成后要开启外网访问,数据工厂可以通过公网 IP 或域名对数据库进行访问,详细域名可以在数据库的详情页中找到。
快速实现数据导入及简单DCS的实现-InfoQ-2

创建 MySQL 数据库可以通过图形界面按提示填写必要信息开通,这里不赘述。需要提醒的是数据库开通后默认不允许外网访问,要点击开启外网访问,并记住默认端口 3306。
快速实现数据导入及简单DCS的实现-InfoQ-3

对 MySQL 访问可以通过图形界面访问或通过客户端访问。当然也可以通过其他支持 MySQL 的图形化客户端进行访问。
快速实现数据导入及简单DCS的实现-InfoQ-4

本例使用 CentOS 系统作为客户端访问,如未安装客户端可以使用 Yum 命令安装 MySQL。顺利安装可以看到如下提示。

复制代码
1[ root@CentOS ~]# yum install mysql
2Loaded plugins: fastestmirror, langpacks
3Loading mirror speeds from cached hostfile
4base | 3.6 kB 00: 00
5epel | 4.7 kB 00: 00
6extras | 3.4 kB 00: 00
7updates | 3.4 kB 00: 00
8( 1/ 2): epel/x86_64/updateinfo | 986 kB 00: 00
9( 2/ 2): epel/x86_64/primary_db | 6.7 MB 00: 00
10Resolving Dependencies
11--> Running transaction check
12---> Package mariadb.x86_64 1: 5.5 .60 -1.el7_5 will be installed
13--> Processing Dependency: mariadb-libs(x86 -64) = 1: 5.5 .60 -1.el7_5 for package: 1:mariadb -5.5 .60 -1.el7_5.x86_64
14--> Running transaction check
15---> Package mariadb-libs.x86_64 1: 5.5 .56 -2.el7 will be updated
16---> Package mariadb-libs.x86_64 1: 5.5 .60 -1.el7_5 will be an update
17--> Finished Dependency Resolution
18
19Dependencies Resolved
20
21===========================================================
22 Package Arch Version Repository
23 Size
24===========================================================
25Installing:
26 mariadb x86_64 1: 5.5 .60 -1.el7_5 base 8.9 M
27Updating for dependencies:
28 mariadb-libs x86_64 1: 5.5 .60 -1.el7_5 base 758 k
29
30Transaction Summary
31===========================================================
32Install 1 Package
33Upgrade ( 1 Dependent package)
34
35Total download size: 9.6 M
36Is this ok [y/d/N]: y
37Downloading packages:
38Delta RPMs disabled because /usr/bin/applydeltarpm not installed.
39( 1/ 2): mariadb-libs -5.5 .60 -1.el7_5.x8 | 758 kB 00: 00
40( 2/ 2): mariadb -5.5 .60 -1.el7_5.x86_64. | 8.9 MB 00: 00
41-----------------------------------------------------------
42Total 11 MB/s | 9.6 MB 00: 00
43Running transaction check
44Running transaction test
45Transaction test succeeded
46Running transaction
47 Updating : 1:mariadb-libs -5.5 .60 -1.el7_5.x86_64 1/ 3
48 Installing : 1:mariadb -5.5 .60 -1.el7_5.x86_64 2/ 3
49 Cleanup : 1:mariadb-libs -5.5 .56 -2.el7.x86_64 3/ 3
50 Verifying : 1:mariadb-libs -5.5 .60 -1.el7_5.x86_64 1/ 3
51 Verifying : 1:mariadb -5.5 .60 -1.el7_5.x86_64 2/ 3
52 Verifying : 1:mariadb-libs -5.5 .56 -2.el7.x86_64 3/ 3
53
54Installed:
55 mariadb.x86_64 1: 5.5 .60 -1.el7_5
56
57Dependency Updated:
58 mariadb-libs.x86_64 1: 5.5 .60 -1.el7_5
59
60Complete!

安装后执行 MySQL 命令,测试一下是否可以链接数据库,客户端访问命令格式是 MySQL -h 主机地址 -u 用户名 -p 用户密码主机地址使用 MySQL 数据库的外部域名。从连接数据库到创建表单的详细执行过程如下:

1、验证是否可以正常连接数据库

复制代码
1[ root@CentOS ~]# mysql -h mysql-cn-north -1-aed0e558da5e4877. public.jcloud.com -P3306 -umysqlxxx –pPasswordxxx

如可以正常连接,可以新开一个窗口创建 SQL 脚本用于数据库的创建和测试数据插入操作,也可以提前制作好并上传到客户端。

创建数据表格,创建一个测试数据库和测试表便于测试。选择合适目录创建 SQL 脚本文件,可以用 vi ctable.sql 创建,也可以用其他文本编辑工具制作,脚本内容如下:

复制代码
1[root@CentOS ~]# cat ctable. sql
2USE testdb;
3 DROP TABLE IF EXISTS `sqltest`;
4 CREATE TABLE `sqltest` (
5 `id` int( 10) unsigned NOT NULL AUTO_INCREMENT,
6 `user_id` varchar( 20) NOT NULL DEFAULT '',
7 `vote_num` int( 10) unsigned NOT NULL DEFAULT '0',
8 `group_id` int( 10) unsigned NOT NULL DEFAULT '0',
9 `status` tinyint( 2) unsigned NOT NULL DEFAULT '1',
10 `create_time` datetime NOT NULL DEFAULT '0000-00-00 00:00:00',
11 PRIMARY KEY (`id`),
12 KEY `index_user_id` (`user_id`) USING HASH
13) ENGINE=InnoDB AUTO_INCREMENT= 1 DEFAULT CHARSET=utf8;

2、创建临时数据产生脚本文件,vi adddb.sql

复制代码
1[root@CentOS ~]# cat adddb. sql
2 DELIMITER // -- 修改 MySQL delimiter:'//'
3 DROP FUNCTION IF EXISTS `rand_string` //
4 SET NAMES utf8 //
5 CREATE FUNCTION `rand_string` (n INT) RETURNS VARCHAR( 255) CHARSET 'utf8'
6 BEGIN
7 DECLARE char_str varchar( 100) DEFAULT 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789';
8 DECLARE return_str varchar( 255) DEFAULT '';
9 DECLARE i INT DEFAULT 0;
10 WHILE i < n DO
11 SET return_str = concat(return_str, substring(char_str, FLOOR( 1 + RAND()* 62), 1));
12 SET i = i+ 1;
13 END WHILE;
14 RETURN return_str;
15 END //

– 创建插入数据的存储过程

复制代码
1 DROP PROCEDURE IF EXISTS `adddb` //
2 CREATE PROCEDURE `adddb`( IN n INT)
3 BEGIN
4 DECLARE i INT DEFAULT 1;
5 DECLARE vote_num INT DEFAULT 0;
6 DECLARE group_id INT DEFAULT 0;
7 DECLARE status TINYINT DEFAULT 1;
8 WHILE i < n DO
9 SET vote_num = FLOOR( 1 + RAND() * 10000);
10 SET group_id = FLOOR( 0 + RAND()* 3);
11 SET status = FLOOR( 1 + RAND()* 2);
12 INSERT INTO `sqltest` VALUES ( NULL, rand_string( 20), vote_num, group_id, status, NOW());
13 SET i = i + 1;
14 END WHILE;
15 END // DELIMITER ;

– 改回默认的 MySQL delimiter:’;’

3、登陆数据库执行 ctable.sql 和 adddb.sql 脚本
快速实现数据导入及简单DCS的实现-InfoQ-5

执行命令:

复制代码
1MySQL [testdb]> source /root/ctable.sql;
2MySQL [testdb]> source /root/adddb.sql;

增加 100 条数据

复制代码
1MySQL [testdb]> call adddb( 100);

通过调整 adddb(要增加数字)参数的数字,也可以增加 1000 条如 adddb(1000)。
快速实现数据导入及简单DCS的实现-InfoQ-6

至此我们有了数据源的测试环境,接下来可以开始利用数据工厂进行数据同步。

二、利用数据工厂进行源数据采集

选择大数据与分析的数据工厂菜单。在连接管理中添加连接,如下:
快速实现数据导入及简单DCS的实现-InfoQ-7

建立连接时建议点击连接测试按钮先进行测试数据库连接,不能连接时请检查域名、端口、用户名密码是否正确,数据库是否允许外网访问。

建立好数据库连接后,就可以进行数据同步工作了。数据同步工作可以在数据同步中单独建立任务设置,也可以在工作流中通过数据集成选项进行设置。数据集成设置后会自动生成数据同步任务。
快速实现数据导入及简单DCS的实现-InfoQ-8

快速实现数据导入及简单DCS的实现-InfoQ-9

调度策略可以选择手工执行、周期调度和单次运行三种模式,也可以直接选择单次运行。
快速实现数据导入及简单DCS的实现-InfoQ-10

快速实现数据导入及简单DCS的实现-InfoQ-11

执行完毕后在数据计算服务中就同步产生了数据。

上述操作可以用工作流的形式实现,衔接更为复杂的 Spark 计算脚本。
快速实现数据导入及简单DCS的实现-InfoQ-12

成功执行后可以在运维中心查看执行情况,在实例列表中的画图试布中看到执行节点变为绿色。
快速实现数据导入及简单DCS的实现-InfoQ-13

通过以上建立数据同步任务和工作流两种形式都能实现数据源的数据获取,数据获取后就可以直接使用大数据服务进行数据处理了。在大数据与分析菜单下选择数据计算服务管理。默认用自己用户名 /PIN(本例用户名是 jdc-14)为实例名,建立了 Default HIVE INSTANCE。
快速实现数据导入及简单DCS的实现-InfoQ-14

数据的库表管理下可以看到刚刚新建的库 MySQLdb 和表 SQLTest,点击进入 SQLTest 表名可以看到更详细的表信息如图。
快速实现数据导入及简单DCS的实现-InfoQ-15

快速实现数据导入及简单DCS的实现-InfoQ-16

可以基于获取的大数据信息在数据计算服务中进行任务开发,任务开发可以使用 SQL 或开发脚本对数据进行计算。
快速实现数据导入及简单DCS的实现-InfoQ-17

三、DCS 大数据导出

可以利用这个能力把数据工厂当作简单 DTS 工具,把数据传给目的数据库。本例在京东云建立一个 MySQL 目的数据库,把 SQLTest 同步给目的数据库,实现两个数据库的数据同步。

准备好或新建立目的 MySQL 数据库 destmysqldb,将数据计算服务的 MySQLdb 同步给目的数据库 destmysqldb。

在数据工厂菜单下,选择连接管理,新建到 destmysqldb 的连接。
快速实现数据导入及简单DCS的实现-InfoQ-18

新建同步任务,将数据计算服务的数据同步给 destmysqldb 数据库,任务名称 synctodest。选择数据源端要选择大数据的数据计算服务,数据库名为 mysqldb 数据表名为 sqltest,表的数据可以预览,避免出错。

在传输数据前要在数据库中事先建立空表结构,执行文章开头的 ctable.sql 建立表 SQLTest,数据插入时要选择目的表名。
快速实现数据导入及简单DCS的实现-InfoQ-19

执行完毕后可以在 destmysqldb 中确认结果,通过 select count(*) from sqltest 可以确认数据已经成功导入。
快速实现数据导入及简单DCS的实现-InfoQ-20

可以确认利用数据工厂作为简单 DTS 工具进行源数据库数据同步到目的数据库,实战成功!

文章评论