加入星計(jì)劃,您可以享受以下權(quán)益:

  • 創(chuàng)作內(nèi)容快速變現(xiàn)
  • 行業(yè)影響力擴(kuò)散
  • 作品版權(quán)保護(hù)
  • 300W+ 專業(yè)用戶
  • 1.5W+ 優(yōu)質(zhì)創(chuàng)作者
  • 5000+ 長(zhǎng)期合作伙伴
立即加入
  • 正文
    • 一、組件介紹
    • 二、測(cè)試預(yù)期
    • 三、環(huán)境安裝
    • 三、工程配置
    • 四、工程測(cè)試
    • 五、加入學(xué)習(xí)
  • 推薦器件
  • 相關(guān)推薦
  • 電子產(chǎn)業(yè)圖譜
申請(qǐng)入駐 產(chǎn)業(yè)圖譜

分庫分表數(shù)據(jù),如何同步到Elasticsearch,提供聚合查詢?

03/25 13:09
3215
閱讀需 29 分鐘
加入交流群
掃碼加入
獲取工程師必備禮包
參與熱點(diǎn)資訊討論

作者:小傅哥,博客:https://bugstack.cn

本文的宗旨在于通過簡(jiǎn)單干凈實(shí)踐的方式教會(huì)讀者,配置出一套 Canal 工具服務(wù),來同步分庫分表的數(shù)據(jù)到 Elasticsearch 文件夾系統(tǒng)中。同時(shí)在 SpringBoot 工程中,配置出兩套數(shù)據(jù)源,一套是 MySQL + MyBatis,一套是 Elasticsearch + MyBatis?!具@是非常重要的設(shè)計(jì)手段】

雖然現(xiàn)在有 TiDB 這樣的分布式數(shù)據(jù)庫,但對(duì)于分庫分表 + 數(shù)據(jù)同步ES,依然是非常主流的方案。同時(shí)也有一部分是把分庫分表的數(shù)據(jù)同步到 TiDB 使用。

本文涉及的工程:

    xfg-dev-tech-canal:https://gitcode.net/KnowledgePlanet/road-map/xfg-dev-tech-canaldocs/dev-ops/xfg-dev-tech-canal-docker-compose.yml:提供了所需的環(huán)境安裝,mysql、canal-server、canal-adapter、elasticsearch、kibanaGithub:https://github.com/alibaba/canal

一、組件介紹

canal ,譯為水道/管道/溝渠,主要用途是基于 MySQL 數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱和消費(fèi)。

早期阿里巴巴因?yàn)楹贾莺兔绹p機(jī)房部署,存在跨機(jī)房同步的業(yè)務(wù)需求,實(shí)現(xiàn)方式主要是基于業(yè)務(wù) trigger 獲取增量變更。從 2010 年開始,業(yè)務(wù)逐步嘗試數(shù)據(jù)庫日志解析獲取增量變更進(jìn)行同步,由此衍生出了大量的數(shù)據(jù)庫增量訂閱和消費(fèi)業(yè)務(wù)。

它的工作原理是,canal 模擬 MySQL slave 的交互協(xié)議,偽裝自己為 MySQL slave ,向 MySQL master 發(fā)送dump 協(xié)議。在 MySQL master 收到 dump 請(qǐng)求,開始推送 binary log 給 slave (即 canal ) 這樣 canal 再解析 binary log (binlog)進(jìn)行配置分發(fā),同步到 Elasticsearch 等系統(tǒng)中進(jìn)行使用。

那么有了 canal 就可以把分庫分表的數(shù)據(jù)同步到 Elasticsearch,提供匯總查詢和聚合操作,也就不需要把輪訓(xùn)每個(gè)分庫分表數(shù)據(jù)了。

二、測(cè)試預(yù)期

