PythonでMySQLに接続するクラスを共通クラスとして部品化

phtyonロゴ

何かしらのシステムを構築しようとした場合、DBにデータを登録する、更新する、取得する、といった機能は必須になります。

Pythonも例外ではないのですが、いろいろなサイトでDBアクセスの一般的な方法は紹介しているのですが、実システムに沿ったような部品化についてはきちんとまとまっているサイトをみつけることができませんでした。


なければ作ってしまえということで、今回は、Pythonを使ってDBアクセス、および、DBアクセスクラスの部品化について紹介します。


環境情報


  • Python 3.4.1
  • MySQL 5.6.23

MySQLのデータベースには、以下のテーブルが存在することを前提とします。


CREATE TABLE test (
 test_id INT NOT NULL AUTO_INCREMENT,
 value INT NOT NULL,
 register_name VARCHAR(40) NOT NULL,
 register_date DATETIME NOT NULL,
 del_flg CHAR(1) NOT NULL,
 PRIMARY KEY (test_id)
) ENGINE = InnoDB DEFAULT CHARACTER SET utf8;

メインクラスとDBアクセスクラス


今回のクラス構成としては、メインクラスとDBアクセスクラスの2つとなります。
DBアクセスクラスは、DBアクセスに必要な最低限の機能を保持する事します。
メインクラスは、DBアクセスが必要な際にDBアクセスクラスを使います。


メインクラス

#!/usr/local/bin/python3.4
# coding: utf-8
from dbAccessor import dbAccessor

#
# メインクラス
#
class Main:

    # -----------------------------------
    # メインクラス
    #
    # DBアクセスクラスを呼び出すメイン処理
    # -----------------------------------
    def excecuteMain():
        obj = dbAccessor(
            '※DB名', '※ホスト名', '※ユーザ名', '※パスワード');

        # SELECT実行
        rows = obj.excecuteQuery('select test_id, value from test;')
        for row in rows:
            print("%d %s" % (row[0], row[1]))

        # INSERT実行
        num = obj.excecuteInsert("INSERT INTO test (
            value, register_name, register_date, del_flg) 
            VALUES (100,'管理者', '2020-12-20 00:00:00','管理者', 
            '0')")

        # UPDATE実行
        num = obj.excecuteUpdate(
            "UPDATE test SET value = 100 where test_id = 10")

        # DELETE実行
        num = obj.excecuteDelete(
            "DELETE FROM test WHERE test_id = 10")

# メイン処理を実行
Main.excecuteMain()

DBアクセスクラス

#!/usr/local/bin/python3.4
# coding: utf-8
from urllib.parse import urlparse
import mysql.connector

#
# DBアクセス管理クラス
#
class dbAccessor:

    # -----------------------------------
    # コンストラクタ
    #
    # コネクションを取得し、クラス変数にカーソルを保持する。
    # -----------------------------------
    def __init__(self, dbName, hostName, id, password):
        print("start:__init__")

        # DB接続URLを作成してパース
        url = 'mysql://' + id + ':' + password + '@' + 
            hostName + '/' + dbName
        parse = urlparse(url)

        try:
            # DBに接続する
            self.conn = mysql.connector.connect(
                host = parse.hostname,
                port = parse.port,
                user = parse.username,
                password = parse.password,
                database = parse.path[1:],
            )

            # コネクションの設定
            self.conn.autocommit = False

            # カーソル情報をクラス変数に格納
            self.conn.is_connected()
            self.cur = self.conn.cursor()
        except (mysql.connector.errors.ProgrammingError) as e:
            print(e)

        print("end:__init__")

    # -----------------------------------
    # クエリの実行
    #
    # クエリを実行し、取得結果を呼び出し元に通知する。
    # -----------------------------------
    def excecuteQuery(self, sql):
        print("start:excecuteQuery")

        try:
            self.cur.execute(sql)
            rows = self.cur.fetchall()
            return rows
        except (mysql.connector.errors.ProgrammingError) as e:
            print(e)

        print("end:excecuteQuery")

    # -----------------------------------
    # インサートの実行
    #
    # インサートを実行する。
    # -----------------------------------
    def excecuteInsert(self, sql):
        print("start:excecuteInsert")

        try:
            self.cur.execute(sql)
            self.conn.commit()
            return self.cur.rowcount
        except (mysql.connector.errors.ProgrammingError) as e:
            self.conn.rollback()
            print(e)

        print("end:excecuteInsert")

    # -----------------------------------
    # アップデートの実行
    #
    # アップデートを実行する。
    # -----------------------------------
    def excecuteUpdate(self, sql):
        print("start:excecuteUpdate")

        try:
            self.cur.execute(sql)
            self.conn.commit()
            return self.cur.rowcount
        except (mysql.connector.errors.ProgrammingError) as e:
            self.conn.rollback()
            print(e)

        print("end:excecuteUpdate")

    # -----------------------------------
    # デリートの実行
    #
    # デリートを実行する。
    # -----------------------------------
    def excecuteDelete(self, sql):
        print("start:excecuteDelete")

        try:
            self.cur.execute(sql)
            self.conn.commit()
            return self.cur.rowcount
        except (mysql.connector.errors.ProgrammingError) as e:
            self.conn.rollback()
            print(e)

        print("end:excecuteDelete")

    # -----------------------------------
    # デストラクタ
    #
    # コネクションを解放する。
    # -----------------------------------
    def __del__(self):
        print("start:__del__")
        try:
            self.conn.close()
        except (mysql.connector.errors.ProgrammingError) as e:
            print(e)
        print("end:__del__")

