Transaction API 操作ガイド

はじめに

Apollo TransactionはACID特性を備えるトランザクション処理を実現するためのAPIを提供しています.このドキュメントではユーザー間の送金処理を想定した簡単なサンプルプログラムを用いて,Transaction APIの基本的な使い方を紹介します.

準備

サンプルプログラムで使用するキースペースとテーブルをCassandra上に作成します.Apolloでトランザクション処理を実現するには,アプリケーション用のキースペース/テーブルに加え,トランザクションの状態を管理するためのキースペース/テーブル(coordination/txn_state)が必要です. なお,トランザクション処理を行うアプリケーション用テーブルにおいては,アプリケーションに必要なカラムに加えて,Apolloが管理するメタデータ用のカラムとロールバック用のカラムを定義する必要があります.
  • トランザクションのメタデータ用カラム
    • txn_valid_start_at
    • txn_valid_end_at
    • txn_leased_at
    • txn_last_updated_at
    • txn_id
    • txn_state
    • txn_etag
  • ロールバック用カラム(トランザクション開始前のデータを保持する)
    • 全てのカラムに対してprev_というプレフィクスを付与したカラム
以下の例ではアプリケーション用のキースペース/テーブル(test_txn_keyspace/test_txn_table)と,トランザクションの状態管理用キースペース/テーブル(coordination/txn_state)を作成しています.ここでは3ノード以上からなるクラスタを想定して,データの複製を3ノードに分散配置するように設定しています('replication_factor': '3').
cat <<EOF > create_txn_schema.cql
DROP KEYSPACE IF EXISTS test_txn_keyspace;
DROP KEYSPACE IF EXISTS coordination;
CREATE KEYSPACE test_txn_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'};
CREATE KEYSPACE coordination WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'};

CREATE TABLE test_txn_keyspace.test_txn_table (
    user_id int PRIMARY KEY,
    balance int,

    txn_valid_start_at bigint,
    txn_valid_end_at bigint,
    txn_leased_at bigint,
    txn_last_updated_at bigint,
    txn_id text,
    txn_state int,
    txn_etag int,

    prev_user_id int,
    prev_balance int,

    prev_txn_valid_start_at bigint,
    prev_txn_valid_end_at bigint,
    prev_txn_leased_at bigint,
    prev_txn_last_updated_at bigint,
    prev_txn_id text,
    prev_txn_state int,
    prev_txn_etag int,
);

CREATE TABLE coordination.txn_state (
    txn_id text PRIMARY KEY,
    txn_state int,
);

INSERT INTO test_txn_keyspace.test_txn_table (user_id, balance, txn_state, txn_etag) VALUES (1, 10000, 3, 0);
INSERT INTO test_txn_keyspace.test_txn_table (user_id, balance, txn_state, txn_etag) VALUES (2, 10000, 3, 0);
EOF

# CQLコマンドの実行
cqlsh -f ./create_txn_schema.cql

Transaction APIを利用したトランザクションの例

Transactionインスタンスの生成

トランザクション処理を行う前にTransactionインスタンスを生成する必要があります.
Transaction transaction = TransactionFactory.create("test_txn_keyspace", "test_txn_table");

トランザクションの処理フロー

