本文将为您介绍如何使用Connecter将实时计算Flink结果表、源表、维表实时写入Hologres。

在操作之前请注意以下几个事项:

  1. 建议使用Hologres商业化版本实例来操作。
  2. Flink结果表、源表、维表实时写入Hologres需要在Flink中引用jar包,请提工单/找相关技术人员提供。

前提条件

  1. 开通Hologres,并连接开发工具,详见开始使用
  2. 已有一个可以提交任务的Flink集群。

Flink Sink结果表语法示例

在Flink中创建Hologres sink结果表的语法示例如下:

create table mysource(   name varchar,   age BIGINT,   birthday BIGINT ) with (   'connector.type'='hologres',   'connector.database'='Hologres的数据库名',   'connector.table'='Hologres接收数据的表名',   'connector.username'='当前账号的Access ID',   'connector.password'='当前账号的Access key',   'connector.endpoint'='<ip>:<port>'//Hologres的网络地址 );
参数 参数说明 是否必填 备注
type 源表类型 固定为hologres
database Hologres的数据库名
table Hologres接收数据的表名
username 当前账号的Access ID 用户信息页面查看
password 当前账号的Access key 用户信息页面查看
endpoint Hologres的网络地址 Hologres管控台查看
bulkload 以bulkload语义写入,见下面“bulkload语义”一节 默认值:false
field_delimiter 数据导入时一行数据之间使用的分隔符。间隔符不可存在于数据中。与bulkload语义一同使用 默认值:”u0002″
upsert_type 流式写入语义,见下面“流式语义”一节 默认值:insertorignore
partition_router 分区表写入,见下面“流式语义”一节 默认值:false

当前sink结果表支持以streaming或bulkload语义两种语义将数据写入Hologres:

阿里云2000元代金券免费领,最新优惠1折抢购,2核4G云服务器仅799元/3年,新老用户同享,立即抢购>>>

  • Streaming sink

    Streaming sink适用于持续不断往Hologres写入数据的场景,写入的数据立即可查。

    Streaming sink推荐在Flink流作业中使用,实时将数据写入Hologres。

    根据sink的配置和Hologres表属性,sink可以提供exactly-once或at-least-once保证。

    • 当Hologres表设有primary key时,Hologres sink可通过幂等性(idempotent)提供exactly-once语义。

      在有主键(primary key)的情况下,出现同pk数据出现多次的情况时,Hologres sink支持以下流式语:

      • insertorignore:默认语义,保留首次出现的数据,忽略之后的所有数据。
      • insertorreplace:后出现的数据整行替换已有数据。
      • insertorupdate:部分替换已有数据。

        例如一张表有a,b,c,d四个字段,a是pk,然后写入的时候只写入a,b两个字段,在pk重复的情况下,会只更新b字段,c,d原有的值不变。

    • 其他情况下为at-least-once语义。

    默认情况下streaming sink只能往一张具体的Hologres表导入数据,如果是一张分区父表,虽然导入成功,但在Hologres中查询不出数据。

    可以通过设置参数`partitionRouter` = 'true', 开启自动将数据路由到对应分区子表的功能,示例如下:

    create table mysource(   name varchar,   age BIGINT,   birthday BIGINT ) with (   'connector.type'='hologres',   'connector.database'='Hologres的数据库名',   'connector.table'='Hologres接收数据的分区父表名',   'connector.username'='当前账号的Access ID',   'connector.password'='当前账号的Access key',   'connector.endpoint'='<ip>:<port>',//Hologres的网络地址   'connector.partitionRouter'='true', );
    说明

    • tableName参数填写的是父表表名。
    • 数据写入过程中不会自动创建分区子表,需要提前在Hologres中创建接收数据的分区子表

  • Bulkload sink

    Bulkload sink 语义适用于一次性批量、高速导入大量数据。数据全部导入后才可见。写入的所有数据在一个事务(transaction)中,即所有数据都加载成功或者都不成功,并保证exactly-once。

    Bulkload sink针对高吞吐进行了优化。

    Bulkload sink推荐在Flink批作业中使用,适用于数据迁移和回填。

    说明 有分区表时,Bulkload sink只能导入数据到子分区。

Flink Source源表语法示例

Hologres源表目前支持bulk read。

Bulkread source读取当前Hologres表snapshot的所有数据。读取的数据在一个transaction中。如果失败,会重新读取新时间下的Hologres表snapshot。

Bulkread source推荐在Flink batch作业中使用。

说明

  • 源表支持projection pushdown,可以只读某些column。
  • Flink作业的每个并发可以读取一个或多个Hologres shard,建议配置Flink并发数小于等于Hologres 的shard数。

在Flink中创建Hologres源表的语法示例如下:

create table mysource(   name varchar,   age BIGINT,   birthday BIGINT ) with (   'connector.type'='hologres',   'connector.database'='Hologres的数据库名',   'connector.table'='Hologres接收数据的表名',   'connector.username'='当前账号的Access ID',   'connector.password'='当前账号的Access Key',   'connector.endpoint'='<ip>:<port>'//Hologres的网络地址 );
参数 参数说明 是否必填 备注
type 源表类型 固定值hologres
database Hologres的数据库
table Hologres接收数据的表名
username 当前账号的Access ID 在用户信息页面查看
password 当前账号的Access Key 在用户信息页面查看
endpoint Hologres的网络地址 Hologres管控台查看
field_delimiter 数据导出时一行数据之间使用的分隔符。间隔符不可存在于数据中 默认值:”u0002″

Flink Lookup Table维表语法示例

典型业务场景是将Flink处理的数据与Hologres表中已有数据做join。Hologres connector提供temporal table和temporal function作为Flink维表。

  1. 使用说明
    • Hologres维表当前暂不支持async模式。
    • Hologres维表当前暂不支持cache。
    • 若有其他需求,请联系Flink或Hologres技术支持。
  2. 在Flink中创建Hologres维表的语法示例如下:
    create table mysource(   ... ) with (   'connector.type'='hologres',   'connector.database'='Hologres的数据库名',   'connector.table'='Hologres接收数据的表名',   'connector.username'='当前账号的Access ID',   'connector.password'='当前账号的Access Key',   'connector.endpoint'='<ip>:<port>'//Hologres的网络地址 );
  3. 使用示例
    // register hologreslookup as udf select * from source, LATERAL TABLE(hologreslookup(a, b))  // register hologreslookup as lookup table source select x, a, b, c from src      JOIN hologreslookup FOR SYSTEM_TIME AS OF src.proc as h ON src.x = h.a AND src.y = h.b

附录:Flink和Hologres数据类型映射

当前Flink和Hologres的数据类型映射如下表:

Flink数据类型 Hologres数据类型
int int
array<int> int[]
bigint bigint
array<bigint> bigint[]
float real
array<float> real[]
double double precision
array<double> double precision[]
boolean boolean
array<boolean> boolean[]
varchar text
array<varchar> text[]
decimal numeric
date date
timestamp timestamp with timezone
说明 若有数据类型不在以上表中,说明当前Hologres暂不支持转换。

Hologres当前支持的数据类型请参见数据类型