[jira] [Commented] (FLINK-27519) Fix duplicates names when there are multiple levels of over window aggregate

2022-05-07 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17533221#comment-17533221
 ] 

jinfeng commented on FLINK-27519:
-

[~paul8263]  Yes,  there are already duplicate column names before the 
LOGICAL_CONVERTERS rule is applied,    I think the easiest way is to modify 
FlinkLogicalOverAggregate  if  it cannot be fixed in calcite currently. 

> Fix duplicates names when there are multiple levels of over window aggregate
> 
>
> Key: FLINK-27519
> URL: https://issues.apache.org/jira/browse/FLINK-27519
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.15.0
>Reporter: jinfeng
>Priority: Major
>
> A similar  issue like 
> [FLINK-22121|https://issues.apache.org/jira/browse/FLINK-22121]
> And can be reproduced by adding this unit test 
> org.apache.flink.table.planner.plan.stream.sql.agg.GroupWindowTest#testWindowAggregateWithAnotherWindowAggregate
> {code:java}
> //代码占位符
> @Test
> def testWindowAggregateWithAnotherWindowAggregate(): Unit = {
>   val sql =
> """
>   |SELECT CAST(pv AS INT) AS pv, CAST(uv AS INT) AS uv FROM (
>   |  SELECT *, count(distinct(c)) over (partition by a order by b desc) 
> AS uv
>   |  FROM (
>   |SELECT *, count(*) over (partition by a, c order by b desc) AS pv
>   |FROM MyTable
>   |  )
>   |)
>   |""".stripMargin
>   util.verifyExecPlan(sql)
> } {code}
> The error message : 
>  
>  
> {code:java}
> //代码占位符
> org.apache.flink.table.api.ValidationException: Field names must be unique. 
> Found duplicates: [w0$o0]    at 
> org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:273)
>     at org.apache.flink.table.types.logical.RowType.(RowType.java:158)
>     at org.apache.flink.table.types.logical.RowType.of(RowType.java:298)
>     at org.apache.flink.table.types.logical.RowType.of(RowType.java:290)
>     at 
> org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:663)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalOverAggregate.translateToExecNode(StreamPhysicalOverAggregate.scala:57)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:74)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:71)
>  {code}
>  
> I think we can add come logical in  FlinkLogicalOverAggregate  to avoid 
> duplicate names of  output rowType. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27519) Fix duplicates names when there are multiple levels of over window aggregate

2022-05-05 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17532615#comment-17532615
 ] 

jinfeng commented on FLINK-27519:
-

[~jark]  [~TsReaper]  Can you help to confirm this issue. 

If this is a problem , I can help to fix this. 

> Fix duplicates names when there are multiple levels of over window aggregate
> 
>
> Key: FLINK-27519
> URL: https://issues.apache.org/jira/browse/FLINK-27519
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.15.0
>Reporter: jinfeng
>Priority: Major
>
> A similar  issue like 
> [FLINK-22121|https://issues.apache.org/jira/browse/FLINK-22121]
> And can be reproduced by adding this unit test 
> org.apache.flink.table.planner.plan.stream.sql.agg.GroupWindowTest#testWindowAggregateWithAnotherWindowAggregate
> {code:java}
> //代码占位符
> @Test
> def testWindowAggregateWithAnotherWindowAggregate(): Unit = {
>   val sql =
> """
>   |SELECT CAST(pv AS INT) AS pv, CAST(uv AS INT) AS uv FROM (
>   |  SELECT *, count(distinct(c)) over (partition by a order by b desc) 
> AS uv
>   |  FROM (
>   |SELECT *, count(*) over (partition by a, c order by b desc) AS pv
>   |FROM MyTable
>   |  )
>   |)
>   |""".stripMargin
>   util.verifyExecPlan(sql)
> } {code}
> The error message : 
>  
>  
> {code:java}
> //代码占位符
> org.apache.flink.table.api.ValidationException: Field names must be unique. 
> Found duplicates: [w0$o0]    at 
> org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:273)
>     at org.apache.flink.table.types.logical.RowType.(RowType.java:158)
>     at org.apache.flink.table.types.logical.RowType.of(RowType.java:298)
>     at org.apache.flink.table.types.logical.RowType.of(RowType.java:290)
>     at 
> org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:663)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalOverAggregate.translateToExecNode(StreamPhysicalOverAggregate.scala:57)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:74)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:71)
>  {code}
>  
> I think we can add come logical in  FlinkLogicalOverAggregate  to avoid 
> duplicate names of  output rowType. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27519) Fix duplicates names when there are multiple levels of over window aggregate

2022-05-05 Thread jinfeng (Jira)
jinfeng created FLINK-27519:
---

 Summary: Fix duplicates names when there are multiple levels of 
over window aggregate
 Key: FLINK-27519
 URL: https://issues.apache.org/jira/browse/FLINK-27519
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.15.0
Reporter: jinfeng


A similar  issue like 
[FLINK-22121|https://issues.apache.org/jira/browse/FLINK-22121]

And can be reproduced by adding this unit test 

org.apache.flink.table.planner.plan.stream.sql.agg.GroupWindowTest#testWindowAggregateWithAnotherWindowAggregate
{code:java}
//代码占位符
@Test
def testWindowAggregateWithAnotherWindowAggregate(): Unit = {
  val sql =
"""
  |SELECT CAST(pv AS INT) AS pv, CAST(uv AS INT) AS uv FROM (
  |  SELECT *, count(distinct(c)) over (partition by a order by b desc) AS 
uv
  |  FROM (
  |SELECT *, count(*) over (partition by a, c order by b desc) AS pv
  |FROM MyTable
  |  )
  |)
  |""".stripMargin
  util.verifyExecPlan(sql)
} {code}
The error message : 

 

 
{code:java}


//代码占位符
org.apache.flink.table.api.ValidationException: Field names must be unique. 
Found duplicates: [w0$o0]    at 
org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:273)
    at org.apache.flink.table.types.logical.RowType.(RowType.java:158)
    at org.apache.flink.table.types.logical.RowType.of(RowType.java:298)
    at org.apache.flink.table.types.logical.RowType.of(RowType.java:290)
    at 
org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:663)
    at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalOverAggregate.translateToExecNode(StreamPhysicalOverAggregate.scala:57)
    at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:74)
    at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:71)
 {code}
 

I think we can add come logical in  FlinkLogicalOverAggregate  to avoid 
duplicate names of  output rowType. 

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-20951) IllegalArgumentException when reading Hive parquet table if condition not contain all partitioned fields

2022-04-05 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17517778#comment-17517778
 ] 

jinfeng commented on FLINK-20951:
-

[~jark] [~lirui]  I encountered the same problem in version 1.12.2.   When I 
applied  FLINK-22202 , this problem did not appear again , I think  FLINK-22202 
 can fix this problem

> IllegalArgumentException when reading Hive parquet table if condition not 
> contain all partitioned fields
> 
>
> Key: FLINK-20951
> URL: https://issues.apache.org/jira/browse/FLINK-20951
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.12.0
> Environment: flink 1.12.0release-12
> sql-cli
>Reporter: YUJIANBO
>Priority: Not a Priority
>  Labels: auto-deprioritized-critical, auto-deprioritized-major, 
> auto-deprioritized-minor, auto-unassigned
>
> The production hive table is partitioned by two fields:datekey and event
> I have do this test by Flink-sql-cli:(Spark Sql All is OK)
> (1)First:
> SELECT vid From table_A WHERE datekey = '20210112' AND event = 'XXX' AND vid
> = 'aa';(OK)
> SELECT vid From table_A WHERE datekey = '20210112' AND vid = 'aa';
> (Error)
> (2)Second:
> SELECT vid From table_B WHERE datekey = '20210112' AND event = 'YYY' AND vid
> = 'bb';(OK)
> SELECT vid From table_B WHERE datekey = '20210112' AND vid = 'bb';
> (Error)
> The exception is:
> {code}
> java.lang.RuntimeException: One or more fetchers have encountered exception
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116)
> at
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:273)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:67)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 19 received
> unexpected exception while polling the records
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> ... 1 more
> Caused by: java.lang.IllegalArgumentException
> at java.nio.Buffer.position(Buffer.java:244)
> at
> org.apache.flink.hive.shaded.parquet.io.api.Binary$ByteBufferBackedBinary.getBytes(Binary.java:424)
> at
> org.apache.flink.hive.shaded.formats.parquet.vector.reader.BytesColumnReader.readBatchFromDictionaryIds(BytesColumnReader.java:79)
> at
> org.apache.flink.hive.shaded.formats.parquet.vector.reader.BytesColumnReader.readBatchFromDictionaryIds(BytesColumnReader.java:33)
> at
> org.apache.flink.hive.shaded.formats.parquet.vector.reader.AbstractColumnReader.readToVector(AbstractColumnReader.java:199)
> at
> org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.nextBatch(ParquetVectorizedInputFormat.java:359)
> at
> org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.readBatch(ParquetVectorizedInputFormat.java:328)
> at
> org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67)
> at
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56)
> at
> 

[jira] [Commented] (FLINK-26854) 重定向数据库无法写入

2022-03-24 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17512158#comment-17512158
 ] 

jinfeng commented on FLINK-26854:
-

[~MingHao Liu] I don't think this is a problem with Flink, when writing doris 
using StreamLoad, you should use httpclient version 4.5.3 
[https://github.com/apache/incubator-doris/blob/master/samples/stream_load/java/DorisStreamLoad.java]
{code:java}
 
 org.apache.httpcomponents
 httpclient
 4.5.3
 
{code}
You can directly depend on this version in your code, and do shade and 
relocation,  Or you can just use doris connector 
https://github.com/apache/incubator-doris-flink-connector

Also, please describe your problem in English.

> 重定向数据库无法写入
> --
>
> Key: FLINK-26854
> URL: https://issues.apache.org/jira/browse/FLINK-26854
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataSet
>Affects Versions: 1.14.4
> Environment: jdk1.8
> flink1.14
> flink-scala_2.11
> flink-streaming-scala_2.11
>  
>Reporter: liuminghao
>Priority: Major
>
> 很荣幸可以使用flink最新的1.14版本,在便捷的同时也遇到一些问题,当我使用http想将数据保存进数据库(doirs),我的数据库有一次重定向,而在flink1.14中并不支持,重定向生成的请求头会被认为是一个错误从而终止任务,(flink1.14不支持自定义请求头,我现在只能暂时在重定向后删除他)代码:
> PushContent2Doris.pushContent( , headers = null)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26712) Metadata keys should not conflict with physical columns

2022-03-23 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17511301#comment-17511301
 ] 

jinfeng commented on FLINK-26712:
-

[~twalthr] Thanks for your reply, maybe I'm thinking a bit complicated, looking 
forward to this feature. :D

> Metadata keys should not conflict with physical columns
> ---
>
> Key: FLINK-26712
> URL: https://issues.apache.org/jira/browse/FLINK-26712
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> If you have an field called timestamp and in addition want to read the 
> timestamp from the metadata:
> {code}
> CREATE TABLE animal_sightings_with_metadata (
>   `timestamp` TIMESTAMP(3),
>   `name` STRING,
>   `country` STRING,
>   `number` INT,
>   `append_time` TIMESTAMP(3) METADATA FROM 'timestamp',
>   `partition` BIGINT METADATA VIRTUAL,
>   `offset` BIGINT METADATA VIRTUAL,
>   `headers` MAP METADATA,
>   `timestamp-type` STRING METADATA,
>   `leader-epoch` INT METADATA,
>   `topic` STRING METADATA
> )
> {code}
> This gives:
> {code}
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.ValidationException: Field names must be unique. 
> Found duplicates: [timestamp]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26810) The local time zone does not take effect when the dynamic index uses a field of type timestamp_ltz

2022-03-23 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17511288#comment-17511288
 ] 

jinfeng commented on FLINK-26810:
-

[~martijnvisser]   I think this is a problem with the elasticSearch connector, 
because the conversion of timestampData to String is implemented inside the 
connector, not at the Table Runtime layer.
as shown in the code below 
{code:java}
//代码占位符
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return (value, dateTimeFormatter) -> {
TimestampData indexField = (TimestampData) value;
return 
indexField.toInstant().atZone(ZoneOffset.UTC).format(dateTimeFormatter);
};
{code}

When formatting TimestampData as String, session local timezone is not used