本文的案例會(huì)把MySQL,2庫4表的數(shù)據(jù),通過 Sharding 分庫分表寫入數(shù)據(jù)后,同步到 Elasticsearch。分庫分表如下(環(huán)境安裝中會(huì)自動(dòng)安裝數(shù)據(jù)庫和設(shè)置庫表);

三、環(huán)境安裝

為了讓讀者伙伴更加簡(jiǎn)單的學(xué)習(xí)到這一項(xiàng)方案技能,小傅哥這里把所需的環(huán)境都配置成一整套的 docker compose 腳本文件(ARM、AMD),你只要執(zhí)行安裝即可。安全前注意,無論是本機(jī)還是云服務(wù)器都需要安裝 docker-ce

1. 環(huán)境腳本

    • 打開 xfg-dev-tech-canal 工程,下面就是 docker compose 的執(zhí)行腳本。mac/windows 如果安裝了 docker 可以直接點(diǎn)擊如圖的三角號(hào)安裝。如果是在 Linux 安裝了 docker 可以把 dev-ops 整個(gè)文件夾都上傳到云服務(wù)器,之后通過腳本;

docker-compose -f xfg-dev-tech-canal-docker-compose.yml up -d

    進(jìn)行安裝。
1.1 開啟 binlog

mysql 數(shù)據(jù)同步需要?jiǎng)?chuàng)建一個(gè) canal 的賬戶,之后還需要開啟 binlog 日志

    在 mysql 配置文件夾中,設(shè)置了初始化授權(quán)的賬戶、導(dǎo)入的庫表,以及開啟 mysql-bin 和配置要采集的庫。如果你有配置自己其他的庫要同步也可以如此配置。
1.2 庫表采集配置

    本文選擇的是 es 同步方式,所以需要在 canal-adapter 中 es7 文件夾添加同步的庫表 yml 配置。以及在 application.yml 中配置出需要鏈接的庫表以及同步的目標(biāo)地址,也就是 es 的地址?!疽?yàn)楸疚牡陌咐窃谕粋€(gè) docker compose 下安裝,所以直接用名稱 elsticsearch 即可訪問】

2. 運(yùn)行狀態(tài)

    安裝完成后可以進(jìn)入 portainer 查看各個(gè)組件的運(yùn)行,如果有哪個(gè)運(yùn)行失敗了,可以點(diǎn)擊那個(gè)小文件的圖標(biāo),它可以查看日志。

3. 創(chuàng)建索引

在 doc/dev-ops/curl 下提供了創(chuàng)建 Elasticsearch 的腳本;你可以點(diǎn)擊執(zhí)行或者直接復(fù)制執(zhí)行,也可以復(fù)制導(dǎo)入到 ApiPost 里執(zhí)行。

以上這些腳本是為了創(chuàng)建出數(shù)據(jù)庫表同步到 Elasticsearch 后對(duì)應(yīng)的索引和映射的字段。文章下面會(huì)用到。

3.1 創(chuàng)建
curl?-X?PUT?"127.0.0.1:9200/xfg_dev_tech.user_order"?-H?'Content-Type:?application/json'?-d'
{
????"mappings":?{
??????"properties":?{
????????"_user_id":{"type":?"text"},
????????"_user_name":{"type":?"text"},
????????"_order_id":{"type":?"text"},
????????"_uuid":{"type":?"text"},
????????"_create_time":{"type":?"date"},
????????"_update_time":{"type":?"date"}
??????}
????}
}'
3.2 添加
curl?-X?PUT?"127.0.0.1:9200/xfg_dev_tech.user_order/_mapping"?-H?'Content-Type:?application/json'?-d'
{
??"properties":?{
????"_sku_name":?{
??????"type":?"text"
????}
??}
}'
3.3 刪除
curl?-X?DELETE?"127.0.0.1:9200/xfg_dev_tech.user_order"

4. 創(chuàng)建索引(Kibana)

4.1 索引管理

地址:http://127.0.0.1:5601/app/management/kibana/indexPatterns

    填寫完,點(diǎn)擊創(chuàng)建索引模式即可。
