數(shù)據(jù)庫無論對于生產(chǎn)管理還是很多的實際應(yīng)用都非常重要。小編這次聊一下數(shù)據(jù)庫事件觸發(fā)的應(yīng)用。示例使用了postgresql和Python。在本文中,事件觸發(fā)和處理大概地分為兩類:
數(shù)據(jù)庫的事件觸發(fā)和服務(wù)器內(nèi)部處理(1~4)
數(shù)據(jù)庫事件觸發(fā)后,客戶端的程序檢測到該事件的觸發(fā)對應(yīng)的處理(5~6)
在數(shù)據(jù)庫系統(tǒng)中,事件觸發(fā)(通常指數(shù)據(jù)庫觸發(fā)器)以及讀取事件觸發(fā)的信息用于多種場景和需求。請看兩組示例(1~4)和(5~6)。
1. 數(shù)據(jù)一致性和完整性維護
當(dāng)對數(shù)據(jù)庫表中的數(shù)據(jù)進行插入、更新或刪除操作時,需要自動驗證或調(diào)整相關(guān)數(shù)據(jù),以確保它們符合業(yè)務(wù)規(guī)則或約束。例如,在一個訂單管理系統(tǒng)中,如果庫存數(shù)量減少到一定閾值以下,可以觸發(fā)一個警告或補貨請求。
Step 1-1: 創(chuàng)建數(shù)據(jù)庫表
假設(shè)我們有一個inventory表,它保存產(chǎn)品庫存的信息:
CREATE TABLE inventory ( product_id SERIAL PRIMARY KEY, product_name TEXT NOT NULL, quantity INT NOT NULL );
Step 1-2: 創(chuàng)建觸發(fā)函數(shù)
創(chuàng)建一個 PL/pgSQL 函數(shù),用于檢查庫存數(shù)量并記錄警告信息:
CREATE OR REPLACE FUNCTION check_inventory_threshold() RETURNS TRIGGER AS $$ BEGIN IF NEW.quantity < 10 THEN -- 假設(shè) 10 是閾值 -- 在此處記錄警告或使用某種方式發(fā)送通知 RAISE WARNING 'Product % is below threshold with quantity %', NEW.product_name, NEW.quantity; -- 可在此插入補貨請求或通知操作 END IF; RETURN NEW; END; $$ LANGUAGE plpgsql;
Step 1-3: 創(chuàng)建觸發(fā)器
設(shè)置一個觸發(fā)器,更新inventory表時調(diào)用觸發(fā)函數(shù):
CREATE TRIGGER inventory_check_trigger AFTER INSERT OR UPDATE ON inventory FOREACHROWEXECUTEPROCEDUREcheck_inventory_threshold();Step 1-4: 使用 Python 進行外部操作
一個Python腳本可以用于監(jiān)控警告并執(zhí)行更復(fù)雜的操作,比如發(fā)送電子郵件或自動創(chuàng)建補貨單。以下是一個簡單的Python示例,假設(shè)你將警告日志記錄到一個專門的日志表中:
import psycopg2
from smtplib import SMTP
def send_notification(product_name, quantity):
# 發(fā)送郵件通知邏輯(確保你已設(shè)置SMTP服務(wù)器配置)
with SMTP('smtp.example.com') as smtp:
smtp.sendmail('from@example.com', 'to@example.com',
f'Subject: Inventory Alert
Product {product_name} is below threshold with quantity {quantity}.')
def check_and_notify():
try:
# Connect to PostgreSQL database
connection = psycopg2.connect(
host="localhost",
database="your_database",
user="your_user",
password="your_password"
)
cursor = connection.cursor()
# Query to check logs for low inventory
query = """
SELECT product_name, quantity FROM inventory WHERE quantity < 10;
"""
cursor.execute(query)
low_stock_items = cursor.fetchall()
for product_name, quantity in low_stock_items:
send_notification(product_name, quantity)
except (Exception, psycopg2.DatabaseError) as error:
print(f"Error: {error}")
finally:
if connection:
cursor.close()
connection.close()
# Run the check and notify function
check_and_notify()
2.自動化任務(wù)
自動執(zhí)行某些日常任務(wù),如記錄變化、生成日志或進行審計。當(dāng)某個表中的數(shù)據(jù)被修改時,觸發(fā)器可以自動記錄修改前后的數(shù)據(jù)以供審計,當(dāng)對特定表進行插入、更新或刪除操作時,觸發(fā)器能夠捕捉這些事件,并執(zhí)行相關(guān)的處理邏輯。 下面是一個如何使用 PostgreSQL 觸發(fā)器來記錄數(shù)據(jù)變化的示例。假設(shè)我們有一個名為employee_data的表,我們希望記錄每次數(shù)據(jù)更新時的操作者信息。
2-1. 創(chuàng)建一個用于日志記錄的表
首先,需要新建一個用于存儲變更日志的表。假設(shè)我們有一個名為employee_data的表,我們希望記錄每次數(shù)據(jù)更新時的操作者信息。
CREATE TABLE change_log (
id SERIAL PRIMARY KEY,
table_name TEXT,
operation VARCHAR(10),
changed_by TEXT,
change_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
old_data JSONB,
new_data JSONB
);
2-2. 創(chuàng)建一個觸發(fā)函數(shù)
接下來,創(chuàng)建一個觸發(fā)函數(shù)。當(dāng)employee_data表發(fā)生變化時,調(diào)用該函數(shù)來記錄變更,檢測并獲取old_data和new_data,然后通過row_to_json函數(shù)將其轉(zhuǎn)換為 JSONB 格式存入日志表中。處理中請留意不同的操作對應(yīng)的日志記錄內(nèi)容的差異。
CREATE OR REPLACE FUNCTION log_employee_data_changes()
RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'DELETE' THEN
INSERT INTO change_log (table_name, operation, changed_by, old_data)
VALUES (
TG_TABLE_NAME,
TG_OP,
SESSION_USER,
row_to_json(OLD)
);
ELSE
INSERT INTO change_log (table_name, operation, changed_by, old_data, new_data)
VALUES (
TG_TABLE_NAME,
TG_OP,
SESSION_USER,
row_to_json(OLD),
row_to_json(NEW)
);
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
TG_OP是 PostgreSQL 觸發(fā)器函數(shù)中的一個特殊變量。在觸發(fā)器函數(shù)中,TG_OP用于表示觸發(fā)事件的操作類型。它會被設(shè)置為以下字符串值之一,以標識觸發(fā)器是由哪種數(shù)據(jù)庫操作激活的:
'INSERT': 觸發(fā)器是由插入操作激活的。
'UPDATE': 觸發(fā)器是由更新操作激活的。
'DELETE': 觸發(fā)器是由刪除操作激活的.
'TRUNCATE': 觸發(fā)器是由截斷操作激活的。
在觸發(fā)器函數(shù)中使用TG_OP,可以根據(jù)不同的操作類型執(zhí)行不同的邏輯。 2-3. 創(chuàng)建觸發(fā)器
最后,為employee_data表創(chuàng)建一個觸發(fā)器,當(dāng)發(fā)生INSERT、UPDATE或DELETE操作時調(diào)用觸發(fā)函數(shù):
CREATE TRIGGER employee_data_changes AFTER INSERT OR UPDATE OR DELETE ON employee_data FOR EACH ROW EXECUTE PROCEDURE log_employee_data_changes();
2-4.如果沒有對應(yīng)的表employee_data,就建一個來測試
CREATE TABLE employee_data (
employee_id SERIAL PRIMARY KEY, -- 員工唯一標識
first_name VARCHAR(50) NOT NULL, -- 員工的名字
last_name VARCHAR(50) NOT NULL, -- 員工的姓氏
email VARCHAR(100) UNIQUE NOT NULL, -- 員工的電子郵件地址
phone_number VARCHAR(15), -- 員工的聯(lián)系電話
hire_date DATE NOT NULL, -- 入職日期
job_title VARCHAR(50), -- 職位名稱
department VARCHAR(50), -- 所屬部門
salary NUMERIC(10, 2), -- 薪水
manager_id INT, -- 上級主管ID,指向另一個員工
CONSTRAINT fk_manager
FOREIGN KEY(manager_id)
REFERENCES employee_data(employee_id)
ON DELETE SET NULL
);
2-5. 如果表中沒有數(shù)據(jù)就添加一條來測試
INSERT INTO employee_data (
first_name,
last_name,
email,
phone_number,
hire_date,
job_title,
department,
salary,
manager_id
) VALUES (
'ZZZ', -- First name
'AAA', -- Last name
'ZZZ.AAA@example.com', -- Email address
'123-456-7890', -- Phone number
'2023-11-01', -- Hire date
'Engineer', -- Job title
'Engineering', -- Department
75000, -- Salary
NULL -- Manager ID (assuming no manager or manager not yet assigned)
);
3. 跨表更新或同步:
當(dāng)一個表發(fā)生變化時時,觸發(fā)器可以用于自動更新或同步其他表的數(shù)據(jù)。例如,在一個多表關(guān)聯(lián)的系統(tǒng)中,有一個訂單表order和一個庫存表inventory,如果訂單表中數(shù)據(jù)有變化,就觸發(fā)更新庫存表中的對應(yīng)產(chǎn)品的數(shù)據(jù)。 3.1建表示例
CREATE TABLE inventory (
product_id SERIAL PRIMARY KEY,
product_name VARCHAR(100),
stock_quantity INT NOT NULL
);
CREATE TABLE orders (
order_id SERIAL PRIMARY KEY,
product_id INT REFERENCES inventory(product_id),
quantity INT NOT NULL
);
3.2 創(chuàng)建觸發(fā)事件
當(dāng)order表中已經(jīng)發(fā)生insert,updat或者delete事件時,就觸發(fā)下面的函數(shù)運行。實際數(shù)據(jù)的加減操作,請根據(jù)實際關(guān)系進行調(diào)整。這里的簡單邏輯是:
有新訂單添加時,就在庫存表中減少產(chǎn)品庫存數(shù)
訂單數(shù)據(jù)有更新時,就把庫存表中減去更新后訂單表中對應(yīng)產(chǎn)品的訂單數(shù)據(jù),然后加上更新前訂單表中對應(yīng)產(chǎn)品的數(shù)據(jù)
當(dāng)訂單取消(刪除)時,就會在庫存數(shù)據(jù)上增加之訂單表中刪除的舊數(shù)據(jù)
CREATE OR REPLACE FUNCTION update_inventory()
RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'INSERT' THEN
UPDATE inventory
SET stock_quantity = stock_quantity - NEW.quantity
WHERE product_id = NEW.product_id;
ELSIF TG_OP = 'UPDATE' THEN
UPDATE inventory
SET stock_quantity = stock_quantity - NEW.quantity + OLD.quantity
WHERE product_id = NEW.product_id;
ELSIF TG_OP = 'DELETE' THEN
UPDATE inventory
SET stock_quantity = stock_quantity + OLD.quantity
WHERE product_id = OLD.product_id;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
3.3 創(chuàng)建事件觸發(fā)器
CREATE TRIGGER trigger_orders_update AFTER INSERT OR UPDATE OR DELETE ON orders FOR EACH ROW EXECUTE PROCEDURE update_inventory();
(防止出現(xiàn)視覺疲勞)
4. 安全性檢查和防護
執(zhí)行安全性檢查,如防止未授權(quán)的數(shù)據(jù)更改或異常數(shù)據(jù)輸入。如果有可疑活動或不當(dāng)數(shù)據(jù)修改,觸發(fā)器可以自動拒絕操作或生成警告。假設(shè)你有一個敏感數(shù)據(jù)的表,如sensitive_data,需要確保只有授權(quán)用戶才能更新數(shù)據(jù)。 4.1建表sensitive_data示例
CREATE TABLE sensitive_data (
id SERIAL PRIMARY KEY,
data TEXT NOT NULL,
last_modified TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
4.2 創(chuàng)建觸發(fā)函數(shù)進行安全檢查
創(chuàng)建一個觸發(fā)函數(shù)來檢查是否是授權(quán)用戶在做修改。
CREATE OR REPLACE FUNCTION check_user_authorization()
RETURNS TRIGGER AS $$
BEGIN
-- 簡單檢查:用戶是否在允許的列表中(實際應(yīng)該更加復(fù)雜)
IF SESSION_USER <> 'authorized_user' THEN
RAISE EXCEPTION 'Unauthorized user. Access denied.';
END IF;
-- 更新 last_modified 時間戳
NEW.last_modified := CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
4.3為表創(chuàng)建觸發(fā)器
CREATE TRIGGER secure_update_trigger BEFORE UPDATE ON sensitive_data FOR EACH ROW EXECUTE PROCEDURE check_user_authorization();
該事件觸發(fā)器的功能說明
功能:這個示例功能是,當(dāng)有人試圖更新sensitive_data表中的數(shù)據(jù)時,觸發(fā)器函數(shù)check_user_authorization()會自動檢查發(fā)起更新的數(shù)據(jù)庫用戶是否有權(quán)限。如果沒有權(quán)限,拋出異常并阻止操作。
擴展:在實際的生產(chǎn)環(huán)境中,這種安全性檢查會更復(fù)雜,可能包括日志記錄、詳細的用戶權(quán)限檢查、使用角色來管理權(quán)限等。
安全性:使用觸發(fā)器確保只有合適和經(jīng)過驗證的用戶可以進行關(guān)鍵數(shù)據(jù)修改,這是保護數(shù)據(jù)完整性的一部分。
審計:這種自動檢查可集成到更大的審計框架中,以全面監(jiān)控和存儲所有數(shù)據(jù)修改嘗試記錄。
5.事件通知(客戶端程序配合事件觸發(fā)的同步處理方式)
使用事件觸發(fā)器和事件通知來實現(xiàn)對特定數(shù)據(jù)庫事件的響應(yīng)和處理。使用LISTEN和NOTIFY機制,數(shù)據(jù)庫客戶端可以監(jiān)聽特定的通道,并在觸發(fā)器函數(shù)中發(fā)送通知。這在需要實時監(jiān)控數(shù)據(jù)庫事件時非常有用。下面是一個使用 PostgreSQL 實現(xiàn)事件通知的示例。
假設(shè)我們希望在orders表中插入新訂單時發(fā)送通知,以便外部系統(tǒng)或服務(wù)進行相應(yīng)處理。
5.1建一個orders表方便示例
CREATE TABLE orders (
order_id SERIAL PRIMARY KEY,
product_id INT NOT NULL,
quantity INT NOT NULL,
order_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
觸發(fā)器可以用于事件通知,例如在數(shù)據(jù)變化時發(fā)送電子郵件通知相關(guān)人員。這在實時監(jiān)控和響應(yīng)系統(tǒng)中非常有用。
5.2 建立觸發(fā)函數(shù)
CREATE OR REPLACE FUNCTION notify_new_order()
RETURNS TRIGGER AS $$
BEGIN
-- 使用 NOTIFY 發(fā)送通知,通道名為 'new_order'
PERFORM pg_notify('new_order', 'New order placed: ' || NEW.order_id);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
5.3創(chuàng)建觸發(fā)器 為orders表創(chuàng)建觸發(fā)器,以在插入新記錄時調(diào)用觸發(fā)函數(shù)。
CREATE TRIGGER notify_order_insert AFTER INSERT ON orders FOR EACH ROW EXECUTE PROCEDURE notify_new_order();5.4使用 Python 監(jiān)聽通知 我們可以使用 Python 腳本來監(jiān)聽并處理通知。以下是一個簡單的示例,使用psycopg2庫監(jiān)聽new_order通道。
import psycopg2
import select
def listen_for_new_orders():
try:
# Connect to your PostgreSQL database
connection = psycopg2.connect(
dbname="your_db",
user="your_user",
password="your_password",
host="localhost"
)
connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cursor = connection.cursor()
# Listen for notifications on the 'new_order' channel
cursor.execute("LISTEN new_order;")
print("Waiting for notifications on channel 'new_order'...")
while True:
# Use select() to wait for notification
if select.select([connection], [], [], 5) == ([], [], []):
print("No new notifications.")
else:
connection.poll()
while connection.notifies:
notify = connection.notifies.pop(0)
print(f"Got NOTIFY: {notify.channel} - {notify.payload}")
except (Exception, psycopg2.DatabaseError) as error:
print(f"Error: {error}")
finally:
if connection:
cursor.close()
connection.close()
# Call the function to start listening for notifications
if__name__=='__main__':
listen_for_new_orders()
6. 事件通知(客戶端程序異步多線程的方式進行檢測和操作)
示例的數(shù)據(jù)庫表和事件觸發(fā)的設(shè)置或創(chuàng)建,和示例5中相同,不過這里我們要增加一些復(fù)雜度,畢竟,程序處理要盡可能避免堵塞的方式進行等待讀取。這里設(shè)想另外一種使用場景:
一方面客戶端要檢測數(shù)據(jù)庫的orders表中的數(shù)據(jù)變化;另一方面,客戶端還在繼續(xù)讀?。ɑ蛘咂渌僮鳎┻@個數(shù)據(jù)庫中的數(shù)據(jù)。
import threading import psycopg2 import select import time # Global flag to indicate whether the threads should continue running running = True def listen_for_new_orders(): try: # Connect to your PostgreSQL database connection = psycopg2.connect( dbname="your_db", user="your_user", password="your_password", host="localhost" ) connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) cursor = connection.cursor() # Listen for notifications on the 'new_order' channel cursor.execute("LISTEN new_order;") print("Waiting for notifications on channel 'new_order'...") while running: # Use select() to wait for notification if select.select([connection], [], [], 5) == ([], [], []): continue else: connection.poll() while connection.notifies: notify = connection.notifies.pop(0) print(f"Got NOTIFY: {notify.channel} - {notify.payload}") except (Exception, psycopg2.DatabaseError) as error: print(f"Error: {error}") finally: if connection: cursor.close() connection.close() def read_database_records(): while running: try: # Example of reading from PostgreSQL connection = psycopg2.connect( dbname="your_db", user="your_user", password="your_password", host="localhost" ) cursor = connection.cursor() # Example query to periodically read data (replace with actual query) cursor.execute("SELECT * FROM orders;") records = cursor.fetchall() for record in records: print(f"Order Record: {record}") time.sleep(10) # Wait before reading again to simulate periodic check except (Exception, psycopg2.DatabaseError) as error: print(f"Error: {error}") finally: if connection: cursor.close() connection.close() def main(): try: # Create threads for listening and reading listener_thread = threading.Thread(target=listen_for_new_orders) reader_thread = threading.Thread(target=read_database_records) # Start the threads listener_thread.start() reader_thread.start() # Wait for both threads to complete (or terminate on Ctrl+C) listener_thread.join() reader_thread.join() except KeyboardInterrupt: # Set the running flag to False to stop the threads global running running = False print("Exiting...") if __name__ == "__main__": main()請留意上面的示例python代碼中,數(shù)據(jù)庫的連接使用了ISOLATION_LEVEL_AUTOCOMMIT,這就意味著每次涉及到數(shù)據(jù)更改或者增加的操作,數(shù)據(jù)庫將自動提交了。如果要手動方式提交,那就需要配置一個ISOLATION_LEVEL_READ_COMMITTED。 另外需要留意,前面的事件觸發(fā)示例中,用了:
... FOR EACH ROW EXECUTE PROCEDURE your_trigger_func(); ...這個代碼的執(zhí)行是針對每條記錄的發(fā)生來觸發(fā)了。請根據(jù)實際應(yīng)用的操作需要進行調(diào)整。
-
數(shù)據(jù)庫
+關(guān)注
關(guān)注
7文章
3986瀏覽量
67592 -
觸發(fā)器
+關(guān)注
關(guān)注
14文章
2048瀏覽量
62922 -
函數(shù)
+關(guān)注
關(guān)注
3文章
4403瀏覽量
66594 -
python
+關(guān)注
關(guān)注
56文章
4849瀏覽量
89292
原文標題:數(shù)據(jù)庫事件觸發(fā)的設(shè)置和應(yīng)用,及客戶端程序?qū)κ录耐?、異步讀取操作
文章出處:【微信號:安費諾傳感器學(xué)堂,微信公眾號:安費諾傳感器學(xué)堂】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
Labview如何設(shè)置數(shù)據(jù)庫訪問權(quán)限?
數(shù)據(jù)庫觸發(fā)器機制的設(shè)計與實現(xiàn)
什么是支持數(shù)據(jù)庫,什么是中宏數(shù)據(jù)庫
數(shù)據(jù)庫索引技術(shù)應(yīng)用
Delphi教程之動態(tài)設(shè)置遠程數(shù)據(jù)庫的查詢參數(shù)
SQL Server數(shù)據(jù)庫學(xué)習(xí)總結(jié)
數(shù)據(jù)庫教程之如何進行數(shù)據(jù)庫設(shè)計
數(shù)據(jù)庫學(xué)習(xí)教程之數(shù)據(jù)庫的發(fā)展狀況如何數(shù)據(jù)庫有什么新發(fā)展
數(shù)據(jù)庫教程之PHP訪問MySQL數(shù)據(jù)庫的理論知識詳細說明
云數(shù)據(jù)庫和自建數(shù)據(jù)庫的區(qū)別及應(yīng)用
ACS數(shù)據(jù)庫與RSC數(shù)據(jù)庫比較研究
數(shù)據(jù)庫建立|數(shù)據(jù)庫創(chuàng)建的方法?
python讀取數(shù)據(jù)庫數(shù)據(jù) python查詢數(shù)據(jù)庫 python數(shù)據(jù)庫連接
數(shù)據(jù)庫數(shù)據(jù)恢復(fù)—通過拼接數(shù)據(jù)庫碎片恢復(fù)SQLserver數(shù)據(jù)庫

數(shù)據(jù)庫事件觸發(fā)的設(shè)置和應(yīng)用
評論