二百七十、Kettle——ClickHouse中增量导入清洗数据错误表

一、目的

比如原始数据100条,清洗后,90条正确数据在DWD层清洗表,10条错误数据在DWD层清洗数据错误表,所以清洗数据错误表任务一定要放在清洗表任务之后。

二百七十、Kettle——ClickHouse中增量导入清洗数据错误表

更关键的是,Hive中原本的SQL语句,放在ClickHouse需要大改,头大!而且Kettle任务要想定时增量导入,既与清洗数据错误表最新时间相关,又与DWD层清洗表最新时间相关,搞了大半天才搞定!

二、Hive中原有代码

2.1 表结构

--21、静态排队错误数据表——动态分区  dwd_queue_error
create  table  if not exists  hurys_db.dwd_queue_error(
    id                  string          comment '唯一ID',
    device_no           string          comment '设备编号',
    source_device_type  string          comment '设备类型',
    sn                  string          comment '设备序列号 ',
    model               string          comment '设备型号',
    create_time         string       comment '创建时间',
    lane_no             int             comment '车道编号',
    lane_type           int             comment '车道类型 0:渠化1:来向2:出口3:去向4:左弯待转区5:直行待行区6:右转专用道99:未定义车道',
    queue_count         int             comment '排队车辆数',
    queue_len           float           comment '排队长度(m)',
    queue_head          float           comment '排队头车距停止线距离(m)',
    queue_tail          float           comment '排队尾车距停止线距离(m)'
)
comment '静态排队错误数据表——动态分区'
partitioned by (day string)
stored as orc
;

2.2 SQL代码

--动态插入数据
insert  overwrite  table  hurys_db.dwd_queue_error  partition(day)
select
UUID()  as  id,
t2.device_no, t2.source_device_type, t2.sn, t2.model, t2.create_time,t2.lane_no, t2.lane_type,
t2.queue_count, t2.queue_len, t2.queue_head, t2.queue_tail, t2.day
from hurys_db.ods_queue as t2
left join hurys_db.dwd_queue as t3
on t3.device_no=t2.device_no and t3.create_time=t2.create_time and t3.lane_no=t2.lane_no
where t3.device_no is null and t3.create_time is null and t3.lane_no is null and t2.day='2024-09-10'
;

原有Hive代码很简单,然后把代码变成脚本,放在海豚定时调度即可,都很简单!

三、ClickHouse中现有代码

3.1 表结构

--21 静态排队数据错误表(长期存储)
create  table  if not exists  hurys_jw.dwd_queue_error(
    id                  String                       comment '唯一ID',
    device_no           String             comment '设备编号',
    source_device_type  Nullable(String)             comment '设备类型',
    sn                  Nullable(String)             comment '设备序列号 ',
    model               Nullable(String)             comment '设备型号',
    create_time         DateTime                     comment '创建时间',
    lane_no             Int32              comment '车道编号',
    lane_type           Nullable(Int32)              comment '车道类型 0:渠化1:来向2:出口3:去向4:左弯待转区5:直行待行区6:右转专用道99:未定义车道',
    queue_count         Int32              comment '排队车辆数',
    queue_len           Decimal(10, 2)     comment '排队长度(m)',
    queue_head          Decimal(10, 2)     comment '排队头车距停止线距离(m)',
    queue_tail          Decimal(10, 2)     comment '排队尾车距停止线距离(m)',
    day                 Date                         comment '日期'
)
ENGINE = MergeTree
PARTITION BY day
PRIMARY KEY (day,id)
ORDER BY (day,id)
SETTINGS index_granularity = 8192;

注意:由于后面数据清洗记录表需要,因此部分清洗规则的字段不能用Nullable,这也是后面的一大坑!

3.2 SQL代码

select
generateUUIDv4()  as  id,
device_no, source_device_type, sn, model, create_time,
lane_no, lane_type, queue_count, queue_len, queue_head, queue_tail,
cast(day as String) day
from (select
        t2.device_no, t2.source_device_type, t2.sn, t2.model,t2.create_time,t2.lane_no, t2.lane_type,
        t2.queue_count, t2.queue_len, t2.queue_head, t2.queue_tail, toDate(t2.create_time) day
      from hurys_jw.ods_queue as t2
      ANTI join hurys_jw.dwd_queue as t3
      on t3.device_no=t2.device_no and t3.create_time=t2.create_time and t3.lane_no=t2.lane_no
     )
--where  create_time > ?
;

注意:1 生成uuid字段,Hive中是UUID() as id,而ClickHouse中是generateUUIDv4() as id

           2 ClickHouse中with语句好像不是支持,不知道是不是版本问题

           3 ClickHouse中有ANTI join函数

           4 Kettle里需要把Date字段的day变成cast(day as String) day

3.3 Kettle任务

3.3.1 newtime

获取目标表dwd_queue_error的最新时间create_time

3.3.2 替换NULL值

3.3.3 clickhouse输入

select 
generateUUIDv4()  as  id,
device_no, source_device_type, sn, model, create_time,
lane_no, 
lane_type, queue_count, queue_len, queue_head, queue_tail,
cast(day as String) day
from (
select t2.device_no, t2.source_device_type, t2.sn, t2.model,t2.create_time,t2.lane_no, t2.lane_type,
t2.queue_count, t2.queue_len, t2.queue_head, t2.queue_tail, toDate(t2.create_time) day
from hurys_jw.ods_queue as t2
ANTI join hurys_jw.dwd_queue as t3
on t3.device_no=t2.device_no and t3.create_time=t2.create_time and t3.lane_no=t2.lane_no
)
where  create_time > ?
;

3.3.4 字段选择

3.3.5 newtime3

获取清洗表dwd_queue的最新时间create_time3

3.3.6 替换NULL值3

3.3.7 记录关联 (笛卡尔输出)

注意:清洗表dwd_queue的最新时间create_time3要大于等于目标表dwd_queue_error的最新时间create_time

3.3.8 clickhouse输出

3.3.9 保存后先执行清洗表dwd_queue任务,再执行dwd_queue_error任务

3.3.10 配置海豚调度任务

搞定!!!

版权声明:如无特殊标注,文章均来自网络,本站编辑整理,转载时请以链接形式注明文章出处,请自行分辨。

本文链接:https://www.shbk5.com/dnsj/76445.html