Hi Peihui,
確認下你想調用的方法是不是 connect?因為看起來 stream1.collect(stream2) 不像是 DataStream 支援的 API
如果是的話,想請問你 ConfigSource() 有沒有配置 WatermarkStrategy?connect 後的算子是透過上游兩個算子的
watermark 取最小作為輸出。
因此,如果只定義其中一邊的 WatermarkStrategy 會導致這個算子的 watermark 無法推進。
詳細可以參考這個章節
https://nightlies.apache.org/flink/flink-docs-mast
" dmo_index_code string, " +
> " index_value string" +
> ") with (" +
> " 'connector' = 'jdbc', " +
> " 'username' = 'root
Hi xiaoyue,
請問你的測試代碼有其他額外的代碼或配置嗎?能否分享下完整的代碼和配置文件?
我嘗試在我的 IDE 上運行了下列代碼是能成功運行的。
public static void main(String[] args) {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings Settings =
EnvironmentSettings.newInstance().inBatc
Hi,
有考慮升級 1.14 嗎?Flink 1.14 支持了 FLIP-147,讓 Flink 在 task 為 finished 狀態時仍能觸發
checkpoint [1, 2]。
[1]
https://flink.apache.org/news/2021/09/29/release-1.14.0.html#checkpointing-and-bounded-streams
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finish
Hi,
如果兩次 left join 的話是否滿足你的需求呢?
然後在取 temporal table 的字段時,用 IF 去判斷取值。參考 SQL 如下
SELECT
c.mer_cust_id,
*IF(k.mer_cust_id IS NOT NULL AND a.mercust_id IS NOT NULL AND
k.mer_cust_id <> '', k.update_time, NULL) AS update_time*
FROM charge_log as c
LEFT JOIN ka_mer_info FOR SYSTEM_TIME AS OF c.pro
AST_VALUE(num) as num, LAST_VALUE(flag) as flag
> from tmpTable,
> group by user, ord
> ) t1
> group by user
best regards,
Tony Wei 於 2021年11月25日 週四 上午11:01寫道:
> Hi,
>
> 對於這個 sql 的回撤流,我感覺沒什麼問題。原因是如果在 batch 執行模式下,首三行的輸出結果應該為:
>
> +--+---+
>
Hi,
對於這個 sql 的回撤流,我感覺沒什麼問題。原因是如果在 batch 執行模式下,首三行的輸出結果應該為:
+--+---+
| user | num |
+--+---+
| b | 20|
+--+---+
因為 user a 被 `where flag = 1` 過濾掉了,所以實際下游應該是撤回了 insert 的操作才對,而不是更新成 num = 0。
或許可以考慮把 sql 寫法改為這樣試試?
select user, sum(num) as num
> from (
> select use
Hi Pinjie,
如果是需要 event triggered 的累計統計更新的話,可以考慮使用 SQL over aggregation
[1]。例如文件中提供的如下範例,計算當前 row 往前一小時內的加總結果。
> SELECT order_id, order_time, amount,
> SUM(amount) OVER (
> PARTITION BY product
> ORDER BY order_time
> RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
> ) A
Hi yidan,
你可以試試 SQL Hints [1].
[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/queries/hints/
yidan zhao 於 2021年11月2日 週二 下午1:03寫道:
> 嗯嗯,hive catalog的确不需要重新建表,但是我的场景是:我需要通过 flinkSQL 流式将 kafka 表数据写入 hive 表。
> 因此就需要有如下属性等,而原先的hive表式spark-sql中创建的,肯定不可能带有这种属性。我目前想法是,比
Hi
從代碼上來看是使用了 regular join 關聯了 kafka source 和 hbase source,hbase connector
目前是不支持流式數據源的
你可以從任務儀表板上確認下提交的任務,hbase source 的部分應該在執行一段時間後狀態會變更為 FINISHED,目前 flink
checkpoint 還不支持在 FINISHED task 上執行
你可以考慮改寫 sql 使用 processing time temporal join [1] 的方式來關聯 hbase table,從 kafka
消費的數據會實時的去查 hbase table 的當前數
Hi Chenyu,
這確實是目前尚未解決的一個問題,相關的 jira issue 可以看這 [1]。
jira issue 底下的討論串有提到一個替代方案是:使用 -D\$internal.pipeline.job-id=$(cat
/proc/sys/kernel/random/uuid|tr -d "-") 主動為 application 模式的任務產生隨機的 jobid。
但由於此配置參數屬於 flink 內部參數,可能不保證未來任何改動後的向後兼容性,請謹慎考慮後再使用。
[1] https://issues.apache.org/jira/browse/FLINK-19358
Hi Caizhi,
我測試了 sink.rolling-policy.rollover-interval 這個配置,並且改使用 csv hive table 作為
sink table,結果是符合預期的。再次謝謝你的幫忙。
Tony Wei 於 2021年8月5日 週四 上午10:40寫道:
> Hi,
>
> 感謝指正,我的目的是為了測試 sql hints 是否生效,選擇 `sink.parallelism` 是
> 單純因為這個配置比較好觀察結果。
> 我會再嘗試其他 hive streaming sink 的配置測試看看。謝謝。
>
Hi,
感謝指正,我的目的是為了測試 sql hints 是否生效,選擇 `sink.parallelism` 是
單純因為這個配置比較好觀察結果。
我會再嘗試其他 hive streaming sink 的配置測試看看。謝謝。
Caizhi Weng 於 2021年8月5日 週四 上午10:36寫道:
> Hi!
>
> 单独设置 sink 并发是 1.13 的新特性。可以升级到 1.13 试试看。
>
> Tony Wei 于2021年8月5日周四 上午10:21写道:
>
> > Hi Experts,
> >
Hi Experts,
我嘗試使用 sql hints 去覆寫 hive sink table 的配置,發現 sql hints 沒有生效。
我測試了修改 `sink.parallelism` 來調整寫入的並行度,但並行度仍舊為 `1`。(sql 任務配置的並行度)
我運行的環境版本是 Flink 1.12.2,底下是我在 flink sql client 的操作流程和結果的截圖。
寫入的 `user_hive_2` 表是透過 hive beeline 創建的,不是透過 flink sql ddl。
同時我也確認了 `table.dynamic-table-options.enabled
補充一下,從代碼上來看, last_value 原本的實現似乎就是獲取 last "non-null" value 了。
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/LastValueAggFunction.java
Tony Wei 於 2021年8月2日 週一 下午5:33寫道:
> 你好,
>
>
你好,
如果我沒有理解錯你的應用場景的話,你想達成的結果應該是類似這篇討論 [1] 裡提到的問題對吧?
從最新的 flink 文檔 [2] 中來看應該無法透過你期望的 on duplicate key 語句來實現,
或許可以嘗試在 SELECT 語句上達成,舉例來說你可以在原有的 select 語句之外多添加 group by,如下:
insert into t select a, last_value(b ignore nulls) as b, last_value(c
> ignore nulls) as c from $(original_select_statement) gro
Hi,
1、是否是这种方式增加label
>
是的,MetricGroup#addGroup(key, value) 的設計其中一個目的就是為了支援 prometheus 的 label 。
> 2、由于采用了add group的方式,导致exp对应的值里面的 ‘.’ 变成了下划线,是否有办法保持为'.'
可以透過配置 filterLabelValueCharacters: false 來關閉過濾功能 [1],但使用者需要自行確保不會有非法字元混入
label value 之中。詳細可參考文檔說明。
[1]
https://ci.apache.org/projects/flink/
會是 merge function 去把三者合併。
Best
Tony Wei
Px New <15701181132mr@gmail.com> 於 2019年11月10日 週日 上午10:58寫道:
> [image: image.png]建议深入解下 keyWindow,NoKeyWindow 与Assigner TimeWindow
> And WindowsFunction 😃
>
> Yuan,Youjun 于2019年11月9日周六 下午7:46写道:
>
>> 1, 是
>> 2,没有标准答案,
kafka operator 。先謝謝各位的幫助。
Best,
Tony Wei
Hi,
非常感謝你的回覆。但我認為子查詢可能沒有辦法滿足我想要的功能,原因是 sql optimizar 會
嘗試做 where condition pushdown,其結果會跟原本的 sql 相似。
我在 user mailing list 上問了相同的問題,並得到了一個可行的解法 [1],給你參考看看。
Best,
Tony Wei
[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Is-it-possible-to-decide-the-order-of-where
UDF_NEED_TO_QUERY_DB(user)
謝謝大家的幫忙。
Best regards,
Tony Wei
Hi Caizhi,
非常感謝你提供的資訊和講解,這對我幫助非常大。我會試著把這些知識應用到我們的案例中。
Best Regards,
Tony Wei
Caizhi Weng 於 2019年7月12日 週五 下午12:31寫道:
> Hi Tony!
>
> 這些資料都不需要在記憶體中實體化,所以不會受限記憶體的擴展性,只有那些需要被放入 Top-N 的資料會被讀取出來存放在一個 in-memory
> > 的堆中做為加速運算的優化。
>
>
> 前两种情况下,由于不需要从老数据中捞记录回 Top-N,state 里其实也只要放
,雖然沒有很有把握但或許可以根據你的想法實現一個專門針對我們應用情境的優化。
撇開上述特殊的情況,我另外好奇的是第一點中維護的 map state 要記錄精確的排名這件事的細
節,想知道如果更新是循序變化的,如果添加了一筆新的紀錄,可能會導致多個紀錄的排名需要
加一或是減一,這部分是不是也需要遍歷整個 map 去判斷是否有增減,針對變動的部分通知下
游?
Best Regards,
Tony Wei
Caizhi Weng 於 2019年7月12日 週五 上午11:36寫道:
> Hi Tony!
>
> 其实 Flink 对 Top-N 问题并没有很
略掉 D,因為他不在 top-3 的範圍內。但是當下一個如果收到
的是一個更新 A 使用者的 score 為 0 的紀錄的話,這個時候理論上我們知道 top-3 會改為 B, C,
D,但是在維護 top-3 的堆中我們無力找回被忽略的使用者 D。這樣的優化在 batch mode 是沒有
問題的,因為最新的 score 在有限的數據中會是固定的不動的。
不過當處理流數據,我目前只想到這種應用最終可能需要退回成存放全部使用者 scores 才有辦
法處理,才能隨時計算出正確的 top-k。所以我想請教各位大牛有沒有什麼樣的優化方式可以處
理這樣的問題,讓狀態不需要存到全部資料?當然這個
24 matches
Mail list logo