> The local time zone does not take effect when the dynamic index uses a field 
> of type timestamp_ltz
> --
>
> Key: FLINK-26810
> URL: https://issues.apache.org/jira/browse/FLINK-26810
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ElasticSearch, Table SQL / Planner, Table 
> SQL / Runtime
>Reporter: jinfeng
>Priority: Major
> Attachments: 截屏2022-03-23 上午12.48.02.png
>
>
> When using  TIMESTAMP_WITH_LOCAL_TIMEZONE field to generate a dynamic index,  
> it will alway use UTC timezone.   
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26810) The local time zone does not take effect when the dynamic index uses a field of type timestamp_ltz

2022-03-22 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17510764#comment-17510764
 ] 

jinfeng commented on FLINK-26810:
-

[~fpaul]   Can you help to confirm this issue?

> The local time zone does not take effect when the dynamic index uses a field 
> of type timestamp_ltz
> --
>
> Key: FLINK-26810
> URL: https://issues.apache.org/jira/browse/FLINK-26810
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ElasticSearch
>Reporter: jinfeng
>Priority: Major
> Attachments: 截屏2022-03-23 上午12.48.02.png
>
>
> When using  TIMESTAMP_WITH_LOCAL_TIMEZONE field to generate a dynamic index,  
> it will alway use UTC timezone.   
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26810) The local time zone does not take effect when the dynamic index uses a field of type timestamp_ltz

2022-03-22 Thread jinfeng (Jira)
jinfeng created FLINK-26810:
---

 Summary: The local time zone does not take effect when the dynamic 
index uses a field of type timestamp_ltz
 Key: FLINK-26810
 URL: https://issues.apache.org/jira/browse/FLINK-26810
 Project: Flink
  Issue Type: Bug
  Components: Connectors / ElasticSearch
Reporter: jinfeng
 Attachments: 截屏2022-03-23 上午12.48.02.png

When using  TIMESTAMP_WITH_LOCAL_TIMEZONE field to generate a dynamic index,  
it will alway use UTC timezone.   

 

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26712) Metadata keys should not conflict with physical columns

2022-03-21 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17509787#comment-17509787
 ] 

jinfeng commented on FLINK-26712:
-

[~twalthr]  Currently, when there is a metacolumn, it will scan which 
metacolumns are used, and pass these fields to the source, and then the source 
will generate the corresponding columns according to the metakey, and then 
generate a project node to map the metacolumn to the actual field.

This means that even if there are multiple fields referencing the same metakey, 
the source only needs produce one metacolumn. If we need to modify it, it means 
that we need to generate multiple metacolumns on the source side, and then 
refer to the fields of the project node.

Will this affect the compatibility of SQL plan generation?

> Metadata keys should not conflict with physical columns
> ---
>
> Key: FLINK-26712
> URL: https://issues.apache.org/jira/browse/FLINK-26712
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Timo Walther
>Priority: Major
>
> If you have an field called timestamp and in addition want to read the 
> timestamp from the metadata:
> {code}
> CREATE TABLE animal_sightings_with_metadata (
>   `timestamp` TIMESTAMP(3),
>   `name` STRING,
>   `country` STRING,
>   `number` INT,
>   `append_time` TIMESTAMP(3) METADATA FROM 'timestamp',
>   `partition` BIGINT METADATA VIRTUAL,
>   `offset` BIGINT METADATA VIRTUAL,
>   `headers` MAP METADATA,
>   `timestamp-type` STRING METADATA,
>   `leader-epoch` INT METADATA,
>   `topic` STRING METADATA
> )
> {code}
> This gives:
> {code}
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.ValidationException: Field names must be unique. 
> Found duplicates: [timestamp]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26712) Metadata keys should not conflict with physical columns

2022-03-17 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17508213#comment-17508213
 ] 

jinfeng commented on FLINK-26712:
-

[~twalthr].   I am interested in this,  please assign this ticket to me .

> Metadata keys should not conflict with physical columns
> ---
>
> Key: FLINK-26712
> URL: https://issues.apache.org/jira/browse/FLINK-26712
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Timo Walther
>Priority: Major
>
> If you have an field called timestamp and in addition want to read the 
> timestamp from the metadata:
> {code}
> CREATE TABLE animal_sightings_with_metadata (
>   `timestamp` TIMESTAMP(3),
>   `name` STRING,
>   `country` STRING,
>   `number` INT,
>   `append_time` TIMESTAMP(3) METADATA FROM 'timestamp',
>   `partition` BIGINT METADATA VIRTUAL,
>   `offset` BIGINT METADATA VIRTUAL,
>   `headers` MAP METADATA,
>   `timestamp-type` STRING METADATA,
>   `leader-epoch` INT METADATA,
>   `topic` STRING METADATA
> )
> {code}
> This gives:
> {code}
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.ValidationException: Field names must be unique. 
> Found duplicates: [timestamp]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26016) FileSystemLookupFunction does not produce correct results when hive table uses columnar storage

2022-02-15 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17492949#comment-17492949
 ] 

jinfeng commented on FLINK-26016:
-

[~luoyuxia]  I have submitted a pr, please take the time to help review the 
code,  Thanks .

> FileSystemLookupFunction does not produce correct results when hive table 
> uses columnar storage
> ---
>
> Key: FLINK-26016
> URL: https://issues.apache.org/jira/browse/FLINK-26016
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.14.3
>Reporter: jinfeng
>Priority: Major
>  Labels: pull-request-available
>
> When I use the parquet hive table as the lookup table, there will be some 
> records that cannot be joined. This can be reproduced by adding unit tests to 
> HiveLookupJoinITCase.
> {code:java}
>   // create the hive table with columnar storage.
> tableEnv.executeSql(
> String.format(
> "create table columnar_table (x string) STORED AS 
> PARQUET "
> + "tblproperties ('%s'='5min')",
> HiveOptions.LOOKUP_JOIN_CACHE_TTL.key()));
> @Test
> public void testLookupJoinTableWithColumnarStorage() throws Exception {
> // constructs test data, as the DEFAULT_SIZE of VectorizedColumnBatch 
> is 2048, we should
> // write as least 2048 records to the test table.
> List testData = new ArrayList<>(4096);
> for (int i = 0; i < 4096; i++) {
> testData.add(Row.of(String.valueOf(i)));
> }
> // constructs test data using values table
> TableEnvironment batchEnv = 
> HiveTestUtils.createTableEnvInBatchMode(SqlDialect.DEFAULT);
> batchEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
> batchEnv.useCatalog(hiveCatalog.getName());
> String dataId = TestValuesTableFactory.registerData(testData);
> batchEnv.executeSql(
> String.format(
> "create table value_source(x string, p as proctime()) 
> with ("
> + "'connector' = 'values', 'data-id' = '%s', 
> 'bounded'='true')",
> dataId));
> batchEnv.executeSql("insert overwrite columnar_table select x from 
> value_source").await();
> TableImpl flinkTable =
> (TableImpl)
> tableEnv.sqlQuery(
> "select t.x as x1, c.x as x2 from 
> value_source t "
> + "left join columnar_table for 
> system_time as of t.p c "
> + "on t.x = c.x where c.x is null");
> List results = 
> CollectionUtil.iteratorToList(flinkTable.execute().collect());
> assertTrue(results.size() == 0);
> }
> {code}
> The problem may be caused by the following code. 
> {code:java}
> RowData row;
> while ((row = partitionReader.read(reuse)) != null) {
>count++;
>RowData key = extractLookupKey(row);
>List rows = cache.computeIfAbsent(key, k -> new ArrayList<>());
>rows.add(serializer.copy(row));
> }
> {code}
>  
> It can be fixed with the following modification
> {code:java}
> RowData row;
> while ((row = partitionReader.read(reuse)) != null) {
> count++;
> RowData rowData = serializer.copy(row);
> RowData key = extractLookupKey(rowData);
> List rows = cache.computeIfAbsent(key, k -> new ArrayList<>());
> rows.add(rowData);
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26016) FileSystemLookupFunction does not produce correct results when hive table uses columnar storage

2022-02-14 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17492374#comment-17492374
 ] 

jinfeng commented on FLINK-26016:
-

[~luoyuxia] , Thanks for your reply.  I think the root cause is the 
ColumnarRowData::getString() method. 
{code:java}
//代码占位符

@Override
public StringData getString(int pos) {
Bytes byteArray = vectorizedColumnBatch.getByteArray(rowId, pos);
return StringData.{_}fromBytes{_}(byteArray.data, byteArray.offset, 
byteArray.len);
}
{code}

When reading StringData, the data is not copied. And vectorizedColumnBatch is 
reused when reading different batch. In the lookup join function, the bytes 
corresponding to the key read first in the cache will be overwritten by the 
bytes corresponding to the key read later.

> FileSystemLookupFunction does not produce correct results when hive table 
> uses columnar storage
> ---
>
> Key: FLINK-26016
> URL: https://issues.apache.org/jira/browse/FLINK-26016
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.14.3
>Reporter: jinfeng
>Priority: Major
>
> When I use the parquet hive table as the lookup table, there will be some 
> records that cannot be joined. This can be reproduced by adding unit tests to 
> HiveLookupJoinITCase.
> {code:java}
>   // create the hive table with columnar storage.
> tableEnv.executeSql(
> String.format(
> "create table columnar_table (x string) STORED AS 
> PARQUET "
> + "tblproperties ('%s'='5min')",
> HiveOptions.LOOKUP_JOIN_CACHE_TTL.key()));
> @Test
> public void testLookupJoinTableWithColumnarStorage() throws Exception {
> // constructs test data, as the DEFAULT_SIZE of VectorizedColumnBatch 
> is 2048, we should
> // write as least 2048 records to the test table.
> List testData = new ArrayList<>(4096);
> for (int i = 0; i < 4096; i++) {
> testData.add(Row.of(String.valueOf(i)));
> }
> // constructs test data using values table
> TableEnvironment batchEnv = 
> HiveTestUtils.createTableEnvInBatchMode(SqlDialect.DEFAULT);
> batchEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
> batchEnv.useCatalog(hiveCatalog.getName());
> String dataId = TestValuesTableFactory.registerData(testData);
> batchEnv.executeSql(
> String.format(
> "create table value_source(x string, p as proctime()) 
> with ("
> + "'connector' = 'values', 'data-id' = '%s', 
> 'bounded'='true')",
> dataId));
> batchEnv.executeSql("insert overwrite columnar_table select x from 
> value_source").await();
> TableImpl flinkTable =
> (TableImpl)
> tableEnv.sqlQuery(
> "select t.x as x1, c.x as x2 from 
> value_source t "
> + "left join columnar_table for 
> system_time as of t.p c "
> + "on t.x = c.x where c.x is null");
> List results = 
> CollectionUtil.iteratorToList(flinkTable.execute().collect());
> assertTrue(results.size() == 0);
> }
> {code}
> The problem may be caused by the following code. 
> {code:java}
> RowData row;
> while ((row = partitionReader.read(reuse)) != null) {
>count++;
>RowData key = extractLookupKey(row);
>List rows = cache.computeIfAbsent(key, k -> new ArrayList<>());
>rows.add(serializer.copy(row));
> }
> {code}
>  
> It can be fixed with the following modification
> {code:java}
> RowData row;
> while ((row = partitionReader.read(reuse)) != null) {
> count++;
> RowData rowData = serializer.copy(row);
> RowData key = extractLookupKey(rowData);
> List rows = cache.computeIfAbsent(key, k -> new ArrayList<>());
> rows.add(rowData);
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-26016) FileSystemLookupFunction does not produce correct results when hive table uses columnar storage

2022-02-14 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17492326#comment-17492326
 ] 

jinfeng edited comment on FLINK-26016 at 2/15/22, 2:41 AM:
---

[~MartijnVisser]   [~luoyuxia]  Can you help take a look?    If it's a bug, I 
can help fix it


was (Author: hackergin):
[~MartijnVisser]   Can you help take a look?

