Uber推出数据湖集成神器DBEvents,支持MySQL、Cassandra等(21)

发布于2019-04-21 20:14:20

在全球市场保持Uber平台的可靠性和实时性是一项7*24小时不能间断的任务。当旧金山的人们进入梦乡时,巴黎的上班族们正发送着Uber车辆订单准备出门工作。而同一时刻在地球的另一端,孟买的居民可能正在用Uber Eats订购晚餐。

我们在Uber的大数据平台上促成各种互动,使用我们的Marketplace来匹配乘客和司机;食客、餐馆和配送伙伴;货车司机和运货人。从数据的角度来洞察这些交互有助于我们为全球用户提供优质且有意义的产品体验。

食客们希望食物能及时送达,乘客也希望在最短的时间内被接到,我们的数据必须尽可能快的反映出现场发生的事件。但随着四面八方的数据汇入我们的数据湖,在这种规模下保持数据的新鲜度成为了一项重大的挑战。

虽然现在已经有一些为公司提供24小时数据新鲜度的方案,但对于Uber的实时性需求来说还是过时了。此外,对于Uber的数据规模和运营规模,这种方案无法保证可靠运行。

为了满足我们的特殊需求,我们开发了DBEvents——一种专为高数据质量和新鲜度而设计的变更数据获取系统。变更数据获取系统(CDC, Change Data Capture System)可以用来确定哪些数据发生了变更,以便采取一些操作,比如获取或是复制。DBEvents有助于引导、获取已有表格的快照,以及增量的流式更新。

作为Uber其他软件(例如MarmarayHudi)的补充,DBEvents从MySQL、Apache Cassandra和Schemaless中获取数据,以更新我们的Hadoop数据湖。这个解决方案可管理PB级的数据并可在全球范围内运营,帮助我们为内部数据客户提供最好的服务。

快照数据获取

从历史上看,Uber的数据获取一般会先确认要获取的数据集,然后使用MapReduce或是Apache Spark运行一个大型处理作业,从源数据库或表中高并发地读取数据。接下来,我们会将这个作业的输出发送到离线的数据湖,如HDFS或Apache Hive。我们把这个过程称为快照,根据数据集大小的不同,一般会花费几分钟到几小时的时间,这对于我们内部客户的需求来说还不够快。

当一个作业开始获取数据时,它会分散成多个并行任务,与上游的表格(如MySQL)建立并行的连接并拉取数据。从MySQL读取大量数据会对其实时应用流量施加很大的压力。我们可以使用专用的服务器执行ETL操作来减轻压力,但这会影响到数据的完整性,也会因为这个备份的数据库服务器增加额外的硬件成本。

获取数据库或表的时间会随着数据量的增加而延长,并在某些时刻无法再满足业务的需求。由于大多数的数据库每天仅更新部分数据,只有极少量的新纪录会被添加,而整个快照过程会一遍又一遍地读取和写入整张表的数据,包括未修改的行,这会导致计算和存储资源无法被有效利用。

DBEvents的要求

为了Uber对更新鲜更快速的数据洞察需求,我们需要设计一种更好的方式来将数据提取到数据湖中。当我们开始设计DBEvents时,为最终的解决方案定义了三个业务要求:新鲜度、质量和效率。

新鲜度

数据的新鲜度指它更新得有多频繁。假设在时间t1更新MySQL表中的一行。数据提取作业在时间t1+1时刻开始运行,并消耗N个单位时间完成作业。则用户可以在t1+1+N时刻获得数据。这里,数据的新鲜度延迟是N+1,即数据实际更新到数据湖中并可被获取的时间延迟。

Uber有很多用例都要求N+1尽可能的小,最好是几分钟。这些用例包括欺诈检测,因为即使是最轻微的延迟也会影响到客户的体验。出于这些原因,我们在DBEvents中将数据新鲜度的要求排在了最高优先级。

质量

