はじめに
近頃担当する業務は夜間バッチでのデータ更新処理が多く、特にDWH的にテーブル再構築(TRUNCATE/INSERT)のパターンを多く使用しています。
その中でSQLで処理を組み上げる時、エラー処理などで条件分岐で異なるSQLを実行したくなる事は珍しくありません。
多くのシステムでは呼び出し側でSQLの実行結果を参照し、次に実行するSQLを選択/実行していると思います。
また、SQLだけで実行するなら、制御文が記述できるストアド・プロシージャ等が使用されていると思います。
ですが、構築するシステムの仕組み/制約でこれらの実現が難しい場合、通常のSQL(DML)のみでそれに近いことを実現する必要があります。
よくあるケース
夜間のバッチ処理でマスタを一括更新する場合、前段の処理に異常があれば前日のマスタ状態を維持する場合が多いです。
更新処理がエラーになればロールバックされますが、SQLが正常終了したからといって、データが正常とは限りません。
この場合、更新結果のデータ状態を見て正常/異常を判定し、異常であれば前日のデータに切り戻しを行います。
例1:全件連携時のマスタ洗替え
シンプルに全件洗いがえのマスタ更新で、件数が前日の半分以下なら異常扱いで前日データを維持することとします。
ただし、物理削除されたデータは削除日をつけてマスタに残す必要があります。
マスタデータはCSVで全件が連携される事とします。
使用するデータ
・商品マスタCSV(item.csv)
商品マスタの全件がCSVで連携されます。
PKは商品コードで、商品名、価格、更新日を持ちます。
・LOAD用テーブル(load_item)
CSVファイルを一括ロードする受け側のテーブルです。
・商品マスタテーブル(item_mst)
更新対象の商品マスタテーブルです。
・最終商品マスタテーブル(item_mst_latest)
提供済のマスタテーブルです。更新後に切り戻す場合はこの状態に切り戻します。
DBはAWS Redshift を使用していますが、同様の処理を行うのであれば他のDBでも基本的な考えに大差ないと思います。
処理
(1)CSVの一括ロード
CSVファイルは以下のイメージで一括でDBテーブルに取り込みます。
truncate table item_mst;
copy item_mst from item_csv xxx
;
STL_LOAD_ERRORSを見てエラーがあればエラー処理を行うことも可能ですがここでは割愛します。
注意:
RedshiftではロードするテーブルにはPKを定義しないことをおすすめします。
連携トラブル(再送複数ファイル)や送り元の不具合でキー重複が発生した場合もPKの制約は効きません。
PKはSQLでユニークが保証できるテーブル(group byやrow_number()などでユニークを保証しているデータ)のみに定義することをおすすめします。
(2)商品マスタの更新
ここでサブクエリ(CTE)を使って頑張って更新します。
サブクエリのデータがほしいときは、1件データをJOIN、ほしくないときは0件テーブルをJOIN、でデータの要否をコントロールしていきます。
作成するSQLは以下のイメージとなります。
truncate table item_mst;
insert into item_mst
-- ここで取り込んだデータがPK(商品コード)ユニークにできるよう新しい方優先で優先順(pri)をつけます
with in_data as (
select
商品コード, 商品名, 価格, 更新日,
row_number() over (partition by 商品コード order by 更新日)pri
from load_item
),
-- 件数が今の半分以下かどうかを確認します
cnt_check as (
select
( select count(*) from in_data where pri=1 ) cnt_new, -- 取り込み商品数
( select count(*) from item_mst_latest where del_date is null ) cnt_old, -- 今の商品数
cnt_new>=cnt_old/2 cnt_ok -- 判定
),
-- 物理削除されたデータを取得します
del_data as (
select
商品コード, 商品名, 価格, 更新日, sysdate 削除日
from item_mst_latest a
inner join cnt_check on cnt_ok=true -- okでなければ処理不要
left join b on a.商品コード = b.商品コード
where b.商品コード is null
),
-- 商品コードユニークの最新データを取得します
new_data as (
select
商品コード, 商品名, 価格, 更新日, null 削除日
from in_data a
inner join cnt_check on cnt_ok=true -- okでなければ処理不要
where pri=1
)
-- 上のCTEを条件付きでUNIONします
select a.* from del_data a inner join cnt_check on cnt_ok=true -- 件数正常であれば削除全件、件数異常のときは0全件
union all
select a.* from new_data a inner join cnt_check on cnt_ok=true -- 件数正常であれば最新全件、件数異常のときは0全件
union all
select a.* from item_mst_latest a inner join cnt_check on cnt_ok=false -- 件数正常であれば0件、件数異常のときは前回の全件
;
任意のデータの条件により、1テーブルについては内容の切り替えをすることができました。
正常時と異常時の双方の処理は、記述するだけでなく両方が動いてしまうので効率が悪そうですが、計算不要なサブクエリ部分には0件テーブルをJOINして(実行計画でうまく行けば)即0件で処理が終了するようにするのが良いと思います。
例2:条件により異なるテーブルを更新したい
例1では1テーブルの更新内容を変更できました。
更新先のテーブルが異なる場合についても考え方は同じです。
条件により更新先のテーブルがAとBで異なる場合、AとBの更新処理を両方記述して実行します。
処理の流れ
更新先のテーブルが異なる場合は、条件を判定するSQL、Aを更新するSQL、Bを更新するSQLの3つのSQLを記述することになります。
AとBの更新それぞれで判定SQLを書くこともできますが、判定処理は同一なので、共通SQLとして外出しするのが良いと思います。
(1)更新可否の結果を判定結果テーブルに出力
(2)判定結果テーブルを見て、正常時にAのテーブルを更新
(3)判定結果テーブルを見て、異常時にBのテーブルを更新
この場合も、例1と同様にA、Bどちらかしか更新が不要な所を双方のテーブル更新を明示的に実行します。
例1と同様に0件のテーブルをJOINすることで、上手く行けば実行負荷を下げることができると思います。
(1)判定処理
この例では簡単な TRUNCATE/INSERTにしています。
結果を記録するために追加型INSERTでも、管理テーブルとしてキー付きUPDATEでも構いません。
truncate table check_ok_tbl;
insert into check_tbl -- 正常時 true1レコード、異常時 false 1レコード
with check as(
select
判定条件 check_ok
from xx where yy -- 条件は要件に従って自由に記述できます
)
select check_ok from check
;
(2)Aの更新(正常時)
この例では更新不要な異常時にはAテーブルは空になります。
異常時に更新したくない場合は、例1の方法で対応できます。
truncate table A;
insert into A
-- Aに入れたいデータ
with a_data as (
select xx from yy where zz
)
-- 判定結果をJOINします。1件取得できれば全件が作成され、取得できなければ空テーブルになります。
select a.* from a_data a inner join check_tbl on check_ok=true
;
(3)Bの更新(異常時)
Aテーブルの場合と同じ構造です。
異常時にBのテーブルが更新され、正常時には空テーブルになります。
truncate table B;
insert into B
-- Bに入れたいデータ
with b_data as (
select xx from yy where zz
)
select b.* from b_data b inner join check_tbl on check_ok=false
;
終わりに
最新のマスタと切り戻し用の最終テーブルの二重持ちはディスク容量的に無駄に見えます。
リソース問題であればRedshiftであれば spectrumテーブルを使用することもでき致命的な問題にはならないと思います。
特にDWHではトランザクション容量がマスタ比べて著しく大きいため、マスタ二重持ちがシステム全体のリソースに与える影響度合いは通常のシステムより低くなります。
Comments