> FileSystemLookupFunction does not produce correct results when hive table 
> uses columnar storage
> ---
>
> Key: FLINK-26016
> URL: https://issues.apache.org/jira/browse/FLINK-26016
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.14.3
>Reporter: jinfeng
>Priority: Major
>
> When I use the parquet hive table as the lookup table, there will be some 
> records that cannot be joined. This can be reproduced by adding unit tests to 
> HiveLookupJoinITCase.
> {code:java}
>   // create the hive table with columnar storage.
> tableEnv.executeSql(
> String.format(
> "create table columnar_table (x string) STORED AS 
> PARQUET "
> + "tblproperties ('%s'='5min')",
> HiveOptions.LOOKUP_JOIN_CACHE_TTL.key()));
> @Test
> public void testLookupJoinTableWithColumnarStorage() throws Exception {
> // constructs test data, as the DEFAULT_SIZE of VectorizedColumnBatch 
> is 2048, we should
> // write as least 2048 records to the test table.
> List testData = new ArrayList<>(4096);
> for (int i = 0; i < 4096; i++) {
> testData.add(Row.of(String.valueOf(i)));
> }
> // constructs test data using values table
> TableEnvironment batchEnv = 
> HiveTestUtils.createTableEnvInBatchMode(SqlDialect.DEFAULT);
> batchEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
> batchEnv.useCatalog(hiveCatalog.getName());
> String dataId = TestValuesTableFactory.registerData(testData);
> batchEnv.executeSql(
> String.format(
> "create table value_source(x string, p as proctime()) 
> with ("
> + "'connector' = 'values', 'data-id' = '%s', 
> 'bounded'='true')",
> dataId));
> batchEnv.executeSql("insert overwrite columnar_table select x from 
> value_source").await();
> TableImpl flinkTable =
> (TableImpl)
> tableEnv.sqlQuery(
> "select t.x as x1, c.x as x2 from 
> value_source t "
> + "left join columnar_table for 
> system_time as of t.p c "
> + "on t.x = c.x where c.x is null");
> List results = 
> CollectionUtil.iteratorToList(flinkTable.execute().collect());
> assertTrue(results.size() == 0);
> }
> {code}
> The problem may be caused by the following code. 
> {code:java}
> RowData row;
> while ((row = partitionReader.read(reuse)) != null) {
>count++;
>RowData key = extractLookupKey(row);
>List rows = cache.computeIfAbsent(key, k -> new ArrayList<>());
>rows.add(serializer.copy(row));
> }
> {code}
>  
> It can be fixed with the following modification
> {code:java}
> RowData row;
> while ((row = partitionReader.read(reuse)) != null) {
> count++;
> RowData rowData = serializer.copy(row);
> RowData key = extractLookupKey(rowData);
> List rows = cache.computeIfAbsent(key, k -> new ArrayList<>());
> rows.add(rowData);
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26016) FileSystemLookupFunction does not produce correct results when hive table uses columnar storage

2022-02-14 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17492326#comment-17492326
 ] 

jinfeng commented on FLINK-26016:
-

[~MartijnVisser]   Can you help take a look?

> FileSystemLookupFunction does not produce correct results when hive table 
> uses columnar storage
> ---
>
> Key: FLINK-26016
> URL: https://issues.apache.org/jira/browse/FLINK-26016
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.14.3
>Reporter: jinfeng
>Priority: Major
>
> When I use the parquet hive table as the lookup table, there will be some 
> records that cannot be joined. This can be reproduced by adding unit tests to 
> HiveLookupJoinITCase.
> {code:java}
>   // create the hive table with columnar storage.
> tableEnv.executeSql(
> String.format(
> "create table columnar_table (x string) STORED AS 
> PARQUET "
> + "tblproperties ('%s'='5min')",
> HiveOptions.LOOKUP_JOIN_CACHE_TTL.key()));
> @Test
> public void testLookupJoinTableWithColumnarStorage() throws Exception {
> // constructs test data, as the DEFAULT_SIZE of VectorizedColumnBatch 
> is 2048, we should
> // write as least 2048 records to the test table.
> List testData = new ArrayList<>(4096);
> for (int i = 0; i < 4096; i++) {
> testData.add(Row.of(String.valueOf(i)));
> }
> // constructs test data using values table
> TableEnvironment batchEnv = 
> HiveTestUtils.createTableEnvInBatchMode(SqlDialect.DEFAULT);
> batchEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
> batchEnv.useCatalog(hiveCatalog.getName());
> String dataId = TestValuesTableFactory.registerData(testData);
> batchEnv.executeSql(
> String.format(
> "create table value_source(x string, p as proctime()) 
> with ("
> + "'connector' = 'values', 'data-id' = '%s', 
> 'bounded'='true')",
> dataId));
> batchEnv.executeSql("insert overwrite columnar_table select x from 
> value_source").await();
> TableImpl flinkTable =
> (TableImpl)
> tableEnv.sqlQuery(
> "select t.x as x1, c.x as x2 from 
> value_source t "
> + "left join columnar_table for 
> system_time as of t.p c "
> + "on t.x = c.x where c.x is null");
> List results = 
> CollectionUtil.iteratorToList(flinkTable.execute().collect());
> assertTrue(results.size() == 0);
> }
> {code}
> The problem may be caused by the following code. 
> {code:java}
> RowData row;
> while ((row = partitionReader.read(reuse)) != null) {
>count++;
>RowData key = extractLookupKey(row);
>List rows = cache.computeIfAbsent(key, k -> new ArrayList<>());
>rows.add(serializer.copy(row));
> }
> {code}
>  
> It can be fixed with the following modification
> {code:java}
> RowData row;
> while ((row = partitionReader.read(reuse)) != null) {
> count++;
> RowData rowData = serializer.copy(row);
> RowData key = extractLookupKey(rowData);
> List rows = cache.computeIfAbsent(key, k -> new ArrayList<>());
> rows.add(rowData);
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26016) FileSystemLookupFunction does not produce correct results when hive table uses columnar storage

2022-02-08 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17488974#comment-17488974
 ] 

jinfeng commented on FLINK-26016:
-

 cc [~lirui]  

> FileSystemLookupFunction does not produce correct results when hive table 
> uses columnar storage
> ---
>
> Key: FLINK-26016
> URL: https://issues.apache.org/jira/browse/FLINK-26016
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.14.3
>Reporter: jinfeng
>Priority: Major
>
> When I use the parquet hive table as the lookup table, there will be some 
> records that cannot be joined. This can be reproduced by adding unit tests to 
> HiveLookupJoinITCase.
> {code:java}
>   // create the hive table with columnar storage.
> tableEnv.executeSql(
> String.format(
> "create table columnar_table (x string) STORED AS 
> PARQUET "
> + "tblproperties ('%s'='5min')",
> HiveOptions.LOOKUP_JOIN_CACHE_TTL.key()));
> @Test
> public void testLookupJoinTableWithColumnarStorage() throws Exception {
> // constructs test data, as the DEFAULT_SIZE of VectorizedColumnBatch 
> is 2048, we should
> // write as least 2048 records to the test table.
> List testData = new ArrayList<>(4096);
> for (int i = 0; i < 4096; i++) {
> testData.add(Row.of(String.valueOf(i)));
> }
> // constructs test data using values table
> TableEnvironment batchEnv = 
> HiveTestUtils.createTableEnvInBatchMode(SqlDialect.DEFAULT);
> batchEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
> batchEnv.useCatalog(hiveCatalog.getName());
> String dataId = TestValuesTableFactory.registerData(testData);
> batchEnv.executeSql(
> String.format(
> "create table value_source(x string, p as proctime()) 
> with ("
> + "'connector' = 'values', 'data-id' = '%s', 
> 'bounded'='true')",
> dataId));
> batchEnv.executeSql("insert overwrite columnar_table select x from 
> value_source").await();
> TableImpl flinkTable =
> (TableImpl)
> tableEnv.sqlQuery(
> "select t.x as x1, c.x as x2 from 
> value_source t "
> + "left join columnar_table for 
> system_time as of t.p c "
> + "on t.x = c.x where c.x is null");
> List results = 
> CollectionUtil.iteratorToList(flinkTable.execute().collect());
> assertTrue(results.size() == 0);
> }
> {code}
> The problem may be caused by the following code. 
> {code:java}
> RowData row;
> while ((row = partitionReader.read(reuse)) != null) {
>count++;
>RowData key = extractLookupKey(row);
>List rows = cache.computeIfAbsent(key, k -> new ArrayList<>());
>rows.add(serializer.copy(row));
> }
> {code}
>  
> It can be fixed with the following modification
> {code:java}
> RowData row;
> while ((row = partitionReader.read(reuse)) != null) {
> count++;
> RowData rowData = serializer.copy(row);
> RowData key = extractLookupKey(rowData);
> List rows = cache.computeIfAbsent(key, k -> new ArrayList<>());
> rows.add(rowData);
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26016) FileSystemLookupFunction does not produce correct results when hive table uses columnar storage

2022-02-08 Thread jinfeng (Jira)
jinfeng created FLINK-26016:
---

 Summary: FileSystemLookupFunction does not produce correct results 
when hive table uses columnar storage
 Key: FLINK-26016
 URL: https://issues.apache.org/jira/browse/FLINK-26016
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Affects Versions: 1.14.3
Reporter: jinfeng


When I use the parquet hive table as the lookup table, there will be some 
records that cannot be joined. This can be reproduced by adding unit tests to 
HiveLookupJoinITCase.

{code:java}
  // create the hive table with columnar storage.
tableEnv.executeSql(
String.format(
"create table columnar_table (x string) STORED AS 
PARQUET "
+ "tblproperties ('%s'='5min')",
HiveOptions.LOOKUP_JOIN_CACHE_TTL.key()));

@Test
public void testLookupJoinTableWithColumnarStorage() throws Exception {
// constructs test data, as the DEFAULT_SIZE of VectorizedColumnBatch 
is 2048, we should
// write as least 2048 records to the test table.
List testData = new ArrayList<>(4096);
for (int i = 0; i < 4096; i++) {
testData.add(Row.of(String.valueOf(i)));
}

// constructs test data using values table
TableEnvironment batchEnv = 
HiveTestUtils.createTableEnvInBatchMode(SqlDialect.DEFAULT);
batchEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
batchEnv.useCatalog(hiveCatalog.getName());
String dataId = TestValuesTableFactory.registerData(testData);
batchEnv.executeSql(
String.format(
"create table value_source(x string, p as proctime()) 
with ("
+ "'connector' = 'values', 'data-id' = '%s', 
'bounded'='true')",
dataId));
batchEnv.executeSql("insert overwrite columnar_table select x from 
value_source").await();
TableImpl flinkTable =
(TableImpl)
tableEnv.sqlQuery(
"select t.x as x1, c.x as x2 from value_source 
t "
+ "left join columnar_table for 
system_time as of t.p c "
+ "on t.x = c.x where c.x is null");
List results = 
CollectionUtil.iteratorToList(flinkTable.execute().collect());
assertTrue(results.size() == 0);
}
{code}

The problem may be caused by the following code. 

{code:java}
RowData row;
while ((row = partitionReader.read(reuse)) != null) {
   count++;
   RowData key = extractLookupKey(row);
   List rows = cache.computeIfAbsent(key, k -> new ArrayList<>());
   rows.add(serializer.copy(row));
}
{code}

 
It can be fixed with the following modification
{code:java}
RowData row;
while ((row = partitionReader.read(reuse)) != null) {
count++;
RowData rowData = serializer.copy(row);
RowData key = extractLookupKey(rowData);
List rows = cache.computeIfAbsent(key, k -> new ArrayList<>());
rows.add(rowData);
}
{code}




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-24571) elasticsearch dynamic index support the use of proctime

2022-02-06 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17487828#comment-17487828
 ] 

jinfeng edited comment on FLINK-24571 at 2/7/22, 2:17 AM:
--

I'll update the pr in this week.  To make sure proctime dynamc index only works 
on append only stream. 


was (Author: hackergin):
I'll update the pr in this week.  To make sure dynamic  of proctime  only works 
on append only stream. 

>  elasticsearch dynamic index support the use of proctime
> 
>
> Key: FLINK-24571
> URL: https://issues.apache.org/jira/browse/FLINK-24571
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / ElasticSearch
>Reporter: jinfeng
>Assignee: jinfeng
>Priority: Major
>  Labels: pull-request-available
>
> When use eleasticsearch dynamic index, we must  special  a field. If there is 
> no  time-attribute field,  we can't  use dynamic index.  so support dynamic 
> index using proctime may be a good choice.  
> Before implement this feature, we should discuss the  proctime index pattern. 
>   
> The current dynamic index is : 
> {code:sql}
> Create table esSink (
>   f1 timestamp(3)
> ) with (
>   'index' = 'myusers_{f1|-MM-dd}'
> );
> {code}
> When using proctime as dynamic index, It would be like this. 
> {code:sql}
> Create table esSink (
>f1 varchar
> ) with (
>   'index' = 'myusers_{|-MM-dd}'
> );
> {code}
> When field name is not specified, we use proctime to generate the dynamic 
> index. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24571) elasticsearch dynamic index support the use of proctime

