[jira] [Created] (FLINK-11094) Restored state in RocksDBStateBackend that has not been accessed in new execution causes NPE on snapshot

2018-12-06 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-11094:
---

 Summary: Restored state in RocksDBStateBackend that has not been 
accessed in new execution causes NPE on snapshot
 Key: FLINK-11094
 URL: https://issues.apache.org/jira/browse/FLINK-11094
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.7.0
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai
 Fix For: 1.7.1


This was caused by changes in FLINK-10679.

The problem is that in that change, in the {{RocksDBKeyedBackend}}, 
{{RegisteredStateMetaInfoBase}}s were no longer created eagerly for all 
restored state, but instead only lazily created when the state was accessed 
again by the user. This causes non-accessed restored state to have empty meta 
info, and throws NPE when trying to take a snapshot of them.

The rationale behind FLINK-10679 was that, since 
{{RegisteredStateMetaInfoBase}} holds already serializer instances for state 
access, creating them eagerly at restore time with restored serializer 
snapshots did not make sense (because at that point-in-time, we do not have the 
new serializers yet for state access; the snapshot is only capable of creating 
the previous state serializer).

I propose the following:

Instead of having final {{TypeSerializer}} instances in 
{{RegisteredStateMetaInfoBase}}s, they should have a 
{{StateSerializerProvider}} instead.

The {{StateSerializerProvider}} would have the following methods:
{code}
public class StateSerializerProvider {
    TypeSerializer getCurrentSerializer();

    TypeSerializer updateCurrentSerializer(TypeSerializer newSerializer);

    TypeSerializer getPreviousSerializer();

}
{code}

A {{StateSerializerProvider}} can be created either from:
1) A restored serializer snapshot when restoring the state.
2) A fresh, new state's serializer, when registering the state for the first 
time.

For 1), state that has not been accessed after the restore will return the same 
serializer (i.e. the previous serializer) for both {{getPreviousSerializer}} 
and {{getCurrentSerializer}}. Once a restored state is re-accessed, then 
{{updateCurrentSerializer(TypeSerializer newSerializer)}} should be used to 
update what serializer the provider returns in {{getCurrentSerializer}}.

