OracleのインメモリーグリッドCoherenceを使った開発に携わり1年立ちました。途中の数ヶ月は別の事もやったりで比較的まったりやっていましたが、この辺でちょっと整理。別に筆者が選定した訳ではありませんが、今携わっている仕事でCoherenceを使う事になったのは、Webサービスでミッションクリティカルかつ、応答レスポンスの最小化という要件を満たす為でした。構成はこんな感じです。

上記のWebLogicにデプロイされているフロントアプリケーションと、別途バックにJavaVMで動作させているプロセスの両方にCoherenceのキャッシュデータが配置されます。フロントは静的かつアクセス頻度が高いマスタ系データをレプリケーションキャッシュとして配置、バック側は主にトランザクション系データを分散キャッシュとして配置することで、分散かつ他のプロセスにバックアップを取ってくれます。コンフィグ次第で他サーバとかにバックアップを取るようにも出来るようです。またバックキャッシュではキャッシュストアといって、キャッシュに無ければDBに取りに行ったり、キャッシュに入ったものをDBへ入れたりする機能が動作します。
Coherenceに関してコードを書くところは、キャッシュに格納するデータとなるエンティティモデル、上記のキャッシュストア、その他必要に応じてイベント処理やビジネスロジック側の為にDAO的なものを用意するくらいです。
まず、エンティティモデルとしてPortableObjectを実装します。こんな感じです。
package jp.co.esoro.cache.EDM;
import java.io.IOException;
import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.tangosol.io.pof.PortableObject;
/** 取引履歴 */
public class TrnRequest implements PortableObject{
/** 取引年月日 */
private String trnDate;
/** 所属先ID */
private String companyID;
/** 取引番号 */
private String trnID;
/** 取消フラグ */
private String canselFlg;
/** ユーザーID */
private String userID;
/** 金額 */
private long amount;
/** 取引区分 */
private String requestType;
public String getTrnDate() {
return trnDate;
}
public void setTrnDate(String trnDate) {
this.trnDate = trnDate;
}
public String getCompanyID() {
return companyID;
}
public void setCompanyID(String companyID) {
this.companyID = companyID;
}
public String getTrnID() {
return trnID;
}
public void setTrnID(String trnID) {
this.trnID = trnID;
}
public String getCanselFlg() {
return canselFlg;
}
public void setCanselFlg(String canselFlg) {
this.canselFlg = canselFlg;
}
public String getUserID() {
return userID;
}
public void setUserID(String userID) {
this.userID = userID;
}
public long getAmount() {
return amount;
}
public void setAmount(long amount) {
this.amount = amount;
}
public String getRequestType() {
return requestType;
}
public void setRequestType(String requestType) {
this.requestType = requestType;
}
public String getId() {
return trnDate + companyID + trnID;
}
@Override
public void readExternal(PofReader arg0) throws IOException {
setTrnDate(arg0.readString(1));
setCompanyID(arg0.readString(2));
setTrnID(arg0.readString(3));
setCanselFlg(arg0.readString(4));
setUserID(arg0.readString(5));
setAmount(arg0.readLong(6));
setRequestType(arg0.readString(7));
}
@Override
public void writeExternal(PofWriter arg0) throws IOException {
arg0.writeString(0, getId());
arg0.writeString(1, getTrnDate());
arg0.writeString(2, getCompanyID());
arg0.writeString(3, getTrnID());
arg0.writeString(4, getCanselFlg());
arg0.writeString(5, getUserID());
arg0.writeLong(6, getAmount());
arg0.writeString(7, getRequestType());
}
}
上記の例ではキー項目が取引年月日と所属先IDと取引番号の3つですが、KeyValueなので1項目のキーとしてgetIdというメソッドを入れてます。
次にバックキャッシュ側で動作するCacheStoreを実装します。こんな感じ。
package jp.co.esoro.cache.Cachestore;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
import jp.co.esoro.cache.EDM.TrnRequest;
import oracle.ucp.jdbc.PoolDataSource;
import oracle.ucp.jdbc.PoolDataSourceFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.tangosol.net.cache.CacheStore;
/**
* 履歴テーブルキャッシュストア
* */
public class ReqCacheStore implements CacheStore {
protected static Logger logger;
private Connection con = null;
PreparedStatement storePs = null;
PoolDataSource pds;
private static String sql_TrnRequest = "MERGE INTO TrnRequest H "
+ "USING (SELECT ? trnDate, ? companyID, ? trnID FROM DUAL) U "
+ "ON (H.trnDate = U.trnDate "
+ "AND H.companyID = U.companyID "
+ "AND H.trnID = U.trnID) "
+ "WHEN MATCHED THEN "
+ "UPDATE SET canselFlg=? "
+ "WHEN NOT MATCHED THEN "
+ "INSERT (trnDate,companyID,trnID,canselFlg,userID,amount,requestType) "
+ " VALUES (?,?,?,?,?,?,?)";
public ReqCacheStore(String cacheName) {
super();
logger = LogManager.getLogger();
try {
pds = PoolDataSourceFactory.getPoolDataSource();
pds.setConnectionPoolName(cacheName);
pds.setConnectionFactoryClassName(
"oracle.jdbc.pool.OracleDataSource");
pds.setValidateConnectionOnBorrow(true);
pds.setURL("****************");
pds.setUser("user");
pds.setPassword("password");
pds.setInitialPoolSize(1);
pds.setMinPoolSize(1);
pds.setMaxPoolSize(20);
con = pds.getConnection();
storePs = con.prepareStatement(sql_TrnRequest);
} catch (SQLException e) {
logger.error(e.getMessage());
}
}
@Override
public Object load(Object arg0) {
//get時キャッシュに無い場合DB等から読込みが必要な場合記述
return null;
}
@SuppressWarnings("rawtypes")
@Override
public Map loadAll(Collection arg0) {
//get時キャッシュに無い場合DB等から読込みが必要な場合記述
return null;
}
@Override
public void erase(Object arg0) {
//削除が必要な場合記述
}
@SuppressWarnings("rawtypes")
@Override
public void eraseAll(Collection arg0) {
//削除が必要な場合記述
}
/**
* DB書込み
* */
@Override
public void store(Object arg0, Object arg1) {
try {
TrnRequest trn = (TrnRequest)arg1;
storePs.setString(1, trn.getTrnDate());
storePs.setString(2, trn.getCompanyID());
storePs.setString(3, trn.getTrnID());
storePs.setString(4, trn.getCanselFlg());
storePs.setString(5, trn.getTrnDate());
storePs.setString(6, trn.getCompanyID());
storePs.setString(7, trn.getTrnID());
storePs.setString(8, trn.getCanselFlg());
storePs.setString(8, trn.getUserID());
storePs.setLong(10, trn.getAmount());
storePs.setString(11, trn.getRequestType());
storePs.executeUpdate();
} catch (SQLException e) {
logger.error(e.getMessage());
}
}
@SuppressWarnings("rawtypes")
@Override
public void storeAll(Map arg0) {
for (Object entry :arg0.entrySet()){
store( (Object)((Entry<?, ?>) entry).getKey(),(Object)((Entry<?, ?>) entry).getValue());
}
}
}
上記はDB書き込みのみの実装例です。
この先はコンフィグを3つ書きます。凝った事をしなければフロント用もバック用も同じものでOKです。これらはJVM起動時オプションで指定します。クラスパス内ならファイル名だけ、外に置いてもフルパスで指定すればOKです。
まず、キャッシュのクラスタ設定ですが、基本的にクラスタ名だけ書いておけば後は勝手に各プロセスが連携してくれます。
起動オプションは、-Dtangosol.coherence.override=tangosol-coherence-override.xml
<?xml version="1.0" encoding="UTF-8"?>
<coherence xmlns="http://xmlns.oracle.com/coherence/coherence-operational-config" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-operational-config http://xmlns.oracle.com/coherence/coherence-operational-config/1.2/coherence-operational-config.xsd">
<!--coherence-version:12.1.3-->
<cluster-config>
<member-identity>
<cluster-name system-property="tangosol.coherence.cluster">cohe-cluster1</cluster-name>
</member-identity>
<multicast-listener>
<address system-property="tangosol.coherence.clusteraddress">224.0.0.1</address>
<port system-property="tangosol.coherence.clusterport">11131</port>
</multicast-listener>
</cluster-config>
</coherence>
次にPortableObjectとして実装したものをPOFコンフィグに書いておきます。番号は1000以上で適当に並べます。
起動オプションは、-Dtangosol.pof.config=pof.xml
<?xml version="1.0"?>
<pof-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://xmlns.oracle.com/coherence/coherence-pof-config"
xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-pof-config http://xmlns.oracle.com/coherence/coherence-pof-config/1.2/coherence-pof-config.xsd">
<user-type-list>
<!-- include all "standard" Coherence POF user types -->
<include>coherence-pof-config.xml</include>
<user-type>
<type-id>1001</type-id>
<class-name>jp.co.esoro.cache.EDM.MstUser</class-name>
</user-type>
<user-type>
<type-id>1002</type-id>
<class-name>jp.co.esoro.cache.EDM.TrnRequest</class-name>
</user-type>
</user-type-list>
</pof-config>
最後に各キャッシュ構成として、キャッシュのタイプや構成、作成したキャッシュストア等を書きます。
起動オプションは、-Dtangosol.coherence.cacheconfig=cache-config.xml
<?xml version="1.0"?>
<cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config http://xmlns.oracle.com/coherence/coherence-cache-config/1.2/coherence-cache-config.xsd">
<defaults>
<serializer>pof</serializer>
<socket-provider system-property="tangosol.coherence.socketprovider"/>
</defaults>
<caching-scheme-mapping>
<cache-mapping>
<cache-name>userMst</cache-name>
<scheme-name>master-scheme</scheme-name>
</cache-mapping>
<cache-mapping>
<cache-name>RequestTrn</cache-name>
<scheme-name>trn-scheme</scheme-name>
</cache-mapping>
</caching-scheme-mapping>
<caching-schemes>
<replicated-scheme>
<scheme-name>master-scheme</scheme-name>
<service-name>master-service</service-name>
<backing-map-scheme>
<local-scheme></local-scheme>
</backing-map-scheme>
<autostart>true</autostart>
</replicated-scheme>
<distributed-scheme>
<scheme-name>trn-scheme</scheme-name>
<service-name>trn-service</service-name>
<thread-count>3</thread-count>
<backing-map-scheme>
<read-write-backing-map-scheme>
<internal-cache-scheme>
<local-scheme>
<eviction-policy>LRU</eviction-policy>
<high-units>1000000</high-units>
</local-scheme>
</internal-cache-scheme>
<cachestore-scheme>
<class-scheme>
<class-name>jp.co.esoro.cache.Cachestore.ReqCacheStore</class-name>
<init-params>
<init-param>
<param-type>java.lang.String</param-type>
<param-value>{cache-name}</param-value>
</init-param>
</init-params>
</class-scheme>
</cachestore-scheme>
<write-delay>5s</write-delay>
<write-requeue-threshold>1</write-requeue-threshold>
</read-write-backing-map-scheme>
</backing-map-scheme>
<autostart>true</autostart>
</distributed-scheme>
<proxy-scheme>
<scheme-name>proxy-scheme</scheme-name>
<service-name>proxy-service</service-name>
<thread-count>10</thread-count>
<acceptor-config>
<tcp-acceptor>
<local-address>
<address>192.168.111.113</address>
<port>9099</port>
</local-address>
</tcp-acceptor>
</acceptor-config>
<proxy-config>
<cache-service-proxy>
<enabled>true</enabled>
</cache-service-proxy>
<invocation-service-proxy>
<enabled>true</enabled>
</invocation-service-proxy>
</proxy-config>
<autostart>true</autostart>
</proxy-scheme>
</caching-schemes>
</cache-config>
上記ですとuserMstはレプリケーションキャッシュ、RequestTrnは分散キャッシュでOutOfMemory対策として1プロセス最大100万件まで、処理負荷分散の為にスレッドを3つ、キャッシュストアは応答レスポンスを意識してキャッシュ書き込み5秒後に非同期で動作するという内容です。最後のProxyはクラスタ構成プロセス外からのアクセス(*Extends)がある場合の受信口を用意している形になってます。なお、複数プロセスを動作させる場合は、PORTを個々に指定します。