2022-02-06 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17487828#comment-17487828
 ] 

jinfeng commented on FLINK-24571:
-

I'll update the pr in this week.  To make sure dynamic  of proctime  only works 
on append only stream. 

>  elasticsearch dynamic index support the use of proctime
> 
>
> Key: FLINK-24571
> URL: https://issues.apache.org/jira/browse/FLINK-24571
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / ElasticSearch
>Reporter: jinfeng
>Assignee: jinfeng
>Priority: Major
>  Labels: pull-request-available
>
> When use eleasticsearch dynamic index, we must  special  a field. If there is 
> no  time-attribute field,  we can't  use dynamic index.  so support dynamic 
> index using proctime may be a good choice.  
> Before implement this feature, we should discuss the  proctime index pattern. 
>   
> The current dynamic index is : 
> {code:sql}
> Create table esSink (
>   f1 timestamp(3)
> ) with (
>   'index' = 'myusers_{f1|-MM-dd}'
> );
> {code}
> When using proctime as dynamic index, It would be like this. 
> {code:sql}
> Create table esSink (
>f1 varchar
> ) with (
>   'index' = 'myusers_{|-MM-dd}'
> );
> {code}
> When field name is not specified, we use proctime to generate the dynamic 
> index. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24571) elasticsearch dynamic index support the use of proctime

2022-02-06 Thread jinfeng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jinfeng updated FLINK-24571:

Labels: pull-request-available  (was: pull-request-available stale-assigned)

>  elasticsearch dynamic index support the use of proctime
> 
>
> Key: FLINK-24571
> URL: https://issues.apache.org/jira/browse/FLINK-24571
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / ElasticSearch
>Reporter: jinfeng
>Assignee: jinfeng
>Priority: Major
>  Labels: pull-request-available
>
> When use eleasticsearch dynamic index, we must  special  a field. If there is 
> no  time-attribute field,  we can't  use dynamic index.  so support dynamic 
> index using proctime may be a good choice.  
> Before implement this feature, we should discuss the  proctime index pattern. 
>   
> The current dynamic index is : 
> {code:sql}
> Create table esSink (
>   f1 timestamp(3)
> ) with (
>   'index' = 'myusers_{f1|-MM-dd}'
> );
> {code}
> When using proctime as dynamic index, It would be like this. 
> {code:sql}
> Create table esSink (
>f1 varchar
> ) with (
>   'index' = 'myusers_{|-MM-dd}'
> );
> {code}
> When field name is not specified, we use proctime to generate the dynamic 
> index. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-23545) Use new schema in SqlCreateTableConverter

2022-01-16 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17476927#comment-17476927
 ] 

jinfeng commented on FLINK-23545:
-

[~airblader] [~twalthr] This feature seems to be supported in 1.15  Is there 
any tickets tracking removing the legacy connectors ?  If not , I can help to 
create new one. 

> Use new schema in SqlCreateTableConverter
> -
>
> Key: FLINK-23545
> URL: https://issues.apache.org/jira/browse/FLINK-23545
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: jinfeng
>Assignee: jinfeng
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.15.0
>
>
> In order to support column comment in sql create table dll. We should  use 
> new schema in SqlCreateeTableConverter. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25419) Support the metadata column to generate dynamic index

2021-12-22 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17463762#comment-17463762
 ] 

jinfeng commented on FLINK-25419:
-

cc  [~fpaul]  [~matriv]  

> Support the metadata column to generate dynamic index 
> --
>
> Key: FLINK-25419
> URL: https://issues.apache.org/jira/browse/FLINK-25419
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / ElasticSearch
>Reporter: jinfeng
>Priority: Major
> Fix For: 1.15.0
>
>
> As mentioned in [https://github.com/apache/flink/pull/18058]   We can 
> implement metadata column to increase the flexibility of using dynamic 
> indexes .  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25419) Support the metadata column to generate dynamic index

2021-12-22 Thread jinfeng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jinfeng updated FLINK-25419:

Description: 
As mentioned in [https://github.com/apache/flink/pull/18058]   We can implement 
metadata column to increase the flexibility of using dynamic indexes .  

 

  was:
As mentioned in [https://github.com/apache/flink/pull/18058 
|https://github.com/apache/flink/pull/18058,] We can implement metadata column 
to increase the flexibility of using dynamic indexes .  

 


> Support the metadata column to generate dynamic index 
> --
>
> Key: FLINK-25419
> URL: https://issues.apache.org/jira/browse/FLINK-25419
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / ElasticSearch
>Reporter: jinfeng
>Priority: Major
> Fix For: 1.15.0
>
>
> As mentioned in [https://github.com/apache/flink/pull/18058]   We can 
> implement metadata column to increase the flexibility of using dynamic 
> indexes .  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25419) Support the metadata column to generate dynamic index

2021-12-22 Thread jinfeng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jinfeng updated FLINK-25419:

Description: 
As mentioned in [https://github.com/apache/flink/pull/18058 
|https://github.com/apache/flink/pull/18058,] We can implement metadata column 
to increase the flexibility of using dynamic indexes .  

 

  was:
As mentioned in [https://github.com/apache/flink/pull/18058,]   We can 
implement metadata column to increase the flexibility of using dynamic indexes 
.  

 


> Support the metadata column to generate dynamic index 
> --
>
> Key: FLINK-25419
> URL: https://issues.apache.org/jira/browse/FLINK-25419
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / ElasticSearch
>Reporter: jinfeng
>Priority: Major
> Fix For: 1.15.0
>
>
> As mentioned in [https://github.com/apache/flink/pull/18058 
> |https://github.com/apache/flink/pull/18058,] We can implement metadata 
> column to increase the flexibility of using dynamic indexes .  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25419) Support the metadata column to generate dynamic index

2021-12-22 Thread jinfeng (Jira)
jinfeng created FLINK-25419:
---

 Summary: Support the metadata column to generate dynamic index 
 Key: FLINK-25419
 URL: https://issues.apache.org/jira/browse/FLINK-25419
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / ElasticSearch
Reporter: jinfeng
 Fix For: 1.15.0


As mentioned in [https://github.com/apache/flink/pull/18058,]   We can 
implement metadata column to increase the flexibility of using dynamic indexes 
.  

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-21068) Add new timeout options for Elasticsearch connector

2021-12-13 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17458356#comment-17458356
 ] 

jinfeng commented on FLINK-21068:
-

[~fpaul]   I submitted the pull request  
[https://github.com/apache/flink/pull/18097] 

> Add new timeout options for Elasticsearch connector
> ---
>
> Key: FLINK-21068
> URL: https://issues.apache.org/jira/browse/FLINK-21068
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / ElasticSearch
>Affects Versions: 1.12.1
>Reporter: jinfeng
>Assignee: jinfeng
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
>
> Currently,   the connection.max-retry-timeout seems not work with new 
> elasticsearch connector.   The elasticsearch community  has  Remove  
> setMaxRetryTimeoutMillis  from RestClientBuilder.  We can set timeout options 
> when create RestHighLevelClient in 
> Elasticsearch7ApiCallBridge , like 
> {code:java}
> //代码占位符
> @Override
> public RestHighLevelClient createClient(Map clientConfig) 
> throws IOException {
>RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new 
> HttpHost[httpHosts.size()]));
>builder.setRequestConfigCallback(new 
> RestClientBuilder.RequestConfigCallback() {
>   @Override
>   public RequestConfig.Builder 
> customizeRequestConfig(RequestConfig.Builder builder) {
>  if (clientConfig.containsKey(CONFIG_KEY_CONNECTION_TIMEOUT)) {
> 
> builder.setConnectTimeout(Integer.valueOf(clientConfig.get(CONFIG_KEY_CONNECTION_TIMEOUT)));
>  }
>  if (clientConfig.containsKey(CONFIG_KEY_CONNECTION_SOCKET_TIMEOUT)) {
> 
> builder.setSocketTimeout(Integer.valueOf(clientConfig.get(CONFIG_KEY_CONNECTION_SOCKET_TIMEOUT)));
>  }
>  if (clientConfig.containsKey(CONFIG_KEY_CONNECTION_REQUEST_TIMEOUT)) 
> {
> 
> builder.setConnectionRequestTimeout(Integer.valueOf(clientConfig.get(CONFIG_KEY_CONNECTION_REQUEST_TIMEOUT)));
>  }
>  return builder;
>   }
>});
> {code}
>  
> So, we can add three table config to config  eleasticsearch timeout.
> connection.timeout
> connection.socket-timeout
> connection.request-timeout
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-21068) Add new timeout options for Elasticsearch connector

2021-10-21 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17432453#comment-17432453
 ] 

jinfeng commented on FLINK-21068:
-

[~karmagyz] Sure I can take this issue. 

> Add new timeout options for Elasticsearch connector
> ---
>
> Key: FLINK-21068
> URL: https://issues.apache.org/jira/browse/FLINK-21068
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / ElasticSearch
>Affects Versions: 1.12.1
>Reporter: jinfeng
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Currently,   the connection.max-retry-timeout seems not work with new 
> elasticsearch connector.   The elasticsearch community  has  Remove  
> setMaxRetryTimeoutMillis  from RestClientBuilder.  We can set timeout options 
> when create RestHighLevelClient in 
> Elasticsearch7ApiCallBridge , like 
> {code:java}
> //代码占位符
> @Override
> public RestHighLevelClient createClient(Map clientConfig) 
> throws IOException {
>RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new 
> HttpHost[httpHosts.size()]));
>builder.setRequestConfigCallback(new 
> RestClientBuilder.RequestConfigCallback() {
>   @Override
>   public RequestConfig.Builder 
> customizeRequestConfig(RequestConfig.Builder builder) {
>  if (clientConfig.containsKey(CONFIG_KEY_CONNECTION_TIMEOUT)) {
> 
> builder.setConnectTimeout(Integer.valueOf(clientConfig.get(CONFIG_KEY_CONNECTION_TIMEOUT)));
>  }
>  if (clientConfig.containsKey(CONFIG_KEY_CONNECTION_SOCKET_TIMEOUT)) {
> 
> builder.setSocketTimeout(Integer.valueOf(clientConfig.get(CONFIG_KEY_CONNECTION_SOCKET_TIMEOUT)));
>  }
>  if (clientConfig.containsKey(CONFIG_KEY_CONNECTION_REQUEST_TIMEOUT)) 
> {
> 
> builder.setConnectionRequestTimeout(Integer.valueOf(clientConfig.get(CONFIG_KEY_CONNECTION_REQUEST_TIMEOUT)));
>  }
>  return builder;
>   }
>});
> {code}
>  
> So, we can add three table config to config  eleasticsearch timeout.
> connection.timeout
> connection.socket-timeout
> connection.request-timeout
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23169) Support user-level app staging directory when yarn.staging-directory is specified

2021-10-20 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17431115#comment-17431115
 ] 

jinfeng commented on FLINK-23169:
-

[~xtsong] 

Thank you for your patience, I totally agree with you.   There may indeed be 
many other staging directory grouping forms.

 If more people have this problem in the future, we can consider adding this 
feature.  Currently we can use  'flink run 
-Dyarn.staging-directory=/flink/staging/user1' to meet ours requirements

> Support user-level app staging directory when yarn.staging-directory is 
> specified
> -
>
> Key: FLINK-23169
> URL: https://issues.apache.org/jira/browse/FLINK-23169
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Reporter: jinfeng
>Priority: Major
>
> When yarn.staging-directory is specified,  different users will use the same 
> directory as the staging directory.   It may not friendly for a job platform 
> to submit job for different users.  I propose to use the user-level directory 
> by default when yarn.staging-directory is specified.  We only need to make 
> small changes  for `getStagingDir` function in 
> YarnClusterDescriptor 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23169) Support user-level app staging directory when yarn.staging-directory is specified

2021-10-20 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17431022#comment-17431022
 ] 

jinfeng commented on FLINK-23169:
-

Hi [~xtsong] 

1.    Putting them together doesn't seem to be a big problem. But I think it is 
more convenient to manage the quota size and permissions of the directory 
separately by user.

 2.  Yes, we can configure different staging directories  when submiting the 
job, just like configue the checkpoint directory.

 

For a single user, it may not be a big problem.  But for a flink job platform, 
it should consider to manage the config for different users.  So I think this 
may be a general requirement.  And here is the spark related jira . 
https://issues.apache.org/jira/browse/SPARK-26877