Apolloにおけるトランザクションはレコードに対する操作の集合です.以下のように,トランザクションの開始(Transaction#start())から終了(Transaction#finish())までに含まれるレコード操作を1つのトランザクションとして扱い,Apolloはこれを不可分に実行します.
  1. トランザクションの開始
  2. レコード操作(読み込み,読み込んだデータの更新,更新結果の書き出し)
  3. トランザクションの確定(コミット処理)
  4. トランザクションの終了
以降では,上記処理フローに沿って送金処理のサンプルプログラムを作成する例を示します.

1. トランザクションの開始

はじめにTransaction#start()でトランザクションを開始します.これ以降のレコード操作が1つのトランザクション内の操作となります.
transaction.start();

2. レコード操作

レコードの読み込みはTransaction#read(),書き出しはTransaction#write()で行います.なお,トランザクションのレコード操作においては,主キーを用いてレコードを指定する必要があります.  レコードを読み込むには,主キー(Predicate)を引数としてTransaction#read()を呼び出します.
// 対象レコードのキーを指定
Predicate keyFrom = new Predicate("user_id", (ByteBuffer) ByteBuffer.allocate(4).putInt(from).flip(), DataType.INT, CompareOperator.EQ);
Predicate keyTo = new Predicate("user_id", (ByteBuffer) ByteBuffer.allocate(4).putInt(to).flip(), DataType.INT, CompareOperator.EQ);

// レコードの読み込み
Row rowFrom = transaction.read(Arrays.asList(keyFrom));
Row rowTo = transaction.read(Arrays.asList(keyTo));
読み込んだレコードからは,以下のようにデータの取得や更新が行えます.
// レコードから残高を取得
int balanceFrom = rowFrom.get("balance").getBytes().getInt();
int balanceTo = rowTo.get("balance").getBytes().getInt();

// 送金元ユーザーの残高チェック
if (balanceFrom < amount) {
    throw new Exception("Insufficient balance.");
}

// 送金後の残高に更新
rowFrom.get("balance").setBytes((ByteBuffer)ByteBuffer.allocate(4).putInt(balanceFrom - amount).flip());
rowTo.get("balance").setBytes((ByteBuffer)ByteBuffer.allocate(4).putInt(balanceTo + amount).flip());
レコードを書き出す際はRowインスタンスを引数としてTransaction#write()を呼び出します.
// 更新の書き出し
transaction.write(rowFrom);
transaction.write(rowTo);

3. トランザクションの確定(コミット処理)

トランザクションの更新内容を確定するためにTransaction#commit()を呼び出します.
transaction.commit();

4. トランザクションの終了

最後にTransaction#finish()を呼び出してトランザクションを終了します.
transaction.finish();

トランザクションにおける例外処理

トランザクション内においては,以下のいずれかの例外が発生する可能性があります.
  • CRUDException
    • コミット前に何らかの問題が発生した場合にスローされます.当該トランザクションは失敗となり,トランザクション中に行った変更は破棄されます
  • TransactionException
    • コミット処理中に何らかの問題が発生した場合にスローされます.当該トランザクションは失敗となり,トランザクション中に行った変更は破棄されます
  • UnknownTransactionStatusException
    • コミット処理中に何らかの問題が発生した場合にスローされます.当該トランザクションはコミットが完了したかどうかが不明な状態です
    • Transaction#recover()を用いることにより,コミット処理中に発生した問題から回復させ,コミットが完了したかどうかを確認出来ます
以下はTransaction#recover()の使用例です.
...
try {
    // トランザクション処理
} catch (UnknownTransactionStatusException e) {
    System.err.println("Commit was tried but not sure if the transaction was successfully committed.");
    TransactionState state = transaction.recover(10000); // recover time out is 10 sec.
    if (state == TransactionState.ABORTED)
        System.err.println("Transaction was Aborted");
    else if (state == TransactionState.BEFORE_COMMITTED_OR_ABORTED)
        System.err.println("read recovery failed");
    else if (state != TransactionState.COMMITTED)
        System.err.println("Transaction was Committed");
}
...

サンプルプログラム

以下にサンプルプログラムの全体と,実行例を示します.

サンプルプログラムの全体

/**
 * Usage:
 *
 * このサンプル全体をTransactionSample.javaというファイル名で保存し,以下のコマンドを実行して下さい.
 *
 * javac -cp /path/to/dependencies TransactionSample.java
 * java -cp .:/path/to/dependencies TransactionSample
 */
import com.orb.apollo.storage.api.CompareOperator;
import com.orb.apollo.storage.api.DataType;
import com.orb.apollo.storage.api.Predicate;
import com.orb.apollo.storage.api.Row;
import com.orb.apollo.transaction.api.Transaction;
import com.orb.apollo.transaction.api.TransactionFactory;
import com.orb.apollo.transaction.api.TransactionState;
import com.orb.apollo.transaction.exception.CRUDException;
import com.orb.apollo.transaction.exception.TransactionException;
import com.orb.apollo.transaction.exception.UnknownTransactionStatusException;

import java.nio.ByteBuffer;
import java.util.Arrays;

public class TransactionSample {
    private static final String KEYSPACE = "test_txn_keyspace";
    private static final String TABLE = "test_txn_table";

    public static void main(String... args) {
        int from = 1; int to = 2; int amount = 1000;

        // 1. Transactionオブジェクトの生成
        Transaction transaction = TransactionFactory.create(KEYSPACE, TABLE);
        try {
            // 2. トランザクションの開始
            transaction.start();

            // 3. レコード操作
            // 対象レコードのキーを指定
            Predicate keyFrom = new Predicate("user_id", (ByteBuffer) ByteBuffer.allocate(4).putInt(from).flip(), DataType.INT, CompareOperator.EQ);
            Predicate keyTo = new Predicate("user_id", (ByteBuffer) ByteBuffer.allocate(4).putInt(to).flip(), DataType.INT, CompareOperator.EQ);

            // レコードの読み込み
            Row rowFrom = transaction.read(Arrays.asList(keyFrom));
            Row rowTo = transaction.read(Arrays.asList(keyTo));

            // レコードから残高を取得
            int balanceFrom = rowFrom.get("balance").getBytes().getInt();
            int balanceTo = rowTo.get("balance").getBytes().getInt();

            // 送金元ユーザーの残高チェック
            if (balanceFrom < amount) {
                throw new Exception("Insufficient balance.");
            }

            // 送金後の残高に更新
            rowFrom.get("balance").setBytes((ByteBuffer)ByteBuffer.allocate(4).putInt(balanceFrom - amount).flip());
            rowTo.get("balance").setBytes((ByteBuffer)ByteBuffer.allocate(4).putInt(balanceTo + amount).flip());

            // 更新の書き出し
            transaction.write(rowFrom);
            transaction.write(rowTo);

            // 4. コミット
            transaction.commit();

            System.out.println("Transaction finished. status:" + transaction.check());
        } catch (CRUDException e) {
            System.err.println("Some errors happened before commit! just finishing the transaction without aborting.");
        } catch (TransactionException e) {
            System.err.println("The transaction is aborted due to a conflict. (commit() automatically aborted the transaction)");
        } catch (UnknownTransactionStatusException e) {
            System.err.println("Commit was tried but not sure if the transaction was successfully committed.)");
            TransactionState state = transaction.recover(10000); // recover time out is 10 sec.
            if (state == TransactionState.ABORTED) System.err.println("Transaction was Aborted");
            else if (state == TransactionState.BEFORE_COMMITTED_OR_ABORTED) System.err.println("read recovery failed");
            else if (state != TransactionState.COMMITTED) System.err.println("Transaction was Committed");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 5. トランザクションの終了
            if (transaction != null) transaction.finish();
            System.exit(0);
        }
    }
}

サンプルプログラムの実行例

サンプルプログラムの実行例は以下のようになります.ここでは$APOLLO_ROOT/lib というディレクトリに必要な依存パッケージのJarファイルが置かれていると想定しています.
# コンパイル
$ javac -cp "$APOLLO_ROOT/lib/*" TransactionSample.java

# 実行
$ java -cp "./:$APOLLO_ROOT/lib/*" TransactionSample
2017-02-28 01:21:06,313 [INFO  com.orb.apollo.storage.api.CassandraStorage] creating an Apollo cluster
2017-02-28 01:21:06,542 [INFO  com.datastax.driver.core.ClockFactory] Using native clock to generate timestamps.
...

Transaction finished. status:COMMITTED
Share on FacebookTweet about this on TwitterShare on LinkedInEmail to someone