软件开发架构师

如何利用Pravega的状态同步器解决分布式一致性问题

架构 118 2019-04-22 23:05

Pravega 是一个开源的分布式流存储平台。其中,StateSynchronizer 组件以 stream 为基础,对外提供一致性状态共享服务。StateSynchronizer 允许一组进程同时读写同一共享状态而不必担心一致性问题。本文以实现一个简单的共享字典应用为示例,演示 StateSynchronizer 相关 API 的使用。

API 示例

示例实现 1:SharedConfig(共享配置)

在深入 StateSynchronizer 的使用细节之前,先让我们看一个使用 StateSynchronizer 的示例。本章节示例全部来自 Pravega 官方文档 [1]。该示例的所有源代码都可以从 Pravega-Samples 的 GitHub 仓库 [2] 下载获取。这个示例程序使用 StateSynchronizer 实现了 Java 的Map(字典)数据结构,我们不妨将其称作“SharedMap(共享字典)”。我们以这个 SharedMap 为基础,实现了 SharedConfig。SharedConfig 允许一组进程一致性地读写一个共享的,由一组键 / 值对属性所构成的配置对象。除了示例代码之外,我们还提供了一个命令行小程序 SharedConfigCLI,让读者可以方便地体验一下这个 SharedConfig 应用。该示例的整体架构如图 1 所示。

如何利用Pravega的状态同步器解决分布式一致性问题-1

命令行小程序 SharedConfigCLI 所支持的所有命令如下:

  • GET_ALL: 打印 SharedConfig 中的所有属性。

  • GET {key}: 打印给定键的属性值。

  • PUT {key}, {value}: 用给定的键 / 值对更新 SharedConfig。如果存在旧值,则同时将其打印。

  • PUT_IF_ABCENT {key}, {value}: 用给定的键 / 值对更新 SharedConfig,当且仅当该属性不存在。

  • REMOVE {key} [, {currentValue}]: 将给定属性从 SharedConfig 中移除。如果给出了{currentValue}值,则仅当该属性当前值匹配时才进行移除操作。

  • REPLACE {key}, {newValue}, [, {currentValue}]: 更新属性值。如果给出了{ currentValue }值,则仅当该属性当前值匹配时才进行更新操作。

  • CLEAR: 从 SharedConfig 中移除所有的键。

  • REFRESH: 强制从共享状态进行同步。

  • HELP: 打印可用命令列表。

  • QUIT: 退出程序。

在安装好上述名为“Pravega-Samples”的项目之后,请使用相同的 scope 名字和 stream 名字启动两个 SharedConfigCLI 的实例,这让我们可以模拟两个不同的进程通过协调它们本地的 SharedConfig 副本来操作同一共享状态。读者可以按如下步骤进行操作,以便初步体会一下这个 SharedConfig 是如何被不同进程协调的。

步骤 进程 1 进程 2 讨论
1 GET_ALL GET_ALL 两个进程都看见一个空的 SharedConfig。
2 PUT p1, v1 进程 1 添加名为 p1 的属性
3 GET p1 GET p1 进程 1 看见属性 p1 的值为 v1
进程 2 此时并不能看见属性 p1。为什么?因为进程 2 还没有从共享状态刷新它的本地副本。
4 REFRESH 将进程 2 的本地副本与共享状态同步。
5 GET p1 进程 2 现在可以看见进程 1 在步骤 2 中所做的改动了。
6 REPLACE p1, newVal, v1 进程 2 尝试更改 p1 的值,但使用的是一个条件更改,这意味着只有当 p1 的旧值为 v1 时该更改才能生效(此时确实如此)。
7 GET p1 p1 的值现在变成 newVal 了。
8 REPLACE p1, anotherVal, v1 进程 1 尝试使用和进程 2 在步骤 6 中所用相同的条件更改。这次的更改会失败,因为 p1 此时的值已不再是 v1 了。
9 GET p1 步骤 8 中的失败更改会导致进程 1 的共享状态本地副本被更新。p1 此时的值已是 newVal 了。

表 1 两个进程同时操作同一个 SharedConfig [1]