> Support user-level app staging directory when yarn.staging-directory is 
> specified
> -
>
> Key: FLINK-23169
> URL: https://issues.apache.org/jira/browse/FLINK-23169
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Reporter: jinfeng
>Priority: Major
>
> When yarn.staging-directory is specified,  different users will use the same 
> directory as the staging directory.   It may not friendly for a job platform 
> to submit job for different users.  I propose to use the user-level directory 
> by default when yarn.staging-directory is specified.  We only need to make 
> small changes  for `getStagingDir` function in 
> YarnClusterDescriptor 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24571) elasticsearch dynamic index support the use of proctime

2021-10-18 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17429872#comment-17429872
 ] 

jinfeng commented on FLINK-24571:
-

[~MartijnVisser]  Thanks for reminding, I will learn the implementation of the 
latest es sink connector. 

>  elasticsearch dynamic index support the use of proctime
> 
>
> Key: FLINK-24571
> URL: https://issues.apache.org/jira/browse/FLINK-24571
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / ElasticSearch
>Reporter: jinfeng
>Assignee: jinfeng
>Priority: Major
>
> When use eleasticsearch dynamic index, we must  special  a field. If there is 
> no  time-attribute field,  we can't  use dynamic index.  so support dynamic 
> index using proctime may be a good choice.  
> Before implement this feature, we should discuss the  proctime index pattern. 
>   
> The current dynamic index is : 
> {code:sql}
> Create table esSink (
>   f1 timestamp(3)
> ) with (
>   'index' = 'myusers_{f1|-MM-dd}'
> );
> {code}
> When using proctime as dynamic index, It would be like this. 
> {code:sql}
> Create table esSink (
>f1 varchar
> ) with (
>   'index' = 'myusers_{|-MM-dd}'
> );
> {code}
> When field name is not specified, we use proctime to generate the dynamic 
> index. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24571) elasticsearch dynamic index support the use of proctime

2021-10-17 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17429810#comment-17429810
 ] 

jinfeng commented on FLINK-24571:
-

[~jark]  Thank's for the great suggestion, I think  using pattern is much more 
better.   I can help to implement this feature. 

>  elasticsearch dynamic index support the use of proctime
> 
>
> Key: FLINK-24571
> URL: https://issues.apache.org/jira/browse/FLINK-24571
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / ElasticSearch
>Reporter: jinfeng
>Priority: Major
>
> When use eleasticsearch dynamic index, we must  special  a field. If there is 
> no  time-attribute field,  we can't  use dynamic index.  so support dynamic 
> index using proctime may be a good choice.  
> Before implement this feature, we should discuss the  proctime index pattern. 
>   
> The current dynamic index is : 
> {code:sql}
> Create table esSink (
>   f1 timestamp(3)
> ) with (
>   'index' = 'myusers_{f1|-MM-dd}'
> );
> {code}
> When using proctime as dynamic index, It would be like this. 
> {code:sql}
> Create table esSink (
>f1 varchar
> ) with (
>   'index' = 'myusers_{|-MM-dd}'
> );
> {code}
> When field name is not specified, we use proctime to generate the dynamic 
> index. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24571) elasticsearch dynamic index support the use of proctime

2021-10-17 Thread jinfeng (Jira)
jinfeng created FLINK-24571:
---

 Summary:  elasticsearch dynamic index support the use of proctime
 Key: FLINK-24571
 URL: https://issues.apache.org/jira/browse/FLINK-24571
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / ElasticSearch
Reporter: jinfeng


When use eleasticsearch dynamic index, we must  special  a field. If there is 
no  time-attribute field,  we can't  use dynamic index.  so support dynamic 
index using proctime may be a good choice.  

Before implement this feature, we should discuss the  proctime index pattern.   

The current dynamic index is : 


{code:sql}
Create table esSink (
f1 timestamp(3)
) with (
'index' = 'myusers_{f1|-MM-dd}'
);

{code}

When using proctime as dynamic index, It would be like this. 

{code:sql}
Create table esSink (
   f1 varchar
) with (
  'index' = 'myusers_{|-MM-dd}'
);
{code}

When field name is not specified, we use proctime to generate the dynamic 
index. 





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24072) Add support for setting default headers in elasticsearch connector

2021-08-31 Thread jinfeng (Jira)
jinfeng created FLINK-24072:
---

 Summary: Add support for setting default headers in elasticsearch 
connector
 Key: FLINK-24072
 URL: https://issues.apache.org/jira/browse/FLINK-24072
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / ElasticSearch
Reporter: jinfeng


If we add support for setting default headers , we can add some head options in 
sql options. 

The ddl would be like this. 
{code:sql}
// Some comments here
create table es-sink (
  a varchar,
  b varchar
) with (
  'connector' = 'elasticsearch-7',
  'connection.default-headers' = 'Authorization:xxx'
);
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23545) Use new schema in SqlCreateTableConverter

2021-08-07 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17395179#comment-17395179
 ] 

jinfeng commented on FLINK-23545:
-

[~twalthr] I found [FLINK-23513 
|https://issues.apache.org/jira/browse/FLINK-23513] has remove many legacy  
connector,  but there are still many legacy  left.  When will we completely 
remove the legacy connector. Maybe I can help with extra jobs. 

> Use new schema in SqlCreateTableConverter
> -
>
> Key: FLINK-23545
> URL: https://issues.apache.org/jira/browse/FLINK-23545
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: jinfeng
>Assignee: jinfeng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> In order to support column comment in sql create table dll. We should  use 
> new schema in SqlCreateeTableConverter. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23545) Use new schema in SqlCreateTableConverter

2021-08-05 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17393969#comment-17393969
 ] 

jinfeng commented on FLINK-23545:
-

[~jark] Thanks for your reply, maybe i didn't express clearly.   After use 
DefaultCatalogTable instead of CatalogTableImpl,  legacy connector will not 
work ,   As cataloTable is DefaultCatalogTable (Unresovled table) 

when execute 
{code:java}
TableFactoryUtil.findAndCreateTableSource(
schemaTable.getCatalog(),
schemaTable.getTableIdentifier(),
catalogTable,
new Configuration(),
schemaTable.isTemporary());
{code}

It will throw exception `Only a resolved catalog table can be serialized into a 
map of string properties`;


The "COLLECTION"  table  maybe another problem,   TestCollectionTableFactory is 
now a legacy connector, but it has the 'new style' table options.

> Use new schema in SqlCreateTableConverter
> -
>
> Key: FLINK-23545
> URL: https://issues.apache.org/jira/browse/FLINK-23545
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: jinfeng
>Assignee: jinfeng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> In order to support column comment in sql create table dll. We should  use 
> new schema in SqlCreateeTableConverter. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23545) Use new schema in SqlCreateTableConverter

2021-08-05 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17393678#comment-17393678
 ] 

jinfeng commented on FLINK-23545:
-

When I implement the code, I found a few problems.

After constructing the schema and some other attributes, I use `Catalog.of` 
method to create the CatalogTable which
is a DefaultCatalogTable.
```
CatalogTable.of(mergedSchema, tableComment, partitionKeys, mergedOptions);
```

But when parser a ddl like this
```
CREATE TABLE testCollection (
field1 VARCHAR
) with (
'connector' = 'COLLECTION'
)
```

COLLECTION is a legacy table, when generate the rel node,
First, it will execute the `isLegacySourceOptions(CatalogTable catalogTable, 
CatalogSchemaTable schemaTable)` method ,
it will throw the exception `"Only a resolved catalog table can be serialized 
into a map of string properties.`
Next it will create CatalogSourceTable to consturct the dynamic table.
As COLLECTION is a legacy table, so the job will throw `Unable to create a 
source for reading table` .

Also, I found the new TableDescriptor has the same problems, when use 
TableDescriptor consturct the legacy table with new style table options.

> Use new schema in SqlCreateTableConverter
> -
>
> Key: FLINK-23545
> URL: https://issues.apache.org/jira/browse/FLINK-23545
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: jinfeng
>Assignee: jinfeng
>Priority: Major
> Fix For: 1.14.0
>
>
> In order to support column comment in sql create table dll. We should  use 
> new schema in SqlCreateeTableConverter. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18958) Lose column comment when create table

2021-07-29 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17389879#comment-17389879
 ] 

jinfeng commented on FLINK-18958:
-

[~jark]. I  create https://issues.apache.org/jira/browse/FLINK-23545  to 
replace tableSchema with new Schema first. 

> Lose column comment when create table
> -
>
> Key: FLINK-18958
> URL: https://issues.apache.org/jira/browse/FLINK-18958
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.11.0
>Reporter: Shengkai Fang
>Assignee: jinfeng
>Priority: Major
>  Labels: auto-unassigned, pull-request-available
> Fix For: 1.14.0
>
>
> Currently, table column will not store column comment and user can't see 
> column comment when use {{describe table}} sql.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23545) Use new schema in SqlCreateTableConverter

2021-07-29 Thread jinfeng (Jira)
jinfeng created FLINK-23545:
---

 Summary: Use new schema in SqlCreateTableConverter
 Key: FLINK-23545
 URL: https://issues.apache.org/jira/browse/FLINK-23545
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: jinfeng
 Fix For: 1.14.0


In order to support column comment in sql create table dll. We should  use new 
schema in SqlCreateeTableConverter. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18958) Lose column comment when create table

2021-07-28 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17388584#comment-17388584
 ] 

jinfeng commented on FLINK-18958:
-

[~jark]  Sure, please assign this ticket to me. 

> Lose column comment when create table
> -
>
> Key: FLINK-18958
> URL: https://issues.apache.org/jira/browse/FLINK-18958
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.11.0
>Reporter: Shengkai Fang
>Priority: Major
>  Labels: auto-unassigned, pull-request-available
> Fix For: 1.14.0
>
>
> Currently, table column will not store column comment and user can't see 
> column comment when use {{describe table}} sql.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22936) Support column comment in Schema and ResolvedSchema

2021-07-25 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17386868#comment-17386868
 ] 

jinfeng commented on FLINK-22936:
-

[~jark]  introducing {{Column withComment(comment)}} method on {{Column}} class 
 is much more better.   And I submitted the pr , please take time to review the 
code when you are free.

> Support column comment in Schema and ResolvedSchema
> ---
>
> Key: FLINK-22936
> URL: https://issues.apache.org/jira/browse/FLINK-22936
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Jark Wu
>Assignee: jinfeng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> In order to support column comment in catalog (FLINK-18958), we should first 
> support column comment in Schema and ResolvedSchema. 
> The API is up to discuss. Currently, we already have 10 methods for adding a 
> column in {{Schema}}. If we want to support column comment for each kind of 
> column, the number of column methods may double. It's not easy to maintain in 
> the long term, and make the API complex. 
> Another alternative is adding a new method {{comment(String)}} which will 
> apply comment to the previous column. This is not a good builder style, but 
> can make the building concise. 
> For example, 
> {code}
> Schema.newBuilder()
> .column("ts", DataTypes.TIMESTAMP(3)).comment("log 
> timestamp")
> .columnByExpression("proctime", 
> "PROCTIME()").comment("processing time")
> .watermark("ts", "ts - INTERVAL '5' SECOND")
> .build()
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22936) Support column comment in Schema and ResolvedSchema

2021-07-23 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17385988#comment-17385988
 ] 

jinfeng commented on FLINK-22936:
-

[~jark] [~twalthr]  The final solution is  add a  withComment method which will 
apply comment to the previous column  ?   And we need to add the setComment 
method to the Column class .     I can help implement this . 

> Support column comment in Schema and ResolvedSchema
> ---
>
> Key: FLINK-22936
> URL: https://issues.apache.org/jira/browse/FLINK-22936
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Jark Wu
>Priority: Major
> Fix For: 1.14.0
>
>
> In order to support column comment in catalog (FLINK-18958), we should first 
> support column comment in Schema and ResolvedSchema. 
> The API is up to discuss. Currently, we already have 10 methods for adding a 
> column in {{Schema}}. If we want to support column comment for each kind of 
> column, the number of column methods may double. It's not easy to maintain in 
> the long term, and make the API complex. 
> Another alternative is adding a new method {{comment(String)}} which will 
> apply comment to the previous column. This is not a good builder style, but 
> can make the building concise. 
> For example, 
> {code}
> Schema.newBuilder()
> .column("ts", DataTypes.TIMESTAMP(3)).comment("log 
> timestamp")
> .columnByExpression("proctime", 
> "PROCTIME()").comment("processing time")
> .watermark("ts", "ts - INTERVAL '5' SECOND")
> .build()
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-23169) Support user-level app staging directory when yarn.staging-directory is specified