4.2 數(shù)據(jù)頁面

地址:http://127.0.0.1:5601/app/discover

    等后面同步數(shù)據(jù)了以后,直接在這里點(diǎn)刷新就可以看到了。

5. 許可證

kibana 提供了免費(fèi)30天的試用許可,安裝后可以使用 x-pack-sql-jdbc。它的好處是可以讓我們通過 MyBatis 的方式查詢 Elasticsearch 數(shù)據(jù)。

地址:http://127.0.0.1:5601/app/management/stack/license_management

Elasticsearch 提供了 x-pack-sql-jdbc,讓對(duì) Elasticsearch 的查詢也可以像使用 MySQL 數(shù)據(jù)庫一樣通過 MyBatis 進(jìn)行查詢。但這個(gè) x-pack-sql-jdbc 是付費(fèi)的,免費(fèi)可以使用 30 天。之后你可以選擇使用重新安裝,破解,或者使用 Elasticsearch 的查詢方式。還可以自己開發(fā)一個(gè) Elasticsearch JDBC,GitHub 上也有類似的組件。

使用時(shí)需要引入 POM 配置;

<!-- https://mvnrepository.com/artifact/org.elasticsearch.plugin/x-pack-sql-jdbc -->
<dependency>
    <groupId>org.elasticsearch.plugin</groupId>
    <artifactId>x-pack-sql-jdbc</artifactId>
    <version>7.17.14</version>
</dependency>

三、工程配置

本節(jié)涉及到了簡(jiǎn)明教程中所講解的 Sharding 分庫分表的使用,因?yàn)槲覀冃枰逊謳旆直淼臄?shù)據(jù)通過 canal 同步到 Elasticsearch。(也可以使用其他分庫分表組件)

在工程中配置一套 Sharding 分庫分表映射的 MyBatis MyBatis,在配置一套 Elasticsearch x-pack-sql-jdbc 數(shù)據(jù)源映射的 MyBatis Mapper。這樣可以讀寫分別走自己設(shè)定好的 Mapper 對(duì)象了。

1. 創(chuàng)建數(shù)據(jù)源

@Configuration
public?class?DataSourceConfig?{

????@Configuration
????@MapperScan(basePackages?=?"cn.bugstack.xfg.dev.tech.infrastructure.dao.elasticsearch",?sqlSessionFactoryRef?=?"elasticsearchSqlSessionFactory")
????static?class?ElasticsearchMyBatisConfig?{

????????@Bean("elasticsearchDataSource")
????????@ConfigurationProperties(prefix?=?"spring.elasticsearch.datasource")
????????public?DataSource?igniteDataSource(Environment?environment)?{
????????????return?new?EsDataSource();
????????}

????????@Bean("elasticsearchSqlSessionFactory")
????????public?SqlSessionFactory?elasticsearchSqlSessionFactory(DataSource?elasticsearchDataSource)?throws?Exception?{
????????????SqlSessionFactoryBean?factoryBean?=?new?SqlSessionFactoryBean();
????????????factoryBean.setDataSource(elasticsearchDataSource);
????????????factoryBean.setMapperLocations(new?PathMatchingResourcePatternResolver().getResources("classpath:/mybatis/mapper/elasticsearch/*.xml"));
????????????return?factoryBean.getObject();
????????}
????}

????@Configuration
????@MapperScan(basePackages?=?"cn.bugstack.xfg.dev.tech.infrastructure.dao.mysql",?sqlSessionFactoryRef?=?"mysqlSqlSessionFactory")
????static?class?MysqlMyBatisConfig?{

????????@Bean("mysqlDataSource")
????????@ConfigurationProperties(prefix?=?"spring.mysql.datasource")
????????public?DataSource?mysqlDataSource(Environment?environment)?{
????????????return?DataSourceBuilder.create()
????????????????????.url(environment.getProperty("spring.mysql.datasource.url"))
????????????????????.driverClassName(environment.getProperty("spring.mysql.datasource.driver-class-name"))
????????????????????.build();
????????}

????????@Bean("mysqlSqlSessionFactory")
????????public?SqlSessionFactory?mysqlSqlSessionFactory(DataSource?mysqlDataSource)?throws?Exception?{
????????????SqlSessionFactoryBean?factoryBean?=?new?SqlSessionFactoryBean();
????????????factoryBean.setDataSource(mysqlDataSource);
????????????factoryBean.setMapperLocations(new?PathMatchingResourcePatternResolver().getResources("classpath:/mybatis/mapper/mysql/*.xml"));
????????????return?factoryBean.getObject();
????????}
????}

}
    ElasticsearchMyBatisConfig 使用 EsDataSource 創(chuàng)建數(shù)據(jù)源和映射 MyBatis Mapper 文件。MysqlMyBatisConfig 使用 DataSourceBuilder 創(chuàng)建 Sharding 提供的數(shù)據(jù)源和映射 MyBatis Mapper 文件。

