Статистика

Участников проекта 105
Опубликовано статей 78
Отчет по карме. Топ 20

Новости блога

1 29.11.2013  Сегодня самым активным участникам newblog'а был выплачен доход с sape.
7 02.11.2012  Ура! Свешилось, нашему сайту дали тИЦ 10. Спасибо всем кто принимает участие в развитии нашего блога.
8 21.08.2012  Интеграция с sape.ru. Теперь каждый автор статей на newblog автоматически зарабатывает на рекламе.
Все новости

Топ 5 категорий

Программирование 46
Операционные системы 9
Базы данных 4
Туризм 2
Заметки 2

Последние 5 заметок (90)

gullyar - Закладки gullyar
gullyar - Ваша первая закладка
osadchaya - Закладки osadchaya
Ira0231188 - Закладки Ira0231188
Ira0231188 - Закладки Ira0231188

Ссылки

www.freedev.asia

Репликация в mysql или как объединить 2 базы данных одинаковой структуры.

30.10.2011 16:28 | Просмотров: 6618 | Доход: 352.51 руб. | Комментариев: 5
[Программирование] 
Рейтинг: 4.67/3

Иногда возникает необходимость объединить две или более базы данных в одну. Mysql не предоставляет стандартных средств для данного вида репликации. Да и возможен ли он без знания всей структуры бд? Потратив несколько часов на безуспешные поиски готового алгоритма я решил написать свой под конкретную задачу, но код получился почти универсальным. Первое что необходимо определить это масштабы базы данных и объемы таблиц. Если попадаются таблицы в несколько миллионов записей, то запросы вида "INSERT table_to ([fields]) (SELECT [fields] FROM table_from WHERE .." сразу отпадают т.к. не каждый сервер выдержит такой запрос. Сразу оговорюсь что приклад был на cakephp, немного поэкспериментировав я понял что запросы query даже с отключенным кешом сильно уступают по производительности стандартному mysql_query. Поэтому выбор технологии был очевиден - писать на чистом php! Весь процесс репликации можно разбить на 2 этапа:

1) Создание пустой структуры бд и перенос не связанных таблиц

Первое что необходимо создать - это создать пустую структуру новой базы данных, а также перенести не связанные таблицы которые одинаковые во всех базах (например некоторые справочники). Лучше всего при этом опустить триггеры. Предлагаю вот такой скрипт:

#!/bin/bash
FROM_DB=dbfrom
FROM_DB_PASS=test
TO_DB=dbto
TO_DB_PASS=test
mysqldump -uroot -p$FROM_DB_PASS --no-data --skip-triggers $FROM_DB > tmpdball.sql
mysql -uroot -p$TO_DB_PASS $TO_DB < tmpdball.sql
rm tmpdball.sql
mysqldump -uroot -p$FROM_DB_PASS --skip-triggers $FROM_DB [тут список справочников] > tmpdata.sql
mysql -uroot -p$TO_DB_PASS $TO_DB < tmpdata.sql
rm tmpdata.sql

2) Перенос основных связанных таблиц

