[jira] [Created] (FLINK-15399) Join with a LookupableTableSource:java.lang.RuntimeException: while converting XXXX Caused by: java.lang.AssertionError: Field ordinal 26 is invalid for type

2019-12-26 Thread Rockey Cui (Jira)
Rockey Cui created FLINK-15399:
--

 Summary: Join with a 
LookupableTableSource:java.lang.RuntimeException: while converting   Caused 
by: java.lang.AssertionError: Field ordinal 26 is invalid for  type
 Key: FLINK-15399
 URL: https://issues.apache.org/jira/browse/FLINK-15399
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.9.1
 Environment: jdk1.8.0_211
Reporter: Rockey Cui
 Attachments: JoinTest-1.0-SNAPSHOT.jar

 
{code:java}
//代码占位符
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
env.setParallelism(1);

DataStreamSource stringDataStreamSource1 = env.fromElements(
"HA"
);
String[] fields1 = new String[]{"ORD_ID", "PS_PARTKEY", "PS_SUPPKEY", 
"PS_AVAILQTY", "PS_SUPPLYCOST", "PS_COMMENT"
// key
, "PS_INT", "PS_LONG"
, "PS_DOUBLE8", "PS_DOUBLE14", "PS_DOUBLE15"
, "PS_NUMBER1", "PS_NUMBER2", "PS_NUMBER3", "PS_NUMBER4"
, "PS_DATE", "PS_TIMESTAMP", "PS_DATE_EVENT", "PS_TIMESTAMP_EVENT"};
TypeInformation[] types1 = new TypeInformation[]{Types.STRING, 
Types.INT, Types.LONG, Types.LONG, Types.DOUBLE, Types.STRING
// key
, Types.INT, Types.LONG
, Types.DOUBLE, Types.DOUBLE, Types.DOUBLE
, Types.LONG, Types.LONG, Types.DOUBLE, Types.DOUBLE
, Types.SQL_DATE, Types.SQL_TIMESTAMP, Types.SQL_DATE, 
Types.SQL_TIMESTAMP};
RowTypeInfo typeInformation1 = new RowTypeInfo(types1, fields1);
DataStream stream1 = stringDataStreamSource1.map(new 
MapFunction() {
private static final long serialVersionUID = 2349572544179673356L;

@Override
public Row map(String s) {
return new Row(typeInformation1.getArity());
}
}).returns(typeInformation1);
tableEnv.registerDataStream("FUN_1", stream1, String.join(",", 
typeInformation1.getFieldNames()) + ",PROCTIME.proctime");

DataStreamSource stringDataStreamSource2 = env.fromElements(
"HA"
);
String[] fields2 = new String[]{"C_NAME", "C_ADDRESS", "C_NATIONKEY"
// key
, "C_INT", "C_LONG"
, "C_DOUBLE8", "C_DOUBLE14"
, "C_DATE_EVENT", "C_TIMESTAMP_EVENT"};
TypeInformation[] types2 = new TypeInformation[]{Types.STRING, 
Types.STRING, Types.LONG
// key
, Types.INT, Types.LONG
, Types.DOUBLE, Types.DOUBLE
, Types.SQL_DATE, Types.SQL_TIMESTAMP};
RowTypeInfo typeInformation2 = new RowTypeInfo(types2, fields2);

DataStream stream2 = stringDataStreamSource2.map(new 
MapFunction() {
private static final long serialVersionUID = 2349572544179673349L;

@Override
public Row map(String s) {
return new Row(typeInformation2.getArity());
}
}).returns(typeInformation2);
tableEnv.registerDataStream("FUN_2", stream2, String.join(",", 
typeInformation2.getFieldNames()) + ",PROCTIME.proctime");

