??????????value????mysqlinst????????????????????????????????????????

 
???????? ??????
?????????? 2020-06-10 15:25
???????? user-zh
?????? ?????? ????flinksql between????
????????????????flink1.10.0 ?????????????????????????????????? ???????????? 
????????????????????????
tnv.registerDataStream("sqlStream",mysqlinst,'start_ip,'end_ip,'area_id,'unit_id,'device_id)
    
tnv.registerDataStream("OMstream",value,'original_network_id,'asset_id,'types,'d_ip,'d_port,'s_ip,'s_port,'devip,'url,'common_des,'operation_des,'raw_log,'severity,'happen_time,'create_time,'s_ip_num,
  
'd_ip_num,'method,'asset_area_id,'device_id,'s_mac,'d_mac,'scope,'dcope,'s_asset_id,'d_asset_id,'asset_unit_id,'area_id,'unit_id,'enterprise_id)
    tnv.registerFunction("ip_to_num",IPtoNum)
 
 ?????????? ????????
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
info.
Exception in thread "main" org.apache.flink.table.api.ValidationException: Too 
many fields referenced from an atomic type.
at 
org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInfoFromAtomicType(FieldInfoUtils.java:388)
at 
org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInformation(FieldInfoUtils.java:259)
at 
org.apache.flink.table.typeutils.FieldInfoUtils.getFieldsInfo(FieldInfoUtils.java:227)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:237)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:236)
at scala.Option.map(Option.scala:146)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.asQueryOperation(StreamTableEnvironmentImpl.scala:236)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.fromDataStream(StreamTableEnvironmentImpl.scala:81)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.registerDataStream(StreamTableEnvironmentImpl.scala:94)
at 
com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES$.main(MysqlLogToES.scala:77)
at 
com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES.main(MysqlLogToES.scala)
 
 
 
 
 
------------------ ???????? ------------------
??????:&nbsp;"Leonard Xu"<xbjt...@gmail.com&gt;;
????????:&nbsp;2020??6??10??(??????) ????1:16
??????:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
 
????:&nbsp;Re: ????flinksql between????
 
 
 
Hi,
 
????????????????????????source(????)??&nbsp; ??????????mysql 
??????join??????????????????????sql????????????????????????????regular join, 
????join??????[1]??
 
SELECT
&nbsp; o.amout, o.currency, r.rate, o.amount * r.rate
FROM
&nbsp; Orders AS o
&nbsp; JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
&nbsp; ON r.currency = o.currency
????JDBC connector????????LookupSource?????????????????????????????????? 
connector.lookup.cache.ttl 
??????????????cache????????????????????????????????????
 
Best,
Leonard Xu
 
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins&gt;
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector&gt;
 
