本文主要介紹了達達集團使用基于開源的Flink Stream SQL開發的Dada Flink SQL進行實時計算任務SQL化過程中的實踐經驗。
時間回到2018年,在數據平臺和數據團隊的共同努力下,我們已經有了完整的離線計算流程,完善的離線數倉模型,也上線了很多的數據產品和大量的數據報表。隨著業務的發展,我們也逐漸面臨著越來越多的實時計算方面的需求。隨著Flink在國內的逐漸流行,實時計算也越來越多地進入我們的視野。當時,Flink的SQL功能還不完善,大量數據開發需要的功能無法使用SQL表達。因此,我們的選擇和很多公司的選擇類似,通過對Flink的框架和API進行封裝,降低我們的數據開發人員進行實時任務開發的難度。針對這些需求我們計劃通過一些封裝,使得數據開發同學無需開發Java或者Scala代碼,專注于業務邏輯的開發。由于開發資源有限,我們傾向于通過引進一些開源的框架并進行定制性的開發來完成這個任務。通過一些調研,我們鎖定了袋鼠云的Flink Stream SQL(以下簡稱FSL)和Uber的AthenaX。對比后,FSL的豐富的插件、開發的活躍度和支持的相對完善對于我們更有吸引力。因此,我們引進了袋鼠云的FSL,并基于FSL開發了達達的SQL計算引擎Dada Flink SQL(以下簡稱DFL),并以此進行實時計算任務的SQL化。
首先介紹一下DFL的架構。DFL中的主要組件為launcher、core、source插件、sink插件、Flink Siddhi插件以及side插件,其中Flink Siddhi為我們根據開源的Flink Siddhi接入的基于Siddhi的規則引擎,后面我們會有專門的文章介紹Flink Siddhi相關的內容和我們做的封裝。launcher負責加載必要的source/side/sink插件,并將Flink program提交到Flink集群,支持session cluster模式和single job模式。core模塊負責解析SQL語句,生成SQLTree,并根據解析的source、sink、Flink Siddhi和side內容加載相應的插件,生成必要的組件并注冊進Flink TableEnvironment。之后,根據SQL是否使用了維表JOIN的功能 ,會選擇直接調用TableEnvironment.sqlUpdate()或者進行維表JOIN的處理。除維表JOIN之外,根據我們數據開發同學的需求,我們還加入了INTERVAL JOIN的支持。使用流程表示,DFL的整體流程如下圖所示。
2.1 Parser
DFL使用Parser來解析SQL語句,解析為相應的數據結構,并放入SqlTree進行管理以便后續使用。Parser定義了良好的接口,易于通過增加新的實現類來增加對新的SQL語法的支持。Parser的接口定義如下:
其中match用于判斷一個具體的Parser的實現能否實現對給定的SQL語句的解析,verifySyntax為我們新增加的接口功能,用于驗證給定SQL的語法是否正確,并將相關的錯誤信息放入errorInfo中供調用方使用,parserSql實現具體的SQL語法的解析工作。我們為IParser增加了很多的實現以實現新的功能,例如增加對Flink Siddhi的支持等。
2.2 維表JOIN
DFL中包含兩種維表JOIN的實現方式:ALL及SIDE方式。ALL方式會將需要JOIN的數據一次性讀取并緩存到Task的內存中,并可以設置定期刷新緩存;SIDE方式則在需要進行JOIN時從相應的數據源中讀取相應的數據,并根據設置決定是否將讀取到的數據緩存在內存中。ALL和SIDE模式相應的抽象類的定義分別為AllReqRow和AsyncReqRow,他們都實現了共同的接口ISideReqRow,ISideReqRow中定義了用于將事實表的數據和維表讀取的數據進行JOIN的方法Row fillData(Row input, Object sideInput)。AllReqRow和AsyncReqRow的定義分別如下:
可以看到其中使用了模板方法的設計模式。
AsyncSideReqRow主要提供了初始化LRU緩存,從LRU緩存中獲取數據以及從數據源或者LRU緩存中無法找到需要JOIN的數據時的默認處理方法。
開發DFL的過程中,根據一些業務相關的需求及簡化數據開發人員使用DFL的需要,我們在原生FSL的基礎上進行了大量的改進和擴展的工作,下面介紹一些我們在DFL上做的工作。
3.1 Flink HA模式下,SESSION模式提交任務超時
為了Flink任務有較好的容錯性,我們為Flink集群配置了基于ZooKeper的HA。出于任務管理和維護的需要,我們的一些Flink任務使用了session模式,在將這些任務遷移到DFL后,發現提交任務時,會報超時的錯誤。查閱Flink的官方文檔也沒有發現線索。后面經過我們的探索,發現了在YARN session模式下,配置了HA時,進行任務提交需要指定high-availability.cluster-id。添加了如下代碼后,SESSION模式下,任務可以正常提交了。
3.2 Kafka支持使用SQL關鍵字作為JSON的字段名
當在Flink中使用了SQL關鍵字作字段名時,即使將字段名用反引號包起來,依然會報如下的錯誤:
這個是Flink的bug,已經在1.10.1中作了修復,詳見這個issue:https://issues.apache.org/jira/browse/FLINK-16526。我們使用的版本為Flink 1.6.2,無法使用這個修復。我們的做法是支持將Kafka中JSON的字段名和引用這個JSON字段的列名作解耦,即在Flink SQL中使用指定的列名引用該JSON字段,而用于JSON解析的還是原始的JSON字段名。具體來說,我們在元數據系統中,支持為Kafka類型的表注冊一個可選的sourceName。如果注冊了sourceName,Flink Stream SQL將使用sourceName去JSON中解析對應的字段。
3.3 元數據整合
DFL上線后,通過添加必要的功能,使用純SQL開發已經滿足我們的很多實時任務開發的需求。但是在DFL運行一段時間后,我們注意到了管理各種上下游存儲的信息給我們的數據開發人員帶來的困擾。我們線上使用的存儲系統包括了Kafka、HBase、ElasticSearch、Redis和MySQL(之后又引入了ClickHouse)。這些數據源基本都是異構的,連接及用戶信息各異,而且在不同的任務中使用相同的數據源,每次都需要使用CREATE TABLE
元數據管理系統開發完成后,我們將Flink Stream SQL和元數據管理系統進行了深度集成。通過引入USE TABLE <> AS <> WITH ()的語法,我們的數據開發人員只需要將數據源在元數據管理系統中進行注冊 ,之后在Flink Stream SQL中引用注冊后的表就無需再填寫任何連接信息,而且如果需要引用所有的字段的話,也無需再填寫字段信息。如果不想要引用所有的子段,有兩種辦法可以做到。第一種方法是在USE TABLE的WITH里面使用columns表達需要引用的字段,第二種方法是在元數據系統里注冊一張只包含了要引用的字段的表。
3.4 Redis hash/set數據類型的支持
FSL已經內置了對Redis作為sink table和side table的支持,但是FSL只支持Redis的String類型的數據,而我們的場景會使用到Redis的hash和set類型的數據,因此我們需要添加對Redis這兩種數據類型的支持。首先介紹一下將Redis中的數據映射到Flink中的表的方法,在我們的Redis的key中包含了兩部分的內容(使用":"分隔),兩部分分別為固定的keyPrefix和由一到多個字段的值使用":"拼接的primaryKey,其中keyPrefix模擬表的概念,也方便Redis中存儲的內容的管理。對String類型的數據,Redis的key會在上面介紹的key的基礎上拼接上字段名稱(使用":"作為分隔符),并以字段的值作為該key對應的value寫入Redis中;對Hash類型的數據,Redis的完整的key就為上面介紹的key,hash的key則由用戶指定的字段的值使用":"拼接而成,類似的,hash的value由用戶指定的字段的值拼接而成。除了Redis hash和set數據類型的支持之外,我們還為Redis增加了setnx和hsetnx以及TTL的功能。
3.5 ClickHouse sink的支持
FSL內置了對Kafka、MySQL、Redis、Elasticsearch和HBbase等數據源作為目標表的支持,但是我們在使用的過程中也遇到了一些新的數據源作為目標寫入端的要求,為此我們開發了新的sink插件來支持這種需求。我們開發和維護的sink插件包括了ClickHouse和HdfsFile。下面以ClickHouse的sink為例介紹一下我們在這方面所做的一些工作。
對于ClickHouse,我們開發了實現了RichSinkFunction和CheckpointedFunction的ClickhouseSink。通過實現CheckpointedFunction并在snapshotState()方法中將數據刷寫到ClickHouse來確保數據不會丟失。為了處理不同的輸入數據類型,我們提供接口ClickhouseMapper
不同于通常情況下由用戶提供sink表的schema的方式,我們通過執行DESC
的方式從ClickHouse獲取表的schema。為了處理ClickHouse中的特殊數據類型,例如nullable(String),Int32等,我們使用正則表達式提取出實際的類型進行寫入,相關的代碼如下。為了寫入數據的過程不阻塞正常的數據處理流程,我們使用了將數據寫入任務放入線程池的方式。同時為了在Flink任務失敗的情況下不發生數據丟失的情況,在snapshotState()方法中等待線程池中的任務完成。
3.6 BINLOG表達的簡化
為了處理線上數據的更新,我們采用了阿里巴巴開源的Canal采集MySQL binlog并發送到Kafka的方式。由于binlog特殊的數據組織形式,處理binlog的數據需要做很多繁雜的工作,例如從binlog的columnValues或者updatedValues字段中使用udf取出實際增加或者更新的字段。由于我們將Flink Stream SQL和元數據系統進行了對接,因此我們可以拿到MySQL表的schema信息,從而我們可以提供語法封裝來幫助數據開發人員減少這種重復性的SQL表達。為此,我們引入一種新的SQL語法:USE BINLOG TABLE,這種語法的格式如下。
我們會將這種語法展開為如下的內容。
在DFL上線后,由于可以使用純SQL進行開發,符合數據開發同學的開發習慣,而且我們提供了很多的語法封裝,加上元數據管理帶來的便利,數據開發同學逐步將一些實時計算任務遷移到了DFL上,這為部門帶來了極大的效率提升。截止到目前,DFL已經應用到了達達集團的各個數據應用系統中,系統中運行的實時計算任務已經達到70多個,涵蓋達達快送、京東到家的各個業務及流量模塊,而且實時計算任務數量和SQL化占比還在穩步增加中。隨著大數據部門的計算基礎設施開放,現在我們的實時計算能力也在集團其它部門中得到了越來越廣泛的應用。
當前Flink的社區版本已經發展到了1.10,Flink Table/SQL本身已經支持了DFL提供的多數功能,出于降低維護組件復雜度的考慮,我們計劃后續引入Flink 1.10,并逐步推廣Flink 1.10的使用,以期最后將所有的任務都遷移到最新的Flink版本上。
公司內部在逐步推廣私有云的使用,考慮到社區在Flink on K8s上的進展,我們后續在引入新版本的Flink時,將嘗試在公司的私有云上進行部署。
作者簡介:馬陽陽 達達集團數據平臺高級開發工程師,負責達達集團計算引擎相關的維護和開發工作
前海粵十完成新一輪戰略融資
2433 閱讀樂歌股份預計2024年歸母凈利潤下降約50%,大力發展海外倉
2444 閱讀連續5年的“春節主力軍”,德邦為何如此穩?
1800 閱讀AI改變物流業的游戲規則:從炒作到實踐的深度思考
1304 閱讀CES 2025:NVIDIA OMNIVERSE驅動的智能倉儲數字孿生革命
1280 閱讀制造業企業,不要逼物流公司降價了!
1159 閱讀拼多多引領電商西進:帝王蟹進村,非遺剪紙出山
1171 閱讀2024年12月份中國出口集裝箱運輸市場分析報告
1139 閱讀菜鳥拆分為假消息,繼續大力發展全球物流業務
1085 閱讀全球海運市場動態(一月中旬至一月下旬)
1049 閱讀