2021-07-08 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17377350#comment-17377350
 ] 

jinfeng edited comment on FLINK-23169 at 7/8/21, 12:22 PM:
---

[~fly_in_gis]  thanks for  your reply.

When we use a Flink job management platform to submit jobs from different 
users, we config the 'yarn.staging-directory' ,for example. /flink/staging/

1. if user1 submit the job, and user1 will create the 
/flink/staging/.flink/application_ user1 supergroup drwx--x--x
 2. if user2 submit the job, and user2 can't use /flink/staging/.flink , becase 
directory `.flink` belongs to user1.
 3. if we create /flink/staging/.flink first, there is also another problem, 
user1, user2, user3.. 's application staging directory will be created in the 
same directory .

The reason why we don't use the default home directory is that we want to 
provided a common directory to different users. If not, every new user need to 
create their own home directory and apply quota.
 I notice that the spark also provided a config to specific the yarn staging 
directory, and it will be divided by user directory by default.


was (Author: hackergin):
[~fly_in_gis]  thank for  you reply.

When we use a Flink job management platform to submit jobs from different 
users, we config the 'yarn.staging-directory' ,for example. /flink/staging/

1. if user1 submit the job, and user1 will create the 
/flink/staging/.flink/application_ user1 supergroup drwx--x--x
2. if user2 submit the job, and user2 can't use /flink/staging/.flink , becase 
directory `.flink` belongs to user1.
3. if we create /flink/staging/.flink first, there is also another problem, 
user1, user2, user3.. 's application staging directory will be created in the 
same directory .

The reason why we don't use the default home directory is that we want to 
provided a common directory to different users. If not, every new user need to 
create their own home directory and apply quota.
I notice that the spark also provided a config to specific the yarn staging 
directory, and it will be divided by user directory by default.

> Support user-level app staging directory when yarn.staging-directory is 
> specified
> -
>
> Key: FLINK-23169
> URL: https://issues.apache.org/jira/browse/FLINK-23169
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Reporter: jinfeng
>Priority: Major
>
> When yarn.staging-directory is specified,  different users will use the same 
> directory as the staging directory.   It may not friendly for a job platform 
> to submit job for different users.  I propose to use the user-level directory 
> by default when yarn.staging-directory is specified.  We only need to make 
> small changes  for `getStagingDir` function in 
> YarnClusterDescriptor 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23169) Support user-level app staging directory when yarn.staging-directory is specified

2021-07-08 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17377350#comment-17377350
 ] 

jinfeng commented on FLINK-23169:
-

[~fly_in_gis]  thank for  you reply.

When we use a Flink job management platform to submit jobs from different 
users, we config the 'yarn.staging-directory' ,for example. /flink/staging/

1. if user1 submit the job, and user1 will create the 
/flink/staging/.flink/application_ user1 supergroup drwx--x--x
2. if user2 submit the job, and user2 can't use /flink/staging/.flink , becase 
directory `.flink` belongs to user1.
3. if we create /flink/staging/.flink first, there is also another problem, 
user1, user2, user3.. 's application staging directory will be created in the 
same directory .

The reason why we don't use the default home directory is that we want to 
provided a common directory to different users. If not, every new user need to 
create their own home directory and apply quota.
I notice that the spark also provided a config to specific the yarn staging 
directory, and it will be divided by user directory by default.

> Support user-level app staging directory when yarn.staging-directory is 
> specified
> -
>
> Key: FLINK-23169
> URL: https://issues.apache.org/jira/browse/FLINK-23169
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Reporter: jinfeng
>Priority: Major
>
> When yarn.staging-directory is specified,  different users will use the same 
> directory as the staging directory.   It may not friendly for a job platform 
> to submit job for different users.  I propose to use the user-level directory 
> by default when yarn.staging-directory is specified.  We only need to make 
> small changes  for `getStagingDir` function in 
> YarnClusterDescriptor 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23169) Support user-level app staging directory when yarn.staging-directory is specified

2021-06-30 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17371886#comment-17371886
 ] 

jinfeng commented on FLINK-23169:
-

[~fly_in_gis]   please have a look at this ticket

> Support user-level app staging directory when yarn.staging-directory is 
> specified
> -
>
> Key: FLINK-23169
> URL: https://issues.apache.org/jira/browse/FLINK-23169
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Reporter: jinfeng
>Priority: Major
>
> When yarn.staging-directory is specified,  different users will use the same 
> directory as the staging directory.   It may not friendly for a job platform 
> to submit job for different users.  I propose to use the user-level directory 
> by default when yarn.staging-directory is specified.  We only need to make 
> small changes  for `getStagingDir` function in 
> YarnClusterDescriptor 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23169) Support user-level app staging directory when yarn.staging-directory is specified

2021-06-28 Thread jinfeng (Jira)
jinfeng created FLINK-23169:
---

 Summary: Support user-level app staging directory when 
yarn.staging-directory is specified
 Key: FLINK-23169
 URL: https://issues.apache.org/jira/browse/FLINK-23169
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / YARN
Reporter: jinfeng


When yarn.staging-directory is specified,  different users will use the same 
directory as the staging directory.   It may not friendly for a job platform to 
submit job for different users.  I propose to use the user-level directory by 
default when yarn.staging-directory is specified.  We only need to make small 
changes  for `getStagingDir` function in 

YarnClusterDescriptor 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22428) Translate timezone page to Chinese

2021-04-23 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17330438#comment-17330438
 ] 

jinfeng commented on FLINK-22428:
-

[~Leonard Xu] I can help to translate. 

> Translate timezone page to Chinese
> --
>
> Key: FLINK-22428
> URL: https://issues.apache.org/jira/browse/FLINK-22428
> Project: Flink
>  Issue Type: Task
>  Components: Documentation, Table SQL / API
>Reporter: Leonard Xu
>Priority: Major
>
> the file path is docs/content.zh/docs/dev/table/concepts/timezone.md



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22178) Support ignore-first-line option in new csv format

2021-04-11 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17318678#comment-17318678
 ] 

jinfeng commented on FLINK-22178:
-

[~ykt836]  I think ignore-first-line option is needed only when we use 
filesystem connector to read the csv file, so we only need to modify the 
AbstractCsvInputFormat and ignore the first line in the open method when 
splitStart == 0 ,  instread of modifying the CsvRowDataDeserializationSchema.   
I don’t know if my understanding is correct. 

> Support ignore-first-line option in new csv format
> --
>
> Key: FLINK-22178
> URL: https://issues.apache.org/jira/browse/FLINK-22178
> Project: Flink
>  Issue Type: New Feature
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.13.0
>Reporter: Kurt Young
>Assignee: jinfeng
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22178) Support ignore-first-line option in new csv format

2021-04-09 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17317954#comment-17317954
 ] 

jinfeng commented on FLINK-22178:
-

[~ykt836] I can help to implement this, please assign this to me. 

> Support ignore-first-line option in new csv format
> --
>
> Key: FLINK-22178
> URL: https://issues.apache.org/jira/browse/FLINK-22178
> Project: Flink
>  Issue Type: New Feature
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.13.0
>Reporter: Kurt Young
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22064) Don't submit statement set when no insert is added in the sql client

2021-03-31 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17312280#comment-17312280
 ] 

jinfeng commented on FLINK-22064:
-

[~jark]  I can help to fix this. 

> Don't submit statement set when no insert  is added in the sql client
> -
>
> Key: FLINK-22064
> URL: https://issues.apache.org/jira/browse/FLINK-22064
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.13.0
>Reporter: Shengkai Fang
>Priority: Major
> Fix For: 1.13.0
>
>
> !https://static.dingtalk.com/media/lALPD4Bhs9KtV3jM4s0F2A_1496_226.png_720x720g.jpg?renderWidth=1496=226=1=0=im!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22043) Introduce thrift format to (de)serialize thrift message only with table

2021-03-31 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17312144#comment-17312144
 ] 

jinfeng commented on FLINK-22043:
-

Thanks [~yunta] 

> Introduce thrift format to (de)serialize  thrift message only with table 
> -
>
> Key: FLINK-22043
> URL: https://issues.apache.org/jira/browse/FLINK-22043
> Project: Flink
>  Issue Type: New Feature
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: jinfeng
>Priority: Major
>
> We have implement thrift format in our company. I found that there was a mail 
> discuss
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/thrift-support-td43257.html
> , but no one seems to have time to implement it.
> [~yuyang08] has done most of the things , I can help implement it based on   
> https://github.com/apache/flink/pull/8067  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22043) Introduce thrift format to (de)serialize thrift message only with table

2021-03-31 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17312115#comment-17312115
 ] 

jinfeng commented on FLINK-22043:
-

[~yunta]   I agree that it should not be too complicated, and I can help to 
implement the both.  

> Introduce thrift format to (de)serialize  thrift message only with table 
> -
>
> Key: FLINK-22043
> URL: https://issues.apache.org/jira/browse/FLINK-22043
> Project: Flink
>  Issue Type: New Feature
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: jinfeng
>Priority: Major
>
> We have implement thrift format in our company. I found that there was a mail 
> discuss
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/thrift-support-td43257.html
> , but no one seems to have time to implement it.
> [~yuyang08] has done most of the things , I can help implement it based on   
> https://github.com/apache/flink/pull/8067  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22043) Introduce thrift format to (de)serialize thrift message only with table

2021-03-30 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17311674#comment-17311674
 ] 

jinfeng commented on FLINK-22043:
-

It may be simpler if we only consider the table, maybe we can open another jira 
to define a general thrift serializer. 

> Introduce thrift format to (de)serialize  thrift message only with table 
> -
>
> Key: FLINK-22043
> URL: https://issues.apache.org/jira/browse/FLINK-22043
> Project: Flink
>  Issue Type: New Feature
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: jinfeng
>Priority: Major
>
> We have implement thrift format in our company. I found that there was a mail 
> discuss
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/thrift-support-td43257.html
> , but no one seems to have time to implement it.
> [~yuyang08] has done most of the things , I can help implement it based on   
> https://github.com/apache/flink/pull/8067  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22043) Introduce thrift format to (de)serialize thrift message only with table

2021-03-30 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17311522#comment-17311522
 ] 

jinfeng commented on FLINK-22043:
-

[~yunta]  I mean only support for sql, and we can use  thrift format in flink 
SQL DDL. 

> Introduce thrift format to (de)serialize  thrift message only with table 
> -
>
> Key: FLINK-22043
> URL: https://issues.apache.org/jira/browse/FLINK-22043
> Project: Flink
>  Issue Type: New Feature
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: jinfeng
>Priority: Major
>
> We have implement thrift format in our company. I found that there was a mail 
> discuss
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/thrift-support-td43257.html
> , but no one seems to have time to implement it.
> [~yuyang08] has done most of the things , I can help implement it based on   
> https://github.com/apache/flink/pull/8067  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22043) Introduce thrift format to (de)serialize thrift message only with table

2021-03-30 Thread jinfeng (Jira)
jinfeng created FLINK-22043:
---

 Summary: Introduce thrift format to (de)serialize  thrift message 
only with table 
 Key: FLINK-22043
 URL: https://issues.apache.org/jira/browse/FLINK-22043
 Project: Flink
  Issue Type: New Feature
Reporter: jinfeng


We have implement thrift format in our company. I found that there was a mail 
discuss
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/thrift-support-td43257.html
, but no one seems to have time to implement it.
[~yuyang08] has done most of the things , I can help implement it based on   
https://github.com/apache/flink/pull/8067  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-21741) Support SHOW JARS statement in SQL Client

2021-03-29 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17310690#comment-17310690
 ] 

jinfeng commented on FLINK-21741:
-

[~jark]  I can help to implement this feature. 

> Support SHOW JARS statement in SQL Client
> -
>
> Key: FLINK-21741
> URL: https://issues.apache.org/jira/browse/FLINK-21741
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Client
>Reporter: Jark Wu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-21906) Support computed column syntax for Hive DDL dialect

2021-03-28 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17310411#comment-17310411
 ] 

jinfeng commented on FLINK-21906:
-