如果我们无法描述或理解数据湖中的数据,它们是无法发挥出价值的。设想不同的上游服务对不同的表有不同的schema。尽管这些表在创建时都有一个schema,但这些schema会随着用例的变化而变化。如果对于摄入的数据没有一个一致的方法来定义和更新schema,数据湖很快就会变成一个数据沼泽——一大堆难以被理解使用的数据。

另外,随着表schema的演进,搞明白为什么要增加字段或是弃用原有字段很重要。如果不了解每列数据代表的含义,就很难理解数据的意义。因此,确保数据的高质量是DBEvents另一个优先考虑的要求。

效率

在Uber,我们有数以千计的微服务来负责业务逻辑的不同部分以及不同的业务线。在大多数情况下,每个微服务都有一个或多个备用数据库来存储长期的数据。可以想象,这会导致成百上千的表格都需要被读取,这需要大量的计算和存储资源。

因此,DBEvents的第三个设计目标是使系统更高效。通过对存储和计算资源的使用优化,我们最终降低了数据中心使用和工程时间的成本,并能在未来加入更多的数据源。

设计DBEvents

结合这三个需求,我们构建了Uber的变更数据获取系统DBEvents,用于增量地感知并获取数据的变更,从而改善我们平台的使用体验。

数据集的获取可以分为两个步骤:

  1. 引导:一个表格在某个时间点快照的表现

  2. 增量获取:对表格进行增量的获取并实施变更(上游发生的)

引导(Bootstrap)

我们开发了一个可插拔数据源的库,来引导例如Cassandra,Schemaless和MySQL等外部数据源通过我们的摄取平台Marmaray将数据导入数据湖。这个库提供了有效引导数据集所需的语意,同时提供了能够添加任何数据源的可插拔架构。每个外部数据源都会将其原始数据的快照备份至HDFS。

快照备份完成后,Marmaray会调用库,依次读取备份数据,并将其作为Marmaray可用的Spark RDD来提供。在执行可选的去重、部分行合并及其他操作后,Marmaray会将RDD持久化到Apache Hive。

为了提高获取超大表格时的效率和可靠性,引导的过程是增量的。你可以为数据集设计批次的大小,也可以增量式(可能是并行的)地进行引导,从而避免过大的作业。

image

图1:我们的可插拔源引导库从HDFS备份中读取,为取数平台Marmaray准备数据集

MySQL 引导案例

为MySQL数据库创建备份一般会先在文件系统中为数据创建一个副本,然后使用本地文件格式将其存储在另一个存储引擎中。这种逐位复制文件的方式被称为物理备份。由于存在索引,被复制的物理文件通常包含重复数据,这会使磁盘上数据集的大小明显增加。

作为DBEvents体系的一部分,我们开发并开源了一个名为StorageTapper的服务,它从MySQL数据库读取数据,将其转换为模式化的版本,并将事件发布到不同的目的地,例如HDFS或者Apache Kafka。这种在目标存储系统上生成事件的方法使我们能够创建逻辑备份。逻辑备份依赖StorageTapper基于原始数据库创建的事件从而在目标系统上重新创建数据集,而不是使用数据集的直接备份。

除了比物理备份具有更高的效率外,逻辑备份还有以下优点:

image

图2:StorageTapper从MySQL读取二进制变更日志,对Apache Avro中的时间进行编码,并将它们发送至Apache Kafka或是在HDFS中备份。可以用这些事件在其他系统(例如Apache Hive)中重构数据集。

实现新鲜度

为了使我们的数据足够新鲜,我们需要以小批量的方式来增量地消费和修改数据集。我们的数据湖使用的是HDFS(一种append-only系统)来存储PB级的数据。而大部分的分析数据都是用Apache Parquet文件格式编写的,这种方式适用于大规模的列扫描,但是无法更新。遗憾的是,由于HDFS是append-only模式而Apache Parquet是不可修改的,用户如果想更新数据集就必须要批量重写整个数据集(在使用Hive时,是重写数据集的大量分区)。

为了快速获取数据,我们使用了Apache Hudi——一个由Uber设计的用于管理HDFS中所有原始数据集的开源库,它能减少对不可变的数据湖执行upsert操作所花费的时间。Apache Hudi能在数据集上提供原子的upsert操作和增量数据流。