2. Mapper 映射

<?xml?version="1.0"?encoding="UTF-8"?>
<!DOCTYPE?mapper?PUBLIC?"-//mybatis.org//DTD?Mapper?3.0//EN"?"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper?namespace="cn.bugstack.xfg.dev.tech.infrastructure.dao.elasticsearch.IElasticsearchUserOrderDao">

????<resultMap?id="dataMap"?type="cn.bugstack.xfg.dev.tech.infrastructure.po.UserOrderPO">
????????<result?column="_user_id"?property="userId"/>
????????<result?column="_user_name"?property="userName"/>
????????<result?column="_order_id"?property="orderId"/>
????????<result?column="_sku_name"?property="skuName"/>
????????<result?column="_update_time"?property="updateTime"/>
????????<result?column="_create_time"?property="createTime"/>
????</resultMap>

????<select?id="selectByUserId"?resultMap="dataMap">
????????select?_user_id,?_user_name,?_order_id,?_sku_name
????????from?"xfg_dev_tech.user_order"
????????order?by?_update_time
????????limit?10
????</select>

</mapper>

這個(gè)是 Elasticsearch 映射的 Mapper 文件,映射的字段就是前面安裝環(huán)境的時(shí)候設(shè)置的索引和字段?,F(xiàn)在你使用 Elasticsearch 就不用在工程中硬編碼查詢語句了,變得非常方便。

四、工程測(cè)試

1. 寫入數(shù)據(jù)

@Test
public?void?test_insert()?throws?InterruptedException?{
????for?(int?i?=?0;?i?<?3;?i++)?{
????????UserOrderPO?userOrderPO?=?UserOrderPO.builder()
????????????????.userName("小傅哥")
????????????????.userId("xfg_"?+?RandomStringUtils.randomAlphabetic(6))
????????????????.userMobile("+86?13521408***")
????????????????.sku("13811216")
????????????????.skuName("《手寫MyBatis:漸進(jìn)式源碼實(shí)踐》")
????????????????.orderId(RandomStringUtils.randomNumeric(11))
????????????????.quantity(1)
????????????????.unitPrice(BigDecimal.valueOf(128))
????????????????.discountAmount(BigDecimal.valueOf(50))
????????????????.tax(BigDecimal.ZERO)
????????????????.totalAmount(BigDecimal.valueOf(78))
????????????????.orderDate(new?Date())
????????????????.orderStatus(0)
????????????????.isDelete(0)
????????????????.uuid(UUID.randomUUID().toString().replace("-",?""))
????????????????.ipv4("127.0.0.1")
????????????????.ipv6("2001:0db8:85a3:0000:0000:8a2e:0370:7334".getBytes())
????????????????.extData("{"device":?{"machine":?"IPhone?14?Pro",?"location":?"shanghai"}}")
????????????????.build();
????????userOrderDao.insert(userOrderPO);
????????Thread.sleep(100);
????}
}
    循環(huán)插入3條數(shù)據(jù),按需你可以設(shè)置更多條數(shù)據(jù)。這里的用戶編號(hào) userId 是隨機(jī)的,也是切分鍵的 ID,所以會(huì)在不同的庫表寫入數(shù)據(jù)。