MyLookupTableSource tableSource = MyLookupTableSource.newBuilder()
.withFieldNames(new String[]{
"S_NAME", "S_ADDRESS", "S_PHONE"
, "S_ACCTBAL", "S_COMMENT"
// key
, "S_INT", "S_LONG"
, "S_DOUBLE8", "S_DOUBLE14"
, "S_DOUBLE15", "S_DATE_EVENT", "S_TIMESTAMP_EVENT"})
.withFieldTypes(new TypeInformation[]{
Types.STRING, Types.STRING, Types.STRING
, Types.DOUBLE, Types.STRING
// key
, Types.INT, Types.LONG
, Types.DOUBLE, Types.DOUBLE
, Types.DOUBLE, Types.SQL_DATE, Types.SQL_TIMESTAMP})
.build();

tableEnv.registerTableSource("INFO", tableSource);

String sql = "SELECT LN(F.PS_INT),LOG(F2.C_INT,1)\n" +
"  FROM (SELECT *\n" +
"  FROM FUN_1 F1\n" +
"  JOIN INFO FOR SYSTEM_TIME AS OF F1.PROCTIME D1\n" +
" ON F1.PS_INT = D1.S_INT AND F1.PS_LONG - 570 = D1.S_LONG \n" +
") F\n" +
"JOIN FUN_2 F2 ON F.PS_INT = F2.C_INT AND F.PS_LONG - 150 = 
F2.C_LONG\n" +
" WHERE 1=1\n" +
" AND F.PS_INT BETWEEN 1000 AND 5000\n" +
" AND F.S_LONG < 2147792600\n" + // I find this cause the Exception
" AND F.PS_COMMENT LIKE '%FILY%'\n" +
" AND F2.C_INT IS NOT NULL\n" +
" AND LN(F.PS_INT)<8";

Table table = tableEnv.sqlQuery(sql);

DataStream re

[jira] [Created] (FLINK-15212) PROCTIME attribute causes problems with timestamp times before 1900 ?

2019-12-11 Thread Rockey Cui (Jira)
Rockey Cui created FLINK-15212:
--

 Summary:  PROCTIME attribute causes problems with timestamp times 
before 1900 ?
 Key: FLINK-15212
 URL: https://issues.apache.org/jira/browse/FLINK-15212
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.9.1
 Environment: flink 1.9.1

jdk1.8.0_211

idea2019.3
Reporter: Rockey Cui


A simple DataStreamSource with timestamp registered as a table.

 
{code:java}
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
DataStreamSource stringDataStreamSource = env.fromElements(
"1001,1002,adc0,1900-01-01 00:00:00.0",
"1002,1003,adc1,1910-01-01 00:00:00.0",
"1003,1004,adc2,1920-01-01 00:00:00.0",
"1004,1005,adc3,1930-01-01 00:00:00.0",
"1005,1006,adc4,1970-01-01 00:00:00.0",
",,adc5,1971-01-01 00:00:00.0"
);
TypeInformation[] fieldTypes = new TypeInformation[]{Types.LONG, Types.LONG, 
Types.STRING, Types.SQL_TIM
String[] fieldNames = new String[]{"id", "cityId", "url", "clickTime"};
RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames);
DataStream stream = stringDataStreamSource.map((MapFunction) 
s -> {
String[] split = s.split(",");
Row row = new Row(split.length);
for (int i = 0; i < split.length; i++) {
Object value = null;
if (fieldTypes[i].equals(Types.STRING)) {
value = split[i];
}
if (fieldTypes[i].equals(Types.LONG)) {
value = Long.valueOf(split[i]);
}
if (fieldTypes[i].equals(Types.INT)) {
value = Integer.valueOf(split[i]);
}
if (fieldTypes[i].equals(Types.DOUBLE)) {
value = Double.valueOf(split[i]);
}
if (fieldTypes[i].equals(Types.SQL_TIMESTAMP)) {
value = Timestamp.valueOf(split[i]);
}
row.setField(i, value);
}
//System.out.println(row.toString());
return row;
}).returns(rowTypeInfo);
tableEnv.registerDataStream("user_click_info", stream, String.join(",", 
fieldNames) + ",www.proctime");
String sql = "select * from user_click_info";
Table table = tableEnv.sqlQuery(sql);
DataStream result = tableEnv.toAppendStream(table, Row.class);
result.print();
table.printSchema();
tableEnv.execute("Test");
{code}
result ==>

 

