На одном из недавно "вверенных" проектов жил поживал PostgreSQL с настроенным разбиением данных по таблицам-разделам в зависимости от значений (т.н. Partitioning). Особенности работы с БД были таковы, что раз в сутки в таблицы "вливалось" достаточно большое кол-во данных (десятки миллионов строк) и до следующего "вливания" база работала только на отдачу данных.
Проблема, а точнее даже не проблема, а пожелание заключалось в следующем: ПО, которое занималось подготовкой, фильтрацией и, собственно заливкой данных, занималось еще и тем, что вычисляло тот самый, нужный раздел (child table), в который данные нужно сложить. Это приводило к:
- невозможности менять условия разбиения данных достаточно гибко (изменения должны быть синхронизированы на стороне БД и кода);
- иногда в разделы попадали некорректные данные, так как заливка данных осуществлялась пакетно, с помощью операции COPY и нужный раздел определялся для всего пакета один;
- необходимо было заранее создавать таблицы разделов.
Решение, приведенное ниже - не панацея, но кому-то может пригодиться:
Первым делом был создан триггер, который вешался на мастер таблицу:
CREATE TRIGGER redirect_to_child
BEFORE INSERT ON parent
FOR EACH ROW
EXECUTE PROCEDURE redirect_to_child();
Затем, экспериментируя с процедурой
redirect_to_child() выяснилось, что на стандартном
PlPgsql написать достаточно гибкую функцию, которая бы перенаправляла данные в нужный раздел не получается, так как на этапе запуска функции не известно название таблицы, в которую нужно вставлять данные. Таким образом приходиться использовать
EXECUTE для формирования динамического SQL, но это не решает всех проблем, так как теперь нельзя использовать
NEW.*, потому что полученный SQL выполняется вне контекста триггерной функции и соответственно не видит структуры NEW.
В PostgreSQL 8.4 появилось ключевое слово
USING, которое позволяет обойти проблему с использованием NEW.*, но скорость работы данного триггера оставляла желать лучшего.
В результате всех экспериментов родился небольшой велосипедик на
PlPython, который сам создаёт нужные таблички и перенаправляет вставку данных. Нижеприведенный кусочек кода, решает все задачи, описанные в начале статьи, а по времени работает всего около
5-ти раз медленнее "чистой" вставки, что для моих условий было более чем приемлемо. Собственно код:
CREATE OR REPLACE FUNCTION redirect_to_child()
RETURNS trigger AS
$BODY$
import datetime
# TD['new'] -> inserted data
new_data = TD['new']
values = new_data.values()
date_occured = new_data['occured'].split()[0]
s_year, s_month = date_occured.split('-')[:2]
# TD['table_name'] -> table name, that triggered this function.
parent_table_name = TD['table_name']
# Child table name pattern is <parent_table>_<year>_<month>.
child_table_name = '_'.join([parent_table_name, s_year, s_month])
try:
insert_plan = SD[child_table_name]
except KeyError:
# Check whether child table exists.
child_table_result = plpy.execute(("SELECT * FROM information_schema.tables"
" WHERE table_catalog = CURRENT_CATALOG"
" AND table_schema = CURRENT_SCHEMA"
" AND table_name = '%s'") % (child_table_name, ))
if child_table_result.nrows() == 0 :
# Need to create child table
year = int(s_year)
month = int(s_month)
min_range_date = datetime.date(year, month, 1)
max_range_date = datetime.date(year + ((month + 1) / 12), (month + 1) % 12, 1)
create_sql = ("CREATE TABLE %(child)s ("
" id BIGINT DEFAULT nextval('%(parent)s_id_seq'::text) PRIMARY KEY,"
" CHECK ( occured >= DATE '%(min)s' AND occured < DATE '%(max)s' )"
") INHERITS (%(parent)s);") \
% {'parent': parent_table_name,
'child':child_table_name,
'min':min_range_date.isoformat(),
'max':max_range_date.isoformat() }
plpy.execute( create_sql )
plpy.execute( "ALTER TABLE %s OWNER TO tableowner;" % child_table_name )
# Prepare insertion plan.
questions_marks = ','.join(['$%d' % x for x in xrange(1, len(new_data) + 1)])
col_types = plpy.execute( ("SELECT column_name, data_type"
" FROM information_schema.columns"
" WHERE table_catalog = CURRENT_CATALOG"
" AND table_schema = CURRENT_SCHEMA"
" AND table_name = '%s'"
" AND column_name <> 'id';")
% parent_table_name );
type_hash = dict((column["column_name"], column["data_type"])
for column in col_types)
column_names = new_data.keys();
types = [type_hash[k] for k in column_names]
insert_sql = ("INSERT INTO %s (%s) VALUES (%s);" %
(child_table_name,
', '.join(column_names),
questions_marks))
insert_plan = plpy.prepare(insert_sql, types)
# Cache insertion plan for reuse in the same transaction.
SD[child_table_name] = insert_plan
# Insert data to child table
plpy.execute( insert_plan, values )
# Skip insertion to parent table
return "SKIP";
$BODY$
LANGUAGE 'plpythonu';
ALTER FUNCTION redirect_to_child() OWNER TO tableowner;
archive