We could also make use of this new abstraction to move away some of the new 
serializer's compatibility checks from the state backend to 
{{StateSerializerProvider#updateCurrentSerializer}}.

For tests, apparently we're lacking test coverage for restored state that has 
not been accessed and being snapshotted again. This should be included as part 
of the fix.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11093) Migrate flink-table runtime Function classes

2018-12-06 Thread xueyu (JIRA)
xueyu created FLINK-11093:
-

 Summary: Migrate flink-table runtime Function classes
 Key: FLINK-11093
 URL: https://issues.apache.org/jira/browse/FLINK-11093
 Project: Flink
  Issue Type: New Feature
Reporter: xueyu
Assignee: xueyu


As discussed in 
[FLINK-11065|https://issues.apache.org/jira/browse/FLINK-11065], this is a 
subtask which migrates flink-table 
{{org.apache.flink.table.runtime.\*Function.scala}} to java in module 
flink-table-runtime



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11092) Migrate flink-table runtime Selector and Collector classes

2018-12-06 Thread xueyu (JIRA)
xueyu created FLINK-11092:
-

 Summary: Migrate flink-table runtime Selector and Collector classes
 Key: FLINK-11092
 URL: https://issues.apache.org/jira/browse/FLINK-11092
 Project: Flink
  Issue Type: New Feature
  Components: Table API  SQL
Reporter: xueyu
Assignee: xueyu


As discussed in 
[FLINK-11065|https://issues.apache.org/jira/browse/FLINK-11065], this is a 
subtask which migrates flink-table CRowKeySelector, RowKeySelector, 
CRowWrappingCollector, and TableFunctionCollector to java in module 
flink-table-runtime



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Discuss [FLINK-9740] Support group windows over intervals of months

2018-12-06 Thread x1q1j1
hi Timo Walther??
   I have redesigned and sorted it out. Please find the details in the 
attachment and I also added it to jira.
   https://issues.apache.org/jira/browse/FLINK-9740
thanks
qianjin

[jira] [Created] (FLINK-11090) Unused parameter in WindowedStream.aggregate()

2018-12-06 Thread Hequn Cheng (JIRA)
Hequn Cheng created FLINK-11090:
---

 Summary: Unused parameter in WindowedStream.aggregate()
 Key: FLINK-11090
 URL: https://issues.apache.org/jira/browse/FLINK-11090
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Reporter: Hequn Cheng
Assignee: Hequn Cheng


The {{aggregateResultType}} parameter in {{WindowedStream.aggregate()}} seems 
useless. Or what have I missed?

If it is useless, I prefer to remove the parameter by adding a new API and 
deprecate the current one. We can't remove it directly as it is PublicEvolving.
{code:java}
@PublicEvolving
public  SingleOutputStreamOperator aggregate(
AggregateFunction aggregateFunction,
ProcessWindowFunction windowFunction,
TypeInformation accumulatorType,
TypeInformation aggregateResultType,
TypeInformation resultType) {
}
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11091) Clear the use of deprecated methods of KeyedStream in table operators

2018-12-06 Thread sunjincheng (JIRA)
sunjincheng created FLINK-11091:
---

 Summary: Clear the use of deprecated methods of KeyedStream in 
table operators
 Key: FLINK-11091
 URL: https://issues.apache.org/jira/browse/FLINK-11091
 Project: Flink
  Issue Type: Improvement
  Components: Table API  SQL
Affects Versions: 1.7.0, 1.6.2, 1.5.5
Reporter: sunjincheng


The method of  `KeyedStream#process(ProcessFunction)` has bend 

deprecated after FLINK-8560.  So It's good to using 
`KeyedStream#process(KeyProcessFunction)` to implement `DataStreamSort`, 
`DataStreamGroupAggregate` and `DataStreamOverAggregate` operators.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Apply for flink contributor permission

2018-12-06 Thread Tzu-Li (Gordon) Tai
Hi,

You now have contributor permissions and can assign JIRAs you would like to
work on to yourself.

Welcome to the community!

Cheers,
Gordon

On Fri, Dec 7, 2018, 11:17 AM sf lee  Hi there,
> Could anyone kindly give me the contributor permission?
> My JIRA id is xleesf.
>
> Thanks,
>
> xleesf
>


Apply for flink contributor permission

2018-12-06 Thread sf lee
Hi there,
Could anyone kindly give me the contributor permission?
My JIRA id is xleesf.

Thanks,

xleesf


[jira] [Created] (FLINK-11089) Log filecache directory removed messages

2018-12-06 Thread liuzhaokun (JIRA)
liuzhaokun created FLINK-11089:
--

 Summary: Log filecache directory removed messages 
 Key: FLINK-11089
 URL: https://issues.apache.org/jira/browse/FLINK-11089
 Project: Flink
  Issue Type: Improvement
  Components: Local Runtime
Affects Versions: 1.7.0
Reporter: liuzhaokun


When taskmanager exit or shutdown,the filecache directory named 
"flink-dist-cache*" will be removed,but there is not any log about this 
action.So I think we should log it for user to check it easy when there are 
some bugs.

You can see IOManager.java logs the removed messages when taskmanager shutdown, 
filecache can do the same things.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


dev apply

2018-12-06 Thread sf lee
dev apply


Re: delay one of the datastream when performing join operation on event-time and watermark

2018-12-06 Thread jincheng sun
Hi Pakesh Kuma,
I think you can using the interval-join, e.g.:

orderStream
.keyBy()
.intervalJoin(invoiceStream.keyBy())
.between(Time.minutes(-5), Time.minutes(5))

The semantics of interval-join and detailed usage description can refer to
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/joining.html#interval-join

Hope to help you, and any feedback is welcome!

Bests,
Jincheng


Rakesh Kumar  于2018年12月6日周四 下午7:10写道:

> Hi,
> I have two data sources one is  for order data and another one is for
> invoice data, these two data i am pushing into kafka topic in json form. I
> wanted to delay order data for 5 mins because invoice data comes only after
> order data is generated. So, for that i have written a flink program which
> will take these two data from kafka and apply watermarks and delay order
> data for 5 mins. After applying watermarks on these data, i wanted to join
> these data based on order_id which is present in both order and invoice
> data. After Joining i wanted to push it to kafka in different topic.
>
> But, i am not able to join these data streams with 5 min delay and i am
> not able to figure it out.
>
> I am attaching my flink program below and it's dependency.
>


[jira] [Created] (FLINK-11088) Improve Kerberos Authentication using Keytab in YARN proxy user mode

2018-12-06 Thread Rong Rong (JIRA)
Rong Rong created FLINK-11088:
-

 Summary: Improve Kerberos Authentication using Keytab in YARN 
proxy user mode
 Key: FLINK-11088
 URL: https://issues.apache.org/jira/browse/FLINK-11088
 Project: Flink
  Issue Type: Improvement
  Components: YARN
Reporter: Rong Rong


Currently flink-yarn assumes keytab is shipped as application master 
environment local resource on client side and will be distributed to all the 
TMs. This does not work for YARN proxy user mode since proxy user or super user 
does not have access to actual user's keytab but only delegation tokens. 

We propose to have the keytab file path discovery configurable depending on the 
launch mode of the YARN client. 

Reference: 
https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Apply for permission to solve flink's jira issues

2018-12-06 Thread Timo Walther

Hi,

welcome to the Flink community. If you give me your JIRA username, I can 
give your contributor permissions.


Thanks,
Timo

Am 06.12.18 um 12:12 schrieb shen lei:

Hi All,
Could you give me the permission to solve the flink's jira issues? I
am interested in Flink, and I want to find some easy jira issues to study
flink.If possible,I hope to make some contribution to flink.At the same
time , I could learn flink more deeply.Thank you.
Best wishes,
Lei Shen





[jira] [Created] (FLINK-11086) flink-hadoop-compatibility tests fail for 3.x hadoop versions

2018-12-06 Thread Sebastian Klemke (JIRA)
Sebastian Klemke created FLINK-11086:


 Summary: flink-hadoop-compatibility tests fail for 3.x hadoop 
versions
 Key: FLINK-11086
 URL: https://issues.apache.org/jira/browse/FLINK-11086
 Project: Flink
  Issue Type: Bug
  Components: YARN
Reporter: Sebastian Klemke


All builds using maven 3.2.5 on commithash 
ed8ff14ed39d08cd319efe75b40b9742a2ae7558.

Attempted builds:
 - mvn clean install -Dhadoop.version=3.0.3
 - mvn clean install -Dhadoop.version=3.1.1

Integration tests with Hadoop input format datasource fail. Example stack 
trace, taken from hadoop.version 3.1.1 build:
{code:java}
testJobCollectionExecution(org.apache.flink.test.hadoopcompatibility.mapred.WordCountMapredITCase)
  Time elapsed: 0.275 sec  <<< ERR
OR!
java.lang.NoClassDefFoundError: 
org/apache/flink/hadoop/shaded/com/google/re2j/PatternSyntaxException
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.hadoop.fs.Globber.doGlob(Globber.java:210)
at org.apache.hadoop.fs.Globber.glob(Globber.java:149)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:2085)
at 
org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:269)
at 
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:239)
at 
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:325)
at 
org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:150)
at 
org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:58)
at 
org.apache.flink.api.common.operators.GenericDataSourceBase.executeOnCollections(GenericDataSourceBase.java:225)
at 
org.apache.flink.api.common.operators.CollectionExecutor.executeDataSource(CollectionExecutor.java:219)
at 
org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:155)
at 
org.apache.flink.api.common.operators.CollectionExecutor.executeUnaryOperator(CollectionExecutor.java:229)
at 
org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:149)
at 
org.apache.flink.api.common.operators.CollectionExecutor.executeUnaryOperator(CollectionExecutor.java:229)
at 
org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:149)
at 
org.apache.flink.api.common.operators.CollectionExecutor.executeUnaryOperator(CollectionExecutor.java:229)
at 
org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:149)
at 
org.apache.flink.api.common.operators.CollectionExecutor.executeUnaryOperator(CollectionExecutor.java:229)
at 
org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:149)
at 
org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:131)
at 
org.apache.flink.api.common.operators.CollectionExecutor.executeDataSink(CollectionExecutor.java:182)
at 
org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:158)
at 
org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:131)
at 
org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:115)
at 
org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:38)
at 
org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:52)
at 
org.apache.flink.test.hadoopcompatibility.mapred.WordCountMapredITCase.internalRun(WordCountMapredITCase.java:121)
at 
org.apache.flink.test.hadoopcompatibility.mapred.WordCountMapredITCase.testProgram(WordCountMapredITCase.java:71)
{code}
Maybe hadoop 3.x versions could be added to test matrix as well?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0

