![不仅仅是流计算:Apache Flink实践](https://wfqqreader-1252317822.image.myqcloud.com/cover/768/24045768/b_24045768.jpg)
Apache Flink类型和序列化机制简介
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0144_0001.jpg?sign=1738982043-sdrVZ7jeHVFuF0DmFprjfzExBgsdfTCz-0-d8adcc2f48a9f97c79f293a872ebcde0)
使用Apache Flink(以下简称Flink)编写处理逻辑时,新手总是容易被林林总总的概念所混淆:
为什么Flink有那么多的类型声明方式?
BasicTypeInfo.STRING TYPE INFO、Types.STRING 、Types.STRING()有何区别?
TypeInfoFactory又是什么?
TypeInformation.of和TypeHint是如何使用的呢?
接下来本文将逐步解密Flink的类型和序列化机制。
Flink的类型分类
Flink的类型系统源码位于org.apache.flink.api.common.typeinfo包,让我们对图1深入追踪,看一下类的继承关系图:
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0145_0001.jpg?sign=1738982043-pRNa0sJ0xyJEj2nskYO9LLouLlvPqeO1-0-68842e1c45a9c8679de3a6132522cad4)
图1:Flink类型分类
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0145_0002.jpg?sign=1738982043-tk83MQol99UH4xkHJy8kRUgWzuBQzVlI-0-e2887f7a05c926d0c6a4f39696424b6b)
图2:TypeInformation类继承关系图
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0146_0001.jpg?sign=1738982043-NJKjEIRJKFewKMwRkmthxOOPt8w5vXDt-0-24e631e3d9d0d12f0cb80f99bafe9840)
图3:使用.returns方法声明返回类型
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0147_0001.jpg?sign=1738982043-pzlBJR0YCcW1ZIWsVp1cuX08ptnqTWyb-0-9c0b42afcfc269b482042a47bd97aaee)
图4:Flink-ML注册子类类型信息
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0147_0002.jpg?sign=1738982043-gZPNhmcXMrOswjFzko2FSuzyO4rGOifF-0-5aa26c9b9c101dd4fd2a9b18aecd1d49)
图5:Flink允许注册自定义类型
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0148_0001.jpg?sign=1738982043-i37aKotjmbx88B8ghjKYzudMu9Mdyzy8-0-9a3f26bab838710bcd9e528934f46c49)
图6:class对象作为参数
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0148_0002.jpg?sign=1738982043-kqVLqE1UBb2PDCOIhQQhnsWKPyjZyltc-0-6756485586823c872c09d5fbd74536cc)
图7:TypeHint作为参数,保存泛型信息
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0148_0003.jpg?sign=1738982043-Hu1vn5FoGlW2TsDl8vrGOUkGFzYqGc8A-0-03cbc53623c2acb0895ee16bc258b521)
图8:BasicTypeInfo快捷方式
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0149_0001.jpg?sign=1738982043-oVUDth4XR9BvUFTEjSHbcfRmGvYha8eT-0-4f00633154c0cd14a9ddd9c5ff10856d)
图9:使用BasicTypeInfo快捷方式来声明一行(Row)每个字段的类型信息
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0149_0002.jpg?sign=1738982043-kQI9qrqHJEBb8c4JkaMunbu1ZHuxxPfd-0-925c86e621fcfbcdb7e55c01414c6921)
图10:Types类
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0149_0003.jpg?sign=1738982043-PspwDBjAHGyNGqMcPd15P14xL2JNa3X2-0-c4989234c036d67bbfd61dd1a367960c)
图11:flink-table模块的Types类
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0150_0001.jpg?sign=1738982043-9oO3AK7DWjaaECMRNi0wXdJU5O5J7jIU-0-2506b06e2b7ad2acdd984f9687f5b96d)
图12:为自定义类提供类型支持(图片未展示全部字段)
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0151_0001.jpg?sign=1738982043-OfCkHcUk8LmARhwhzjhTnEbikuwmPiyj-0-7b6401b781d289c9b0b20ef63f68a1ca)
图13:Flink自带的TypeSerializer子类概览
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0152_0001.jpg?sign=1738982043-iRiTXlStldTcUGsZrkqMcuDfiimoP0hT-0-782533c1df80c8e0c359abbe561f496d)
图14:为Kryo增加自定义的Serializer
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0152_0002.jpg?sign=1738982043-Kabcr0XtJWkQFLDNDbzaX1MAHgE0fruq-0-2df1a4cfa57452910b6df0b7dca1c72c)
图15:为Kryo增加自定义的Serializer
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0153_0001.jpg?sign=1738982043-JIMMVgdt3yOjr8qtD0mFN6StOGa9kIIR-0-3a1643d786784b8cf3faee6ce4a130d7)
图16:类型信息到内存块
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0153_0002.jpg?sign=1738982043-82MdEenFoZLL86dro2XRjPJCnKC9i26Z-0-ca5701c15b3117c5c2ddc05ff6ecc8cf)
图17:StringSerializer类的serialize()方法
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0154_0001.jpg?sign=1738982043-skDtA9ySWgVMhoB9zccpztTbZd3IvS43-0-30281aa0f9f039af0392f7e3f4c7eb34)
图18:String对象的序列化过程
可以看到,图1和图2是一一对应的,TypeInformation类是描述一切类型的公共基类,它和它的所有子类必须可序列化(Serializable),因为类型信息将会伴随Flink的作业提交,被传递给每个执行节点。
由于Flink自己管理内存,采用了一种非常紧凑的存储格式(见官方博文),因而类型信息在整个数据处理流程中属于至关重要的元数据。
TypeExtractror类型提取
Flink内部实现了名为TypeExtractror的类,可以利用方法签名、子类信息等蛛丝马迹,自动提取和恢复类型信息(当然也可以显式声明,即本文所介绍的内容)。
然而由于Java的类型擦除,自动提取并不是总是有效。因而一些情况下(例如通过URLClassLoader动态加载的类),仍需手动处理;例如下图中对DataSet变换时,使用.returns()方法声明返回类型。
这里需要说明一下,returns()接受三种类型的参数:字符串描述的类名(例如"String")、TypeHint(接下来会讲到,用于泛型类型参数)、Java原生Class(例如String.class)等;不过字符串形式的用法即将废弃,如果确实有必要,请使用Class.forName()等方法来解决。
下面是ExecutionEnvironment类的registerType方法,它可以向Flink注册子类信息(Flink认识父类,但不一定认识子类的一些独特特性,因而需要注册),下面是Flink-ML机器学习库代码的例子:
从下图可以看到,如果通过TypeExtractor.createTypeInfo(type)方法获取到的类型信息属于PojoTypeInfo及其子类,那么将其注册到一起;否则统一交给Kryo去处理,Flink并不过问(这种情况下性能会变差)。
声明类型信息的常见手段
通过TypeInformation.of()方法,可以简单地创建类型信息对象。
1.对于非泛型的类,直接传入Class对象即可
2.对于泛型类,需要借助TypeHint来保存泛型类型信息
TypeHint的原理是创建匿名子类,运行时TypeExtractor可以通过getGenericSuperclass(). getActualTypeArguments()方法获取保存的实际类型。
3.预定义的快捷方式
例如BasicTypeInfo,这个类定义了一系列常用类型的快捷方式,对于String、Boolean、Byte、Short、Integer、Long、Float、Double、Char等基本类型的类型声明,可以直接使用。
例如下面是对Row类型各字段的类型声明,使用方法非常简明,不再需要new XxxTypeInfo<>(很多很多参数)
当然,如果觉得BasicTypeInfo还是太长,Flink还提供了完全等价的Types类(org.apache.flink.api.common.typeinfo.Types):
特别需要注意的是,flink-table模块也有一个Types类(org.apache.flink.table.api.Types),用于table模块内部的类型定义信息,用法稍有不同。使用IDE的自动import时一定要小心:
4.自定义TypeInfo和TypeInfoFactory
通过自定义TypeInfo为任意类提供Flink原生内存管理(而非Kryo),可令存储更紧凑,运行时也更高效。
开发者在自定义类上使用@TypeInfo注解,随后创建相应的TypeInfoFactory并覆盖createTypeInfo方法。
注意需要继承TypeInformation类,为每个字段定义类型,并覆盖元数据方法,例如是否是基本类型(isBasicType)、是否是Tuple(isTupleType)、元数(对于一维的Row类型,等于字段的个数)等等,从而为TypeExtractor提供决策依据。
更多示例,请参考Flink源码的org/apache/flink/api/java/typeutils/TypeInfoFactoryTest.java
TypeSerializer
Flink自带了很多TypeSerializer子类,大多数情况下各种自定义类型都是常用类型的排列组合,因而可以直接复用:
如果不能满足,那么可以继承TypeSerializer及其子类以实现自己的序列化器。
Kryo序列化
对于Flink无法序列化的类型(例如用户自定义类型,没有registerType,也没有自定义TypeInfo和TypeInfoFactory),默认会交给Kryo处理。
如果Kryo仍然无法处理(例如Guava、Thrift、Protobuf等第三方库的一些类),有以下两种解决方案:
1.可以强制使用Avro来替代Kryo:
env.getConfig().enableForceAvro(); // env代表ExecutionEnvironment对象,下同
2.为Kryo增加自定义的Serializer以增强Kryo的功能:
env.getConfig().addDefaultKryoSerializer(Class<? > type, Class<? extends Serializer<? >> serializerClass
以及
env.getConfig().registerTypeWithKryoSerializer(Class<? > type, T serializer)
如果希望完全禁用Kryo(100% 使用Flink的序列化机制),则可以使用以下设置,但注意一切无法处理的类都将导致异常:
env.getConfig().disableGenericTypes();
类型机制的陷阱与缺陷
金无足赤,人无完人。Flink内置的类型系统虽然强大而灵活,但仍然有一些需要注意的点:
1.Lambda函数的类型提取
由于Flink类型提取依赖于继承等机制,而lambda函数比较特殊,它是匿名的,也没有与之相关的类,所以其类型信息较难获取。
Eclipse的JDT编译器会把lambda函数的泛型签名等信息写入编译后的字节码中,而对于javac等常见的其他编译器,则不会这样做,因而Flink就无法获取具体类型信息了。
2.Kryo的JavaSerializer在Flink下存在Bug
推荐使用org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer而非
com.esotericsoftware.kryo.serializers.JavaSerializer以防止与Flink不兼容。
类型机制与内存管理
下面以StringSerializer为例,来看下Flink是如何紧凑管理内存的:
下面是具体的序列化过程:
可以看到,Flink对于内存管理是非常细致的,层次分明,代码也容易理解。