Wednesday, 6 April 2011

PostgreSQL. Redirect data to child partition.

На одном из недавно "вверенных" проектов жил поживал PostgreSQL с настроенным разбиением данных по таблицам-разделам в зависимости от значений (т.н. Partitioning). Особенности работы с БД были таковы, что раз в сутки в таблицы "вливалось" достаточно большое кол-во данных (десятки миллионов строк) и до следующего "вливания" база работала только на отдачу данных.
Проблема, а точнее даже не проблема, а пожелание заключалось в следующем: ПО, которое занималось подготовкой, фильтрацией и, собственно заливкой данных, занималось еще и тем, что вычисляло тот самый, нужный раздел (child table), в который данные нужно сложить. Это приводило к:
  • невозможности менять условия разбиения данных достаточно гибко (изменения должны быть синхронизированы на стороне БД и кода);
  • иногда в разделы попадали некорректные данные, так как заливка данных осуществлялась пакетно, с помощью операции COPY и нужный раздел определялся для всего пакета один;
  • необходимо было заранее создавать таблицы разделов.
Решение, приведенное ниже - не панацея, но кому-то может пригодиться:
Первым делом был создан триггер, который вешался на мастер таблицу:

CREATE TRIGGER redirect_to_child
  BEFORE INSERT ON parent
  FOR EACH ROW
  EXECUTE PROCEDURE redirect_to_child();

Затем, экспериментируя с процедурой redirect_to_child() выяснилось, что на стандартном PlPgsql написать достаточно гибкую функцию, которая бы перенаправляла данные в нужный раздел не получается, так как на этапе запуска функции не известно название таблицы, в которую нужно вставлять данные. Таким образом приходиться использовать EXECUTE для формирования динамического SQL, но это не решает всех проблем, так как теперь нельзя использовать NEW.*, потому что полученный SQL выполняется вне контекста триггерной функции и соответственно не видит структуры NEW.
В PostgreSQL 8.4 появилось ключевое слово USING, которое позволяет обойти проблему с использованием NEW.*, но скорость работы данного триггера оставляла желать лучшего.

В результате всех экспериментов родился небольшой велосипедик на PlPython, который сам создаёт нужные таблички и перенаправляет вставку данных. Нижеприведенный кусочек кода, решает все задачи, описанные в начале статьи, а по времени работает всего около 5-ти раз медленнее "чистой" вставки, что для моих условий было более чем приемлемо. Собственно код:

CREATE OR REPLACE FUNCTION redirect_to_child()
RETURNS trigger AS
$BODY$
  import datetime

  # TD['new'] -> inserted data
  new_data = TD['new']
  values = new_data.values()

  date_occured = new_data['occured'].split()[0]
  s_year, s_month = date_occured.split('-')[:2]

  # TD['table_name'] -> table name, that triggered this function.
  parent_table_name = TD['table_name']
  # Child table name pattern is <parent_table>_<year>_<month>.
  child_table_name = '_'.join([parent_table_name, s_year, s_month])

  try:
    insert_plan = SD[child_table_name]
  except KeyError:
    # Check whether child table exists.
    child_table_result = plpy.execute(("SELECT * FROM information_schema.tables"
                      " WHERE table_catalog = CURRENT_CATALOG"
                      " AND table_schema = CURRENT_SCHEMA"
                      " AND table_name = '%s'") % (child_table_name, ))

    if child_table_result.nrows() == 0 :
      # Need to create child table
      year = int(s_year)
      month = int(s_month)

      min_range_date = datetime.date(year, month, 1)
      max_range_date = datetime.date(year + ((month + 1) / 12), (month + 1) % 12, 1)

      create_sql = ("CREATE TABLE %(child)s ("
              " id BIGINT DEFAULT nextval('%(parent)s_id_seq'::text) PRIMARY KEY,"
              " CHECK ( occured >= DATE '%(min)s' AND occured < DATE '%(max)s' )"
              ") INHERITS (%(parent)s);") \
              % {'parent': parent_table_name,
                'child':child_table_name,
                'min':min_range_date.isoformat(),
                'max':max_range_date.isoformat() }
      plpy.execute( create_sql )
      plpy.execute( "ALTER TABLE %s OWNER TO tableowner;" % child_table_name )

    # Prepare insertion plan.
    questions_marks = ','.join(['$%d' % x for x in xrange(1, len(new_data) + 1)])


    col_types = plpy.execute( ("SELECT column_name, data_type"
                  " FROM information_schema.columns"
                  " WHERE table_catalog = CURRENT_CATALOG"
                  " AND table_schema = CURRENT_SCHEMA"
                  " AND table_name = '%s'"
                  " AND column_name <> 'id';")
                  % parent_table_name );

    type_hash = dict((column["column_name"], column["data_type"])
         for column in col_types)

    column_names = new_data.keys();
    types = [type_hash[k] for k in column_names]

    insert_sql = ("INSERT INTO %s (%s) VALUES (%s);" %
         (child_table_name,
         ', '.join(column_names),
         questions_marks))
    insert_plan = plpy.prepare(insert_sql, types)

    # Cache insertion plan for reuse in the same transaction.
    SD[child_table_name] = insert_plan

  # Insert data to child table
  plpy.execute( insert_plan, values )

  # Skip insertion to parent table
  return "SKIP";
$BODY$
LANGUAGE 'plpythonu';

ALTER FUNCTION redirect_to_child() OWNER TO tableowner;


item

No comments:

Post a Comment