2018-12-06 Thread Edward Rojas (JIRA)
Edward Rojas created FLINK-11087:


 Summary: Broadcast state migration Incompatibility from 1.5.3 to 
1.7.0
 Key: FLINK-11087
 URL: https://issues.apache.org/jira/browse/FLINK-11087
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.7.0
 Environment: Migration from Flink 1.5.3 to Flink 1.7.0
Reporter: Edward Rojas


When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast 
state throws the following error:
{noformat}
org.apache.flink.util.StateMigrationException: The new key serializer for 
broadcast state must not be incompatible.
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238)
at 
org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:745){noformat}
The broadcast is using a MapState with StringSerializer as key serializer and a 
custom JsonSerializer as value serializer. 

There was no changes in the TypeSerializers used, only upgrade of version. 

 

With some debugging I see that at the moment of the validation of the 
compatibility of states in the DefaultOperatorStateBackend class, the 
"*registeredBroadcastStates*" containing the data about the 'old' state, 
contains wrong association of the key and value serializer. This is, 
JsonSerializer appears as key serializer and StringSerializer appears as value 
serializer. (when it should be the contrary)

 

After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" 
class is the responsible of this swap here:
https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11085) flink-s3-fs-presto

2018-12-06 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-11085:


 Summary: flink-s3-fs-presto
 Key: FLINK-11085
 URL: https://issues.apache.org/jira/browse/FLINK-11085
 Project: Flink
  Issue Type: Bug
  Components: FileSystem