2. 數(shù)據(jù)驗(yàn)證

    MySQL:http://127.0.0.1:8899/ docker compose 配置的管理后臺(tái),可以 root/123456 登錄Kibana:http://127.0.0.1:5601/app/discover 查詢寫入的數(shù)據(jù)。

3. 查詢數(shù)據(jù)

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public?class?UserOrderDaoTest?{

????@Resource
????private?IElasticsearchUserOrderDao?userOrderDao;

????@Test
????public?void?test()?{
????????List<UserOrderPO>?userOrderPOS?=?userOrderDao.selectByUserId();
????????log.info("測(cè)試結(jié)果:{}",?JSON.toJSONString(userOrderPOS));
????}

}

    通過 Elasticsearch 走 x-pack-sql-jdbc 的方式再把數(shù)據(jù)查詢出來。

五、加入學(xué)習(xí)

小傅哥是一個(gè)大廠的架構(gòu)師,經(jīng)常會(huì)帶著伙伴們,卷這些實(shí)際場(chǎng)景中非常有必要的技術(shù)。也會(huì)帶著伙伴實(shí)戰(zhàn)項(xiàng)目,這些項(xiàng)目也都是來自于互聯(lián)網(wǎng)大廠中真實(shí)的業(yè)務(wù)場(chǎng)景,所有學(xué)習(xí)這樣的項(xiàng)目無論是實(shí)習(xí)、校招、社招,都是有非常強(qiáng)的競(jìng)爭(zhēng)力。別人還在玩玩具,而你已經(jīng)漲能力!

這樣的項(xiàng)目學(xué)習(xí)在小傅哥星球「碼農(nóng)會(huì)鎖」有8個(gè),每個(gè)都是從0到1開發(fā)并提供簡(jiǎn)歷模板和面試題,并且還在繼續(xù)開發(fā),后續(xù)還將有更多!價(jià)格嘎嘎實(shí)惠,早點(diǎn)加入,早點(diǎn)提升自己。項(xiàng)目地址:https://gaga.plus

推薦器件

更多器件
器件型號(hào) 數(shù)量 器件廠商 器件描述 數(shù)據(jù)手冊(cè) ECAD模型 風(fēng)險(xiǎn)等級(jí) 參考價(jià)格 更多信息
LMK62A2-100M00SIAR 1 Texas Instruments 100-MHz, LVDS ±50 ppm, high-performance, low-jitter, standard oscillator 6-QFM -40 to 85
暫無數(shù)據(jù) 查看
TCAN4550RGYRQ1 1 Texas Instruments Automotive system basis chip (SBC) with integrated CAN FD controller &amp; transceiver 20-VQFN -40 to 125

ECAD模型

下載ECAD模型
暫無數(shù)據(jù) 查看
AT27C256R-70PU 1 Atmel Corporation OTP ROM, 32KX8, 70ns, CMOS, PDIP28, 0.600 INCH, GREEN, PLASTIC, MS-011AB, DIP-28

ECAD模型

下載ECAD模型
$2.54 查看

相關(guān)推薦

電子產(chǎn)業(yè)圖譜

作者小傅哥多年從事一線互聯(lián)網(wǎng)Java開發(fā),從19年開始編寫工作和學(xué)習(xí)歷程的技術(shù)匯總,旨在為大家提供一個(gè)較清晰詳細(xì)的核心技能學(xué)習(xí)文檔。如果本文能為您提供幫助,請(qǐng)給予支持(關(guān)注、點(diǎn)贊、分享)!