博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
mysql版同步数据
阅读量:4213 次
发布时间:2019-05-26

本文共 4781 字,大约阅读时间需要 15 分钟。

DELIMITER ;
DROP PROCEDURE IF EXISTS P_sync_etl_mid_data ;
DROP TABLE IF EXISTS mid_shop;
DELIMITER &&
CREATE PROCEDURE P_sync_etl_mid_data 
(
IN p_source_db VARCHAR(50),
IN p_source_table VARCHAR(50),
IN p_target_table VARCHAR(50),
IN p_key_col VARCHAR(50), #键列 (用于来源表和目标表的关联,目前仅支持单一字段)
IN p_condition VARCHAR(255) , #限制条件 如限制7天内的数据参与更新。则可以写 create_date>=date_add(now(),interval -7 day)
INOUT p_is_new_table BIT #default 0
)
top: BEGIN #定义存储过程顶部以方便跳出存储过程
 # declare p_condition varchar(255) Default '';  默认值的定义方法
# 创建表
DECLARE done INT DEFAULT FALSE;
DECLARE var_sql VARCHAR(1000);
 -- 游标
  DECLARE cur_update CURSOR FOR 
SELECT CONCAT('update ',a.table_name,' as a join ',b.TABLE_SCHEMA,'.',b.TABLE_NAME,' as b
on a.',p_key_col,'=b.',p_key_col,'
set a.',a.column_name,'=b.',a.column_name,',a.sync_flag=2
where ifnull(a.',a.column_name,',',
(CASE WHEN a.DATA_TYPE IN ('int','bit','tinyint','decimal','bigint','double','smallint','float') THEN '0'
WHEN a.DATA_TYPE IN ('varchar','char','text') THEN ''''''
WHEN a.DATA_TYPE IN ('timestamp','datetime','date','year','month','day') THEN '''1970-01-01''' END)
,')<>ifnull(b.',a.column_name,',',
(CASE WHEN a.DATA_TYPE IN ('int','bit','tinyint','decimal','bigint','double','smallint','float') THEN '0'
WHEN a.DATA_TYPE IN ('varchar','char','text') THEN ''''''
WHEN a.DATA_TYPE IN ('timestamp','datetime','date','year','month','day') THEN '''1970-01-01''' END)
,')  
  #a.sync_flag is null 不限制此处是为了更新数据
 ;') AS sq
FROM information_schema.`COLUMNS`  a JOIN information_schema.`COLUMNS` b 
ON  a.table_schema=DATABASE() AND a.table_name=p_target_table
AND a.column_name=b.column_name 
AND b.table_schema=p_source_db AND b.table_name=p_source_table;
  -- 遍历数据结束标志
  -- 将结束标志绑定到游标
  DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;
IF  NOT EXISTS (SELECT 1 FROM information_schema.`TABLES` 
WHERE table_schema=p_source_db AND table_name =p_source_table) THEN 
SELECT '源数据不存在';
LEAVE top; #强制跳出
END IF ;
IF  NOT EXISTS (SELECT 1 FROM information_schema.`TABLES` 
WHERE table_schema=DATABASE() AND table_name =p_target_table) THEN 
# 目标数据不存在,即将创建数据表
SET @p_sql=CONCAT('create table ',p_target_table,' like ',p_source_db,'.',p_source_table,';');
PREPARE stmt FROM @p_sql;
EXECUTE stmt; 
# 初装数据
SET @p_sql=CONCAT('insert into ',p_target_table,' select * from ',p_source_db,'.',p_source_table,';');
PREPARE stmt FROM @p_sql;
EXECUTE stmt; 
SET p_is_new_table=1;
END IF ;
# 添加同步字段和索引
IF NOT EXISTS(SELECT 1 FROM information_schema.`COLUMNS` 
WHERE TABLE_SCHEMA=DATABASE() AND table_name=p_target_table AND COLUMN_NAME='sync_flag') THEN 
SET @p_sql=CONCAT('alter table ',p_target_table,' add column sync_flag tinyint ;');
PREPARE stmt FROM @p_sql;
EXECUTE stmt; 
SET @p_sql=CONCAT('create index idx_sync_flag on ',p_target_table,'(sync_flag);');
PREPARE stmt FROM @p_sql;
EXECUTE stmt; 
END IF; 
# 初始bi数据表,即数据填充至数据仓库/ods。
# 增删改的标记。1 需要插入 2 需要更新,3 需要删除 存储过程?
# 判断顺序 清空字段-->更新-->删除-->插入 更新时需要判断空值条件,根据datatype填充值。
# 清空
IF CHAR_LENGTH(p_condition)>3 THEN 
SET @p_sql=CONCAT('delete from ',p_target_table,' where not (',IFNULL(p_condition,''),');') ;
PREPARE stmt FROM @p_sql;
EXECUTE stmt;
END IF ;
SET @p_sql=CONCAT('
update ',p_target_table,' set sync_flag=null where sync_flag is not null ;
');
PREPARE stmt FROM @p_sql;
EXECUTE stmt; 
# 循环判断更新()。
 # 游标
  -- 打开游标
  OPEN cur_update;
  
  -- 开始循环
  read_loop: LOOP
    -- 提取游标里的数据,这里只有一个,多个的话也一样;
    FETCH cur_update INTO var_sql;
    -- 声明结束的时候
    IF done THEN
      LEAVE read_loop;
    END IF;
    -- 这里做你想做的循环的事件
    SET @p_sql=var_sql;
PREPARE stmt FROM @p_sql;
EXECUTE stmt; 
  END LOOP;
  -- 关闭游标
  CLOSE cur_update;
#需要删除的数据
SET @p_sql=(CASE WHEN CHAR_LENGTH(p_condition)>3 THEN 
CONCAT('and a.',p_key_col,' in (select ',p_key_col,' from ',p_source_db,'.',p_source_table,' where ',p_condition,')
') ELSE '' END);
SET @p_sql=
CONCAT('
update ',p_target_table,' b
left join  ',p_source_db,'.',p_source_table,' a 
on b.',p_key_col,'=a.',p_key_col,' 
',
@p_sql
,'
set b.sync_flag=3
where a.',p_key_col,' is null ; 
');
#b.sync_flag is null 不限制此处是为了更新数据
PREPARE stmt FROM @p_sql;
EXECUTE stmt; 
# 需要插入的数据
SELECT @p_sql:=GROUP_CONCAT(a.column_name)
FROM information_schema.`COLUMNS`  a JOIN information_schema.`COLUMNS` b 
ON  a.table_schema=DATABASE() AND a.table_name=p_target_table
AND a.column_name=b.column_name 
AND b.table_schema=p_source_db AND b.table_name=p_source_table;
SET @p_sql=
CONCAT('insert into ',p_target_table,'(',@p_sql,',sync_flag)
select ',@p_sql,',1 as sync_flag
from ',p_source_db,'.',p_source_table,' a 
where ',
(CASE WHEN CHAR_LENGTH(p_condition)>3 THEN 
CONCAT('a.',p_key_col,' in (select ',p_key_col,' from ',p_source_db,'.',p_source_table,' where ',p_condition,')
and ') 
ELSE '' END),'
not exists(select 1 from ',p_target_table,' b where b.',p_key_col,'=a.',p_key_col,');
');
 # b.sync_flag is null 不限制此处是为了更新数据
PREPARE stmt FROM @p_sql;
EXECUTE stmt; 
END;

&&

转载地址:http://rlfmi.baihongyu.com/

你可能感兴趣的文章
系统调用之sys_nanosleep
查看>>
CephContext 中的AdminSocket
查看>>
ceph中的threadpool
查看>>
ceph_perf_local
查看>>
系统调用之getpid
查看>>
解开xz格式的initrd
查看>>
arm 的neon指令等同于arm64的asimd指令
查看>>
arm64 页表映射
查看>>
ceph中用到的压缩引擎
查看>>
numabalance
查看>>
mempolicy
查看>>
cpu的调度域初始化
查看>>
smp的负载均衡
查看>>
docker 性能debug
查看>>
页面锁
查看>>
页面的反向映射
查看>>
页面回收线程
查看>>
cpu子系统的组调度
查看>>
物理内存映射
查看>>
kernel中ksm特性
查看>>