Affects Versions: 1.7.0
Reporter: Chesnay Schepler


A user has reporter an issue on the ML where using the presto-s3 filesystem 
fails with an exception due to a missing class. The missing class is indeed 
filtered out in the shade-plugin configuration.
{code:java}
java.lang.NoClassDefFoundError: 
org/apache/flink/fs/s3presto/shaded/com/facebook/presto/hadoop/HadoopFileStatus
at 
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.directory(PrestoS3FileSystem.java:446)
at 
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.delete(PrestoS3FileSystem.java:423)
at 
org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147)
at 
org.apache.flink.runtime.state.filesystem.FileStateHandle.discardState(FileStateHandle.java:80)
at 
org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:250)
at 
org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219)
at 
org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756)
at 
org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$8(JobMaster.java:680)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Apply for permission to solve flink's jira issues

2018-12-06 Thread shen lei
Hi All,
   Could you give me the permission to solve the flink's jira issues? I
am interested in Flink, and I want to find some easy jira issues to study
flink.If possible,I hope to make some contribution to flink.At the same
time , I could learn flink more deeply.Thank you.
Best wishes,
Lei Shen


delay one of the datastream when performing join operation on event-time and watermark

2018-12-06 Thread Rakesh Kumar
Hi,
I have two data sources one is  for order data and another one is for
invoice data, these two data i am pushing into kafka topic in json form. I
wanted to delay order data for 5 mins because invoice data comes only after
order data is generated. So, for that i have written a flink program which
will take these two data from kafka and apply watermarks and delay order
data for 5 mins. After applying watermarks on these data, i wanted to join
these data based on order_id which is present in both order and invoice
data. After Joining i wanted to push it to kafka in different topic.

But, i am not able to join these data streams with 5 min delay and i am not
able to figure it out.

I am attaching my flink program below and it's dependency.
http://maven.apache.org/POM/4.0.0;
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;>
	4.0.0

	com.flink.streaming
	flinkJoin
	0.0.1-SNAPSHOT
	jar

	flinkJoin
	http://maven.apache.org

	
		UTF-8
		1.8
		1.8
	
	
		
			org.slf4j
			slf4j-log4j12
			1.7.5
		

		
			org.apache.flink
			flink-connector-kafka_2.11
			1.7.0
			

	org.slf4j
	slf4j-log4j12


	log4j
	log4j

			
		
		
			org.apache.flink
			flink-streaming-java_2.11
			1.7.0
			

	org.slf4j
	slf4j-log4j12


	log4j
	log4j

			
		
		
			org.apache.flink
			flink-streaming-scala_2.11
			1.7.0
			

	org.slf4j
	slf4j-log4j12


	log4j
	log4j

			
		
		
			org.json
			json
			20180813
		
		
			org.apache.flink
			flink-clients_2.11
			1.7.0
		
		
			org.apache.flink
			flink-runtime_2.11
			1.7.0
		
	
	
		
			
org.apache.maven.plugins
maven-assembly-plugin
2.3

	
		src/main/assembly/default.xml
	


	
		assembly
		package
		
			single
		
	

			
		
	