I learned the code in FLIP-152, what we need to do is to use antlr in 
flink-hive-connector to implement the syntax of watermark, and convert ast to 
CreateTableOperation,
Maybe I can take a try. This feature is not target to 1.13 release, which means 
that I have more time to complete it , that's fine

> Support computed column syntax for Hive DDL dialect
> ---
>
> Key: FLINK-21906
> URL: https://issues.apache.org/jira/browse/FLINK-21906
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: Jark Wu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-21978) Disable cast conversion between Numeric type and TIMESTAMP_LTZ type

2021-03-26 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17309850#comment-17309850
 ] 

jinfeng commented on FLINK-21978:
-

[~ykt836] milliseconds

> Disable cast conversion between Numeric type and TIMESTAMP_LTZ type
> ---
>
> Key: FLINK-21978
> URL: https://issues.apache.org/jira/browse/FLINK-21978
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.13.0
>Reporter: Leonard Xu
>Assignee: Leonard Xu
>Priority: Major
>  Labels: pull-request-available
>
> Currently we has supported the cast conversion between Numeric type and 
> TIMESTAMP_LTZ type, we suppose the numeric value e.g `Long type 1000L` as 
> epoch seconds and then cast  to TIMESTAMP_LTZ, but the java.lang.Long is a 
> conversion class of `LocalZonedTimestampType`  and treats as milliseconds.
> To avoid the inconsistency, we should disable it and encourage user to use 
> `TO_TIMESTAMP_LTZ(numeric, precisoon)` function.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-21906) Support computed column syntax for Hive DDL dialect

2021-03-26 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17309749#comment-17309749
 ] 

jinfeng commented on FLINK-21906:
-

[~jark]  I'm interested in this, can you assgin this to me ? 

> Support computed column syntax for Hive DDL dialect
> ---
>
> Key: FLINK-21906
> URL: https://issues.apache.org/jira/browse/FLINK-21906
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: Jark Wu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-21907) Support watermark syntax for Hive DDL dialect

2021-03-26 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17309748#comment-17309748
 ] 

jinfeng commented on FLINK-21907:
-

[~jark]  I'm interested in this, can you assgin this to me ? 

> Support watermark syntax for Hive DDL dialect
> -
>
> Key: FLINK-21907
> URL: https://issues.apache.org/jira/browse/FLINK-21907
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Jark Wu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-21978) Disable cast conversion between Numeric type and TIMESTAMP_LTZ type

2021-03-26 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17309471#comment-17309471
 ] 

jinfeng commented on FLINK-21978:
-

+1 for introduce an build-in function for timestamp casting. In our compancy 
platform, we support a common udf to covert TIMESTAMP or BIGINT which is widely 
used.
It should be much better if it can be made into a built-in function.

> Disable cast conversion between Numeric type and TIMESTAMP_LTZ type
> ---
>
> Key: FLINK-21978
> URL: https://issues.apache.org/jira/browse/FLINK-21978
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.13.0
>Reporter: Leonard Xu
>Assignee: Leonard Xu
>Priority: Major
>  Labels: pull-request-available
>
> Currently we has supported the cast conversion between Numeric type and 
> TIMESTAMP_LTZ type, we suppose the numeric value e.g `Long type 1000L` as 
> epoch seconds and then cast  to TIMESTAMP_LTZ, but the java.lang.Long is a 
> conversion class of `LocalZonedTimestampType`  and treats as milliseconds.
> To avoid the inconsistency, we should disable it and encourage user to use 
> `TO_TIMESTAMP_LTZ(numeric, precisoon)` function.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-21892) Add documentation for the DESC statement syntax

2021-03-21 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17305681#comment-17305681
 ] 

jinfeng commented on FLINK-21892:
-

[~jark] Sure, please assign this to me. 

> Add documentation for the DESC statement syntax
> ---
>
> Key: FLINK-21892
> URL: https://issues.apache.org/jira/browse/FLINK-21892
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Table SQL / API
>Reporter: Jark Wu
>Priority: Major
>
> We suppored {{DESC}} as an abbreviation of the DESCRIBE statement in 
> FLINK-21847. We should update the documentation page: 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/describe/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-21847) Introduce DESC grammar in sql parser

2021-03-17 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17303823#comment-17303823
 ] 

jinfeng commented on FLINK-21847:
-

[~fsk119]  Can you assign this to me ?  

> Introduce DESC grammar in sql parser
> 
>
> Key: FLINK-21847
> URL: https://issues.apache.org/jira/browse/FLINK-21847
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.13.0
>Reporter: Shengkai Fang
>Priority: Major
>
> DESC is abbreviation of the DESCRIBE statement. DESC currently is only 
> supported in the sql client.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21068) Add new timeout options for Elasticsearch connector

2021-01-21 Thread jinfeng (Jira)
jinfeng created FLINK-21068:
---

 Summary: Add new timeout options for Elasticsearch connector
 Key: FLINK-21068
 URL: https://issues.apache.org/jira/browse/FLINK-21068
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / ElasticSearch
Affects Versions: 1.12.1
Reporter: jinfeng


Currently,   the connection.max-retry-timeout seems not work with new 
elasticsearch connector.   The elasticsearch community  has  Remove  
setMaxRetryTimeoutMillis  from RestClientBuilder.  We can set timeout options 
when create RestHighLevelClient in 

Elasticsearch7ApiCallBridge , like 
{code:java}
//代码占位符
@Override
public RestHighLevelClient createClient(Map clientConfig) 
throws IOException {
   RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new 
HttpHost[httpHosts.size()]));
   builder.setRequestConfigCallback(new 
RestClientBuilder.RequestConfigCallback() {
  @Override
  public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder 
builder) {
 if (clientConfig.containsKey(CONFIG_KEY_CONNECTION_TIMEOUT)) {

builder.setConnectTimeout(Integer.valueOf(clientConfig.get(CONFIG_KEY_CONNECTION_TIMEOUT)));
 }
 if (clientConfig.containsKey(CONFIG_KEY_CONNECTION_SOCKET_TIMEOUT)) {

builder.setSocketTimeout(Integer.valueOf(clientConfig.get(CONFIG_KEY_CONNECTION_SOCKET_TIMEOUT)));
 }
 if (clientConfig.containsKey(CONFIG_KEY_CONNECTION_REQUEST_TIMEOUT)) {

builder.setConnectionRequestTimeout(Integer.valueOf(clientConfig.get(CONFIG_KEY_CONNECTION_REQUEST_TIMEOUT)));
 }
 return builder;
  }
   });

{code}
 

So, we can add three table config to config  eleasticsearch timeout.

connection.timeout

connection.socket-timeout

connection.request-timeout

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20739) Ban `if` from HiveModule

2020-12-23 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17254008#comment-17254008
 ] 

jinfeng commented on FLINK-20739:
-

I see, if we can't ban it directly , maybe we can add interface for hive module 
to config  some other functions easily   [~lirui]

> Ban  `if`  from HiveModule
> --
>
> Key: FLINK-20739
> URL: https://issues.apache.org/jira/browse/FLINK-20739
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hive
>Affects Versions: 1.10.0
>Reporter: jinfeng
>Priority: Major
>
> When using hiveModule, the if function is treated as a normal function.
>  If I have a SQL like this: 
>   
> {code:java}
>  insert into Sink select  if(size(split(`test`, '-')) > 1, split(`test`, 
> '-')[2], 'error') from Source   {code}
>  
>  It will throw arrayIndexOutOfBoundsException in Flink1.10,  becase 
> size(split(`test`, '-')-) > 1 , split(`test`, '')[2], ‘error’   will be 
> calculated first, and then if function will be  calculated



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-20739) Ban `if` from HiveModule

2020-12-22 Thread jinfeng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jinfeng updated FLINK-20739:

Description: 
When using hiveModule, the if function is treated as a normal function.
 If I have a SQL like this: 
  
{code:java}
 insert into Sink select  if(size(split(`test`, '-')) > 1, split(`test`, 
'-')[2], 'error') from Source   {code}
 
 It will throw arrayIndexOutOfBoundsException in Flink1.10,  becase 
size(split(`test`, '-')-) > 1 , split(`test`, '')[2], ‘error’   will be 
calculated first, and then if function will be  calculated

  was:
When using hiveModule, the if function is treated as a normal function.
If I have a SQL like this: 
 
{code:java}
 insert into Sink select  if(size(split(`test`, '-')) > 1, split(`test`, 
'-')[10], 'error') from Source   {code}
 
It will throw arrayIndexOutOfBoundsException in Flink1.10,  becase 
size(split(`test`, '-')) > 1 , split(`test`, '-')[10], ‘error’   will be 
calculated first, and then if function will be  calculated


> Ban  `if`  from HiveModule
> --
>
> Key: FLINK-20739
> URL: https://issues.apache.org/jira/browse/FLINK-20739
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hive
>Affects Versions: 1.10.0
>Reporter: jinfeng
>Priority: Major
>
> When using hiveModule, the if function is treated as a normal function.
>  If I have a SQL like this: 
>   
> {code:java}
>  insert into Sink select  if(size(split(`test`, '-')) > 1, split(`test`, 
> '-')[2], 'error') from Source   {code}
>  
>  It will throw arrayIndexOutOfBoundsException in Flink1.10,  becase 
> size(split(`test`, '-')-) > 1 , split(`test`, '')[2], ‘error’   will be 
> calculated first, and then if function will be  calculated



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20739) Ban `if` from HiveModule

2020-12-22 Thread jinfeng (Jira)
jinfeng created FLINK-20739:
---

 Summary: Ban  `if`  from HiveModule
 Key: FLINK-20739
 URL: https://issues.apache.org/jira/browse/FLINK-20739
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Hive
Affects Versions: 1.10.0
Reporter: jinfeng


When using hiveModule, the if function is treated as a normal function.
If I have a SQL like this: 
 
{code:java}
 insert into Sink select  if(size(split(`test`, '-')) > 1, split(`test`, 
'-')[10], 'error') from Source   {code}
 
It will throw arrayIndexOutOfBoundsException in Flink1.10,  becase 
size(split(`test`, '-')) > 1 , split(`test`, '-')[10], ‘error’   will be 
calculated first, and then if function will be  calculated



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19937) Support sink parallelism option for all connectors

2020-11-10 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17229106#comment-17229106
 ] 

jinfeng commented on FLINK-19937:
-

[~lzljs3620320] Got it , thanks for your reply. 

> Support sink parallelism option for all connectors
> --
>
> Key: FLINK-19937
> URL: https://issues.apache.org/jira/browse/FLINK-19937
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Ecosystem
>Reporter: Lsw_aka_laplace
>Priority: Major
>  Labels: pull-request-available
>
> Since https://issues.apache.org/jira/browse/FLINK-19727 has been done.
> SINK_PARALLELISM option and `ParallelismProvider` should be applied for all 
> existing `DynamicTableSink` of connectors in order to give users access to 
> setting their own sink parallelism.
>  
>  
> Update:
> Anybody who works on this issue should refrence to FLINK-19727~
> `ParallelismProvider` should work with `SinkRuntimeProvider`, actually 
> `SinkFunctionProvider` and `OutputFormatProvider`  has implemented 
> `ParallelismProvider`. And `SINK_PARALLELISM`  has already defined in 
> `FactoryUtil`, plz reuse it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19937) Support sink parallelism option for all connectors

2020-11-09 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17228944#comment-17228944
 ] 

jinfeng commented on FLINK-19937:
-

After setting the parallelism, the sequence of data cannot be guaranteed,  
because the default physical partitioning is rebalance.  Should we add a config 
that we can config the physical partitioning ? 

> Support sink parallelism option for all connectors
> --
>
> Key: FLINK-19937
> URL: https://issues.apache.org/jira/browse/FLINK-19937
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Ecosystem
>Reporter: Lsw_aka_laplace
>Priority: Major
>  Labels: pull-request-available
>
> Since https://issues.apache.org/jira/browse/FLINK-19727 has been done.
> SINK_PARALLELISM option and `ParallelismProvider` should be applied for all 
> existing `DynamicTableSink` of connectors in order to give users access to 
> setting their own sink parallelism.
>  
>  
> Update:
> Anybody who works on this issue should refrence to FLINK-19727~
> `ParallelismProvider` should work with `SinkRuntimeProvider`, actually 
> `SinkFunctionProvider` and `OutputFormatProvider`  has implemented 
> `ParallelismProvider`. And `SINK_PARALLELISM`  has already defined in 
> `FactoryUtil`, plz reuse it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17139) The sub-plan reuse optimizer can't reuse sub-plan that from different sql node tree 