root
 |-- id: BIGINT
 |-- cityId: BIGINT
 |-- url: STRING
 |-- clickTime: TIMESTAMP(3)
 |-- www: TIMESTAMP(3) *PROCTIME*

 

1001,1002,adc0,{color:#FF}1899-12-31 23:54:17.0{color},2019-12-12 
03:37:18.036
1002,1003,adc1,1910-01-01 00:00:00.0,2019-12-12 03:37:18.196
1003,1004,adc2,1920-01-01 00:00:00.0,2019-12-12 03:37:18.196
1004,1005,adc3,1930-01-01 00:00:00.0,2019-12-12 03:37:18.196
1005,1006,adc4,1970-01-01 00:00:00.0,2019-12-12 03:37:18.196
,,adc5,1971-01-01 00:00:00.0,2019-12-12 03:37:18.196

without  PROCTIME attribute is OK ==>

 

root
 |-- id: BIGINT
 |-- cityId: BIGINT
 |-- url: STRING
 |-- clickTime: TIMESTAMP(3)

 

1001,1002,adc0,1900-01-01 00:00:00.0
1002,1003,adc1,1910-01-01 00:00:00.0
1003,1004,adc2,1920-01-01 00:00:00.0
1004,1005,adc3,1930-01-01 00:00:00.0
1005,1006,adc4,1970-01-01 00:00:00.0
,,adc5,1971-01-01 00:00:00.0



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15155) Join with a LookupableTableSource: the defined order lookup keys are inconsistent

2019-12-09 Thread Rockey Cui (Jira)
Rockey Cui created FLINK-15155:
--

 Summary: Join with a LookupableTableSource: the defined order 
lookup keys are inconsistent 
 Key: FLINK-15155
 URL: https://issues.apache.org/jira/browse/FLINK-15155
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Affects Versions: 1.9.1
 Environment: win10_x64

idea2019.2.4_x64

jdk1.8.0_211_x64
Reporter: Rockey Cui
 Attachments: getAsyncLookupFunction.png

My DQL==>

SELECT
 D1.S_INT ,
 D1.S_LONG ,
 D1.S_NUMBER1 ,
 D1.S_ADDRESS ,
 D1.S_DATE_EVENT ,
 D1.S_TIMESTAMP_EVENT
FROM
 CIRROSTREAM_DIM_GHL_SOURCE_13_CSL AS F1 LEFT JOIN 
