PostgreSQLでdblinkを使用して、異なるデータベースのテーブルを結合する

JavaScript

PostgreSQLで、異なるデータベースのテーブルを結合したい場合、dblinkを使用すると便利です。
dblinkを使用するための準備と、使用方法について説明します。


環境

  • OS:Ubuntu Server 22.04.2 LTS
  • PostgreSQL15.5

dblinkの準備

dblinkを使用するためには、PostgreSQLのエクステンションをインストールしておく必要があります。

postgres=# create extension dblink;
postgres=# \dx
                                    インストール済みの拡張一覧
  名前   | バージョン |  スキーマ  |                             説明
---------+------------+------------+--------------------------------------------------------------
 dblink  | 1.2        | public     | connect to other PostgreSQL databases from within a database
 plpgsql | 1.0        | pg_catalog | PL/pgSQL procedural language
(2 行)

dblinkを使用

dblinkを使用するために参照するためのテーブルは、あらかじめviewとして準備しておくと便利です。

// 「DBLINK-SAMPLE」という名称でビューを作成しておく
create or replace view link_view_sample as
select
    col1
    , col2 
    , col3
from
    dblink(
        'DBLINK-SAMPLE'
        ,'select
            col1
            , col2
            , col3
        from
            other_server_table otTble
            left outer join my_server_table myTble on otTble.col1 = myTble.kojin_col1
        ) AS t(
            col1 int
            , col2 int
            , col3 int
    );

作成したビューを使用するためには、リンク先サーバと接続→SQLを実行→リンク先サーバとの接続解除、の流れになります。


// リンク先サーバと接続
select
    dblink_connect( 
        'DBLINK-SAMPLE'
        , 'host=X.X.X.X dbname=database user=saku password=saku00'
    );

// ビューを参照
select
    col1
    , col2
    , col3 
from
    link_view_sample;

// リンク先サーバとの接続解除
select dblink_disconnect('DBLINK-SAMPLE');

 


PythonでMySQLに接続するクラスを共通クラスとして部品化

phtyonロゴ

何かしらのシステムを構築しようとした場合、DBにデータを登録する、更新する、取得する、といった機能は必須になります。

Pythonも例外ではないのですが、いろいろなサイトでDBアクセスの一般的な方法は紹介しているのですが、実システムに沿ったような部品化についてはきちんとまとまっているサイトをみつけることができませんでした。


なければ作ってしまえということで、今回は、Pythonを使ってDBアクセス、および、DBアクセスクラスの部品化について紹介します。


環境情報


  • Python 3.4.1
  • MySQL 5.6.23

MySQLのデータベースには、以下のテーブルが存在することを前提とします。


CREATE TABLE test (
 test_id INT NOT NULL AUTO_INCREMENT,
 value INT NOT NULL,
 register_name VARCHAR(40) NOT NULL,
 register_date DATETIME NOT NULL,
 del_flg CHAR(1) NOT NULL,
 PRIMARY KEY (test_id)
) ENGINE = InnoDB DEFAULT CHARACTER SET utf8;

メインクラスとDBアクセスクラス


今回のクラス構成としては、メインクラスとDBアクセスクラスの2つとなります。
DBアクセスクラスは、DBアクセスに必要な最低限の機能を保持する事します。
メインクラスは、DBアクセスが必要な際にDBアクセスクラスを使います。


メインクラス

#!/usr/local/bin/python3.4
# coding: utf-8
from dbAccessor import dbAccessor

#
# メインクラス
#
class Main:

    # -----------------------------------
    # メインクラス
    #
    # DBアクセスクラスを呼び出すメイン処理
    # -----------------------------------
    def excecuteMain():
        obj = dbAccessor(
            '※DB名', '※ホスト名', '※ユーザ名', '※パスワード');

        # SELECT実行
        rows = obj.excecuteQuery('select test_id, value from test;')
        for row in rows:
            print("%d %s" % (row[0], row[1]))

        # INSERT実行
        num = obj.excecuteInsert("INSERT INTO test (
            value, register_name, register_date, del_flg) 
            VALUES (100,'管理者', '2020-12-20 00:00:00','管理者', 
            '0')")

        # UPDATE実行
        num = obj.excecuteUpdate(
            "UPDATE test SET value = 100 where test_id = 10")

        # DELETE実行
        num = obj.excecuteDelete(
            "DELETE FROM test WHERE test_id = 10")

# メイン処理を実行
Main.excecuteMain()

DBアクセスクラス

#!/usr/local/bin/python3.4
# coding: utf-8
from urllib.parse import urlparse
import mysql.connector