2020-04-14 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17083380#comment-17083380
 ] 

jinfeng commented on FLINK-17139:
-

[~wenlong.lwl]  . That's ture.  The master of 
StreamTableEnvironmentImpl#isEagerOperationTranslation return false.   

> The sub-plan reuse optimizer can't reuse sub-plan that from different sql 
> node tree 
> 
>
> Key: FLINK-17139
> URL: https://issues.apache.org/jira/browse/FLINK-17139
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: jinfeng
>Priority: Major
>
> The sub-plan reuse optimizer can't reuse sub-plan that from different sql 
> node tree .
> {code:java}
> //代码占位符
> create table SourceTable ...;
> create table SinkTable1 ;
> create table SinkTable2 ;
> insert into SinkTable1 select * from SourceTable;
> insert into SinkTable2 select * from SourceTable;
> {code}
> SourceTable will not be reuse when I execute the below sql statements;
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17139) The sub-plan reuse optimizer can't reuse sub-plan that from different sql node tree 

2020-04-14 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17083367#comment-17083367
 ] 

jinfeng commented on FLINK-17139:
-

[~ykt836] , I execute the sql  by  using  sqlUpdate() ;  

> The sub-plan reuse optimizer can't reuse sub-plan that from different sql 
> node tree 
> 
>
> Key: FLINK-17139
> URL: https://issues.apache.org/jira/browse/FLINK-17139
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: jinfeng
>Priority: Major
>
> The sub-plan reuse optimizer can't reuse sub-plan that from different sql 
> node tree .
> {code:java}
> //代码占位符
> create table SourceTable ...;
> create table SinkTable1 ;
> create table SinkTable2 ;
> insert into SinkTable1 select * from SourceTable;
> insert into SinkTable2 select * from SourceTable;
> {code}
> SourceTable will not be reuse when I execute the below sql statements;
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17139) The sub-plan reuse optimizer can't reuse sub-plan that from different sql node tree 

2020-04-14 Thread jinfeng (Jira)
jinfeng created FLINK-17139:
---

 Summary: The sub-plan reuse optimizer can't reuse sub-plan that 
from different sql node tree 
 Key: FLINK-17139
 URL: https://issues.apache.org/jira/browse/FLINK-17139
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.10.0
Reporter: jinfeng


The sub-plan reuse optimizer can't reuse sub-plan that from different sql node 
tree .
{code:java}
//代码占位符
create table SourceTable ...;
create table SinkTable1 ;
create table SinkTable2 ;

insert into SinkTable1 select * from SourceTable;
insert into SinkTable2 select * from SourceTable;

{code}
SourceTable will not be reuse when I execute the below sql statements;

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16451) listagg with distinct for over window codegen error

2020-03-05 Thread jinfeng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jinfeng updated FLINK-16451:

Description: 
When I use lisgagg with distinct and over window.
{code:java}
//代码占位符
"select listagg(distinct product, '|') over(partition by user order by proctime 
rows between 200 preceding and current row) as product, user from " + testTable
{code}
I got the follwing exception
{code:java}
//代码占位符

Exception in thread "main" java.lang.IndexOutOfBoundsException: Index: 3, Size: 
3 at java.util.ArrayList.rangeCheck(ArrayList.java:657) at 
java.util.ArrayList.get(ArrayList.java:433) at 
java.util.Collections$UnmodifiableList.get(Collections.java:1311) at 
org.apache.flink.table.types.logical.RowType.getTypeAt(RowType.java:174) at 
org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:635)
 at 
org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:620)
 at 
org.apache.flink.table.planner.codegen.GenerateUtils$.generateInputAccess(GenerateUtils.scala:524)
 at 
org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen$$anonfun$10.apply(DistinctAggCodeGen.scala:374)
 at 
org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen$$anonfun$10.apply(DistinctAggCodeGen.scala:374)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:234) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:234) at 
org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen.generateKeyExpression(DistinctAggCodeGen.scala:374)
 at 
org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen.accumulate(DistinctAggCodeGen.scala:192)
 at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$12.apply(AggsHandlerCodeGenerator.scala:871)
 at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$12.apply(AggsHandlerCodeGenerator.scala:871)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genAccumulate(AggsHandlerCodeGenerator.scala:871)
 at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateAggsHandler(AggsHandlerCodeGenerator.scala:329)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.createBoundedOverProcessFunction(StreamExecOverAggregate.scala:425)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:255)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:56)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlan(StreamExecOverAggregate.scala:56)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
{code}
But It worked with 
{code:java}
//代码占位符
select listagg(distinct product) over(partition by user order by proctime rows 
between 200 preceding and current row) as product, user from " + testTable
{code}
 

The exception will be throw  at the below code. 
{code:java}
//代码占位符
private def generateKeyExpression(
ctx: CodeGeneratorContext,
generator: ExprCodeGenerator): GeneratedExpression = {
  val fieldExprs = distinctInfo.argIndexes.map(generateInputAccess(
ctx,
generator.input1Type,
generator.input1Term,
_,
nullableInput = false,
deepCopy = inputFieldCopy))
{code}
 

The distinctInfo.argIndexs is  [1, 3] .  But the index 3 is a logical index. It 
will be replaced by  '|' . And should not  generate Input Access for  index 3 

  was:
When I use lisgagg with distinct and over window.
{code:java}
//代码占位符
"select listagg(distinct product, '|') over(partition by user order by proctime 
rows between 200 preceding and current row) as product, user from " + testTable
{code}

[jira] [Updated] (FLINK-16451) listagg with distinct for over window codegen error

2020-03-05 Thread jinfeng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jinfeng updated FLINK-16451:

Summary: listagg with distinct for over window  codegen error  (was: 
listagg with distinct for over window )

> listagg with distinct for over window  codegen error
> 
>
> Key: FLINK-16451
> URL: https://issues.apache.org/jira/browse/FLINK-16451
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.2, 1.10.0
>Reporter: jinfeng
>Priority: Major
>
> When I use lisgagg with distinct and over window.
> {code:java}
> //代码占位符
> "select listagg(distinct product, '|') over(partition by user order by 
> proctime rows between 200 preceding and current row) as product, user from " 
> + testTable
> {code}
> I got the follwing exception
> {code:java}
> //代码占位符
> Exception in thread "main" java.lang.IndexOutOfBoundsException: Index: 3, 
> Size: 3 at java.util.ArrayList.rangeCheck(ArrayList.java:657) at 
> java.util.ArrayList.get(ArrayList.java:433) at 
> java.util.Collections$UnmodifiableList.get(Collections.java:1311) at 
> org.apache.flink.table.types.logical.RowType.getTypeAt(RowType.java:174) at 
> org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:635)
>  at 
> org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:620)
>  at 
> org.apache.flink.table.planner.codegen.GenerateUtils$.generateInputAccess(GenerateUtils.scala:524)
>  at 
> org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen$$anonfun$10.apply(DistinctAggCodeGen.scala:374)
>  at 
> org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen$$anonfun$10.apply(DistinctAggCodeGen.scala:374)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:234) at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
> scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:234) at 
> org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen.generateKeyExpression(DistinctAggCodeGen.scala:374)
>  at 
> org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen.accumulate(DistinctAggCodeGen.scala:192)
>  at 
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$12.apply(AggsHandlerCodeGenerator.scala:871)
>  at 
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$12.apply(AggsHandlerCodeGenerator.scala:871)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
> scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at 
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genAccumulate(AggsHandlerCodeGenerator.scala:871)
>  at 
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateAggsHandler(AggsHandlerCodeGenerator.scala:329)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.createBoundedOverProcessFunction(StreamExecOverAggregate.scala:425)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:255)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:56)
>  at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlan(StreamExecOverAggregate.scala:56)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
> {code}
> But It worked with 
> {code:java}
> //代码占位符
> select listagg(distinct product) over(partition by user order by proctime 
> rows between 200 preceding and current row) as product, user from " + 
> testTable
> {code}
>  
> {code:java}
> //代码占位符
> private def generateKeyExpression(
> ctx: CodeGeneratorContext,
> generator: ExprCodeGenerator): 

[jira] [Created] (FLINK-16451) listagg with distinct for over window

2020-03-05 Thread jinfeng (Jira)
jinfeng created FLINK-16451:
---

 Summary: listagg with distinct for over window 
 Key: FLINK-16451
 URL: https://issues.apache.org/jira/browse/FLINK-16451
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.10.0, 1.9.2
Reporter: jinfeng


When I use lisgagg with distinct and over window.
{code:java}
//代码占位符
"select listagg(distinct product, '|') over(partition by user order by proctime 
rows between 200 preceding and current row) as product, user from " + testTable
{code}
I got the follwing exception
{code:java}
//代码占位符

Exception in thread "main" java.lang.IndexOutOfBoundsException: Index: 3, Size: 
3 at java.util.ArrayList.rangeCheck(ArrayList.java:657) at 
java.util.ArrayList.get(ArrayList.java:433) at 
java.util.Collections$UnmodifiableList.get(Collections.java:1311) at 
org.apache.flink.table.types.logical.RowType.getTypeAt(RowType.java:174) at 
org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:635)
 at 
org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:620)
 at 
org.apache.flink.table.planner.codegen.GenerateUtils$.generateInputAccess(GenerateUtils.scala:524)
 at 
org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen$$anonfun$10.apply(DistinctAggCodeGen.scala:374)
 at 
org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen$$anonfun$10.apply(DistinctAggCodeGen.scala:374)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:234) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:234) at 
org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen.generateKeyExpression(DistinctAggCodeGen.scala:374)
 at 
org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen.accumulate(DistinctAggCodeGen.scala:192)
 at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$12.apply(AggsHandlerCodeGenerator.scala:871)
 at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$12.apply(AggsHandlerCodeGenerator.scala:871)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genAccumulate(AggsHandlerCodeGenerator.scala:871)
 at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateAggsHandler(AggsHandlerCodeGenerator.scala:329)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.createBoundedOverProcessFunction(StreamExecOverAggregate.scala:425)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:255)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:56)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlan(StreamExecOverAggregate.scala:56)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
{code}
But It worked with 
{code:java}
//代码占位符
select listagg(distinct product) over(partition by user order by proctime rows 
between 200 preceding and current row) as product, user from " + testTable
{code}
 
{code:java}
//代码占位符
private def generateKeyExpression(
ctx: CodeGeneratorContext,
generator: ExprCodeGenerator): GeneratedExpression = {
  val fieldExprs = distinctInfo.argIndexes.map(generateInputAccess(
ctx,
generator.input1Type,
generator.input1Term,
_,
nullableInput = false,
deepCopy = inputFieldCopy))
{code}
The exception will be throw  at the below code.

The distinctInfo.argIndexs is  [1, 3] .  But the index 3 is a logical index. It 
will be replaced by  '|' . And should not  generate Input Access for  index 3 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14356) Introduce "single-field" format to (de)serialize message to a single field

2019-10-10 Thread jinfeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16948788#comment-16948788
 ] 

jinfeng commented on FLINK-14356:
-

I am very happy to contribute this . It would be simple to implement the raw 
format  that only support VARBINARY

> Introduce "single-field" format to (de)serialize message to a single field
> --
>
> Key: FLINK-14356
> URL: https://issues.apache.org/jira/browse/FLINK-14356
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / API
>Reporter: jinfeng
>Priority: Major
>
> I want to use flink sql to write kafka messages directly to hdfs. The 
> serialization and deserialization of messages are not involved in the middle. 
>  The bytes of the message directly convert the first field of Row.  However, 
> the current RowSerializationSchema does not support the conversion of bytes 
> to VARBINARY. Can we add some special RowSerializationSchema and 
> RowDerializationSchema ? 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14356) Support some special RowDeserializationSchema and RowSerializationSchema

2019-10-09 Thread jinfeng (Jira)
jinfeng created FLINK-14356:
---

 Summary: Support some special RowDeserializationSchema and 
RowSerializationSchema 
 Key: FLINK-14356
 URL: https://issues.apache.org/jira/browse/FLINK-14356
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
SQL / API
Reporter: jinfeng


I want to use flink sql to write kafka messages directly to hdfs. The 
serialization and deserialization of messages are not involved in the middle.  
The bytes of the message directly convert the first field of Row.  However, the 
current RowSerializationSchema does not support the conversion of bytes to 
VARBINARY. Can we add some special RowSerializationSchema and 
RowDerializationSchema ? 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)