CIRROSTREAM_DIM_GHL_DIM_13 FOR SYSTEM_TIME AS OF F1.PROCTIME AS D1
 ON {color:#FF}F1.S_INT = D1.S_INT{color}
{color:#FF} AND F1.S_LONG = D1.S_LONG{color}
{color:#FF} AND F1.S_NUMBER1 = D1.S_NUMBER1{color}
{color:#FF} AND F1.S_ADDRESS = D1.S_ADDRESS{color}
WHERE
 F1.S_INT BETWEEN 1501 AND 2000

My LookupableTableSource.getAsyncLookupFunction received lookupKeys==>

0 = "S_ADDRESS"
1 = "S_INT"
2 = "S_LONG"
3 = "S_NUMBER1"

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14985) FLNIK YARN per job with ehcache Exception

2019-11-28 Thread Rockey Cui (Jira)
Rockey Cui created FLINK-14985:
--

 Summary: FLNIK YARN per job with ehcache Exception
 Key: FLINK-14985
 URL: https://issues.apache.org/jira/browse/FLINK-14985
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN
Affects Versions: 1.9.1
 Environment: openjdk-1.8.0.102-4.b14.el7.x86_64

flink1.9.1

Hadoop 3.0.0-cdh6.2.0
Reporter: Rockey Cui


I used ehcache in my project,I get the following exception when using per job 
mode:

java.lang.IllegalStateException: UserManagedCacheBuilder failed to 
build.java.lang.IllegalStateException: UserManagedCacheBuilder failed to build. 
at 
org.ehcache.config.builders.UserManagedCacheBuilder.build(UserManagedCacheBuilder.java:182)
 at 
org.ehcache.config.builders.UserManagedCacheBuilder.build(UserManagedCacheBuilder.java:404)
 at lookupabletable.cache.AllCache.init(AllCache.java:46) at 
lookupabletable.function.AbstractTableFunction.initCache(AbstractTableFunction.java:83)
 at 
lookupabletable.function.AbstractTableFunction.open(AbstractTableFunction.java:65)
 at LookupFunction$7.open(Unknown Source) at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
 at 
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.open(LookupJoinRunner.java:68)
 at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
 at 
org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at 
java.lang.Thread.run(Thread.java:748)Caused by: 
java.lang.IllegalStateException: 
org.ehcache.core.spi.ServiceLocator$DependencyException: Failed to find 
provider with satisfied dependency set for interface 
org.ehcache.core.spi.store.Store$Provider [candidates 
[org.ehcache.impl.internal.store.heap.OnHeapStoreProviderFactory@5a58dd22, 
org.ehcache.impl.internal.store.offheap.OffHeapStoreProviderFactory@4d766540, 
org.ehcache.impl.internal.store.disk.OffHeapDiskStoreProviderFactory@784d016d, 
org.ehcache.impl.internal.store.tiering.TieredStoreProviderFactory@25b63026, 
org.ehcache.impl.internal.store.loaderwriter.LoaderWriterStoreProviderFactory@776ed22b]]
 at 
org.ehcache.core.spi.ServiceLocator$DependencySet.build(ServiceLocator.java:350)
 at 
org.ehcache.config.builders.UserManagedCacheBuilder.build(UserManagedCacheBuilder.java:179)
 ... 15 moreCaused by: org.ehcache.core.spi.ServiceLocator$DependencyException: 
Failed to find provider with satisfied dependency set for interface 
org.ehcache.core.spi.store.Store$Provider [candidates 
[org.ehcache.impl.internal.store.heap.OnHeapStoreProviderFactory@5a58dd22, 
org.ehcache.impl.internal.store.offheap.OffHeapStoreProviderFactory@4d766540, 
org.ehcache.impl.internal.store.disk.OffHeapDiskStoreProviderFactory@784d016d, 
org.ehcache.impl.internal.store.tiering.TieredStoreProviderFactory@25b63026, 
org.ehcache.impl.internal.store.loaderwriter.LoaderWriterStoreProviderFactory@776ed22b]]
 at 
org.ehcache.core.spi.ServiceLocator$DependencySet.lookupService(ServiceLocator.java:401)
 at 
org.ehcache.core.spi.ServiceLocator$DependencySet.build(ServiceLocator.java:322)
 ... 16 more

but No problem in standalone mode.

Any help ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14961) Join with a LookupableTableSource: Expected results are inconsistent when using SQL expressions or UDF

2019-11-26 Thread Rockey Cui (Jira)
Rockey Cui created FLINK-14961:
--

 Summary: Join with a LookupableTableSource: Expected results are 
inconsistent when using SQL expressions or UDF
 Key: FLINK-14961
 URL: https://issues.apache.org/jira/browse/FLINK-14961
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.9.1
 Environment: jdk1.8.0_211

windows7

IDEA 2019.2.4
Reporter: Rockey Cui
 Attachments: 1574826850(1).jpg, 企业微信截图_15748270288763.png, 
企业微信截图_157482739422.png

I implements LookupableTableSource,When I used the join syntax, I found that 
the expected result was not consistent with the SQL. Looking at the execution 
plan, I found the following problems:

Stream Schema-->

|-- id: BIGINT
 |-- cityId: INT
 |-- url: STRING
 |-- cDate: DATE
 |-- cTimeStamp: TIMESTAMP(3)
 |-- proctime: TIMESTAMP(3) *PROCTIME*

DATA-->

"1001,1002,adc0,2019-11-11,2019-11-11 00:00:00.00101",
"1002,1003,adc1,2019-11-11,2019-11-11 00:00:00.00202",
"1003,1004,adc2,2019-11-11,2019-11-11 00:00:00.00303",
"1004,1005,adc3,2019-11-11,2019-11-11 00:00:00.00404",
"1005,1006,adc4,2019-11-11,2019-11-11 00:00:00.00505",
",,adc5,2019-11-11,2019-11-11 00:00:00.00606"

LookupableTableSource Schema-->

|-- USERID: BIGINT
 |-- CITYID: INT
 |-- AGE: DOUBLE
 |-- USERNAME: STRING
 |-- USERSEX: STRING
 |-- CREATEDATE: TIMESTAMP(3)
 |-- CREATETIMESTAMP: TIMESTAMP(3)

DATA-->

1001,1002,1001.00,赵俊峰,男,2019-11-26,2019-11-26 20:49:20.000527
1002,1003,1002.01,华宏言,男,2019-11-26,2019-11-26 20:49:20.000527
1003,1004,1003.02,姜艺,女,2019-11-26,2019-11-26 20:49:20.000527
1004,1005,1004.03,唐鸣,男,2019-11-26,2019-11-26 20:49:20.000527
1005,1006,1005.04,苗斌,男,2019-11-26,2019-11-26 20:49:20.000527

SQL -->

select a.*,i.* from user_click_info a left join info FOR SYSTEM_TIME AS OF 
a.proctime i on i.CITYID = a.cityId and{color:#FF} i.USERID - 1 = a.id 
{color}

FlinkTaskInfo -->

Source: Collection Source -> Map -> 
SourceConversion(table=[Unregistered_DataStream_2], fields=[id, cityId, url, 
cDate, cTimeStamp, proctime]) -> 
LookupJoin(table=[MyLookupableTableSource(USERID, CITYID, AGE, USERNAME, 
USERSEX, CREATEDATE, CREATETIMESTAMP)], joinType=[LeftOuterJoin], 
async=[false], on=[cityId=CITYID, {color:#FF}id=$f7{color}], where=[], 
select=[id, cityId, url, cDate, cTimeStamp, proctime, USERID, CITYID, AGE, 
USERNAME, USERSEX, CREATEDATE, CREATETIMESTAMP, $f7]) -> Calc(select=[id, 
cityId, url, cDate, cTimeStamp, PROCTIME_MATERIALIZE(proctime) AS proctime, 
USERID, CITYID, AGE, USERNAME, USERSEX, CREATEDATE, CREATETIMESTAMP]) -> 
SinkConversionToRow -> Sink: Print to Std. Out (1/1)

Result->

{color:#FF}1001,1002{color},adc0,2019-11-11,2019-11-11T00:00:00.001,2019-11-27T03:52:18.156,{color:#FF}1001,1002{color},1001.0,赵俊峰,男,2019-11-26T00:00,2019-11-26T20:49:20
{color:#FF}1002,1003{color},adc1,2019-11-11,2019-11-11T00:00:00.002,2019-11-27T03:52:18.263,{color:#FF}1002,1003{color},1002.01,华宏言,男,2019-11-26T00:00,2019-11-26T20:49:20
1003,1004,adc2,2019-11-11,2019-11-11T00:00:00.003,2019-11-27T03:52:18.351,1003,1004,1003.02,姜艺,女,2019-11-26T00:00,2019-11-26T20:49:20
1004,1005,adc3,2019-11-11,2019-11-11T00:00:00.004,2019-11-27T03:52:18.433,1004,1005,1004.03,唐鸣,男,2019-11-26T00:00,2019-11-26T20:49:20
1005,1006,adc4,2019-11-11,2019-11-11T00:00:00.005,2019-11-27T03:52:18.534,1005,1006,1005.04,苗斌,男,2019-11-26T00:00,2019-11-26T20:49:20
,,adc5,2019-11-11,2019-11-11T00:00:00.006,2019-11-27T03:52:18.636,null,null,null,null,null,null,null

 

It looks like the expression doesn't work!!! 

TableFunction getLookupFunction(String[] strings) only receive CITYID,

public void eval(Object... params) only receive cityId value.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)