MySQL 增量提取案例

除了引导,我们还能使用StorageTapper从MySQL源执行增量提取。在我们的用例中,StorageTapper从MySQL的二进制日志中读取事件,其中记录了数据库所做的变更。二进制日志中包含了所有INSERT、UPDATE、DELETE和DDL这些我们称之为二进制日志事件的操作。这些事件会按照数据库变更发生的顺序依次写入日志中。

StorageTapper读取这些事件,用Apache Avro的格式对其进行编码,并将它们发送至Apache Kafka。每条二进制日志事件都是Kafka中的一条消息,每条消息对应一整行表格的数据。由于发送到Apache Kafka的事件能反映出对原始数据库做变更的顺序,当我们将Kafka中的消息应用到另一个数据库时,就会获得与原始数据完全一致的副本。这个方法会比直接从MySQL转发数据到另一个数据库使用更少的计算资源。

保证质量

为了确保数据高质量,我们需要先使用schema来为数据湖中的数据集定义结构。Uber使用了一种内部的schema管理服务Schema-Service,它能保证数据湖中的每个数据集都有关联的schema,并且使schema的任何变更都遵从变更规则。这些变更规则保证了schema后向的兼容性,以避免影响到这类数据集的消费者。

Schema-Service使用Apache Avro的格式来存储schema并执行schema的变更。此schema通常是上游表schema的1:1展现。只要变更被接受为向后兼容,自助服务工具就能允许内部用户修改schema。一旦schema以Apache Avro的格式变更,一个DDL语句就会作用到表来改变实际的schema。

我们通过Schema编码过程将数据模式化(schematized)。schema执行库(heatpipe)会将数据模式化或编码,就像能对数据进行schema检查的瘦客户端。schema执行库还会向每个变更日志中添加元数据,使其全局标准化,不用考虑数据从哪儿来或是要写到哪里去。确保数据都有schema且schema都是最新的,意味着我们可以找到并使用数据湖中所有的数据。

image

图3:DBEvents的heatpipe库对数据进行编码,Schema-Service是所有schema的网关。这是实现将所有数据模式化的方法。

MySQL schema 执行案例

如上所述,用户可以通过Schema-Service请求变更MySQL的schema,这能使变更生效并保证它们是向后兼容的。如果请求成功,就可以使用新版本的schema。每当StorageTapper在MySQL二进制日志中读取ALERT TABLE语句时,它都会检测到这些schema的变更。这会触发StorageTapper开始用新的schema去处理未来的事件。

有效的资源利用

我们发现在较早的pipeline中有一些低效率的问题:

Hudi仅消费和应用上游表中更新的行和变更日志来提升我们用DBEvents使能的pipeline的效率。Hudi的设计使用增量更新来替代快照,会用更少的计算资源,从而可以改善许多低效率的问题。同时通过读取变更日志,Hudi不再需要加载整张表,因此可以减轻对上游数据源的压力。

图4清晰地描述了这些解决方案是如何在DBEvents的增量架构中协同工作的。在Uber,我们从不同的数据源中拉取数据。每个源都有一个自定义的实现去读取变更日志时间并提供增量变更。举个例子,MySQL的变更日志通过StorageTapper拉取并推送至Apache Kafka,而如前面提到的,Cassandra的变更日志则是通过Cassandra的变更数据捕获(CDC)功能结合Uber特有的集成能力来实现的。

image

图4:在DBEvents中,每种源类型都以统一的消息格式向Kafka中发送变更日志事件。

Marmaray 是Uber开源的、通用的数据获取和分发库。在较高的层面上,Marmaray为我们的DBEvents pipeline提供了下列功能,以提高整个架构的效率:

无论数据源是什么,单个摄取pipeline都会执行相同的有向无环图作业(DAG)。这个过程会依据特定的源来确定运行时的摄取行为,类似于策略设计模式

标准化变更日志事件

