时长09:17大小12.76M
你好,我是蔡元楠。
今天我要与你分享的主题是“Facebook 游戏实时流处理 Beam Pipeline 实战”。
Facebook 这个社交平台我相信你一定早有耳闻。它除了能够让用户发送消息给好友,分享自己的动态图片和视频之外,还通过自身的 App Center 管理着各式各样的小游戏。许多游戏开发商借助 Facebook 的好友邀请机制让自己的 App 火了一把。
曾经有一段时间,在 Facebook 上有一款名为糖果传奇(Candy Crush Saga)的游戏风靡了整个北美。各个年龄层的玩家都会在空闲的时间拿出手机,过五关斩六将,希望尽快突破更多的关卡,并且获得高分。
当然了,除了消除游戏本身带来的乐趣以外,可以在 Facebook 里和自己的好友进行积分排名比拼也是另外一个能吸引用户的地方。
想要一个类似 Facebook 这样的好友间积分排行榜,你可以有很多种实现方式以及各种优化方法。那么,如果我们要利用 Apache Beam 的话,该怎样实现一个类似的游戏积分排行榜呢?
今天我就来和你一起研究,要如何利用 Apache Beam 的数据流水线来实现一个我们自定义的简单游戏积分排行榜。
为了简化整个游戏积分排行榜案例的说明,我们先来做几个方面的假设:
有了这些假设,我们就一起来由浅入深地看看有哪些执行方案。
正如上一讲中所说,如果可以用简单的方法解决战斗,我们当然要避免将问题复杂化了。一种比较直观的做法就是使用 crontab 定时执行一个 Beam 数据流水线,将每周需要进行计算排名的开始时间点和结束时间点传入数据流水线中,过滤掉所有事件时间不在这个时间范围内的数据。
那么,具体要怎么做呢?
首先,我们先要定义一个类,来保存我们之前假设好用户上传的信息。
Java
class UserScoreInfo { String userId; Double score; Long eventTimestamp; public UserScoreInfo(String userId, Double score, Long eventTimestamp) { this.userId = userId; this.score = score; this.eventTimestamp = eventTimestamp; } public String getUserId() { return this.userId; } public Double getScore() { return this.score; } public Long getEventTimestamp() { return this.eventTimestamp; } }复制代码
这个类十分简单,构造函数需要传入的是用户 ID、游戏通关时的积分还有通关时间。
有了这个类之后,整个数据流水线的逻辑就可以围绕着这个类来处理,步骤大致如下:
在上面所描述的步骤中,第 5 步出现了一个叫 Composite Transform 的概念。
那么,什么是 Composite Transform 呢?其实 Composite Transform 并不是指一个具体的 Transform,而是指我们可以将多个不同的 Transforms 嵌套进一个类中,使得数据流水线更加模块化。具体做法是继承 PTransform 这个类,并且实现 expand 抽象方法来实现的。
用我们实现过的 WordsCount 来举例,我们可以将整个 WordsCount 数据流水线模块化成一个 Composite Transform,示例如下:
Java
public static class WordsCount extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> { @Override public PCollection<KV<String, Long>> expand(PCollection<String> lines) { PCollection<String> words = lines.apply( ParDo.of(new ExtractWordsFn())); PCollection<KV<String, Long>> wordsCount = words.apply(Count.<String>perElement()); return wordsCount; } }复制代码
在上面这个例子中,输入的参数是每一行字符串 PCollection
所以在第 5 步中,我们也可以自己定义一个 ExtractUserAndScore 的 Composite Transform 来实现上面所描述的多个不同的 Transforms。
好了,为了事先知道游戏积分排行榜中开始的边界时间和结束的边界时间,我们还需要自己实现一个 Options 接口。方法是继承 PipelineOptions 这个接口,具体如下所示:
Java
public interface Options extends PipelineOptions { @Default.String("1970-01-01-00-00") String getStartBoundary(); void setStartBoundary(String value); @Default.String("2100-01-01-00-00") String getEndBoundary(); void setEndBoundary(String value); }复制代码
这样开始的边界时间和结束的边界时间就都可以通过 Pipeline option 的参数传入。
例如,我们想要得到 2019 年 7 月 15 日至 2019 年 7 月 21 日这周的排行榜,那在运行数据流水线的时候,参数就可以按照“–startBoundary=2019-07-15-00-00 --etartBoundary=2019-07-21-00-00”传入了。
整个数据流水线的大致逻辑如下:
Java
final class LeaderBoard { static class UserScoreInfo { String userId; Double score; Long eventTimestamp; public UserScoreInfo(String userId, Double score, Long eventTimestamp) { this.userId = userId; this.score = score; this.eventTimestamp = eventTimestamp; } public String getUserId() { return this.userId; } public Double getScore() { return this.score; } public Long getEventTimestamp() { return this.eventTimestamp; } } private static DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd-HH-mm") .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("Asia/Shanghai"))); public static void main(String[] args) throws Exception { Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); Pipeline pipeline = Pipeline.create(options); final Instant startBoundary = new Instant(formatter.parseMillis(options.getStartBoundary())); final Instant endBoundary = new Instant(formatter.parseMillis(options.getEndBoundary())); pipeline .apply( BigtableIO.read() .withProjectId(projectId) .withInstanceId(instanceId) .withTableId("ScoreTable")) .apply("ConvertUserScoreInfo", ParDo.of(new ConvertUserScoreInfoFn())) .apply( "FilterStartTime", Filter.by((UserScoreInfo info) -> info.getTimestamp() > startBoundary.getMillis())) .apply( "FilterEndTime", Filter.by((UserScoreInfo info) -> info.getTimestamp() < endBoundary.getMillis())) .apply("RetrieveTop100Players", new ExtractUserAndScore()) .apply( FileIO.<List<String>>write() .via( new CSVSink(Arrays.asList("userId", "score")) .to("filepath") .withPrefix("scoreboard") .withSuffix(".csv"))); pipeline.run().waitUntilFinish(); } }复制代码
其中,ConvertUserScoreInfoFn 这个 Transform 代表着第 2 步转换操作,数据流水线中两个 Filter Transform 分别代表着第 3 和第 4 步。第 5 步“获得最高分的前 100 位用户”是由 ExtractUserAndScore 这个 Composite Transform 来完成的。
你可以看到,不算上各种具体 Transform 的实现,整个数据流水线的逻辑框架大概用 60 行代码就可以表示出来。
虽然这个批处理的方法可以用简单的逻辑得到最后我们想要的结果,不过其实它还存在着不少的不足之处。
因为我们的批处理数据流水线使用 crontab 来定时运行,所以“运行数据流水线的时间间隔”以及“完成数据流水线”这之间的时间之和会给最终结果带来延迟。
比如,我们定义 crontab 每隔 30 分钟来运行一次数据流水线,这个数据流水线大概需要 5 分钟完成,那在这 35 分钟期间用户上传到服务器的分数是无法反应到积分排行榜中的。
那么,有没有能够缩小延时的办法呢?
当然有,答案就是将输入数据作为无边界数据集读取进来,进行实时的数据处理。在这里面我们会运用的到第 23 讲所讲述到的窗口(Window)、触发器(Trigger)和累加模式(Accumulation)的概念。
我将在下一讲中,与你具体分析怎样运用 Beam 的数据流水线实现一个实时输出的游戏积分排行榜。
今天我们一起展开讨论了自己实现一个简易游戏积分排行榜的过程。可以知道的是,我们可以使用 Beam 的数据流水线来完成这一任务。而在 Beam 数据流水线的实现方式中,我们又可以分成批处理的实现方式和即将在下一讲中展开讨论的实时流处理的方式。批处理虽然简单,但是存在着延时性高、无法快速更新积分排行榜的缺点。
在今天这一讲的最后,我提示了你在实时流处理中需要用到窗口、触发器和累加模式。那我们就先来做个预热,思考一下,在流处理中你会对这三种概念赋予什么值呢?
欢迎你把答案写在留言区,与我和其他同学一起讨论。如果你觉得有所收获,也欢迎把文章分享给你的朋友。