Для того чтобы перенести основные таблицы надо решить несколько вопросов, а именно где мы будем хранить связки id (id в старой бд \ id в новой бд) и как это отразится на скорости репликации. Существующие варианты: массивы(медленные и занимают место при большом объеме данных), таблицы типа momory -идеальны, но тоже занимают место в оперативной памяти поэтому подходят для хранения id только небольших и часто используемых таблиц. Ну и конечно же MyISAM - стандартные таблицы. Сначала будем реплицировать все родительские таблицы, чтобы при репликации дочерних таблиц у нас уже были пары id родительских таблиц из старой и новой бд. Проведя ряд экспериментов и погуглив я выяснил что extended INSERT в mysql намного быстрее чем простой INSERT, тоесть гораздо эффективнее вставлять сразу несколько записей чем одну. Нельзя забывать про limit при выборке из реплицируемой таблицы, т.к. при нескольких миллионах записей эта процедура либо займет много времени либо для нее не хватит оперативной памяти. Для вставки временных не критичных данных можно использовать DELAY в INSERT, это позволит скрипту не ждать ответа от сервера а сразу продолжать свою работу. Для остальных данных желательно лочить таблицу. Вообщем собрав все нагугленное и опыт после нескольких экспериментов у меня получилась вот такая универсальная функция:

	function __universal_replic($bd,$table,$fields,$values,$replica,$where=''){
			$buffer_insert=400; //сколько записей одновременно отправлять на сервер
			$buffer_select=5000; //шаг по таблице
			$block_tmp='';
			$block_replica='';
			$block_count=0;
			$start_time = microtime(true);
			mysql_query('LOCK TABLES '.$table.' WRITE');
			if(!$this->progress)$this->out($table.')');
			$this->increment++;
			$res=mysql_query('SELECT count(*) as icount FROM '.$bd.'.'.$table.$where);
			$row = mysql_fetch_assoc($res);
			$all_count=$row['icount'];
			
			$res=mysql_query('SELECT count(*) as icount FROM '.$this->db_to.'.'.$table.$where);
			$row = mysql_fetch_assoc($res);
			$in_before=$row['icount'];
			
			$res=mysql_query('SELECT max(id) as imax FROM '.$this->db_to.'.'.$table);
			$row = mysql_fetch_assoc($res);
			$new_start=$row['imax'];
			if(empty($new_start))$new_start=1;
			$this->tek=0;
			$part=round($all_count/$buffer_select);
			$next=0;
			for($ii=0;$ii<=$part;$ii++){
				$result=mysql_query('SELECT * FROM '.$bd.'.'.$table.$where.' LIMIT '.$next.', '.$buffer_select);
				if (mysql_num_rows($result) > 0){
					$queryheader='INSERT '.$this->db_to.'.'.$table.' '.$fields.' VALUES ';
					$queryheaderreplica='INSERT DELAYED '.$this->db_to.'.replica_'.$table.' (old_id,new_id) VALUES ';
					while ($row = mysql_fetch_assoc($result)){							
							if($flag){
								$block_tmp.='(';
								foreach($values as $value){
									if(!empty($value['value'])){
										$block_tmp.=$value['value'];
									}else{
										if(empty($value['get'])){
											$block_tmp.=$this->vset($row[$value['field']],$value['type']).',';
										}else{
											$block_tmp.=$this->vset($this->__get_new_id($value['get'],$row[$value['field']]),$value['type']).',';
										}
									}
								}
								if($replica){
									$block_replica.=' ('.$row['id'].','.$new_start.'),';
									$new_start++;
								}
								$block_tmp=substr($block_tmp,0,strlen($block_tmp)-1);
								$block_tmp.='),';
								$block_count++;
							}
							$this->__progress_bar($all_count,$table);
							if($block_count>=$buffer_insert){
								$block_tmp=substr($block_tmp,0,strlen($block_tmp)-1);
								mysql_query($queryheader.$block_tmp);
								if(mysql_affected_rows()!=$block_count){
									$this->out('# see error.log:'.date('d.m.Y H:i:s',time()));
									$this->log($queryheader.$block_tmp);
									exit;
								}
								$block_tmp='';
								if($replica){
									$block_replica=substr($block_replica,0,strlen($block_replica)-1);
									mysql_query($queryheaderreplica.$block_replica);
									$block_replica='';
								}
								$block_count=0;
							}
					}
					if($block_count>0){
							$block_tmp=substr($block_tmp,0,strlen($block_tmp)-1);
							mysql_query($queryheader.$block_tmp);
							if(mysql_affected_rows()!=$block_count){
								$this->out('# see error.log:'.date('d.m.Y H:i:s',time()));
								$this->log($queryheader.$block_tmp);
								exit;
							}
							if($replica){
								$block_replica=substr($block_replica,0,strlen($block_replica)-1);
								mysql_query($queryheaderreplica.$block_replica);
								$block_replica='';
							}
							$block_count=0;
							$block_tmp='';
					}
				}
				$next=$next+$buffer_select;
			}
			
			$res=mysql_query('SELECT count(*) as icount FROM '.$this->db_to.'.'.$table.$where);
			$row = mysql_fetch_assoc($res);
			$in_after=$row['icount'];
			
			$res=mysql_query('SELECT count(*) as icount FROM '.$bd.'.'.$table.$where);
			$row = mysql_fetch_assoc($res);
			$real=$row['icount'];
			
			mysql_query('UNLOCK TABLES');
			$exec_time = microtime(true) - $start_time;
			$this->out('# готово ('.round($exec_time,3).' c) ['.($in_after-$in_before).']-['.$real.']');
			$this->log($bd.'.'.$table.' - готово ('.round($exec_time,3).' c) ['.($in_after-$in_before).']-['.$real.']');
	}

 

