本文共 4781 字,大约阅读时间需要 15 分钟。
DELIMITER ; DROP PROCEDURE IF EXISTS P_sync_etl_mid_data ; DROP TABLE IF EXISTS mid_shop; DELIMITER && CREATE PROCEDURE P_sync_etl_mid_data ( IN p_source_db VARCHAR(50), IN p_source_table VARCHAR(50), IN p_target_table VARCHAR(50), IN p_key_col VARCHAR(50), #键列 (用于来源表和目标表的关联,目前仅支持单一字段) IN p_condition VARCHAR(255) , #限制条件 如限制7天内的数据参与更新。则可以写 create_date>=date_add(now(),interval -7 day) INOUT p_is_new_table BIT #default 0 ) top: BEGIN #定义存储过程顶部以方便跳出存储过程 # declare p_condition varchar(255) Default ''; 默认值的定义方法 # 创建表 DECLARE done INT DEFAULT FALSE; DECLARE var_sql VARCHAR(1000); -- 游标 DECLARE cur_update CURSOR FOR SELECT CONCAT('update ',a.table_name,' as a join ',b.TABLE_SCHEMA,'.',b.TABLE_NAME,' as b on a.',p_key_col,'=b.',p_key_col,' set a.',a.column_name,'=b.',a.column_name,',a.sync_flag=2 where ifnull(a.',a.column_name,',', (CASE WHEN a.DATA_TYPE IN ('int','bit','tinyint','decimal','bigint','double','smallint','float') THEN '0' WHEN a.DATA_TYPE IN ('varchar','char','text') THEN '''''' WHEN a.DATA_TYPE IN ('timestamp','datetime','date','year','month','day') THEN '''1970-01-01''' END) ,')<>ifnull(b.',a.column_name,',', (CASE WHEN a.DATA_TYPE IN ('int','bit','tinyint','decimal','bigint','double','smallint','float') THEN '0' WHEN a.DATA_TYPE IN ('varchar','char','text') THEN '''''' WHEN a.DATA_TYPE IN ('timestamp','datetime','date','year','month','day') THEN '''1970-01-01''' END) ,') #a.sync_flag is null 不限制此处是为了更新数据 ;') AS sq FROM information_schema.`COLUMNS` a JOIN information_schema.`COLUMNS` b ON a.table_schema=DATABASE() AND a.table_name=p_target_table AND a.column_name=b.column_name AND b.table_schema=p_source_db AND b.table_name=p_source_table; -- 遍历数据结束标志 -- 将结束标志绑定到游标 DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE; IF NOT EXISTS (SELECT 1 FROM information_schema.`TABLES` WHERE table_schema=p_source_db AND table_name =p_source_table) THEN SELECT '源数据不存在'; LEAVE top; #强制跳出 END IF ; IF NOT EXISTS (SELECT 1 FROM information_schema.`TABLES` WHERE table_schema=DATABASE() AND table_name =p_target_table) THEN # 目标数据不存在,即将创建数据表 SET @p_sql=CONCAT('create table ',p_target_table,' like ',p_source_db,'.',p_source_table,';'); PREPARE stmt FROM @p_sql; EXECUTE stmt; # 初装数据 SET @p_sql=CONCAT('insert into ',p_target_table,' select * from ',p_source_db,'.',p_source_table,';'); PREPARE stmt FROM @p_sql; EXECUTE stmt; SET p_is_new_table=1; END IF ; # 添加同步字段和索引 IF NOT EXISTS(SELECT 1 FROM information_schema.`COLUMNS` WHERE TABLE_SCHEMA=DATABASE() AND table_name=p_target_table AND COLUMN_NAME='sync_flag') THEN SET @p_sql=CONCAT('alter table ',p_target_table,' add column sync_flag tinyint ;'); PREPARE stmt FROM @p_sql; EXECUTE stmt; SET @p_sql=CONCAT('create index idx_sync_flag on ',p_target_table,'(sync_flag);'); PREPARE stmt FROM @p_sql; EXECUTE stmt; END IF; # 初始bi数据表,即数据填充至数据仓库/ods。 # 增删改的标记。1 需要插入 2 需要更新,3 需要删除 存储过程? # 判断顺序 清空字段-->更新-->删除-->插入 更新时需要判断空值条件,根据datatype填充值。 # 清空 IF CHAR_LENGTH(p_condition)>3 THEN SET @p_sql=CONCAT('delete from ',p_target_table,' where not (',IFNULL(p_condition,''),');') ; PREPARE stmt FROM @p_sql; EXECUTE stmt; END IF ; SET @p_sql=CONCAT(' update ',p_target_table,' set sync_flag=null where sync_flag is not null ; '); PREPARE stmt FROM @p_sql; EXECUTE stmt; # 循环判断更新()。 # 游标 -- 打开游标 OPEN cur_update; -- 开始循环 read_loop: LOOP -- 提取游标里的数据,这里只有一个,多个的话也一样; FETCH cur_update INTO var_sql; -- 声明结束的时候 IF done THEN LEAVE read_loop; END IF; -- 这里做你想做的循环的事件 SET @p_sql=var_sql; PREPARE stmt FROM @p_sql; EXECUTE stmt; END LOOP; -- 关闭游标 CLOSE cur_update; #需要删除的数据 SET @p_sql=(CASE WHEN CHAR_LENGTH(p_condition)>3 THEN CONCAT('and a.',p_key_col,' in (select ',p_key_col,' from ',p_source_db,'.',p_source_table,' where ',p_condition,') ') ELSE '' END); SET @p_sql= CONCAT(' update ',p_target_table,' b left join ',p_source_db,'.',p_source_table,' a on b.',p_key_col,'=a.',p_key_col,' ', @p_sql ,' set b.sync_flag=3 where a.',p_key_col,' is null ; '); #b.sync_flag is null 不限制此处是为了更新数据 PREPARE stmt FROM @p_sql; EXECUTE stmt; # 需要插入的数据 SELECT @p_sql:=GROUP_CONCAT(a.column_name) FROM information_schema.`COLUMNS` a JOIN information_schema.`COLUMNS` b ON a.table_schema=DATABASE() AND a.table_name=p_target_table AND a.column_name=b.column_name AND b.table_schema=p_source_db AND b.table_name=p_source_table; SET @p_sql= CONCAT('insert into ',p_target_table,'(',@p_sql,',sync_flag) select ',@p_sql,',1 as sync_flag from ',p_source_db,'.',p_source_table,' a where ', (CASE WHEN CHAR_LENGTH(p_condition)>3 THEN CONCAT('a.',p_key_col,' in (select ',p_key_col,' from ',p_source_db,'.',p_source_table,' where ',p_condition,') and ') ELSE '' END),' not exists(select 1 from ',p_target_table,' b where b.',p_key_col,'=a.',p_key_col,'); '); # b.sync_flag is null 不限制此处是为了更新数据 PREPARE stmt FROM @p_sql; EXECUTE stmt; END;&&
转载地址:http://rlfmi.baihongyu.com/