读者可以重复类似的操作序列来探索 PUT_IF_ABCENT 和其它修改共享状态操作的语义。该示例背后蕴含的基本思想是:所有对 SharedConfig 的更改操作只有当它们作用于最新的状态值时才能成功。此处,我们使用乐观的并发控制在 SharedConfig 对象的不同消费者之间达到了高效的一致性。读者还可以创建多个 SharedConfig 状态对象同时运行,每个 SharedConfig 对象个体都使用基于不同 stream 的独立的 StateSynchronizer。

示例实现 2:SharedMap(共享字典)的基本结构

在上一个示例中,我们其实已经使用到了 SharedMap。StateSynchronizer 可以用于实现几乎所有数据结构的共享版本。或许你的应用只需要共享一个简单的整数计数器,那么我们可以用 StateSynchronizer 实现一个共享计数器。又或许你需要共享的数据是一个包含当前集群里所有服务器的一个集合,那么我们可以用 StateSynchronizer 实现一个共享集合。还有许许多多类似这样的可能性。现在,让我们详细讨论一下如何基于 StateSynchronizer 实现 SharedMap 以便进行对象共享。

创建 StateSynchronizer

StateSynchronizer也是 Pravega 客户端的类型之一,类似EventStreamReader或者EventStreamWriterStateSynchronizer通过ClientFactory接口创建。每个StateSynchronizer在其 scope 内必须具有唯一的名字。SynchronizerConfig可用于定制StateSynchronizer的行为(尽管在当前版本中,StateSynchronizer 尚不支持自定义行为)。StateSynchronizer利用 Java 的泛型机制,允许开发者指定基于特定类型的StateSynchronizer

共享 / 同步状态StateT

在设计一个使用 StateSynchronizer 的应用时,开发者必须确定需要被同步(共享)的状态类型。我们需要共享一个Map?一个Set?还是一个普通的 POJO 类?也就是说,我们需要共享什么样的数据结构。这将定义出 StateSynchronizer 的核心类型,即StateSynchronizer接口上的泛型类型StateTStateT对象可以是任何 Java 对象,只要它实现了 Pravega 的Revisioned接口。Revisioned是很简单的一个接口,它允许 Pravega 能够比较任意两个不同的StateT对象。

在我们的例子中,SharedMap 才是 StateSynchronizer 的真正使用者。它定义了一个简单的Map接口,提供get(key)set(key, value)等操作,就像一个常见的Map对象那样。它同时也按 StateSynchronizer 的要求实现了Revisioned接口,并且使用一个简单的ConcurrentHashMap作为内部Map的实现。所以,在我们的这个例子中,StateT就是SharedStateMap<K, V>

状态变更操作 Update 与 InitialUpdate

在一个 StateSynchronizer 应用中,除了类型StateT之外,还需要定义另外两个非常重要的类型:Update类型和InitialUpdate类型。Update代表了持久化在 stream 中的“增量”或者说是更新对象。InitialUpdate则是一个特殊的起始更新对象,用于启动 StateSynchronizer。UpdateInitialUpdate都是基于泛型类型StateT定义的。

StateSynchronizer 使用单 segment 的 stream 来存储对共享对象的更新操作。以UpdateInitialUpdate形式存在的更新对象,根据当前本地的共享状态副本是否相对于 stream 处于最新状态,被写入 stream 之中。如果检测到该更新操作将会作用于在共享状态的一个早期版本,则更新操作不会被应用。StateSynchronizer 自身在本地内存中维护了一份共享状态的副本,并且保存了该副本相关的版本元信息。通过getState()方法可以获取该本地状态副本。本地的状态副本有可能已经过期,应用程序可以通过调用fetchUpdates()方法来刷新状态副本,该操作会从 stream 获取所有在该版本之后产生的更新操作。绝大多数来自应用程序的更新操作都通过updateState()方法进行。updateState()方法接受一个函数对象作为参数。该函数对象会被调用并传入最新的共享状态,而函数本身则负责确定有哪些更新将作用于该状态。

在我们的例子中,InitialUpdate实现如下:

复制代码
/**
* Create a Map. This is used by StateSynchronizer to initialize shared state.
*/
private static class CreateState<K, V> implements InitialUpdate<SharedStateMap<K,V>>, Serializable {
private static final long serialVersionUID = 1L;
private final ConcurrentHashMap<K, V> impl;
public CreateState(ConcurrentHashMap<K, V> impl) {
this.impl = impl;
}
@Override
public SharedStateMap<K, V> create(String scopedStreamName, Revision revision) {
return new SharedStateMap<K, V>(scopedStreamName, impl, revision);
}
}

源代码 1 用于产生起始状态的特殊更新操作 InitialUpdate [1]

如源码所示,CreateState对象用于初始化 stream 中的共享对象,它将会创建一个新的,空SharedStateMap对象。读者可以把其他例子中的InitialUpdate想象成把计数器置 1,或者把一个集合初始化成仅包含数个固定元素。将诸如“initialize”和“update”的函数方法用类的形式来实现或许看起来有一些奇怪,但当你仔细思考其中的缘由时,你就会发现这其实非常合理。所有这些更新操作,例如 initialize 和 update,都需要被保存到 stream 中,因此它们必须被实现为可序列化的对象。我们必须允许一个客户端在任何时间点启动,计算出当前共享状态,并且随着更新操作(可能来自其它客户端)不断被写入到 stream 中,还能够维持当前最新状态。如果我们仅仅在 stream 中保存“最新的状态值”,那么就无法使用并发控制来提供并发读写的一致性了。

StateUpdate 抽象

Update类型看起来就更加怪异了。对于Map数据结构来说,并非只有一种更新操作,而是有各种各样的更新操作:更新一个键 / 值对,更新一组键 / 值对,删除一个键 / 值对,删除所有键 / 值对等等。每一类这样的更新操作都必须由一个对应的类实现。为此,我们定义了一个名为 StateUpdate 的抽象类,而所有其它更新操作的实现类都将继承自这个抽象类:

复制代码
/**
* A base class for all updates to the shared state. This allows for several different types of updates.
*/
private static abstract class StateUpdate<K,V> implements Update<SharedStateMap<K,V>>, Serializable {
private static final long serialVersionUID = 1L;
@Override
public SharedStateMap<K,V> applyTo(SharedStateMap<K,V> oldState, Revision newRevision) {
ConcurrentHashMap<K, V> newState = new ConcurrentHashMap<K, V>(oldState.impl);
process(newState);
return new SharedStateMap<K,V>(oldState.getScopedStreamName(), newState, newRevision);
}
public abstract void process(ConcurrentHashMap<K, V> updatableList);
}

源代码 2 所有更新操作的抽象基类 [1]

通过定义一个抽象类StateUpdate,我们可以基于该抽象类来定义其它的Update实现。抽象类实现了“applyTo”方法。StateSynchronizer 调用该方法将更新应用于当前的状态对象,并返回对应的更新后的状态对象。实际的更新工作是在旧状态的底层Map实现对象的一个拷贝上完成的:process()方法作用于实现对象上,并返回SharedStateMap的一个新版本,而这个新版本则使用process()方法处理后的实现对象作为内部状态。抽象类所定义的process()方法正是更新操作真正进行的位置。该方法由具体的更新操作实现类负责实现,例如 SharedMap 上的Put更新操作和PutAll更新操作。

基于 StateUpdate 抽象的 Put 操作

以下是 SharedMap 上的 Put(key, value) 操作的实现:

复制代码
/**
* Add a key/value pair to the State.
*/
private static class Put<K,V> extends StateUpdate<K,V> {
private static final long serialVersionUID = 1L;
private final K key;
private final V value;
public Put(K key, V value) {
this.key = key;
this.value = value;
}
@Override
public void process(ConcurrentHashMap<K, V> impl) {
impl.put(key, value);
}
}

源代码 3 Put 更新操作的实现 [1]

此处,process()方法用于将一个键 / 值对加入字典数据结构,如果对应的键已经存在,则更改对应的值。SharedMap 上的所有操作都是通过创建StateUpdate子类实例的方式进行的。

示例实现 3:在 SharedMap 上执行操作

SharedMap 的例子展示了StateSynchronizer上的典型操作。SharedMap 对外提供的 API 非常类似 Java 的Map<K, V>接口。通过使用StateUpdate的各个子类进行状态变更操作,SharedMap 以操作StateSynchronizer的形式实现了 Map 上的各种操作。