Таким образом чтобы среплицировать любую таблицу достаточно будет вызвать эту функцию передав параметры:

            $this->__universal_replic($bd,'table',
                '(field1, field2, field3)',
                array(
                        array('field'=>'field1','type'=>'s'),
                        array('field'=>'field2','type'=>'i'),
                        array('field'=>'field3','type'=>'s'),
                    ), true
            );

 

где $bd - одна из объединяемых баз данных,  вторым параметром указывается наименование таблицы, затем идет список полей и собственно их перечисление по типам. Последним параметром указывается нужно ли сохранять ключи(является ли таблица родительской для других или нет). Т.е. дочерняя таблица будет реплицироваться так:

            $this->__universal_replic($bd,'table_child',
                '(table_id, field2, field3)',
                array(
                        array('field'=>'table_id','type'=>'i','get'=>1),
                        array('field'=>'field2','type'=>'i'),
                        array('field'=>'field3','type'=>'s'),
                    ), false
            );

 Где 'get'=>1 - означает получение связки из ранее сохраненной родительской таблицы из функции

    function __get_new_id($action,$old_id){
        if ($old_id===null) return null;
        switch($action){
            case 1:$table='replica_table';break;
            .....
        }
        $res=mysql_query('SELECT new_id FROM '.$this->db_to.'.'.$table.' as replica where old_id='.$old_id);
        $row = mysql_fetch_assoc($res);
        return $row['new_id'];   
    }

 Также основная функция вызывает функцию vset() чтобы правильно записать тип в базу, эта функция может быть например такой:

    function vset($str,$param){
        if($param=='s'){
            if($str===null)
                return "null";
            else
                return "'".str_replace("'","\'",$str)."'";
        }
        if($param=='i'){           
            if($str===null)
                return "null";
            else
                return $str;
        }
    }

Ну и как же без прогрессбара при репликации. Он также полезен для оценки скорости выполнения скрипта.

    function __progress_bar($all,$cap){
        $this->tek++;
        if($this->progress){
            $procent=round(($this->tek*100)/$all);
            $progress='';
            $tmpprocent=round($procent/2);
            if(($this->progress_tmp!=$tmpprocent)||($tmpprocent==50)){
                $this->progress_tmp=$tmpprocent;
                for($i=0;$i<50;$i++){
                    if($i>=$tmpprocent)
                        $progress=$progress.'_';
                    else
                        $progress=$progress.'#';
                }
                $progress=$progress.'|';
                echo($progress.'...'.$procent.'% ('.$this->tek.') '.$cap."\r");
            }
        }
    }

 

Ну и пара советов напоследок. В функцию __universal_replic последним параметром можно передать любое условие запроса.  Например если часть данных необходимо отфильтровать или вовсе не переносить. Если Вы используете таблицы InnoDB - то в конфиге /etc/my.cfn выставьте у параметра innodb_buffer_pool_size значение по максимуму (до 80% ОЗУ), это очень сильно ускорит процесс репликации. По поводу  extended insert хотелось бы добавить что не стоит сильно завышать буфер - это только снизет скорость репликации. В идеальном варианте добавить код который будет сам определять наилучшую скорость в зависимости от изменения размера буфера в ту или другую сторону.
 
P.S.: На нормальное оформление статьи небыло времени, поэтому просто решил сохранить "наброски" как есть. Если у кого-нибудь есть предложения по увеличению производительности скрипта или появятся вопросы по функции - пишите в комментах(для зарегистрированных пользователей) или через обмен сообщениями на сайте.

 


© jdev
| Комментировать статью |
  • Аноним 0 (30.11.2011 22:20)
    Я не вчитался, но все же. Таблица переводов типов АТЕ не имеет первичного ключа, там вообще парево с этим. Она у тебя нормально реплицируется или ты ее просто копируешь? Если копируешь то на разных серверах могут немного отличаться.
    P.S. Отгадай кто написал :)
    | Ответить |
    • GM +2587 (01.12.2011 12:20)
      Угу.. уже столкнулся с этим, вроде решили проблему :) P.S.: дай угадаю.. владелец freedev.asia ?? ))
      | Ответить |
  • Аноним 0 (30.11.2011 22:23)
    Да, и добавь ленту новостей RSS
    | Ответить |
    • GM +2587 (01.12.2011 12:25)
      ok, это уже есть в планах )) Пока можно юзать твиттер
      | Ответить |
    • GM +2587 (29.12.2011 15:50)
      готово.. http://newblog.kz/pages/rss
      | Ответить |