#
# DBアクセス管理クラス
#
class dbAccessor:

    # -----------------------------------
    # コンストラクタ
    #
    # コネクションを取得し、クラス変数にカーソルを保持する。
    # -----------------------------------
    def __init__(self, dbName, hostName, id, password):
        print("start:__init__")

        # DB接続URLを作成してパース
        url = 'mysql://' + id + ':' + password + '@' + 
            hostName + '/' + dbName
        parse = urlparse(url)

        try:
            # DBに接続する
            self.conn = mysql.connector.connect(
                host = parse.hostname,
                port = parse.port,
                user = parse.username,
                password = parse.password,
                database = parse.path[1:],
            )

            # コネクションの設定
            self.conn.autocommit = False

            # カーソル情報をクラス変数に格納
            self.conn.is_connected()
            self.cur = self.conn.cursor()
        except (mysql.connector.errors.ProgrammingError) as e:
            print(e)

        print("end:__init__")

    # -----------------------------------
    # クエリの実行
    #
    # クエリを実行し、取得結果を呼び出し元に通知する。
    # -----------------------------------
    def excecuteQuery(self, sql):
        print("start:excecuteQuery")

        try:
            self.cur.execute(sql)
            rows = self.cur.fetchall()
            return rows
        except (mysql.connector.errors.ProgrammingError) as e:
            print(e)

        print("end:excecuteQuery")

    # -----------------------------------
    # インサートの実行
    #
    # インサートを実行する。
    # -----------------------------------
    def excecuteInsert(self, sql):
        print("start:excecuteInsert")

        try:
            self.cur.execute(sql)
            self.conn.commit()
            return self.cur.rowcount
        except (mysql.connector.errors.ProgrammingError) as e:
            self.conn.rollback()
            print(e)

        print("end:excecuteInsert")

    # -----------------------------------
    # アップデートの実行
    #
    # アップデートを実行する。
    # -----------------------------------
    def excecuteUpdate(self, sql):
        print("start:excecuteUpdate")

        try:
            self.cur.execute(sql)
            self.conn.commit()
            return self.cur.rowcount
        except (mysql.connector.errors.ProgrammingError) as e:
            self.conn.rollback()
            print(e)

        print("end:excecuteUpdate")

    # -----------------------------------
    # デリートの実行
    #
    # デリートを実行する。
    # -----------------------------------
    def excecuteDelete(self, sql):
        print("start:excecuteDelete")

        try:
            self.cur.execute(sql)
            self.conn.commit()
            return self.cur.rowcount
        except (mysql.connector.errors.ProgrammingError) as e:
            self.conn.rollback()
            print(e)

        print("end:excecuteDelete")

    # -----------------------------------
    # デストラクタ
    #
    # コネクションを解放する。
    # -----------------------------------
    def __del__(self):
        print("start:__del__")
        try:
            self.conn.close()
        except (mysql.connector.errors.ProgrammingError) as e:
            print(e)
        print("end:__del__")

DBアクセスクラスの各メソッド


上記に紹介したサンプルプログラムについて説明していきます。


DBコネクションの取得と解放


DBコネクションの取得はコンストラクタで、解放はデストラクタで実施しています。


    def __init__(self, dbName, hostName, id, password):
        print("start:__init__")

        # DB接続URLを作成してパース
        url = 'mysql://' + id + ':' + password + '@' + 
            hostName + '/' + dbName
        parse = urlparse(url)

        try:
            # DBに接続する
            self.conn = mysql.connector.connect(
                host = parse.hostname,
                port = parse.port,
                user = parse.username,
                password = parse.password,
                database = parse.path[1:],
            )

            # コネクションの設定
            self.conn.autocommit = False

            # カーソル情報をクラス変数に格納
            self.conn.is_connected()
            self.cur = self.conn.cursor()
        except (mysql.connector.errors.ProgrammingError) as e:
            print(e)

        print("end:__init__")

    def __del__(self):
        print("start:__del__")
        try:
            self.conn.close()
        except (mysql.connector.errors.ProgrammingError) as e:
            print(e)
        print("end:__del__")

コンストラクタで通知されたパラメータを使って、MySQLに接続するURLを作成しています。
実際に作成されるURLは以下になります。


url = urlparse('mysql://※ユーザ名:※パスワード@※ホスト名/※DB名')

また、オートコミットをオフ(False)に設定しています。
オートコミットをオフにすることにより、トランザクション管理が容易になります。
これも、RDB(リレーションナルデータベース)を使ったシステムでは必須の機能となります。


こういって取得したDBコネクションとカーソル定義を、DBアクセスクラスのクラス変数として保持しています。
クラス変数として保持することにより、オブジェクト内でコネクションの使いまわしが可能となっています。


最後にデストラクタです。
デストラクタでコネクションを解放することにより、オブジェクト解放時に同時にコネクションも解放されます。


SELECT


SELECTを実行するメソッドは、実行するSQLを呼び出し側(メインクラス)から文字列として通知される形としています。
SQL文を通知して、SELECTした結果をそのまま返却しています。
非常にシンプルです。


    def excecuteQuery(self, sql):
        print("start:excecuteQuery")

        try:
            self.cur.execute(sql)
            rows = self.cur.fetchall()
            return rows
        except (mysql.connector.errors.ProgrammingError) as e:
            print(e)

        print("end:excecuteQuery")

INSERT


INSERT文もSELECTと同様で、呼び出し側から通知されたSQLを発行しているのみです。


SELECTと違うのは返り値となり、「rowcount」を返却しているのがポイントです。
「rowcount」は、INSERTした数になります。


    def excecuteInsert(self, sql):
        print("start:excecuteInsert")

        try:
            self.cur.execute(sql)
            self.conn.commit()
            return self.cur.rowcount
        except (mysql.connector.errors.ProgrammingError) as e:
            self.conn.rollback()
            print(e)

        print("end:excecuteInsert")

UPDATEとDELETEのメソッドも、UPDATEと同じメソッドになります。
UPDATEのメソッドは更新した数、DELETEのメソッドは削除した数、を返却します。