8288分类目录 8288分类目录 8288分类目录
  当前位置:海洋目录网 » 站长资讯 » 站长资讯 » 文章详细 订阅RssFeed

Flink intervalJoin 使用与原理分析

来源:本站原创 浏览:25次 时间:2023-05-14

在上一篇的分析【Flink DataStream中CoGroup实现原理与三种 join 实现】中基于DataStream的join只能实现在同一个窗口的两个数据流之间进行join, 但是在实际中常常是会存在数据乱序或者延时的情况,导致两个流的数据进度不一致,就会出现数据跨窗口的情况,那么数据就无法在同一个窗口内join。flink 基于KeyedStream提供了一种interval join 机制,intervaljoin 连接两个keyedStream, 按照相同的key在一个相对数据时间的时间段内进行连接。

先看一个假设的案例:用户购买商品过程中填写收货地址然后下单,在这个过程中产生两个数据流,一个是订单数据流包含用户id、商品id、订单时间、订单金额、收货id等,另一个是收货信息数据流包含收货id、收货人、收货人联系方式、收货人地址等,系统在处理过程中,先发送订单数据,在之后的1到5秒内会发送收货数据,现在要求实时统计按照不同区域维度的订单金额的top100地区。在这个案例中两个数据流:订单流orderStream先,收货信息流addressStream后,需要将这两个数据流按照收货id join之后计算top100订单金额的地区,由于orderStream比addressStream早1到5秒,那么就有这样一个关系:
orderStream.time+1<=addressStream.time<=orderStream.time+5 或者是
addressStream.time-5<=orderStream.time<=addressStream.time-1
看下join 部分代码实现:


  1. case class Order(orderId:String, userId:String, gdsId:String, amount:Double, addrId:String, time:Long)

  2. case class Address(addrId:String, userId:String, address:String, time:Long)

  3. case class RsInfo(orderId:String, userId:String, gdsId:String, amount:Double, addrId:String, address:String)

  4. objectIntervalJoinDemo{

  5. def main(args:Array[String]):Unit={

  6.    val env =StreamExecutionEnvironment.getExecutionEnvironment

  7.    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  8.    env.getConfig.setAutoWatermarkInterval(5000L)

  9.    env.setParallelism(1)


  10.    val kafkaConfig =newProperties()

  11.    kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092")

  12.    kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test1")


  13.    val orderConsumer =newFlinkKafkaConsumer011[String]("topic1",newSimpleStringSchema, kafkaConfig)

  14.    val addressConsumer =newFlinkKafkaConsumer011[String]("topic2",newSimpleStringSchema, kafkaConfig)


  15.    val orderStream = env.addSource(orderConsumer)

  16. .map(x =>{

  17.        val a = x.split(",")

  18. newOrder(a(0), a(1), a(2), a(3).toDouble, a(4), a(5).toLong)

  19. }).assignTimestampsAndWatermarks(newBoundedOutOfOrdernessTimestampExtractor[Order](Time.seconds(10)){

  20. overridedef extractTimestamp(element:Order):Long= element.time

  21. })

  22. .keyBy(_.addrId)


  23.    val addressStream = env.addSource(addressConsumer)

  24. .map(x =>{

  25.        val a = x.split(",")

  26. newAddress(a(0), a(1), a(2), a(3).toLong)

  27. }).assignTimestampsAndWatermarks(newBoundedOutOfOrdernessTimestampExtractor[Address](Time.seconds(10)){

  28. overridedef extractTimestamp(element:Address):Long= element.time

  29. })

  30. .keyBy(_.addrId)


  31.    orderStream.intervalJoin(addressStream)

  32. .between(Time.seconds(1),Time.seconds(5))

  33. .process(newProcessJoinFunction[Order,Address,RsInfo]{

  34. overridedef processElement(left:Order, right:Address, ctx:ProcessJoinFunction[Order,Address,RsInfo]#Context,out:Collector[RsInfo]):Unit={

  35.          println("==在这里得到相同key的两条数据===")

  36.          println("left:"+ left)

  37.          println("right:"+ right)

  38. }

  39. })

  40.    env.execute()

  41. }

  42. }

topic1生产数据:
order01,userId01,gds01,100,addrId01,1573054200000
topic2生产数据:
addrId01,userId01,beijing,1573054203000
由于满足时间范围的条件,得到结果:
left:Order(order01,userId01,gds01,100.0,addrId01,1573054200000)
right:Address(addrId01,userId01,beijing,1573054203000)
但是如果topic2接着在生产数据:
addrId01,userId01,beijing,1573054206000
此时addressStream.time+5>orderStream.time ,没有结果输出。

从源码角度理解intervaljoin实现:

  1. intervaljoin首先会将两个KeyedStream 进行connect操作得到一个ConnectedStreams, ConnectedStreams表示的是连接两个数据流,并且这两个数据流之前可以实现状态共享, 对于intervaljoin 来说就是两个流相同key的数据可以相互访问

  2. 在ConnectedStreams之上进行IntervalJoinOperator算子操作,该算子是intervaljoin 的核心,接下来分析一下其实现
    a. 定义了两个MapState<Long, List<BufferEntry<T1>>>类型的状态对象,分别用来存储两个流的数据,其中Long对应数据的时间戳,List<BufferEntry<T1>>对应相同时间戳的数据
    b. 包含processElement1、processElement2两个方法,这两个方法都会调用processElement方法,真正数据处理的地方

      

  • 判断延时,数据时间小于当前的watermark值认为数据延时,则不处理

  • 将数据添加到对应的MapState<Long, List<BufferEntry<T1>>>缓存状态中,key为数据的时间

  • 循环遍历另外一个状态,如果满足ourTimestamp + relativeLowerBound <=timestamp<= ourTimestamp + relativeUpperBound , 则将数据输出给ProcessJoinFunction调用,ourTimestamp表示流入的数据时间,timestamp表示对应join的数据时间

  • 注册一个数据清理时间方法,会调用onEventTime方法清理对应状态数据。对于例子中orderStream比addressStream早到1到5秒,那么orderStream的数据清理时间就是5秒之后,也就是orderStream.time+5,当watermark大于该时间就需要清理,对于addressStream是晚来的数据不需要等待,当watermark大于数据时间就可以清理掉。

整个处理逻辑都是基于数据时间的,也就是intervaljoin 必须基于EventTime语义,在between 中有做TimeCharacteristic是否为EventTime校验, 如果不是则抛出异常。


  推荐站点

  • At-lib分类目录At-lib分类目录

    At-lib网站分类目录汇集全国所有高质量网站,是中国权威的中文网站分类目录,给站长提供免费网址目录提交收录和推荐最新最全的优秀网站大全是名站导航之家

    www.at-lib.cn
  • 中国链接目录中国链接目录

    中国链接目录简称链接目录,是收录优秀网站和淘宝网店的网站分类目录,为您提供优质的网址导航服务,也是网店进行收录推广,站长免费推广网站、加快百度收录、增加友情链接和网站外链的平台。

    www.cnlink.org
  • 35目录网35目录网

    35目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向35目录推荐、提交优秀网站。

    www.35mulu.com
  • 就要爱网站目录就要爱网站目录

    就要爱网站目录,按主题和类别列出网站。所有提交的网站都经过人工审查,确保质量和无垃圾邮件的结果。

    www.912219.com
  • 伍佰目录伍佰目录

    伍佰网站目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向伍佰目录推荐、提交优秀网站。

    www.wbwb.net