熟女丰满少妇精品一区二区,国产精品第一页综合在线,亚洲人成色777777精品,午夜性色福利免费视频在线播放

<span id="7ugte"><input id="7ugte"></input></span><td id="7ugte"><s id="7ugte"></s></td>
          <center id="7ugte"></center>

          甘肅信息港

          Flink教程:DataStream上的Join操作

          分享到:
           2020-03-28 07:11:25 來源: 閱讀:-G0

          批處理經(jīng)常要解決的問題是將兩個(gè)數(shù)據(jù)源做關(guān)聯(lián)Join操作。比如,很多手機(jī)APP都有一個(gè)用戶數(shù)據(jù)源User,同時(shí)APP會(huì)記錄用戶的行為,我們稱之為Behavior,兩個(gè)表按照userId來進(jìn)行Join。在流處理場景下,F(xiàn)link也支持了Join,只不過Flink是在一個(gè)時(shí)間窗口上來進(jìn)行兩個(gè)表的Join。

          Join示例圖

          目前,F(xiàn)link支持了兩種Join:Window Join(窗口連接)和Interval Join(時(shí)間間隔連接。

          Window Join

          從名字中能猜到,Window Join主要在Flink的窗口上進(jìn)行操作,它將兩個(gè)流中落在相同窗口的元素按照某個(gè)Key進(jìn)行Join。一個(gè)Window Join的大致骨架結(jié)構(gòu)為:

          input1.join(input2)    .where(&lt;KeySelector&gt;)      &lt;- input1使用哪個(gè)字段作為Key    .equalTo(&lt;KeySelector&gt;)    &lt;- input2使用哪個(gè)字段作為Key    .window(&lt;WindowAssigner&gt;)  &lt;- 指定WindowAssigner    [.trigger(&lt;Trigger&gt;)]      &lt;- 指定Trigger(可選)    [.evictor(&lt;Evictor&gt;)]      &lt;- 指定Evictor(可選)    .apply(&lt;JoinFunction&gt;)     &lt;- 指定JoinFunction

          下圖展示了Join的大致過程。兩個(gè)輸入數(shù)據(jù)流先分別按Key進(jìn)行分組,然后將元素劃分到窗口中。窗口的劃分需要使用WindowAssigner來定義,這里可以使用Flink提供的滾動(dòng)窗口、滑動(dòng)窗口或會(huì)話窗口等默認(rèn)的WindowAssigner。隨后兩個(gè)數(shù)據(jù)流中的元素會(huì)被分配到各個(gè)窗口上,也就是說一個(gè)窗口會(huì)包含來自兩個(gè)數(shù)據(jù)流的元素。相同窗口內(nèi)的數(shù)據(jù)會(huì)以INNER JOIN的語義來相互關(guān)聯(lián),形成一個(gè)數(shù)據(jù)對(duì)。當(dāng)窗口的時(shí)間結(jié)束,F(xiàn)link會(huì)調(diào)用JoinFunction來對(duì)窗口內(nèi)的數(shù)據(jù)對(duì)進(jìn)行處理。當(dāng)然,我們也可以使用Trigger或Evictor做一些自定義優(yōu)化,他們的使用方法和普通窗口的使用方法一樣。

          Join的大致流程

          接下來我們重點(diǎn)分析一下兩個(gè)數(shù)據(jù)流是如何INNER JOIN的:

          窗口內(nèi)數(shù)據(jù)INNER JOIN示意圖

          一般滴,INNER JOIN只對(duì)兩個(gè)數(shù)據(jù)源都出現(xiàn)的元素做Join,形成一個(gè)數(shù)據(jù)對(duì),即數(shù)據(jù)源input1中的某個(gè)元素與數(shù)據(jù)源input2中的所有元素逐個(gè)配對(duì)。當(dāng)數(shù)據(jù)源某個(gè)窗口內(nèi)沒數(shù)據(jù)時(shí),比如圖中的第三個(gè)窗口,Join的結(jié)果也是空的。

          class MyJoinFunction extends JoinFunction[(String, Int), (String, Int), String] {  override def join(input1: (String, Int), input2: (String, Int)): String = {    &#34;input 1 :&#34; + input1._2 + &#34;, input 2 :&#34; + input2._2  }}val input1: DataStream[(String, Int)] = ...val input2: DataStream[(String, Int)] = ...val joinResult = input1.join(input2)      .where(i1 =&gt; i1._1)      .equalTo(i2 =&gt; i2._1)      .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))      .apply(new MyJoinFunction)

          上面的代碼自定義了JoinFunction,并將Join結(jié)果打印出來。無論代碼中演示的滾動(dòng)窗口,還是滑動(dòng)窗口或會(huì)話窗口,其原理都是一樣的。除了JoinFunction,F(xiàn)link還提供了FlatJoinFunction,其功能是輸出零到多個(gè)結(jié)果。

          如果INNER JOIN不能滿足我們的需求,CoGroupFunction提供了更多可自定義的功能。需要注意的是,在調(diào)用時(shí),要寫成input1.coGroup(input2).where(&lt;KeySelector&gt;).equalTo(&lt;KeySelecotr&gt;)。

          class MyCoGroupFunction extends CoGroupFunction[(String, Int), (String, Int), String] {  // 這里的類型是Java的Iterable,需要引用 collection.JavaConverters._ 并轉(zhuǎn)成Scala  override def coGroup(input1: lang.Iterable[(String, Int)], input2: lang.Iterable[(String, Int)], out: Collector[String]): Unit = {    input1.asScala.foreach(element =&gt; out.collect(&#34;input1 :&#34; + element.toString()))    input2.asScala.foreach(element =&gt; out.collect(&#34;input2 :&#34; + element.toString()))  }}val input1: DataStream[(String, Int)] = ...val input2: DataStream[(String, Int)] = ...val coGroupResult = input1.coGroup(input2)      .where(i1 =&gt; i1._1)      .equalTo(i2 =&gt; i2._1)      .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))      .apply(new MyCoGroupFunction)

          Interval Join

          與Window Join不同,Interval Join不依賴Flink的WindowAssigner,而是根據(jù)一個(gè)時(shí)間間隔(Interval)界定時(shí)間。Interval需要一個(gè)時(shí)間下界(lower bound)和上界(upper bound),如果我們將input1和input2進(jìn)行Interval Join,input1中的某個(gè)元素為input1.element1,時(shí)間戳為input1.element1.ts,那么一個(gè)Interval就是[input1.element1.ts + lower bound, input1.element1.ts + upper bound],input2中落在這個(gè)時(shí)間段內(nèi)的元素將會(huì)和input1.element1組成一個(gè)數(shù)據(jù)對(duì)。用數(shù)學(xué)公式表達(dá)為,凡是符合下面公式input1.element1.ts + lower bound &lt;= input2.elementx.ts &lt;=input1.element1.ts + upper bound的元素使用INNER JOIN語義,兩兩組合在一起。上下界可以是正數(shù)也可以是負(fù)數(shù)。

          注意,目前Flink(1.9)的Interval Join只支持Event Time語義。

          Interval Join示意圖

          下面的代碼展示了如何對(duì)兩個(gè)數(shù)據(jù)流進(jìn)行Interval Join:

          class MyProcessFunction extends ProcessJoinFunction[(String, Long, Int), (String, Long, Int), String] {  override def processElement(input1: (String, Long, Int),                              input2: (String, Long, Int),                              context: ProcessJoinFunction[(String, Long, Int), (String, Long, Int), String]#Context,                              out: Collector[String]): Unit = {    out.collect(&#34;input 1: &#34; + input1.toString() + &#34;, input 2: &#34; + input2.toString)  }}// 數(shù)據(jù)流有三個(gè)字段:(key, 時(shí)間戳, 數(shù)值)val input1: DataStream[(String, Long, Int)] = ...val input2: DataStream[(String, Long, Int)] = ...val intervalJoinResult = input1.keyBy(_._1)      .intervalJoin(input2.keyBy(_._1))      .between(Time.milliseconds(-5), Time.milliseconds(10))      .process(new MyProcessFunction)

          默認(rèn)的時(shí)間間隔是包含上下界的,我們可以使用.lowerBoundExclusive() 和.upperBoundExclusive來確定是否需要包含上下界。

          val intervalJoinResult = input1.keyBy(_._1)      .intervalJoin(input2.keyBy(_._1))      .between(Time.milliseconds(-5), Time.milliseconds(10))      .upperBoundExclusive()      .lowerBoundExclusive()      .process(new MyProcessFunction)

          Interval Join內(nèi)部是用緩存來存儲(chǔ)所有數(shù)據(jù)的,因此需要注意緩存數(shù)據(jù)不能太大,以免對(duì)內(nèi)存造成絕大壓力。

          推薦閱讀:lofree

          文章評(píng)價(jià)COMMENT

          還可以輸入2000個(gè)字

          暫無網(wǎng)友的評(píng)論

          意見反饋

          ×
          J