DBアクセスクラスの各メソッド


上記に紹介したサンプルプログラムについて説明していきます。


DBコネクションの取得と解放


DBコネクションの取得はコンストラクタで、解放はデストラクタで実施しています。


    def __init__(self, dbName, hostName, id, password):
        print("start:__init__")

        # DB接続URLを作成してパース
        url = 'mysql://' + id + ':' + password + '@' + 
            hostName + '/' + dbName
        parse = urlparse(url)

        try:
            # DBに接続する
            self.conn = mysql.connector.connect(
                host = parse.hostname,
                port = parse.port,
                user = parse.username,
                password = parse.password,
                database = parse.path[1:],
            )

            # コネクションの設定
            self.conn.autocommit = False

            # カーソル情報をクラス変数に格納
            self.conn.is_connected()
            self.cur = self.conn.cursor()
        except (mysql.connector.errors.ProgrammingError) as e:
            print(e)

        print("end:__init__")

    def __del__(self):
        print("start:__del__")
        try:
            self.conn.close()
        except (mysql.connector.errors.ProgrammingError) as e:
            print(e)
        print("end:__del__")

コンストラクタで通知されたパラメータを使って、MySQLに接続するURLを作成しています。
実際に作成されるURLは以下になります。


url = urlparse('mysql://※ユーザ名:※パスワード@※ホスト名/※DB名')

また、オートコミットをオフ(False)に設定しています。
オートコミットをオフにすることにより、トランザクション管理が容易になります。
これも、RDB(リレーションナルデータベース)を使ったシステムでは必須の機能となります。


こういって取得したDBコネクションとカーソル定義を、DBアクセスクラスのクラス変数として保持しています。
クラス変数として保持することにより、オブジェクト内でコネクションの使いまわしが可能となっています。


最後にデストラクタです。
デストラクタでコネクションを解放することにより、オブジェクト解放時に同時にコネクションも解放されます。


SELECT


SELECTを実行するメソッドは、実行するSQLを呼び出し側(メインクラス)から文字列として通知される形としています。
SQL文を通知して、SELECTした結果をそのまま返却しています。
非常にシンプルです。


    def excecuteQuery(self, sql):
        print("start:excecuteQuery")

        try:
            self.cur.execute(sql)
            rows = self.cur.fetchall()
            return rows
        except (mysql.connector.errors.ProgrammingError) as e:
            print(e)

        print("end:excecuteQuery")

INSERT


INSERT文もSELECTと同様で、呼び出し側から通知されたSQLを発行しているのみです。


SELECTと違うのは返り値となり、「rowcount」を返却しているのがポイントです。
「rowcount」は、INSERTした数になります。


    def excecuteInsert(self, sql):
        print("start:excecuteInsert")

        try:
            self.cur.execute(sql)
            self.conn.commit()
            return self.cur.rowcount
        except (mysql.connector.errors.ProgrammingError) as e:
            self.conn.rollback()
            print(e)

        print("end:excecuteInsert")

UPDATEとDELETEのメソッドも、UPDATEと同じメソッドになります。
UPDATEのメソッドは更新した数、DELETEのメソッドは削除した数、を返却します。