&gt; ?? 2020??6??10????10:43???????? <932460...@qq.com&gt; ??????
&gt; 
&gt; hi,???????? ?????????????????????????? ??????join????????mysql???? 
????????????source????????mysql????????????????????????????
&gt; 
&gt; 
&gt; 
&gt; 
&gt; ------------------&amp;nbsp;????????&amp;nbsp;------------------
&gt; 
??????:&amp;nbsp;"wangweigu...@stevegame.cn"<wangweigu...@stevegame.cn&amp;gt;;
&gt; ????????:&amp;nbsp;2020??6??9??(??????) ????6:35
&gt; ??????:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;
&gt; 
&gt; ????:&amp;nbsp;????: ?????? ????flinksql between????
&gt; 
&gt; 
&gt; 
&gt; 
&gt; &amp;nbsp; ????1.10???? 
useBlinkPlanner????????????useOldPlanner????????????
&gt; &amp;nbsp; 
&gt; ??????????????????
&gt; &amp;nbsp; Exception in thread "main" 
org.apache.flink.table.api.TableException: Cannot generate a valid execution 
plan for the given query: 
&gt; 
&gt; LogicalProject(num=[$0])
&gt; &amp;nbsp; LogicalJoin(condition=[AND(&amp;gt;($0, $1), <($0, $2))], 
joinType=[inner])
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; FlinkLogicalDataStreamScan(id=[1], 
fields=[num])
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; FlinkLogicalDataStreamScan(id=[2], 
fields=[startNum, endNum])
&gt; 
&gt; This exception indicates that the query uses an unsupported SQL feature.
&gt; 
&gt; 
&gt; 
&gt; 
&gt; &amp;nbsp;
&gt; ???????? ??????
&gt; ?????????? 2020-06-09 17:41
&gt; ???????? user-zh
&gt; ?????? ?????? ????flinksql between????
&gt; hi????????????&amp;amp;nbsp;
&gt; 1 flink1.9.0
&gt; 2 oldplanner
&gt; <dependency&amp;amp;gt;
&gt; <groupId&amp;amp;gt;org.apache.flink</groupId&amp;amp;gt;
&gt; <artifactId&amp;amp;gt;flink-table-api-scala_2.11</artifactId&amp;amp;gt;
&gt; <version&amp;amp;gt;1.9.0</version&amp;amp;gt;
&gt; </dependency&amp;amp;gt;
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; <dependency&amp;amp;gt;
&gt; <groupId&amp;amp;gt;org.apache.flink</groupId&amp;amp;gt;
&gt; <artifactId&amp;amp;gt;flink-table-planner_2.11</artifactId&amp;amp;gt;
&gt; <version&amp;amp;gt;1.9.0</version&amp;amp;gt;
&gt; </dependency&amp;amp;gt;
&gt; &amp;nbsp;
&gt; 3 streaming mode
&gt; 4. ????????????
&gt; &amp;amp;nbsp; &amp;amp;nbsp; val sqlStream = env.createInput(jdbcInput)
&gt; &amp;amp;nbsp; &amp;amp;nbsp; 
tnv.registerDataStream("sqlStream",sqlStream,'netstruct_id,'start_ip,'end_ip)
&gt; &amp;amp;nbsp; &amp;amp;nbsp; tnv.registerDataStream("OMstream",value,'ip)
&gt; //&amp;amp;nbsp; &amp;amp;nbsp; val table = tnv.sqlQuery("select * 
from&amp;amp;nbsp; OMstream as&amp;amp;nbsp; a left join sqlStream 
as&amp;amp;nbsp; b on a.ip &amp;amp;gt;b.start_ip and a.ip<b.end_ip")
&gt; &amp;amp;nbsp; &amp;amp;nbsp; val table = tnv.sqlQuery("select 
b.netstruct_id from&amp;amp;nbsp; OMstream as&amp;amp;nbsp; a left join 
sqlStream as b on a.ip &amp;amp;gt; b.start_ip and a.ip <b.end_ip ")
&gt; &amp;amp;nbsp; &amp;amp;nbsp; val resRow = table.toRetractStream[Row]
&gt; &amp;nbsp;
&gt; 5 ????????????
&gt; Exception in thread "main" org.apache.flink.table.api.TableException: 
Cannot generate a valid execution plan for the given query:&amp;amp;nbsp;
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; LogicalProject(netstruct_id=[$1])
&gt; &amp;amp;nbsp; LogicalJoin(condition=[AND(&amp;amp;gt;($0, $2), <($0, 
$3))], joinType=[left])
&gt; &amp;amp;nbsp; &amp;amp;nbsp; FlinkLogicalDataStreamScan(id=[1], 
fields=[ip])
&gt; &amp;amp;nbsp; &amp;amp;nbsp; FlinkLogicalDataStreamScan(id=[2], 
fields=[netstruct_id, start_ip, end_ip])
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; This exception indicates that the query uses an unsupported SQL feature.
&gt; Please check the documentation for the set of currently supported SQL 
features.
&gt; at 
org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:245)
&gt; at 
org.apache.flink.table.plan.Optimizer.optimizeLogicalPlan(Optimizer.scala:160)
&gt; at 
org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:66)
&gt; at 
org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410)
&gt; at 
org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182)
&gt; at 
org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
&gt; at 
org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
&gt; at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
&gt; at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
&gt; at scala.collection.Iterator$class.foreach(Iterator.scala:891)
&gt; at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
&gt; at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
&gt; at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
&gt; at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
&gt; at scala.collection.AbstractTraversable.map(Traversable.scala:104)
&gt; at 
org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127)
&gt; at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:201)
&gt; at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:124)
&gt; at 
org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146)
&gt; at com.jwell56.linkstarck.LInkStream$.main(LInkStream.scala:37)
&gt; at com.jwell56.linkstarck.LInkStream.main(LInkStream.scala)
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; 6 ??????????????&amp;amp;nbsp;
&gt; select b.netstruct_id from&amp;amp;nbsp; OMstream as&amp;amp;nbsp; a left 
join sqlStream as b on a.ip &amp;amp;gt; b.start_ip
&gt; ??????????????????????????????&amp;amp;nbsp;
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; ??????
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; ------------------&amp;amp;nbsp;????????&amp;amp;nbsp;------------------
&gt; ??????:&amp;amp;nbsp;"Benchao Li"<libenc...@apache.org&amp;amp;gt;;
&gt; ????????:&amp;amp;nbsp;2020??6??9??(??????) ????4:37
&gt; ??????:&amp;amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;amp;gt;;
&gt; &amp;nbsp;
&gt; ????:&amp;amp;nbsp;Re: ????flinksql between????
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; ????????????????????????
&gt; 1. ????????Flink????????
&gt; 2. ??????planner????blink planner????old planner??
&gt; 3. ??????streaming mode????batch mode??
&gt; 4. ??????????????????????
&gt; &amp;nbsp;
&gt; ?????? <932460...@qq.com&amp;amp;gt; ??2020??6??9?????? ????4:26??????
&gt; &amp;nbsp;
&gt; &amp;amp;gt; hi??????flinksql?????? select * from a join b on a.ip 
<b.startip and a.ip
&gt; &amp;amp;gt; &amp;amp;amp;gt;b.endip ???????????? ???????????? 
??????????????????between??????????????

回复