创建与初始化

以下源码将展示如何创建 SharedMap:

复制代码
/**
* Creates the shared state using a synchronizer based on the given stream name.
*
* @param clientFactory - the Pravega ClientFactory to use to create the StateSynchronizer.
* @param streamManager - the Pravega StreamManager to use to create the Scope and the Stream used by the StateSynchronizer
* @param scope - the Scope to use to create the Stream used by the StateSynchronizer.
* @param name - the name of the Stream to be used by the StateSynchronizer.
*/
public SharedMap(ClientFactory clientFactory, StreamManager streamManager, String scope, String name){
streamManager.createScope(scope);
StreamConfiguration streamConfig = StreamConfiguration.builder().scope(scope).streamName(name)
.scalingPolicy(ScalingPolicy.fixed( 1))
.build();
streamManager.createStream(scope, name, streamConfig);
this.stateSynchronizer = clientFactory.createStateSynchronizer(name,
new JavaSerializer<StateUpdate<K,V>>(),
new JavaSerializer<CreateState<K,V>>(),
SynchronizerConfig.builder().build());
stateSynchronizer.initialize( new CreateState<K,V>( new ConcurrentHashMap<K,V>()));
}

源代码 4 SharedMap 的创建 [1]

一个SharedMap实例是以定义 scope 和 stream 的形式创建出来的(在大多数情况下,对应的 scope 和 stream 可能已经存在,那么第 10-16 行通常为一个空操作)。StateSynchronizer对象本身是在第 18-21 行中由ClientFactory创建的,非常类似 reader 和 writer 的创建。注意,可以为Update对象和InitialUpdate对象分别制定各自的序列化器。目前,SynchronizerConfig还仅仅只是一个空实现,因为 StateSynchronizer 在当前版本中尚无可用的配置项。

StateSynchronizer提供一个以InitialUpdate对象作为参数的initialize()方法。该方法在SharedMap的构造函数中被调用,以保证共享状态被正确地初始化。注意,在大多数情况下,SharedMap很可能是基于一个已经包含SharedMap共享状态的 stream 被创建。在这种情况下,调用initialize()方法也没有任何问题,因为initialize()方法不会修改 stream 中的共享状态。

读操作

所有的读操作,即不会改变共享状态的操作,例如:get(key)containsValue(value)等,都是基于StateSynchronizer的本地状态副本的。所有的这些操作都使用getState()方法获取本地共享状态副本,然后在该副本上的进行读取操作。StateSynchronizer的本地状态副本可能过期。在这种情况下,SharedMap 的客户端可能需要使用refresh()方法来强制StateSynchronizer从 stream 上的最新共享状态刷新本地状态,而该刷新操作是通过StateSynchronizer对象的fetchUpdates()方法完成的。

注意,用存在过期状态的可能性来换取更快的响应,这是设计决策层面的一种权衡。当然,我们也可以很容易地将读操作实现为在读取本地共享状态副本之前总是先做一次刷新操作。如果开发者可以预见到共享状态上会存在频繁更新,那么这也不失为一种有效的策略。在我们的这个示例中,我们假设 SharedMap 对象会被频繁读取,但并不会被频繁更新,因此选择了直接读取本地状态副本。

写(更新)操作

正如我们先前讨论的那样,所有的写操作都是由StateUpdate类的各个具体子类实现的:clear()操作使用StateUpdate的子类Clear来进行键 / 值对的删除,而put()操作则使用Put子类,如此种种。现在,让我们深入put()操作的实现来更加详细地讨论 StateSynchronizer 的编程。以下是put()操作的源码:

复制代码
/**
* Associates the specified value with the specified key in this map.
*
* @param key - the key at which the value should be found.
* @param value - the value to be entered into the map.
* @return - the previous value (if it existed) for the given key or null if the key did not exist before this operation.
*/
public V put(K key, V value){
final AtomicReference<V> oldValue = new AtomicReference<V>( null);
stateSynchronizer.updateState((state, updates) -> {
oldValue.set(state.get(key));
updates.add( new Put<K,V>(key,value));
});
return oldValue.get();
}

源代码 5 SharedMap 上 put 接口方法的实现 [1]