Re: [DISCUSS] Flink SQL DDL Design

2018-12-06 Thread Timo Walther

Hi everyone,

great to have such a lively discussion. My next batch of feedback:

@Jark: We don't need to align the descriptor approach with SQL. I'm open 
for different approaches as long as we can serve a broad set of use 
cases and systems. The descriptor approach was a first attempt to cover 
all aspects and connector/format characteristics. Just another example, 
that is missing in the DDL design: How can a user decide if append, 
retraction, or upserts should be used to sink data into the target 
system? Do we want to define all these improtant properties in the big 
WITH property map? If yes, we are already close to the descriptor 
approach. Regarding the "standard way", most DDL languages have very 
custom syntax so there is not a real "standard".


3. Sources/Sinks: @Lin: If a table has both read/write access it can be 
created using a regular CREATE TABLE (omitting a specific source/sink) 
declaration. Regarding the transition from source/sink to both, yes we 
would need to update the a DDL and catalogs. But is this a problem? One 
also needs to add new queries that use the tables. @Xuefu: It is not 
only about security aspects. Especially for streaming use cases, not 
every connector can be used as a source easily. For example, a JDBC sink 
is easier than a JDBC source. Let's assume an interactive CLI session, 
people should be able to list all source table and sink tables to know 
upfront if they can use an INSERT INTO here or not.


6. Partitioning and keys: @Lin: I would like to include this in the 
design given that Hive integration and Kafka key support are in the 
making/are on our roadmap for this release.


5. Schema declaration: @Lin: You are right it is not conflicting. I just 
wanted to raise the point because if users want to declare computed 
columns they have a "schema" constraints but without columns. Are we ok 
with a syntax like ...
CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro, 
format.schema-file = "/my/avrofile.avsc") ?
@Xuefu: Yes, you are right that an external schema might not excatly 
match but this is true for both directions:
table schema "derives" format schema and format schema "derives" table 
schema.


7. Hive compatibility: @Xuefu: I agree that Hive is popular but we 
should not just adopt everything from Hive as there syntax is very 
batch-specific. We should come up with a superset of historical and 
future requirements. Supporting Hive queries can be an intermediate 
layer on top of Flink's DDL.


4. Time attributes: @Lin: I'm fine with changing the TimestampExtractor 
interface as this is also important for better separation of connector 
and table module [1]. However, I'm wondering about watermark generation.


4a. timestamps are in the schema twice:
@Jark: "existing field is Long/Timestamp, we can just use it as 
rowtime": yes, but we need to mark a field as such an attribute. How 
does the syntax for marking look like? Also in case of timestamps that 
are nested in the schema?


4b. how can we write out a timestamp into the message header?:
I agree to simply ignore computed columns when writing out. This is like 
'field-change: add' that I mentioned in the improvements document.
@Jark: "then the timestmap in StreamRecord will be write to Kafka 
message header": Unfortunately, there is no timestamp in the stream 
record. Additionally, multiple time attributes can be in a schema. So we 
need a constraint that tells the sink which column to use (possibly 
computed as well)?


4c. separate all time attribute concerns into a special clause next to
the regular schema?
@Jark: I don't have a strong opinion on this. I just have the feeling 
that the "schema part" becomes quite messy because the actual schema 
with types and fields is accompanied by so much metadata about 
timestamps, watermarks, keys,... and we would need to introduce a new 
WATERMARK keyword within a schema that was close to standard up to this 
point.


Thanks everyone,
Timo

[1] https://issues.apache.org/jira/browse/FLINK-9461



Am 06.12.18 um 07:08 schrieb Jark Wu:

Hi Timo,

Thank you for the valuable feedbacks.

First of all, I think we don't need to align the SQL functionality to
Descriptor. Because SQL is a more standard API, we should be as cautious as
possible to extend the SQL syntax. If something can be done in a standard
way, we shouldn't introduce something new.

Here are some of my thoughts:

1. Scope: Agree.
2. Constraints: Agree.
4. Time attributes:
   4a. timestamps are in the schema twice.
If an existing field is Long/Timestamp, we can just use it as rowtime,
no twice defined. If it is not a Long/Timestamp, we use computed column to
get an expected timestamp column to be rowtime, is this what you mean
defined twice?  But I don't think it is a problem, but an advantages,
because it is easy to use, user do not need to consider whether to "replace
the existing column" or "add a new column", he will not be confused what's
the real schema is, what's the index of