點(diǎn)擊關(guān)注公眾號(hào),Java干貨及時(shí)送達(dá)
來(lái)源:jianshu.com/p/549d88222528
公司 DBA 一直埋怨 Atlas 的難用,希望從客戶端層出一個(gè)讀寫(xiě)分離的方案。開(kāi)源市場(chǎng)上在客戶端做讀寫(xiě)分離的開(kāi)源軟件基本上沒(méi)有。業(yè)務(wù)方利用 Spring 自帶的路由數(shù)據(jù)源能實(shí)現(xiàn)部分讀寫(xiě)分離的功能,但是功能不夠完善。部分參考 Sharing-JDBC 源碼思想,利用部分業(yè)余時(shí)間,寫(xiě)了這個(gè) Robustdb,總共只使用了十多個(gè)類(lèi),兩千多行代碼左右。
一、背景
隨著業(yè)務(wù)量的增長(zhǎng),所有公司都不是直接選擇分庫(kù)分表設(shè)計(jì)方案的。很長(zhǎng)一段時(shí)間內(nèi),會(huì)采用 庫(kù)垂直拆分和分區(qū)表 來(lái)解決庫(kù)表數(shù)據(jù)量比較大的問(wèn)題,采用讀寫(xiě)分離來(lái)解決訪問(wèn)壓力比較大的問(wèn)題。我們公司也是一樣。目前絕大部分業(yè)務(wù)還是使用讀寫(xiě)分離的方案。我相信很多公司和我們公司的架構(gòu)一樣,采用中間代理層做讀寫(xiě)分離。結(jié)構(gòu)如下:
圖片
第一層是 VIP 曾。通過(guò) VIP 做中間映射層,避免了應(yīng)用綁定數(shù)據(jù)庫(kù)的真實(shí) IP,這樣在數(shù)據(jù)庫(kù)故障時(shí),可以通過(guò) VIP 飄移來(lái)將流量打到另一個(gè)庫(kù)。但是 VIP 無(wú)法跨機(jī)房,為未來(lái)的異地多活設(shè)計(jì)埋下繞不過(guò)去的坎。
VIP 下面一層是讀寫(xiě)分離代理,我們公司使用的是 360 的 Atlas。Atlas 通過(guò)將 SQL 解析為 DML(Data Modify Language)和 DQL(Data Query Language),DML 的請(qǐng)求全部發(fā)到主庫(kù),DQL 根據(jù)配置比例分發(fā)到讀庫(kù)(讀庫(kù)包括主庫(kù)和從庫(kù))。
使用 Atlas 有以下不足:
Altas 不再維護(hù)更新,現(xiàn)存一些 bug,bug 網(wǎng)上很多描述;
Altas 中沒(méi)有具體應(yīng)用請(qǐng)求 IP 與具體數(shù)據(jù)庫(kù) IP 之間的映射數(shù)據(jù),所以無(wú)法準(zhǔn)確查到訪問(wèn)DB的請(qǐng)求是來(lái)自哪個(gè)應(yīng)用;
Altas 控制的粒度是 SQL 語(yǔ)句,只能指定某條查詢 SQL 語(yǔ)句走主庫(kù),不能根據(jù)場(chǎng)景指定;
DB 在自動(dòng)關(guān)閉某個(gè)與 Altas 之間的連接時(shí),Altas 不會(huì)刷新,它仍有可能把這個(gè)失效的連接給下次請(qǐng)求的應(yīng)用使用;
使用 Altas,對(duì)后期增加其他功能模會(huì)比較麻煩。
基于 Atlas 以上問(wèn)題,以及我們需要將數(shù)據(jù)庫(kù)賬號(hào)和連接配置集中管控。我們?cè)O(shè)計(jì)了下面這套方案:
圖片
通過(guò)在客戶端做讀寫(xiě)分離可以解決 Atlas 上面存在的不足。整個(gè)流程如下圖所示:
圖片
二、Robustdb 原理
1、讀寫(xiě)分離設(shè)計(jì)核心點(diǎn)——路由
支持每條 SQL 按照 DML、DQL 類(lèi)型的默認(rèn)路由。
需求描述
目前公司采用讀寫(xiě)分離的方案來(lái)增強(qiáng)數(shù)據(jù)庫(kù)的性能,所有的 DML(insert、updata、delete)操作在主庫(kù),通過(guò) MySQL 的 binlog 同步,將數(shù)據(jù)同步到多個(gè)讀庫(kù)。所有的 DQL(select) 操作主庫(kù)或從庫(kù),從而增強(qiáng)數(shù)據(jù)的讀能力。
支持方法級(jí)別的指定路由
需求描述
在 Service 中指定方法中所有 DB 操作方法操作同一個(gè)數(shù)據(jù)庫(kù)(主要是主庫(kù)),保證方法中的 DB 讀寫(xiě)都操作主庫(kù),避免數(shù)據(jù)同步延遲導(dǎo)致讀從庫(kù)數(shù)據(jù)異常。從而保證整個(gè)方法的事務(wù)屬性。
解決思路
我們將獲取真實(shí)數(shù)據(jù)庫(kù)(主庫(kù)還是哪個(gè)從庫(kù))放到需要建立連接時(shí)的地方,為此我們創(chuàng)建了 BackendConnection(傳統(tǒng)是先連接數(shù)據(jù)庫(kù),然后再創(chuàng)建連接)。
在獲取數(shù)據(jù)庫(kù)連接時(shí),通過(guò)對(duì)請(qǐng)求的 SQL 進(jìn)行解析和類(lèi)型判別,識(shí)別為 DML 和 DQL。如果是 DML,則在線程的單 SQL 線程本地變量上設(shè)置為 master,DQL 則設(shè)置為 slave,為后續(xù)選擇數(shù)據(jù)庫(kù)提供選擇參考。
如果要支持方法級(jí)別的事務(wù)(也就是整個(gè)方法的 SQL 請(qǐng)求都發(fā)送到主庫(kù)),需要借助攔截器,我們采用的是 AspectJ 方式的攔截器。會(huì)攔截所有帶有類(lèi)型為 dataSourceType 的 annotation 的方法。在執(zhí)行方法前,在線程的多 SQL 線程本地變量上設(shè)置 dataSourceType 的 name 值(name 值為 master 代表走主庫(kù),name 值為 slave 代表走從庫(kù))。線程的多 SQL 線程本地變量為后續(xù)選擇數(shù)據(jù)庫(kù)提供選擇參考。在方法執(zhí)行完后,清理本地線程變量。
多 SQL 線程本地變量的優(yōu)先級(jí)高于單 SQL 線程本地變量的優(yōu)先級(jí)。
圖片
圖片
注意點(diǎn)
本地線程變量要使用阿里包裝的 Ttl,防止用戶在方法內(nèi)部啟動(dòng)線程池,導(dǎo)致普通的線程本地變量丟失,從而導(dǎo)致選庫(kù)異常。
使用 Ttl 之后,需要在公司的 JVM 啟動(dòng)參數(shù)中增加
-javaagent:/{Path}/transmittable-thread-local-2.6.0-SNAPSHOT.jar
原理就是在 JVM 啟動(dòng)時(shí),加載 transmittable-thread-local 中的類(lèi)替換邏輯,將以后的 Runnable、Callable、ExecuteService 等線程池相關(guān)類(lèi)替換成增強(qiáng)后的 TtlRunnable、TtlCallable、TtlExecuteService 等。
下面展示一下時(shí)序圖中類(lèi)的核心代碼,僅供參考:
DataSoueceAspect
@Aspect
@Component
public class DataSourceAspect{
@Around( "execution(* *(..)) && @annotation(dataSourceType)")
public Object aroundMethod(ProceedingJoinPoint pjd, DataSourceType dataSourceType) throws Throwable { DataSourceContextHolder.setMultiSqlDataSourceType(dataSourceType.name());
Object result = pjd.proceed();
DataSourceContextHolder.clearMultiSqlDataSourceType();
return result;
}
}
BackendConnection
public final class BackendConnection extends AbstractConnectionAdapter {
private AbstractRoutingDataSource abstractRoutingDataSource;
//用于緩存一條sql(可能對(duì)應(yīng)多個(gè)statement)或者一次事務(wù)中的連接
private final Map connectionMap = new HashMap();
//構(gòu)造函數(shù)
public BackendConnection(AbstractRoutingDataSource abstractRoutingDataSource) {
this.abstractRoutingDataSource = abstractRoutingDataSource;
}
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
return getConnectionInternal(sql).prepareStatement(sql);
}
@Override
public DatabaseMetaData getMetaData() throws SQLException {
if(connectionMap == null || connectionMap.isEmpty()){
return abstractRoutingDataSource.getResolvedDefaultDataSource().getConnection().getMetaData();
}
return fetchCachedConnection(connectionMap.keySet().iterator().next().toString()).get().getMetaData();
}
@Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency)
throws SQLException {
return getConnectionInternal(sql).prepareStatement(sql,resultSetType,resultSetConcurrency);
}
@Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency,
int resultSetHoldability) throws SQLException {
return getConnectionInternal(sql).prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
}
@Override
public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
return getConnectionInternal(sql).prepareStatement(sql, autoGeneratedKeys);
}
@Override
public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
return getConnectionInternal(sql).prepareStatement(sql, columnIndexes);
}
@Override
public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
return getConnectionInternal(sql).prepareStatement(sql, columnNames);
}
@Override
protected Collection getConnections() {
return connectionMap.values();
}
/**
* 根據(jù)sql獲取連接,對(duì)連接進(jìn)行緩存
* @param sql
* @ return
* @throws SQLException
*/
private Connection getConnectionInternal(final String sql) throws SQLException {
//設(shè)置線程環(huán)境遍歷
if (ExecutionEventUtil.isDML(sql)) {
DataSourceContextHolder.setSingleSqlDataSourceType(DataSourceType.MASTER);
} else if (ExecutionEventUtil.isDQL(sql)) {
DataSourceContextHolder.setSingleSqlDataSourceType(DataSourceType.SLAVE);
}
//根據(jù)上面設(shè)置的環(huán)境變量,選擇相應(yīng)的數(shù)據(jù)源
Object dataSourceKey = abstractRoutingDataSource.determineCurrentLookupKey();
String dataSourceName = dataSourceKey.toString();
//看緩存中是否已經(jīng)含有相應(yīng)數(shù)據(jù)源的連接
Optional connectionOptional = fetchCachedConnection(dataSourceName);
if (connectionOptional.isPresent()) {
return connectionOptional.get();
}
//緩存中沒(méi)有相應(yīng)連接,建立相應(yīng)連接,并放入緩存
Connection connection = abstractRoutingDataSource.getTargetDataSource(dataSourceKey).getConnection();
connection.setAutoCommit(super.getAutoCommit());
connection.setTransactionIsolation(super.getTransactionIsolation());
connectionMap.put(dataSourceKey.toString(), connection);
return connection;
}
/**
* 從緩存中取數(shù)據(jù)源
* @param dataSourceName
* @ return
*/
private Optional fetchCachedConnection(final String dataSourceName) {
if (connectionMap.containsKey(dataSourceName)) {
return Optional.of(connectionMap.get(dataSourceName));
}
return Optional.absent();
}
AbstractRoutingDataSource
/**
*
* @Type AbstractRoutingDataSource
* @Desc 數(shù)據(jù)源路由器(spring的AbstractRoutingDataSource將resolvedDataSources的注入放在bean初始化)
* @Version V1.0
*/
public abstract class AbstractRoutingDataSource extends AbstractDataSource {
private boolean lenientFallback = true;
private Map targetDataSources;
private Object defaultTargetDataSource;
private Map resolvedDataSources = new HashMap();
private DataSource resolvedDefaultDataSource;
private Logger logger = LoggerFactory.getLogger(AbstractRoutingDataSource.class);
public BackendConnection getConnection() throws SQLException {
return new BackendConnection(this);
}
public BackendConnection getConnection(String username, String password)
throws SQLException {
return new BackendConnection(this);
public void afterPropertiesSet() {
if (this.targetDataSources == null) {
throw new IllegalArgumentException( "Property 'targetDataSources' is required");
}
this.resolvedDataSources = new HashMap(this.targetDataSources.size());
for (Map.Entry entry : this.targetDataSources.entrySet()) {
Object lookupKey = resolveSpecifiedLookupKey(entry.getKey());
DataSource dataSource = resolveSpecifiedDataSource(entry.getValue());
this.resolvedDataSources.put(lookupKey, dataSource);
}
if (this.defaultTargetDataSource != null) {
this.resolvedDefaultDataSource = resolveSpecifiedDataSource(this.defaultTargetDataSource);
}
}
public void putNewDataSource(Object key, DataSource dataSource){
if(this.resolvedDataSources == null){
this.resolvedDataSources = new HashMap();
}
if(this.resolvedDataSources.containsKey(key)){
this.resolvedDataSources.remove(key);
logger.info( "remove old key:" + key);
}
logger.info( "add key:" + key + ", value=" + dataSource);
this.resolvedDataSources.put(key, dataSource);
}
/**
* 數(shù)據(jù)源選擇邏輯
*/
public DataSource determineTargetDataSource() {
Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
Object lookupKey = determineCurrentLookupKey();
DataSourceContextHolder.clearSingleSqlDataSourceType();
int index = 0;
for (Entry element : resolvedDataSources.entrySet()) {
logger.debug( "myAbstractDS, index:" + index + ", key:" + element.getKey() + ", value:" + element.getValue().toString());
index++;
}
DataSource dataSource = this.resolvedDataSources.get(lookupKey);
if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
dataSource = this.resolvedDefaultDataSource;
}
if (dataSource == null) {
throw new IllegalStateException( "Cannot determine target DataSource for lookup key [" + lookupKey + "]");
}
logger.debug( "myAbstractDS, hit DS is " + dataSource.toString());
return dataSource;
}
public DataSource getTargetDataSource(Object lookupKey) {
Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
if(lookupKey == null){
lookupKey = determineCurrentLookupKey();
}
DataSourceContextHolder.clearSingleSqlDataSourceType();
int index = 0;
for (Entry element : resolvedDataSources.entrySet()) {
logger.debug( "myAbstractDS, index:" + index + ", key:" + element.getKey() + ", value:" + element.getValue().toString());
index++;
}
DataSource dataSource = this.resolvedDataSources.get(lookupKey);
if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
dataSource = this.resolvedDefaultDataSource;
}
if (dataSource == null) {
throw new IllegalStateException( "Cannot determine target DataSource for lookup key [" + lookupKey + "]");
}
logger.debug( "myAbstractDS, hit DS is " + dataSource.toString());
return dataSource;
}
public abstract Object determineCurrentLookupKey();
public abstract Object getCurrentSlaveKey();
@Override
public boolean isWrapperFor(Class iface) throws SQLException {
return (iface.isInstance(this) || determineTargetDataSource().isWrapperFor(iface));
}
@SuppressWarnings( "unchecked")
@Override
public T unwrap(Class iface) throws SQLException {
if (iface.isInstance(this)){
return (T) this;
}
return determineTargetDataSource().unwrap(iface);
}
protected Object resolveSpecifiedLookupKey(Object lookupKey) {
return lookupKey;
}
protected DataSource resolveSpecifiedDataSource(Object dataSource) throws IllegalArgumentException {
if (dataSource instanceof DataSource) {
return (DataSource) dataSource;
}
else {
throw new IllegalArgumentException(
"Illegal data source value - only [javax.sql.DataSource] and String supported: " + dataSource);
}
}
//get set方法省略
}
DataSourceContextHolder
public class DataSourceContextHolder {
private static final TransmittableThreadLocal singleSqlContextHolder = new TransmittableThreadLocal();
private static final TransmittableThreadLocal multiSqlContextHolder = new TransmittableThreadLocal();
/**
* @Description: 設(shè)置單條sql數(shù)據(jù)源類(lèi)型
* @param dataSourceType 數(shù)據(jù)庫(kù)類(lèi)型
* @ return void
* @throws
*/
public static void setSingleSqlDataSourceType(String dataSourceType) {
singleSqlContextHolder.set(dataSourceType);
}
/**
* @Description: 獲取單條sql數(shù)據(jù)源類(lèi)型
* @param
* @ return String
* @throws
*/
public static String getSingleSqlDataSourceType() {
return singleSqlContextHolder.get();
}
/**
* @Description: 清除單條sql數(shù)據(jù)源類(lèi)型
* @param
* @ return void
* @throws
*/
public static void clearSingleSqlDataSourceType() {
singleSqlContextHolder.remove();
}
/**
* @Description: 設(shè)置多條sql數(shù)據(jù)源類(lèi)型
* @param dataSourceType 數(shù)據(jù)庫(kù)類(lèi)型
* @ return void
* @throws
*/
public static void setMultiSqlDataSourceType(String dataSourceType) {
multiSqlContextHolder.set(dataSourceType);
}
/**
* @Description: 獲取多條sql數(shù)據(jù)源類(lèi)型
* @param
* @ return String
* @throws
*/
public static String getMultiSqlDataSourceType() {
return multiSqlContextHolder.get();
}
/**
* @Description: 清除多條sql數(shù)據(jù)源類(lèi)型
* @param
* @ return void
* @throws
*/
public static void clearMultiSqlDataSourceType() {
multiSqlContextHolder.remove();
}
/**
* 判斷當(dāng)前線程是否為使用從庫(kù)為數(shù)據(jù)源. 最外層service有slave的aop標(biāo)簽 或者 service沒(méi)有aop標(biāo)簽且單條sql為DQL
*
* @ return
*/
public static boolean isSlave() {
return "slave".equals(multiSqlContextHolder.get()) || (multiSqlContextHolder.get()==null && "slave".equals(singleSqlContextHolder.get())) ;
}
DynamicDataSource
public class DynamicDataSource extends AbstractRoutingDataSource implements InitializingBean{
private static final Logger logger = LoggerFactory.getLogger(DynamicDataSource.class);
private Integer slaveCount = 0;
// 輪詢計(jì)數(shù),初始為-1,AtomicInteger是線程安全的
private AtomicInteger counter = new AtomicInteger(-1);
// 記錄讀庫(kù)的key
private List slaveDataSources = new ArrayList(0);
// slave庫(kù)的權(quán)重
private Map slaveDataSourcesWeight;
private Object currentSlaveKey;
public DynamicDataSource() {
super();
}
/**
* 構(gòu)造函數(shù)
* @param defaultTargetDataSource
* @param targetDataSources
* @param slaveDataSourcesWeight
*/
public DynamicDataSource(Object defaultTargetDataSource, Map targetDataSources, Map slaveDataSourcesWeight) {
this.setResolvedDataSources(new HashMap(targetDataSources.size()));
for (Map.Entry entry : targetDataSources.entrySet()) {
DataSource dataSource = resolveSpecifiedDataSource(entry.getValue());
this.putNewDataSource(entry.getKey(), dataSource);
}
if (defaultTargetDataSource != null) {
this.setResolvedDefaultDataSource(resolveSpecifiedDataSource(defaultTargetDataSource));
}
this.setSlaveDataSourcesWeight(slaveDataSourcesWeight);
this.afterPropertiesSet();
}
@Override
public Object determineCurrentLookupKey() {
// 使用DataSourceContextHolder保證線程安全,并且得到當(dāng)前線程中的數(shù)據(jù)源key
if (DataSourceContextHolder.isSlave()) {
currentSlaveKey = getSlaveKey();
return currentSlaveKey;
}
//TODO
Object key = "master";
return key;
}
@Override
public void afterPropertiesSet() {
try {
super.afterPropertiesSet();
Map resolvedDataSources = this.getResolvedDataSources();
//清空從庫(kù)節(jié)點(diǎn),重新生成
slaveDataSources.clear();
slaveCount = 0;
for (Map.Entry entry : resolvedDataSources.entrySet()) {
if(slaveDataSourcesWeight.get(entry.getKey())==null){
continue;
}
for(int i=0; i slaveDataSources.add(entry.getKey());
slaveCount++;
}
}
} catch (Exception e) {
logger.error( "afterPropertiesSet error! ", e);
}
}
/**
* 輪詢算法實(shí)現(xiàn)
*
* @ return
*/
public Object getSlaveKey() {
if(slaveCount <= 0 || slaveDataSources == null || slaveDataSources.size() <= 0){
return null;
}
Integer index = counter.incrementAndGet() % slaveCount;
if (counter.get() > 9999) { // 以免超出Integer范圍
counter.set(-1); // 還原
}
return slaveDataSources.get(index);
}
public Map getSlaveDataSourcesWeight() {
return slaveDataSourcesWeight;
}
public void setSlaveDataSourcesWeight(Map slaveDataSourcesWeight) {
this.slaveDataSourcesWeight = slaveDataSourcesWeight;
}
public Object getCurrentSlaveKey() {
return currentSlaveKey;
}
}
2、讀庫(kù)流量分配策略設(shè)計(jì)
我們所有的數(shù)據(jù)庫(kù)連接都是管控起來(lái)的,包括每個(gè)庫(kù)的流量配置都是支持動(dòng)態(tài)分配的。
支持讀庫(kù)按不同比例承接讀請(qǐng)求。通過(guò)配置頁(yè)面動(dòng)態(tài)調(diào)整應(yīng)用的數(shù)據(jù)庫(kù)連接以及比例,支持隨機(jī)或者順序的方式將流量分配到相應(yīng)的讀庫(kù)中去。
這里我們使用的配置管理下發(fā)中心是我們公司自己開(kāi)發(fā)的 gconfig,當(dāng)然替換成開(kāi)源的 diamond 或者 applo 也是可以的。
當(dāng)接收到配管中心的調(diào)整指令,會(huì)動(dòng)態(tài)更新應(yīng)用數(shù)據(jù)源連接,然后更新 beanFactory 中的 datasource。核心函數(shù)如下:
/**
* 更新beanFactory
* @param properties
*/
public void refreshDataSource(String properties) {
YamlDynamicDataSource dataSource;
try {
dataSource = new YamlDynamicDataSource(properties);
} catch (IOException e) {
throw new RuntimeException( "convert datasource config failed!");
}
// 驗(yàn)證必須字段是否存在
if (dataSource == null && dataSource.getResolvedDataSources() == null
|| dataSource.getResolvedDefaultDataSource() == null || dataSource.getSlaveDataSourcesWeight() == null) {
throw new RuntimeException( "datasource config error!");
}
ConcurrentHashMap newDataSource = new ConcurrentHashMap(
dataSource.getResolvedDataSources());
//更新數(shù)據(jù)源的bean
DynamicDataSource dynamicDataSource = (DynamicDataSource) ((DefaultListableBeanFactory) beanFactory)
.getBean(dataSourceName);
dynamicDataSource.setResolvedDefaultDataSource(dataSource.getResolvedDefaultDataSource());
dynamicDataSource.setResolvedDataSources(new HashMap());//將數(shù)據(jù)源清空,重新添加
for (Entry element : newDataSource.entrySet()) {
dynamicDataSource.putNewDataSource(element.getKey(), element.getValue());
}
dynamicDataSource.setSlaveDataSourcesWeight(dataSource.getSlaveDataSourcesWeight());
dynamicDataSource.afterPropertiesSet();
三、性能
我們經(jīng)過(guò)性能測(cè)試,發(fā)現(xiàn) Robustdb 的性能在一定層度上比 Atlas 性能更好。壓測(cè)結(jié)果如下:
特別聲明:以上內(nèi)容(如有圖片或視頻亦包括在內(nèi))為自媒體平臺(tái)“網(wǎng)易號(hào)”用戶上傳并發(fā)布,本平臺(tái)僅提供信息存儲(chǔ)服務(wù)。
Notice: The content above (including the pictures and videos if any) is uploaded and posted by a user of NetEase Hao, which is a social media platform and only provides information storage services.