亞馬遜云科技Amazon MSK是Amazon云平臺(tái)提供的托管Kafka服務(wù)。在系統(tǒng)升級(jí)或遷移時(shí),用戶(hù)常常需要將一個(gè)Amazon MSK集群中的數(shù)據(jù)導(dǎo)出(備份),然后在新集群或另一個(gè)集群中再將數(shù)據(jù)導(dǎo)入(還原)。通常,Kafka集群間的數(shù)據(jù)復(fù)制和同步多采用Kafka MirrorMaker,但是,在某些場(chǎng)景中,受環(huán)境限制,兩個(gè)于Kafka集群之間的網(wǎng)絡(luò)可能無(wú)法連通,或者兩個(gè)亞馬遜云科技賬號(hào)相互隔離,亦或是需要將Kafka的數(shù)據(jù)沉淀為文件存儲(chǔ)以備他用。此時(shí),基于Kafka Connect S3 Source/Sink Connector的方案會(huì)是一種較為合適的選擇,本文就將介紹一下這一方案的具體實(shí)現(xiàn)。
?數(shù)據(jù)的導(dǎo)出、導(dǎo)入、備份、還原通常都是一次性操作,為此搭建完備持久的基礎(chǔ)設(shè)施并無(wú)太大必要,省時(shí)省力,簡(jiǎn)單便捷才是優(yōu)先的考量因素。為此,本文將提供一套開(kāi)箱即用的解決方案,方案使用Docker搭建Kafka Connect,所有操作均配備自動(dòng)化Shell腳本,用戶(hù)只需設(shè)置一些環(huán)境變量并執(zhí)行相應(yīng)腳本即可完成全部工作。這種基于Docker的單體模式可以應(yīng)對(duì)中小型規(guī)模的數(shù)據(jù)同步和遷移,如果要尋求穩(wěn)定、健壯的解決方案,可以考慮將Docker版本的Kafka Connect遷移到Kubernetes或Amazon MSK Connect,實(shí)現(xiàn)集群化部署。
?整體架構(gòu)
?首先介紹一下方案的整體架構(gòu)。導(dǎo)出/導(dǎo)入和備份/還原其實(shí)是兩種高度類(lèi)似的場(chǎng)景,但為了描述清晰,我們還是分開(kāi)討論。先看一下導(dǎo)出/導(dǎo)入的架構(gòu)示意圖:
?在這個(gè)架構(gòu)中,Source端的MSK是數(shù)據(jù)流的起點(diǎn),安裝了S3 Sink Connector的Kafka Connect會(huì)從Source端的MSK中提取指定Topic的數(shù)據(jù),然后以Json或Avro文件的形式存儲(chǔ)到S3上;同時(shí),另一個(gè)安裝了S3 Source Connector的Kafka Connect會(huì)從S3上讀取這些Json或Avro文件,然后寫(xiě)入到Sink端MSK的對(duì)應(yīng)Topic中。如果Source端和Sink端的MSK集群不在同一個(gè)Region,可以在各自的Region分別完成導(dǎo)入和導(dǎo)出,然后在兩個(gè)Region之間使用S3的Cross-Rejion Replication進(jìn)行數(shù)據(jù)同步。
?該架構(gòu)只需進(jìn)行簡(jiǎn)單的調(diào)整,即可用于MSK集群的備份/還原,如下圖所示:先將MSK集群的數(shù)據(jù)備份到S3上,待完成集群的升級(jí)、遷移或重建工作后,再?gòu)腟3上將數(shù)據(jù)恢復(fù)到新建集群即可。
?預(yù)設(shè)條件
?本文聚焦于Kafka Connect的數(shù)據(jù)導(dǎo)出/導(dǎo)入和備份/還原操作,需要提前準(zhǔn)備:
?一臺(tái)基于Amazon Linux2的EC2實(shí)例(建議新建純凈實(shí)例),本文所有的實(shí)操腳本都將在該實(shí)例上執(zhí)行,該實(shí)例也是運(yùn)行Kafka Connect Docker Container的宿主機(jī)。
?兩個(gè)MSK集群,一個(gè)作為Source,一個(gè)作為Sink;如果只有一個(gè)MSK集群也可完成驗(yàn)證,該集群將既作Source又作Sink。
?為聚焦Kafka Connect S3 Source/Sink Connector的核心配置,預(yù)設(shè)MSK集群沒(méi)有開(kāi)啟身份認(rèn)證(即認(rèn)證類(lèi)型為Unauthenticated),數(shù)據(jù)傳輸方式為PLAINTEXT,以便簡(jiǎn)化Kafka Connect的連接配置。
?網(wǎng)絡(luò)連通性上要求EC2實(shí)例能訪問(wèn)S3、Source端MSK集群、Sink端MSK集群。如果在實(shí)際環(huán)境中無(wú)法同時(shí)連通Source端和Sink端,則可以在兩臺(tái)分屬于不同網(wǎng)絡(luò)的EC2上進(jìn)行操作,但它們必須都能訪問(wèn)S3。如果是跨Region或賬號(hào)隔離,則另需配置S3 Cross-Region Replication或手動(dòng)拷貝數(shù)據(jù)文件。
?全局配置
?由于實(shí)際操作將不可避免地依賴(lài)到具體的亞馬遜云科技賬號(hào)以及本地環(huán)境里的各項(xiàng)信息(如AKSK,服務(wù)地址,各類(lèi)路徑,Topic名稱(chēng)等),為了保證本文給出的操作腳本具有良好的可移植性,將所有與環(huán)境相關(guān)的信息抽離出來(lái),以全局變量的形式在實(shí)操前集中配置。以下就是全局變量的配置腳本,讀者需要根據(jù)個(gè)人環(huán)境設(shè)定這些變量的取值:
?為了便于演示和解讀,本文將使用下面的全局配置,其中前6項(xiàng)配置與賬號(hào)和環(huán)境強(qiáng)相關(guān),仍需用戶(hù)自行修改,腳本中給出的僅為示意值,而后5項(xiàng)配置與MSK數(shù)據(jù)的導(dǎo)入導(dǎo)出息息相關(guān),不建議修改,因?yàn)楹罄m(xù)的解讀將基于這里設(shè)定的值展開(kāi),待完成驗(yàn)證后,您可再根據(jù)需要靈活修改后5項(xiàng)配置以完成實(shí)際的導(dǎo)入導(dǎo)出工作。
?回到操作流程,登錄準(zhǔn)備好的EC2實(shí)例,修改下面腳本中與賬號(hào)和環(huán)境相關(guān)的前6項(xiàng)配置,然后執(zhí)行修改后的腳本。此外,需要提醒注意的是:在后續(xù)操作中,部分腳本執(zhí)行后將不再返回,而是持續(xù)占用當(dāng)前窗口輸出日志或Kafka消息,因此需要新開(kāi)命令行窗口,每次新開(kāi)窗口都需要執(zhí)行一次這里的全局配置腳本。
?關(guān)于上述腳本中的后5項(xiàng)配置,有如下詳細(xì)說(shuō)明:
?我們就以腳本中設(shè)定的值為例,解讀一下這5項(xiàng)配置聯(lián)合起來(lái)將要實(shí)現(xiàn)的功能,同時(shí)也是本文將演示的主要內(nèi)容:
?在Source端的MSK集群上存在兩個(gè)名為source-topic-1和source-topic-2的Topic,通過(guò)安裝有S3 Sink Connector的Kafka Connect(Docker容器)將兩個(gè)Topic的數(shù)據(jù)導(dǎo)出到S3的指定存儲(chǔ)桶中,然后再通過(guò)安裝有S3 Source Connector的Kafka Connect(Docker容器,可以和S3 Source Connector共存為一個(gè)Docker容器)將S3存儲(chǔ)桶中的數(shù)據(jù)寫(xiě)入到Sink端的MSK集群上,其中原source-topic-1的數(shù)據(jù)將被寫(xiě)入sink-topic-1,原source-topic-2的數(shù)據(jù)將被寫(xiě)入sink-topic-2。
?特別地,如果是備份/還原場(chǎng)景,需要保持導(dǎo)出/導(dǎo)入的Topic名稱(chēng)一致,此時(shí),可直接刪除S3 Source Connector中以transforms開(kāi)頭的4項(xiàng)配置(將在下文中出現(xiàn)),或者將下面兩項(xiàng)改為:
?如果只有一個(gè)MSK集群,同樣可以完成本文的驗(yàn)證工作,只需將SOURCE_KAFKA_BOOTSTRAP_SEVERS和SINK_KAFKA_BOOTSTRAP_SEVERS同時(shí)設(shè)置為該集群即可,這樣,該集群既是Source端又是Sink端,由于配置中的Source Topics和Sink Topics并不同名,所以不會(huì)產(chǎn)生沖突。
?環(huán)境準(zhǔn)備
?安裝工具包
?在EC2上執(zhí)行以下腳本,安裝并配置jq,yq,docker,jdk,kafka-console-client五個(gè)必須的軟件包,可以根據(jù)自身EC2的情況酌情選擇安裝全部或部分軟件。建議使用純凈的EC2實(shí)例,完成全部的軟件安裝:
?創(chuàng)建S3存儲(chǔ)桶
?整個(gè)方案以S3作為數(shù)據(jù)轉(zhuǎn)儲(chǔ)媒介,為此需要在S3上創(chuàng)建一個(gè)存儲(chǔ)桶。Source端MSK集群的數(shù)據(jù)將會(huì)導(dǎo)出到該桶中并以Json文件形式保存,向Sink端MSK集群導(dǎo)入數(shù)據(jù)時(shí),讀取的也是存儲(chǔ)在該桶中的Json文件。
?在源MSK上創(chuàng)建Source Topics
?為了確保Topics數(shù)據(jù)能完整備份和還原,S3 Source Connector建議Sink Topics的分區(qū)數(shù)最好與Source Topics保持一致,如果讓MSK自動(dòng)創(chuàng)建Topic,則很有可能會(huì)導(dǎo)致Source Topics和Sink Topics的分區(qū)數(shù)不對(duì)等,所以,選擇手動(dòng)創(chuàng)建Source Topics和Sink Topics,并確保它們的分區(qū)數(shù)一致。以下腳本將創(chuàng)建source-topic-1和source-topic-2兩個(gè)Topic,各含9個(gè)分區(qū):
?在目標(biāo)MSK上創(chuàng)建Sink Topics
?原因同上,以下腳本將創(chuàng)建:sink-topic-1和sink-topic-2兩個(gè)Topic,各含9個(gè)分區(qū):
?制作Kafka Connect鏡像
?接下來(lái)是制作帶S3 Sink Connector和S3 Source Connector的Kafka Connect鏡像,鏡像和容器均以kafka-s3-syncer命名,以下是具體操作:
?配置并啟動(dòng)Kafka Connect
?鏡像制作完成后,就可以啟動(dòng)了Kafka Connect了。Kafka Connect有很多配置項(xiàng),需要提醒注意的是:在下面的配置中,使用的是Kafka Connect內(nèi)置的消息轉(zhuǎn)換器:JsonConverter,如果你的輸入/輸出格式是Avro或Parquet,則需要另行安裝對(duì)應(yīng)插件并設(shè)置正確的Converter Class。
?上述腳本執(zhí)行后,命令窗口將不再返回,而是會(huì)持續(xù)輸出容器日志,因此下一步操作需要新開(kāi)一個(gè)命令行窗口。
?
?配置并啟動(dòng)S3 Sink Connector
?在第5節(jié)的操作中,已經(jīng)將S3 Sink Connector安裝到了Kafka Connect的Docker鏡像中,但是還需要顯式地配置并啟動(dòng)它。新開(kāi)一個(gè)命令行窗口,先執(zhí)行一遍《實(shí)操步驟(1):全局配置》,聲明全局變量,然后執(zhí)行以下腳本:
?配置并啟動(dòng)S3 Source Connector
?同上,在第5節(jié)的操作中,已經(jīng)將S3 Source Connector安裝到了Kafka Connect的Docker鏡像中,同樣需要顯式地配置并啟動(dòng)它:
?至此,整個(gè)環(huán)境搭建完畢,一個(gè)以S3作為中轉(zhuǎn)媒介的MSK數(shù)據(jù)導(dǎo)出、導(dǎo)入、備份、還原鏈路已經(jīng)處于運(yùn)行狀態(tài)。
?
?測(cè)試
?現(xiàn)在,來(lái)驗(yàn)證一下整個(gè)鏈路是否能正常工作。首先,使用kafka-console-consumer.sh監(jiān)控source-topic-1和sink-topic-1兩個(gè)Topic,然后使用腳本向source-topic-1持續(xù)寫(xiě)入數(shù)據(jù),如果在sink-topic-1看到了相同的數(shù)據(jù)輸出,就說(shuō)明數(shù)據(jù)成功地從source-topic-1導(dǎo)出然后又導(dǎo)入到了sink-topic-1中,相應(yīng)的,在S3存儲(chǔ)桶中也能看到“沉淀”的數(shù)據(jù)文件。
?打開(kāi)Source Topic
?新開(kāi)一個(gè)命令行窗口,先執(zhí)行一遍《實(shí)操步驟(1):全局配置》,聲明全局變量,然后使用如下命令持續(xù)監(jiān)控source-topic-1中的數(shù)據(jù):
?打開(kāi)Sink Topic
?新開(kāi)一個(gè)命令行窗口,先執(zhí)行一遍《實(shí)操步驟(1):全局配置》,聲明全局變量,然后使用如下命令持續(xù)監(jiān)控sink-topic-1中的數(shù)據(jù):
?向Source Topic寫(xiě)入數(shù)據(jù)
?新開(kāi)一個(gè)命令行窗口,先執(zhí)行一遍《實(shí)操步驟(1:全局配置》,聲明全局變量,然后使用如下命令向source-topic-1中寫(xiě)入數(shù)據(jù):
?現(xiàn)象與結(jié)論
?執(zhí)行上述寫(xiě)入操作后,從監(jiān)控source-topic-1的命令行窗口中可以很快看到寫(xiě)入的數(shù)據(jù),這說(shuō)明Source端MSK已經(jīng)開(kāi)始持續(xù)產(chǎn)生數(shù)據(jù)了,隨后(約1分鐘),即可在監(jiān)控sink-topic-1的命令行窗口中看到相同的輸出數(shù)據(jù),這說(shuō)明目標(biāo)端的數(shù)據(jù)同步也已開(kāi)始正常工作。此時(shí),打開(kāi)S3的存儲(chǔ)桶會(huì)發(fā)現(xiàn)大量Json文件,這些Json是由S3 Sink Connector從source-topic-1導(dǎo)出并存放到S3上的,然后S3 Source Connector又讀取了這些Json并寫(xiě)入到了sink-topic-1中,至此,整個(gè)方案的演示與驗(yàn)證工作全部結(jié)束。
??清理
?在驗(yàn)證過(guò)程中,可能需要多次調(diào)整并重試,每次重試最好恢復(fù)到初始狀態(tài),以下腳本會(huì)幫助清理所有已創(chuàng)建的資源:
?小結(jié)
?本方案主要定位于輕便易用,在S3 Sink Connector和S3 Source Connector中還有很多與性能、吞吐量相關(guān)的配置,例如:s3.part.size, flush.size, s3.poll.interval.ms, tasks.max等,可以在實(shí)際需要自行調(diào)整,此外,Kafka Connect也可以方便地遷移到Kubernetes或Amazon MSK Connect中以實(shí)現(xiàn)集群化部署。