注意,传入给StateSynchronizerupdateState()方法的函数对象很可能被调用多次。将该函数作用于旧状态所得的结果只有当该状态是 stream 上的最新状态时才会被写出。如果由于竞争而导致乐观并发控制检查失败,那么操作会反复重试(从而导致传入的函数对象被调用多次)。在大多数情况下,只需要很少的几次重试就可以完成对应操作了。在某些情况下,开发者也可以选择在调用updateState()方法之前先手动调用一次fetchUpdates(),从 stream 上同步最新的共享状态到StateSynchronizer。这其实是一种优化手段,在更新操作的预期频繁程度和更新操作的高效性之间做出权衡。如果你能预见到会有大量更新,那么就在调用updateState()方法之前先调用一次fetchUpdates()。在我们的示例中,我们假设不会有太多更新,因此选择让 StateSynchronizer 自己处理状态刷新。

删除操作

我们选择在实现删除操作的同时,利用 StateSynchronizer 的 compact 特性。我们的策略是,每执行 5 次remove()操作后,以及每次clear()操作后,都自动进行一次compact()调用。我们甚至还可以选择在每执行 5 次update()操作后也进行一次compct()调用,但目前我们想仅把compact()操作运用在删除操作上。

读者可以把compact()操作想象成 StateSynchronizer 的某种形式的“垃圾清除”机制。在一定数量的更新操作被应用到共享状态之后,一种更有效的形式是再写入一个新的起始状态。这个新的起始状态可以看作是所有写入 stream 的更新操作的累积表示。通过这种方法,所有早于该compact()操作的数据现在都可以被忽略,并最终从 stream 中清除。

如何利用Pravega的状态同步器解决分布式一致性问题-2

图 2 每个compact()操作都会创建一个新的起始状态 图片来自 [1]

如图 2 所示,compact()操作的结果就是将一个新的起始状态 Initial2 写入 stream。现在,所有比 Change3 更早的数据,包括 Change3 自身,都已经无用了,可以从 stream 中被作为垃圾清除掉。

Pravega 系列文章计划

Pravega 根据 Apache 2.0 许可证开源,0.4 版本已于近日发布。我们欢迎对流式存储感兴趣的大咖们加入 Pravega 社区,与 Pravega 共同成长。本篇文章为 Pravega 系列第六篇,系列文章标题如下(标题根据需求可能会有更新):

  1. 实时流处理 (Streaming) 统一批处理 (Batch) 的最后一块拼图:Pravega

  2. 开源 Pravega 架构解析:如何通过分层解决流存储的三大挑战?

  3. Pravega 应用实战:为什么云原生特性对流存储至关重要

  4. “ToB” 产品必备特性: Pravega 的动态弹性伸缩

  5. 取代 ZooKeeper!高并发下的分布式一致性开源组件 StateSynchronizer

  6. 分布式一致性解决方案 - 状态同步器 (StateSynchronizer) API 示例

  7. Pravega 的仅一次语义及事务支持

  8. 与 Apache Flink 集成使用

作者简介

  • 蔡超前:华东理工大学计算机应用专业博士研究生,现就职于 Dell EMC,6 年搜索和分布式系统开发以及架构设计经验,现从事流存储和流搜索相关的设计与研发工作。

  • 滕昱:现就职于 Dell EMC 非结构化数据存储部门 (Unstructured Data Storage)团队并担任软件开发总监。2007 年加入 Dell EMC 以后一直专注于分布式存储领域。参加并领导了中国研发团队参与两代 Dell EMC 对象存储产品的研发工作并取得商业上成功。从 2017 年开始,兼任 Streaming 存储和实时计算系统的设计开发与领导工作。

参考文献

[1] “Working with Pravega: State Synchronizer,” Dell EMC, [Online]. Available: https://github.com/pravega/pravega/blob/master/documentation/src/docs/state-synchronizer.md.

[2] “StateSynchronizer Related Samples in Pravega-Samples GitHub Repository,” [Online]. Available: https://github.com/pravega/pravega-samples/tree/master/pravega-client-examples/src/main/java/io/pravega/example/statesynchronizer.

更多内容,请关注 AI 前线

如何利用Pravega的状态同步器解决分布式一致性问题-3

文章评论