8288分类目录 8288分类目录 8288分类目录
  当前位置:海洋目录网 » 站长资讯 » 站长资讯 » 文章详细 订阅RssFeed

Flink自定义metric监控流入量

来源:本站原创 浏览:18次 时间:2023-05-14

flink任务本身提供了各种类型的指标监控,细化到了每一个Operator的流入/流出量、速率、Watermark值等,通常在实际应用中需要对接入数据做格式化例如转json,符合要求的数据会向下流动,不符合要求或者格式化异常称为脏数据会被过滤掉,现在目标实现一个通用化方式能够对正常数据与脏数据进行指标统计。
实现思路:

  1. flink metric类型分为Counter、Gauge、Histogram、Meter,需要统计的是一个累加值因此选取Counter类型的metirc

  2. 由于是对任务的流入监控,因此需要在Source端进行处理,通常对接的数据源是kafka, 而flink本身已经提供了kakfa connector,并且开放了数据反序列化的接口DeserializationSchema与抽象类AbstractDeserializationSchema,实现该接口或者继承抽象类可以完成数据的反序列化与格式化,由于每一条数据都需要进过反序列化处理,那么可以在反序列化的同时进行指标统计

  3. 在flink中自定义Metric入口是RuntimeContext, 但是在反序列化抽象类中并没有提供访问RuntimeContext的接口,一般是在RichFunction中,与其相关只有FlinkKafkaConsumer,那么就可以在FlinkKafkaConsumer中将获取到的RuntimeContext传给AbstractDeserializationSchema

实现步骤:

  1. 自定义一个继承AbstractDeserializationSchema的抽象类AbsDeserialization,里面包含RuntimeContext与两个统计的Counter,并且包含一个初始化Counter的方法initMetric

  2. 自定义一个继承FlinkKafkaConsumer010的抽象类,里面包含AbsDeserialization属性、构造化方法,并且重写run方法,在run方法里面给AbsDeserialization设置RuntimeContex对象并且调用其initMetric, 最后调用父类run方法

代码如下:

  1. public abstract class AbsDeserialization<T> extends AbstractDeserializationSchema<T> {


  2.    private RuntimeContext runtimeContext;

  3.    private String DIRTY_DATA_NAME="dirtyDataNum";

  4.    private String NORMAL_DATA_NAME="normalDataNum";


  5.    protected transient Counter dirtyDataNum;


  6.    protected transient Counter normalDataNum;


  7.    public RuntimeContext getRuntimeContext() {

  8.        return runtimeContext;

  9.    }


  10.    public void setRuntimeContext(RuntimeContext runtimeContext) {

  11.        this.runtimeContext = runtimeContext;

  12.    }


  13.    public void initMetric()

  14.    {

  15.        dirtyDataNum=runtimeContext.getMetricGroup().counter(DIRTY_DATA_NAME);

  16.        normalDataNum=runtimeContext.getMetricGroup().counter(NORMAL_DATA_NAME);

  17.    }


  18. }

  1. public class CustomerKafkaConsumer<T> extends FlinkKafkaConsumer010<T> {


  2.    private AbsDeserialization<T> valueDeserializer;


  3.    public CustomerKafkaConsumer(String topic, AbsDeserialization<T> valueDeserializer, Properties props) {

  4.        super(topic, valueDeserializer, props);

  5.        this.valueDeserializer=valueDeserializer;

  6.    }


  7.    @Override public void run(SourceContext<T> sourceContext) throws Exception {

  8.        valueDeserializer.setRuntimeContext(getRuntimeContext());

  9.        valueDeserializer.initMetric();

  10.        super.run(sourceContext);

  11.    }

  12. }

使用案例,只要定义一个继承AbsDeserialization类即可,

  1. class ParseDeserialization extends AbsDeserialization[RawData] {


  2.  override def deserialize(message: Array[Byte]): RawData = {


  3.    try {

  4.      val msg = new String(message)

  5.      val rawData = JSON.parseObject(msg, classOf[RawData])

  6.      normalDataNum.inc() //正常数据指标

  7.      rawData

  8.    } catch {

  9.      case e:Exception=>{

  10.        dirtyDataNum.inc()   //脏数据指标

  11.        null

  12.      }

  13.    }

  14.  }


  15. }

source使用方式:

val consumer: CustomerKafkaConsumer[RawData] = new CustomerKafkaConsumer[RawData](topic, new ParseDeserialization, kafkaPro)

那么在任务运行中,可以在flink web的监控界面查看到normalDataNum 、dirtyDataNum 两个指标值,另外在AbsDeserialization里面也可以定义一些流入速率等监控。


  推荐站点

  • At-lib分类目录At-lib分类目录

    At-lib网站分类目录汇集全国所有高质量网站,是中国权威的中文网站分类目录,给站长提供免费网址目录提交收录和推荐最新最全的优秀网站大全是名站导航之家

    www.at-lib.cn
  • 中国链接目录中国链接目录

    中国链接目录简称链接目录,是收录优秀网站和淘宝网店的网站分类目录,为您提供优质的网址导航服务,也是网店进行收录推广,站长免费推广网站、加快百度收录、增加友情链接和网站外链的平台。

    www.cnlink.org
  • 35目录网35目录网

    35目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向35目录推荐、提交优秀网站。

    www.35mulu.com
  • 就要爱网站目录就要爱网站目录

    就要爱网站目录,按主题和类别列出网站。所有提交的网站都经过人工审查,确保质量和无垃圾邮件的结果。

    www.912219.com
  • 伍佰目录伍佰目录

    伍佰网站目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向伍佰目录推荐、提交优秀网站。

    www.wbwb.net