我们的目标之一是以一种能被其他内部数据消费者使用的方式(如流式作业和自定义的pipeline)来标准化变更日志事件。

在标准化DBEvents中的变更日志之前,我们需要先解决一些问题:

为了回答DBEvents用例的这些问题,我们定义了一组Apache Hadoop的元数据标题并添加到每个Kafka的消息中。通过这种设计,元数据和数据都能使用heatpipe(使用Apache Avro)进行编码并通过Kafka进行传输。这使我们能标准化一个能被这类事件所有消费者使用的全局的元数据集。这个元数据单独描述每个更新,以及这些更新与之前的更新有怎样的联系。元数据也会遵循schema规则被写入Apache Hive中一个名为MetadataStruct的特殊列。之后用户就可以轻松地查询MetadataStruct来获取有关行状态的更详细的信息。

下面,我们会重点介绍在事件中标准化的一些关键元数据字段:

Hadoop元数据字段
元数据字段 描述
Row Key Row Key字段是每个源表的唯一键,用于标识行,并根据结果合并部分变更日志。
Reference Key Reference Key是收到的变更日志的版本,这个版本必须是单调递增的。该键可以用来判断此数据是否是某个特定行最近的更新。
Changelog Columns Changelog Columns字段是一个数组<record{“name”:string, “ref_key”:long, “Hadoop_Changelog_Fields”:array}> ,它包含了column names、ref_key和all_changed_fieldnames,会在当前的消息事件中被更新。
Source Source字段反映了生成变更日志的源表的类型。比如Apache Kafka、Schemaless、Apache Cassandra和MySQL。
Timestamp Timestamp字段以毫秒为单位标记事件的创建时间。Timestamp有多种用途,最主要的是用于监控时延和完整性。(我们所说的事件的创建是指,当StorageTapper这类数据模式化服务在把事件推向Kafka前,实际模式化该事件的时间点。)
isDeleted [True/False]。这是一个布尔值,用于支持Hive表中row_key的删除。
Error Exception Error Exception是一个字符串,用于捕获在发送当前的变更日志时遇到的异常或问题(无错时是null)。如果源数据有任何schema的问题,Error Exception会反映出收到的异常,之后就能用来追踪源的问题或修复/重发消息。
Error Source Data Error Source Data是一个包含了实际数据源错误的字符串(无错时是null)。如果出现任何有问题的消息,我们就不能将这个字段获取到主表中,应将它移至相应的错误表。可以使用这个数据来和生产者一起进行修复。
ForceUpdate [True/False]。ForceUpdate是一个布尔值,它可以确保变更日志作用在已有数据之上。在许多情况下,比最新看到的ref_key更早的ref_key会被视为重复的并跳过。设置这个标志后,会忽略hadoop_ref_key字段直接应用变更日志。
Data Center Data Center字段指的是产生事件的原始数据中心。这个字段非常有利于追踪消息以及调试任何潜在的问题,尤其是在active-active或all-active架构下。Heatpipe会根据发布消息的数据中心自动填充这个值。

如上表所述,标准化的元数据使我们的架构具备鲁棒性和普适性。元数据提供了充足的信息使我们能完整地了解每个事件的状态。举个例子,如果事件的模式化或编码有任何问题,定义错误的字段就会被填充,如图5所示,我们就可以决定下一步采取什么操作。在Uber,我们会将错误以及造成问题的实际负载都一起写入错误表中。

image

图5:所有不符合schema标准的数据都会被写入DBEvents的错误表

错误表有很多用途:

下一步工作

有了DBEvents提供的增量变更流,我们就能为数据湖提供更快、更新、更高质量的数据。使用这些数据,才能确保Uber的服务更有效的运作,如车辆共享和Uber Eats。

在未来,我们还打算通过以下功能来强化这个项目:

致谢

特别感谢Reza Shiftehfar,Evan Richards,Yevgeniy Firsov,Shriniket Kale,Basanth Roy,Jintao Guan以及其他团队成员的贡献!

更多内容,请关注AI前线

image