[jira] [Created] (FLINK-15399) Join with a LookupableTableSource:java.lang.RuntimeException: while converting XXXX Caused by: java.lang.AssertionError: Field ordinal 26 is invalid for type
Rockey Cui created FLINK-15399: -- Summary: Join with a LookupableTableSource:java.lang.RuntimeException: while converting Caused by: java.lang.AssertionError: Field ordinal 26 is invalid for type Key: FLINK-15399 URL: https://issues.apache.org/jira/browse/FLINK-15399 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.9.1 Environment: jdk1.8.0_211 Reporter: Rockey Cui Attachments: JoinTest-1.0-SNAPSHOT.jar {code:java} //代码占位符 public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); env.setParallelism(1); DataStreamSource stringDataStreamSource1 = env.fromElements( "HA" ); String[] fields1 = new String[]{"ORD_ID", "PS_PARTKEY", "PS_SUPPKEY", "PS_AVAILQTY", "PS_SUPPLYCOST", "PS_COMMENT" // key , "PS_INT", "PS_LONG" , "PS_DOUBLE8", "PS_DOUBLE14", "PS_DOUBLE15" , "PS_NUMBER1", "PS_NUMBER2", "PS_NUMBER3", "PS_NUMBER4" , "PS_DATE", "PS_TIMESTAMP", "PS_DATE_EVENT", "PS_TIMESTAMP_EVENT"}; TypeInformation[] types1 = new TypeInformation[]{Types.STRING, Types.INT, Types.LONG, Types.LONG, Types.DOUBLE, Types.STRING // key , Types.INT, Types.LONG , Types.DOUBLE, Types.DOUBLE, Types.DOUBLE , Types.LONG, Types.LONG, Types.DOUBLE, Types.DOUBLE , Types.SQL_DATE, Types.SQL_TIMESTAMP, Types.SQL_DATE, Types.SQL_TIMESTAMP}; RowTypeInfo typeInformation1 = new RowTypeInfo(types1, fields1); DataStream stream1 = stringDataStreamSource1.map(new MapFunction() { private static final long serialVersionUID = 2349572544179673356L; @Override public Row map(String s) { return new Row(typeInformation1.getArity()); } }).returns(typeInformation1); tableEnv.registerDataStream("FUN_1", stream1, String.join(",", typeInformation1.getFieldNames()) + ",PROCTIME.proctime"); DataStreamSource stringDataStreamSource2 = env.fromElements( "HA" ); String[] fields2 = new String[]{"C_NAME", "C_ADDRESS", "C_NATIONKEY" // key , "C_INT", "C_LONG" , "C_DOUBLE8", "C_DOUBLE14" , "C_DATE_EVENT", "C_TIMESTAMP_EVENT"}; TypeInformation[] types2 = new TypeInformation[]{Types.STRING, Types.STRING, Types.LONG // key , Types.INT, Types.LONG , Types.DOUBLE, Types.DOUBLE , Types.SQL_DATE, Types.SQL_TIMESTAMP}; RowTypeInfo typeInformation2 = new RowTypeInfo(types2, fields2); DataStream stream2 = stringDataStreamSource2.map(new MapFunction() { private static final long serialVersionUID = 2349572544179673349L; @Override public Row map(String s) { return new Row(typeInformation2.getArity()); } }).returns(typeInformation2); tableEnv.registerDataStream("FUN_2", stream2, String.join(",", typeInformation2.getFieldNames()) + ",PROCTIME.proctime"); MyLookupTableSource tableSource = MyLookupTableSource.newBuilder() .withFieldNames(new String[]{ "S_NAME", "S_ADDRESS", "S_PHONE" , "S_ACCTBAL", "S_COMMENT" // key , "S_INT", "S_LONG" , "S_DOUBLE8", "S_DOUBLE14" , "S_DOUBLE15", "S_DATE_EVENT", "S_TIMESTAMP_EVENT"}) .withFieldTypes(new TypeInformation[]{ Types.STRING, Types.STRING, Types.STRING , Types.DOUBLE, Types.STRING // key , Types.INT, Types.LONG , Types.DOUBLE, Types.DOUBLE , Types.DOUBLE, Types.SQL_DATE, Types.SQL_TIMESTAMP}) .build(); tableEnv.registerTableSource("INFO", tableSource); String sql = "SELECT LN(F.PS_INT),LOG(F2.C_INT,1)\n" + " FROM (SELECT *\n" + " FROM FUN_1 F1\n" + " JOIN INFO FOR SYSTEM_TIME AS OF F1.PROCTIME D1\n" + " ON F1.PS_INT = D1.S_INT AND F1.PS_LONG - 570 = D1.S_LONG \n" + ") F\n" + "JOIN FUN_2 F2 ON F.PS_INT = F2.C_INT AND F.PS_LONG - 150 = F2.C_LONG\n" + " WHERE 1=1\n" + " AND F.PS_INT BETWEEN 1000 AND 5000\n" + " AND F.S_LONG < 2147792600\n" + // I find this cause the Exception " AND F.PS_COMMENT LIKE '%FILY%'\n" + " AND F2.C_INT IS NOT NULL\n" + " AND LN(F.PS_INT)<8"; Table table = tableEnv.sqlQuery(sql); DataStream re
[jira] [Created] (FLINK-15212) PROCTIME attribute causes problems with timestamp times before 1900 ?
Rockey Cui created FLINK-15212: -- Summary: PROCTIME attribute causes problems with timestamp times before 1900 ? Key: FLINK-15212 URL: https://issues.apache.org/jira/browse/FLINK-15212 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.9.1 Environment: flink 1.9.1 jdk1.8.0_211 idea2019.3 Reporter: Rockey Cui A simple DataStreamSource with timestamp registered as a table. {code:java} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); env.setParallelism(1); DataStreamSource stringDataStreamSource = env.fromElements( "1001,1002,adc0,1900-01-01 00:00:00.0", "1002,1003,adc1,1910-01-01 00:00:00.0", "1003,1004,adc2,1920-01-01 00:00:00.0", "1004,1005,adc3,1930-01-01 00:00:00.0", "1005,1006,adc4,1970-01-01 00:00:00.0", ",,adc5,1971-01-01 00:00:00.0" ); TypeInformation[] fieldTypes = new TypeInformation[]{Types.LONG, Types.LONG, Types.STRING, Types.SQL_TIM String[] fieldNames = new String[]{"id", "cityId", "url", "clickTime"}; RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames); DataStream stream = stringDataStreamSource.map((MapFunction) s -> { String[] split = s.split(","); Row row = new Row(split.length); for (int i = 0; i < split.length; i++) { Object value = null; if (fieldTypes[i].equals(Types.STRING)) { value = split[i]; } if (fieldTypes[i].equals(Types.LONG)) { value = Long.valueOf(split[i]); } if (fieldTypes[i].equals(Types.INT)) { value = Integer.valueOf(split[i]); } if (fieldTypes[i].equals(Types.DOUBLE)) { value = Double.valueOf(split[i]); } if (fieldTypes[i].equals(Types.SQL_TIMESTAMP)) { value = Timestamp.valueOf(split[i]); } row.setField(i, value); } //System.out.println(row.toString()); return row; }).returns(rowTypeInfo); tableEnv.registerDataStream("user_click_info", stream, String.join(",", fieldNames) + ",www.proctime"); String sql = "select * from user_click_info"; Table table = tableEnv.sqlQuery(sql); DataStream result = tableEnv.toAppendStream(table, Row.class); result.print(); table.printSchema(); tableEnv.execute("Test"); {code} result ==> root |-- id: BIGINT |-- cityId: BIGINT |-- url: STRING |-- clickTime: TIMESTAMP(3) |-- www: TIMESTAMP(3) *PROCTIME* 1001,1002,adc0,{color:#FF}1899-12-31 23:54:17.0{color},2019-12-12 03:37:18.036 1002,1003,adc1,1910-01-01 00:00:00.0,2019-12-12 03:37:18.196 1003,1004,adc2,1920-01-01 00:00:00.0,2019-12-12 03:37:18.196 1004,1005,adc3,1930-01-01 00:00:00.0,2019-12-12 03:37:18.196 1005,1006,adc4,1970-01-01 00:00:00.0,2019-12-12 03:37:18.196 ,,adc5,1971-01-01 00:00:00.0,2019-12-12 03:37:18.196 without PROCTIME attribute is OK ==> root |-- id: BIGINT |-- cityId: BIGINT |-- url: STRING |-- clickTime: TIMESTAMP(3) 1001,1002,adc0,1900-01-01 00:00:00.0 1002,1003,adc1,1910-01-01 00:00:00.0 1003,1004,adc2,1920-01-01 00:00:00.0 1004,1005,adc3,1930-01-01 00:00:00.0 1005,1006,adc4,1970-01-01 00:00:00.0 ,,adc5,1971-01-01 00:00:00.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15155) Join with a LookupableTableSource: the defined order lookup keys are inconsistent
Rockey Cui created FLINK-15155: -- Summary: Join with a LookupableTableSource: the defined order lookup keys are inconsistent Key: FLINK-15155 URL: https://issues.apache.org/jira/browse/FLINK-15155 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 1.9.1 Environment: win10_x64 idea2019.2.4_x64 jdk1.8.0_211_x64 Reporter: Rockey Cui Attachments: getAsyncLookupFunction.png My DQL==> SELECT D1.S_INT , D1.S_LONG , D1.S_NUMBER1 , D1.S_ADDRESS , D1.S_DATE_EVENT , D1.S_TIMESTAMP_EVENT FROM CIRROSTREAM_DIM_GHL_SOURCE_13_CSL AS F1 LEFT JOIN CIRROSTREAM_DIM_GHL_DIM_13 FOR SYSTEM_TIME AS OF F1.PROCTIME AS D1 ON {color:#FF}F1.S_INT = D1.S_INT{color} {color:#FF} AND F1.S_LONG = D1.S_LONG{color} {color:#FF} AND F1.S_NUMBER1 = D1.S_NUMBER1{color} {color:#FF} AND F1.S_ADDRESS = D1.S_ADDRESS{color} WHERE F1.S_INT BETWEEN 1501 AND 2000 My LookupableTableSource.getAsyncLookupFunction received lookupKeys==> 0 = "S_ADDRESS" 1 = "S_INT" 2 = "S_LONG" 3 = "S_NUMBER1" -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14985) FLNIK YARN per job with ehcache Exception
Rockey Cui created FLINK-14985: -- Summary: FLNIK YARN per job with ehcache Exception Key: FLINK-14985 URL: https://issues.apache.org/jira/browse/FLINK-14985 Project: Flink Issue Type: Bug Components: Deployment / YARN Affects Versions: 1.9.1 Environment: openjdk-1.8.0.102-4.b14.el7.x86_64 flink1.9.1 Hadoop 3.0.0-cdh6.2.0 Reporter: Rockey Cui I used ehcache in my project,I get the following exception when using per job mode: java.lang.IllegalStateException: UserManagedCacheBuilder failed to build.java.lang.IllegalStateException: UserManagedCacheBuilder failed to build. at org.ehcache.config.builders.UserManagedCacheBuilder.build(UserManagedCacheBuilder.java:182) at org.ehcache.config.builders.UserManagedCacheBuilder.build(UserManagedCacheBuilder.java:404) at lookupabletable.cache.AllCache.init(AllCache.java:46) at lookupabletable.function.AbstractTableFunction.initCache(AbstractTableFunction.java:83) at lookupabletable.function.AbstractTableFunction.open(AbstractTableFunction.java:65) at LookupFunction$7.open(Unknown Source) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.open(LookupJoinRunner.java:68) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748)Caused by: java.lang.IllegalStateException: org.ehcache.core.spi.ServiceLocator$DependencyException: Failed to find provider with satisfied dependency set for interface org.ehcache.core.spi.store.Store$Provider [candidates [org.ehcache.impl.internal.store.heap.OnHeapStoreProviderFactory@5a58dd22, org.ehcache.impl.internal.store.offheap.OffHeapStoreProviderFactory@4d766540, org.ehcache.impl.internal.store.disk.OffHeapDiskStoreProviderFactory@784d016d, org.ehcache.impl.internal.store.tiering.TieredStoreProviderFactory@25b63026, org.ehcache.impl.internal.store.loaderwriter.LoaderWriterStoreProviderFactory@776ed22b]] at org.ehcache.core.spi.ServiceLocator$DependencySet.build(ServiceLocator.java:350) at org.ehcache.config.builders.UserManagedCacheBuilder.build(UserManagedCacheBuilder.java:179) ... 15 moreCaused by: org.ehcache.core.spi.ServiceLocator$DependencyException: Failed to find provider with satisfied dependency set for interface org.ehcache.core.spi.store.Store$Provider [candidates [org.ehcache.impl.internal.store.heap.OnHeapStoreProviderFactory@5a58dd22, org.ehcache.impl.internal.store.offheap.OffHeapStoreProviderFactory@4d766540, org.ehcache.impl.internal.store.disk.OffHeapDiskStoreProviderFactory@784d016d, org.ehcache.impl.internal.store.tiering.TieredStoreProviderFactory@25b63026, org.ehcache.impl.internal.store.loaderwriter.LoaderWriterStoreProviderFactory@776ed22b]] at org.ehcache.core.spi.ServiceLocator$DependencySet.lookupService(ServiceLocator.java:401) at org.ehcache.core.spi.ServiceLocator$DependencySet.build(ServiceLocator.java:322) ... 16 more but No problem in standalone mode. Any help ? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14961) Join with a LookupableTableSource: Expected results are inconsistent when using SQL expressions or UDF
Rockey Cui created FLINK-14961: -- Summary: Join with a LookupableTableSource: Expected results are inconsistent when using SQL expressions or UDF Key: FLINK-14961 URL: https://issues.apache.org/jira/browse/FLINK-14961 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.9.1 Environment: jdk1.8.0_211 windows7 IDEA 2019.2.4 Reporter: Rockey Cui Attachments: 1574826850(1).jpg, 企业微信截图_15748270288763.png, 企业微信截图_157482739422.png I implements LookupableTableSource,When I used the join syntax, I found that the expected result was not consistent with the SQL. Looking at the execution plan, I found the following problems: Stream Schema--> |-- id: BIGINT |-- cityId: INT |-- url: STRING |-- cDate: DATE |-- cTimeStamp: TIMESTAMP(3) |-- proctime: TIMESTAMP(3) *PROCTIME* DATA--> "1001,1002,adc0,2019-11-11,2019-11-11 00:00:00.00101", "1002,1003,adc1,2019-11-11,2019-11-11 00:00:00.00202", "1003,1004,adc2,2019-11-11,2019-11-11 00:00:00.00303", "1004,1005,adc3,2019-11-11,2019-11-11 00:00:00.00404", "1005,1006,adc4,2019-11-11,2019-11-11 00:00:00.00505", ",,adc5,2019-11-11,2019-11-11 00:00:00.00606" LookupableTableSource Schema--> |-- USERID: BIGINT |-- CITYID: INT |-- AGE: DOUBLE |-- USERNAME: STRING |-- USERSEX: STRING |-- CREATEDATE: TIMESTAMP(3) |-- CREATETIMESTAMP: TIMESTAMP(3) DATA--> 1001,1002,1001.00,赵俊峰,男,2019-11-26,2019-11-26 20:49:20.000527 1002,1003,1002.01,华宏言,男,2019-11-26,2019-11-26 20:49:20.000527 1003,1004,1003.02,姜艺,女,2019-11-26,2019-11-26 20:49:20.000527 1004,1005,1004.03,唐鸣,男,2019-11-26,2019-11-26 20:49:20.000527 1005,1006,1005.04,苗斌,男,2019-11-26,2019-11-26 20:49:20.000527 SQL --> select a.*,i.* from user_click_info a left join info FOR SYSTEM_TIME AS OF a.proctime i on i.CITYID = a.cityId and{color:#FF} i.USERID - 1 = a.id {color} FlinkTaskInfo --> Source: Collection Source -> Map -> SourceConversion(table=[Unregistered_DataStream_2], fields=[id, cityId, url, cDate, cTimeStamp, proctime]) -> LookupJoin(table=[MyLookupableTableSource(USERID, CITYID, AGE, USERNAME, USERSEX, CREATEDATE, CREATETIMESTAMP)], joinType=[LeftOuterJoin], async=[false], on=[cityId=CITYID, {color:#FF}id=$f7{color}], where=[], select=[id, cityId, url, cDate, cTimeStamp, proctime, USERID, CITYID, AGE, USERNAME, USERSEX, CREATEDATE, CREATETIMESTAMP, $f7]) -> Calc(select=[id, cityId, url, cDate, cTimeStamp, PROCTIME_MATERIALIZE(proctime) AS proctime, USERID, CITYID, AGE, USERNAME, USERSEX, CREATEDATE, CREATETIMESTAMP]) -> SinkConversionToRow -> Sink: Print to Std. Out (1/1) Result-> {color:#FF}1001,1002{color},adc0,2019-11-11,2019-11-11T00:00:00.001,2019-11-27T03:52:18.156,{color:#FF}1001,1002{color},1001.0,赵俊峰,男,2019-11-26T00:00,2019-11-26T20:49:20 {color:#FF}1002,1003{color},adc1,2019-11-11,2019-11-11T00:00:00.002,2019-11-27T03:52:18.263,{color:#FF}1002,1003{color},1002.01,华宏言,男,2019-11-26T00:00,2019-11-26T20:49:20 1003,1004,adc2,2019-11-11,2019-11-11T00:00:00.003,2019-11-27T03:52:18.351,1003,1004,1003.02,姜艺,女,2019-11-26T00:00,2019-11-26T20:49:20 1004,1005,adc3,2019-11-11,2019-11-11T00:00:00.004,2019-11-27T03:52:18.433,1004,1005,1004.03,唐鸣,男,2019-11-26T00:00,2019-11-26T20:49:20 1005,1006,adc4,2019-11-11,2019-11-11T00:00:00.005,2019-11-27T03:52:18.534,1005,1006,1005.04,苗斌,男,2019-11-26T00:00,2019-11-26T20:49:20 ,,adc5,2019-11-11,2019-11-11T00:00:00.006,2019-11-27T03:52:18.636,null,null,null,null,null,null,null It looks like the expression doesn't work!!! TableFunction getLookupFunction(String[] strings) only receive CITYID, public void eval(Object... params) only receive cityId value. -- This message was sent by Atlassian Jira (v8.3.4#803005)