[jira] [Commented] (FLINK-9489) Checkpoint timers as part of managed keyed state instead of raw keyed state

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544092#comment-16544092
 ] 

ASF GitHub Bot commented on FLINK-9489:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6333
  
CC @tillrohrmann 


> Checkpoint timers as part of managed keyed state instead of raw keyed state
> ---
>
> Key: FLINK-9489
> URL: https://issues.apache.org/jira/browse/FLINK-9489
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Timer state should now become part of the keyed state backend snapshot, i.e., 
> stored inside the managed keyed state. This means that we have to connect our 
> preparation for asynchronous checkpoints with the backend, so that the timers 
> are written as part of the state for each key-group. This means that we will 
> also free up the raw keyed state an might expose it to user functions in the 
> future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9489) Checkpoint timers as part of managed keyed state instead of raw keyed state

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544091#comment-16544091
 ] 

ASF GitHub Bot commented on FLINK-9489:
---

GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/6333

[FLINK-9489] Checkpoint timers as part of managed keyed state instead of 
raw keyed state

## What is the purpose of the change

This PR integrates priority queue state (timers) with the snapshotting of 
Flink's state backend ans also already includes backwards compatibility 
(FLINK-9490). Core idea is to have a common abstraction for how state is 
registered in the state backend and how snapshots operator on such state 
(`StateSnapshotRestore`, `RegisteredStateMetaInfoBase`). With this, the new 
state integrates more or less seemless with existing snapshot logic. The 
notable exception is a current lack of integration of RocksDB state backend 
with heap-based priority queue state. This can currently still use the old 
snapshot code without causing any regression using a temporary path (see 
`AbstractStreamOperator.snapshotState(...)`. As a result, after this PR Flink 
supports asynchronous snapshots for heap kv / heap queue, rocks kv / rocks 
queue (full and incremental), rocks kv / heap queue (only full)  and still uses 
synchronous snapshots for rocks kv / heap queue (only incremental).

This work was created in a bit of a rush to make it into the 1.6 release 
and still has some known rough edges that we could fix up in the test phase. 
Here is a list of some things that already come to my mind:

- Integrate heap-based timers with incremental RocksDB snapshots, then kick 
out some code.
- Check proper integration with serializer upgrade story (!!)
- After that, we can also remove the key-partitioning in the set structure 
from `HeapPriorityQueueSet`.
- Improve integration of the batch wrapper.
- Improve general state registration logic in the backends, there is 
potential to remove duplicated code, and generally still improve the 
integration of the queue state a bit.
- Improve performance of RocksDB based timers, e.g. byte[] based cache, 
seek sharp to the next potential timer instead of seeking to the key-group 
start, bulkPoll.
- Improve some class/interface/method names
- Add tests, e.g. bulkPoll etc.

## Verifying this change

This change is already covered by existing tests, but I would add some more 
eventually. You can activate RocksDB based timers by using the RocksDB backend 
and setting `RockDBBackendOptions.PRIORITY_QUEUE_STATE_TYPE` to `ROCKS`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (yes)
  - The runtime per-record code paths (performance sensitive): (yes)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (JavaDocs only for now)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink pq-snapshot-integration

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6333.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6333


commit 1bb8f70700deacc49a4d4ac7900425c10272959d
Author: Stefan Richter 
Date:   2018-06-13T09:56:16Z

[FLINK-9489] Checkpoint timers as part of managed keyed state instead of 
raw keyed state

commit fc20df8268decab6d9890d617787a4084284b2f0
Author: Stefan Richter 
Date:   2018-07-13T23:19:30Z

Optimization for relaxed bulk polls

commit 4db1bea90fd6881555172fe3d22ee928e97894a7
Author: Stefan Richter 
Date:   2018-07-14T06:34:16Z

Renaming of some classes/interfaces




> Checkpoint timers as part of managed keyed state instead of raw keyed state
> ---
>
> Key: FLINK-9489
> URL: https://issues.apache.org/jira/browse/FLINK-9489
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Timer state should now become part of the keyed state backend snapshot, i.e., 
> stored inside t

[jira] [Updated] (FLINK-9489) Checkpoint timers as part of managed keyed state instead of raw keyed state

2018-07-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9489:
--
Labels: pull-request-available  (was: )

> Checkpoint timers as part of managed keyed state instead of raw keyed state
> ---
>
> Key: FLINK-9489
> URL: https://issues.apache.org/jira/browse/FLINK-9489
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Timer state should now become part of the keyed state backend snapshot, i.e., 
> stored inside the managed keyed state. This means that we have to connect our 
> preparation for asynchronous checkpoints with the backend, so that the timers 
> are written as part of the state for each key-group. This means that we will 
> also free up the raw keyed state an might expose it to user functions in the 
> future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6333: [FLINK-9489] Checkpoint timers as part of managed keyed s...

2018-07-13 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6333
  
CC @tillrohrmann 


---


[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...

2018-07-13 Thread StefanRRichter
GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/6333

[FLINK-9489] Checkpoint timers as part of managed keyed state instead of 
raw keyed state

## What is the purpose of the change

This PR integrates priority queue state (timers) with the snapshotting of 
Flink's state backend ans also already includes backwards compatibility 
(FLINK-9490). Core idea is to have a common abstraction for how state is 
registered in the state backend and how snapshots operator on such state 
(`StateSnapshotRestore`, `RegisteredStateMetaInfoBase`). With this, the new 
state integrates more or less seemless with existing snapshot logic. The 
notable exception is a current lack of integration of RocksDB state backend 
with heap-based priority queue state. This can currently still use the old 
snapshot code without causing any regression using a temporary path (see 
`AbstractStreamOperator.snapshotState(...)`. As a result, after this PR Flink 
supports asynchronous snapshots for heap kv / heap queue, rocks kv / rocks 
queue (full and incremental), rocks kv / heap queue (only full)  and still uses 
synchronous snapshots for rocks kv / heap queue (only incremental).

This work was created in a bit of a rush to make it into the 1.6 release 
and still has some known rough edges that we could fix up in the test phase. 
Here is a list of some things that already come to my mind:

- Integrate heap-based timers with incremental RocksDB snapshots, then kick 
out some code.
- Check proper integration with serializer upgrade story (!!)
- After that, we can also remove the key-partitioning in the set structure 
from `HeapPriorityQueueSet`.
- Improve integration of the batch wrapper.
- Improve general state registration logic in the backends, there is 
potential to remove duplicated code, and generally still improve the 
integration of the queue state a bit.
- Improve performance of RocksDB based timers, e.g. byte[] based cache, 
seek sharp to the next potential timer instead of seeking to the key-group 
start, bulkPoll.
- Improve some class/interface/method names
- Add tests, e.g. bulkPoll etc.

## Verifying this change

This change is already covered by existing tests, but I would add some more 
eventually. You can activate RocksDB based timers by using the RocksDB backend 
and setting `RockDBBackendOptions.PRIORITY_QUEUE_STATE_TYPE` to `ROCKS`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (yes)
  - The runtime per-record code paths (performance sensitive): (yes)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (JavaDocs only for now)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink pq-snapshot-integration

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6333.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6333


commit 1bb8f70700deacc49a4d4ac7900425c10272959d
Author: Stefan Richter 
Date:   2018-06-13T09:56:16Z

[FLINK-9489] Checkpoint timers as part of managed keyed state instead of 
raw keyed state

commit fc20df8268decab6d9890d617787a4084284b2f0
Author: Stefan Richter 
Date:   2018-07-13T23:19:30Z

Optimization for relaxed bulk polls

commit 4db1bea90fd6881555172fe3d22ee928e97894a7
Author: Stefan Richter 
Date:   2018-07-14T06:34:16Z

Renaming of some classes/interfaces




---


[jira] [Commented] (FLINK-8858) Add support for INSERT INTO in SQL Client

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544087#comment-16544087
 ] 

ASF GitHub Bot commented on FLINK-8858:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202507962
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ---
@@ -329,14 +341,46 @@ public void stop(SessionContext session) {
}
}
 
-   private  ResultDescriptor executeQueryInternal(ExecutionContext 
context, String query) {
+   private  ProgramTargetDescriptor 
executeUpdateInternal(ExecutionContext context, String statement) {
+   final ExecutionContext.EnvironmentInstance envInst = 
context.createEnvironmentInstance();
+
+   // apply update statement
--- End diff --

On one side yes but on the other side it allows to read the comments from 
top to bottom and know what the method is doing without having to look at the 
actual code.


> Add support for INSERT INTO in SQL Client
> -
>
> Key: FLINK-8858
> URL: https://issues.apache.org/jira/browse/FLINK-8858
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Renjie Liu
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> The current design of SQL Client embedded mode doesn't support long running 
> queries. It would be useful for simple jobs that can be expressed in a single 
> sql statement if we can submit sql statements stored in files as long running 
> queries. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...

2018-07-13 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202507962
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ---
@@ -329,14 +341,46 @@ public void stop(SessionContext session) {
}
}
 
-   private  ResultDescriptor executeQueryInternal(ExecutionContext 
context, String query) {
+   private  ProgramTargetDescriptor 
executeUpdateInternal(ExecutionContext context, String statement) {
+   final ExecutionContext.EnvironmentInstance envInst = 
context.createEnvironmentInstance();
+
+   // apply update statement
--- End diff --

On one side yes but on the other side it allows to read the comments from 
top to bottom and know what the method is doing without having to look at the 
actual code.


---


[jira] [Commented] (FLINK-8858) Add support for INSERT INTO in SQL Client

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544086#comment-16544086
 ] 

ASF GitHub Bot commented on FLINK-8858:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202507930
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 ---
@@ -354,6 +398,23 @@ private void callSelect(SqlCommandCall cmdCall) {
}
}
 
+   private boolean callInsertInto(SqlCommandCall cmdCall) {
+   
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT).toAnsi());
+   terminal.flush();
+
+   try {
+   final ProgramTargetDescriptor programTarget = 
executor.executeUpdate(context, cmdCall.operands[0]);
+   
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_STATEMENT_SUBMITTED).toAnsi());
--- End diff --

Initially I had it similar like your proposal, but this would mix a data 
model class and visualization. `ProgramTargetDescriptor` should not be 
responsible how it is represented in the CLI.


> Add support for INSERT INTO in SQL Client
> -
>
> Key: FLINK-8858
> URL: https://issues.apache.org/jira/browse/FLINK-8858
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Renjie Liu
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> The current design of SQL Client embedded mode doesn't support long running 
> queries. It would be useful for simple jobs that can be expressed in a single 
> sql statement if we can submit sql statements stored in files as long running 
> queries. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...

2018-07-13 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202507930
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 ---
@@ -354,6 +398,23 @@ private void callSelect(SqlCommandCall cmdCall) {
}
}
 
+   private boolean callInsertInto(SqlCommandCall cmdCall) {
+   
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT).toAnsi());
+   terminal.flush();
+
+   try {
+   final ProgramTargetDescriptor programTarget = 
executor.executeUpdate(context, cmdCall.operands[0]);
+   
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_STATEMENT_SUBMITTED).toAnsi());
--- End diff --

Initially I had it similar like your proposal, but this would mix a data 
model class and visualization. `ProgramTargetDescriptor` should not be 
responsible how it is represented in the CLI.


---


[jira] [Commented] (FLINK-8858) Add support for INSERT INTO in SQL Client

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544085#comment-16544085
 ] 

ASF GitHub Bot commented on FLINK-8858:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202507864
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 ---
@@ -173,55 +180,92 @@ public void open() {
if (line == null || line.equals("")) {
continue;
}
+   parseAndCall(line);
+   }
+   }
 
-   final SqlCommandCall cmdCall = 
SqlCommandParser.parse(line);
+   /**
+* Submits a SQL update statement and prints status information and/or 
errors on the terminal.
+*
+* @param statement SQL update statement
+* @return flag to indicate if the submission was successful or not
+*/
+   public boolean submitUpdate(String statement) {
--- End diff --

The entire CliClient is only tested manually so far.


> Add support for INSERT INTO in SQL Client
> -
>
> Key: FLINK-8858
> URL: https://issues.apache.org/jira/browse/FLINK-8858
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Renjie Liu
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> The current design of SQL Client embedded mode doesn't support long running 
> queries. It would be useful for simple jobs that can be expressed in a single 
> sql statement if we can submit sql statements stored in files as long running 
> queries. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...

2018-07-13 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202507864
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 ---
@@ -173,55 +180,92 @@ public void open() {
if (line == null || line.equals("")) {
continue;
}
+   parseAndCall(line);
+   }
+   }
 
-   final SqlCommandCall cmdCall = 
SqlCommandParser.parse(line);
+   /**
+* Submits a SQL update statement and prints status information and/or 
errors on the terminal.
+*
+* @param statement SQL update statement
+* @return flag to indicate if the submission was successful or not
+*/
+   public boolean submitUpdate(String statement) {
--- End diff --

The entire CliClient is only tested manually so far.


---


[jira] [Commented] (FLINK-9407) Support orc rolling sink writer

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544083#comment-16544083
 ] 

ASF GitHub Bot commented on FLINK-9407:
---

Github user sagarl commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r200217627
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/OrcFileWriter.java
 ---
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.fs;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
+
+/**
+ * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
+ *
+ * @param  The type of the elements that are being written by the sink.
+ */
+public class OrcFileWriter extends StreamWriterBase {
+
+   private static final long serialVersionUID = 3L;
+
+   private TypeDescription schema;
+
+   private String meatSchema;
+
+   private transient org.apache.orc.Writer writer;
+
+   private VectorizedRowBatch rowBatch;
+
+   private CompressionKind compressionKind;
+
+   private long writedRowSize;
+
+   private OrcBatchWriter orcBatchWriter;
+
+   /**
+* Creates a new {@code OrcFileWriter} that writes orc files without 
compression.
+*
+* @param metaSchema The orc schema.
+*/
+   public OrcFileWriter(String metaSchema) {
+   this(metaSchema, CompressionKind.NONE);
+   }
+
+   /**
+* Create a new {@code OrcFileWriter} that writes orc file with the 
gaven
+* schema and compression kind.
+*
+* @param metaSchema  The schema of a orc file.
+* @param compressionKind The compression kind to use.
+*/
+   public OrcFileWriter(String metaSchema, CompressionKind 
compressionKind) {
+   this.meatSchema = metaSchema;
+   this.schema = TypeDescription.fromString(metaSchema);
+   this.compressionKind = compressionKind;
+   }
+
+   @Override
+   public void open(FileSystem fs, Path path) throws IOException {
+   writer = OrcFile.createWriter(path, 
OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
+   rowBatch = schema.createRowBatch();
+   orcBatchWriter = new 
OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
+   }
+
+   private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
+   List fieldNames = orcSchema.getFieldNames();
+   List typeDescriptions = 
orcSchema.getChildren();
+   List typeInformations = new ArrayList<>();
+
+   typeDescriptions.forEach(typeDescription -> {
+   typeInformations.add(schemaToTypeInfo(typeDescription));
+   });
+
+   return new TableSchema(
+   fieldNames.toArray(new String[fieldNames.size()]),
+   typeInformations.t

[jira] [Commented] (FLINK-9407) Support orc rolling sink writer

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544082#comment-16544082
 ] 

ASF GitHub Bot commented on FLINK-9407:
---

Github user sagarl commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r200217021
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/OrcFileWriter.java
 ---
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.fs;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
+
+/**
+ * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
+ *
+ * @param  The type of the elements that are being written by the sink.
+ */
+public class OrcFileWriter extends StreamWriterBase {
+
+   private static final long serialVersionUID = 3L;
+
+   private TypeDescription schema;
+
+   private String meatSchema;
+
+   private transient org.apache.orc.Writer writer;
+
+   private VectorizedRowBatch rowBatch;
+
+   private CompressionKind compressionKind;
+
+   private long writedRowSize;
+
+   private OrcBatchWriter orcBatchWriter;
+
+   /**
+* Creates a new {@code OrcFileWriter} that writes orc files without 
compression.
+*
+* @param metaSchema The orc schema.
+*/
+   public OrcFileWriter(String metaSchema) {
+   this(metaSchema, CompressionKind.NONE);
+   }
+
+   /**
+* Create a new {@code OrcFileWriter} that writes orc file with the 
gaven
+* schema and compression kind.
+*
+* @param metaSchema  The schema of a orc file.
+* @param compressionKind The compression kind to use.
+*/
+   public OrcFileWriter(String metaSchema, CompressionKind 
compressionKind) {
+   this.meatSchema = metaSchema;
+   this.schema = TypeDescription.fromString(metaSchema);
+   this.compressionKind = compressionKind;
+   }
+
+   @Override
+   public void open(FileSystem fs, Path path) throws IOException {
+   writer = OrcFile.createWriter(path, 
OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
+   rowBatch = schema.createRowBatch();
+   orcBatchWriter = new 
OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
+   }
+
+   private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
+   List fieldNames = orcSchema.getFieldNames();
+   List typeDescriptions = 
orcSchema.getChildren();
+   List typeInformations = new ArrayList<>();
+
+   typeDescriptions.forEach(typeDescription -> {
+   typeInformations.add(schemaToTypeInfo(typeDescription));
+   });
+
+   return new TableSchema(
+   fieldNames.toArray(new String[fieldNames.size()]),
+   typeInformations.t

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

2018-07-13 Thread sagarl
Github user sagarl commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r200217627
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/OrcFileWriter.java
 ---
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.fs;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
+
+/**
+ * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
+ *
+ * @param  The type of the elements that are being written by the sink.
+ */
+public class OrcFileWriter extends StreamWriterBase {
+
+   private static final long serialVersionUID = 3L;
+
+   private TypeDescription schema;
+
+   private String meatSchema;
+
+   private transient org.apache.orc.Writer writer;
+
+   private VectorizedRowBatch rowBatch;
+
+   private CompressionKind compressionKind;
+
+   private long writedRowSize;
+
+   private OrcBatchWriter orcBatchWriter;
+
+   /**
+* Creates a new {@code OrcFileWriter} that writes orc files without 
compression.
+*
+* @param metaSchema The orc schema.
+*/
+   public OrcFileWriter(String metaSchema) {
+   this(metaSchema, CompressionKind.NONE);
+   }
+
+   /**
+* Create a new {@code OrcFileWriter} that writes orc file with the 
gaven
+* schema and compression kind.
+*
+* @param metaSchema  The schema of a orc file.
+* @param compressionKind The compression kind to use.
+*/
+   public OrcFileWriter(String metaSchema, CompressionKind 
compressionKind) {
+   this.meatSchema = metaSchema;
+   this.schema = TypeDescription.fromString(metaSchema);
+   this.compressionKind = compressionKind;
+   }
+
+   @Override
+   public void open(FileSystem fs, Path path) throws IOException {
+   writer = OrcFile.createWriter(path, 
OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
+   rowBatch = schema.createRowBatch();
+   orcBatchWriter = new 
OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
+   }
+
+   private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
+   List fieldNames = orcSchema.getFieldNames();
+   List typeDescriptions = 
orcSchema.getChildren();
+   List typeInformations = new ArrayList<>();
+
+   typeDescriptions.forEach(typeDescription -> {
+   typeInformations.add(schemaToTypeInfo(typeDescription));
+   });
+
+   return new TableSchema(
+   fieldNames.toArray(new String[fieldNames.size()]),
+   typeInformations.toArray(new 
TypeInformation[typeInformations.size()]));
+   }
+
+   @Override
+   public void write(T element) throws IOException {
+   Boolean isFill = orcBatchWriter.fill(rowBatch, element);
+   if (!isFill) {
+

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

2018-07-13 Thread sagarl
Github user sagarl commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r200217021
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/OrcFileWriter.java
 ---
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.fs;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
+
+/**
+ * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
+ *
+ * @param  The type of the elements that are being written by the sink.
+ */
+public class OrcFileWriter extends StreamWriterBase {
+
+   private static final long serialVersionUID = 3L;
+
+   private TypeDescription schema;
+
+   private String meatSchema;
+
+   private transient org.apache.orc.Writer writer;
+
+   private VectorizedRowBatch rowBatch;
+
+   private CompressionKind compressionKind;
+
+   private long writedRowSize;
+
+   private OrcBatchWriter orcBatchWriter;
+
+   /**
+* Creates a new {@code OrcFileWriter} that writes orc files without 
compression.
+*
+* @param metaSchema The orc schema.
+*/
+   public OrcFileWriter(String metaSchema) {
+   this(metaSchema, CompressionKind.NONE);
+   }
+
+   /**
+* Create a new {@code OrcFileWriter} that writes orc file with the 
gaven
+* schema and compression kind.
+*
+* @param metaSchema  The schema of a orc file.
+* @param compressionKind The compression kind to use.
+*/
+   public OrcFileWriter(String metaSchema, CompressionKind 
compressionKind) {
+   this.meatSchema = metaSchema;
+   this.schema = TypeDescription.fromString(metaSchema);
+   this.compressionKind = compressionKind;
+   }
+
+   @Override
+   public void open(FileSystem fs, Path path) throws IOException {
+   writer = OrcFile.createWriter(path, 
OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
+   rowBatch = schema.createRowBatch();
+   orcBatchWriter = new 
OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
+   }
+
+   private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
+   List fieldNames = orcSchema.getFieldNames();
+   List typeDescriptions = 
orcSchema.getChildren();
+   List typeInformations = new ArrayList<>();
+
+   typeDescriptions.forEach(typeDescription -> {
+   typeInformations.add(schemaToTypeInfo(typeDescription));
+   });
+
+   return new TableSchema(
+   fieldNames.toArray(new String[fieldNames.size()]),
+   typeInformations.toArray(new 
TypeInformation[typeInformations.size()]));
+   }
+
+   @Override
+   public void write(T element) throws IOException {
+   Boolean isFill = orcBatchWriter.fill(rowBatch, element);
+   if (!isFill) {
+

[jira] [Commented] (FLINK-8858) Add support for INSERT INTO in SQL Client

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544076#comment-16544076
 ] 

ASF GitHub Bot commented on FLINK-8858:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202507463
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
 ---
@@ -97,14 +97,34 @@ private void start() {
// add shutdown hook
Runtime.getRuntime().addShutdownHook(new 
EmbeddedShutdownThread(context, executor));
 
-   // start CLI
-   final CliClient cli = new CliClient(context, executor);
-   cli.open();
+   // do the actual work
+   openCli(context, executor);
} else {
throw new SqlClientException("Gateway mode is not 
supported yet.");
}
}
 
+   /**
+* Opens the CLI client for executing SQL statements.
+*
+* @param context session context
+* @param executor executor
+*/
+   private void openCli(SessionContext context, Executor executor) {
+   final CliClient cli = new CliClient(context, executor);
+   // interactive CLI mode
+   if (options.getUpdateStatement() == null) {
+   cli.open();
+   }
+   // execute single update statement
+   else {
+   final boolean success = 
cli.submitUpdate(options.getUpdateStatement());
--- End diff --

No, this would block the process for unbounded queries and require a 
(fault-tolerant) monitoring in the SQL Client which is not intended. We just 
block until the statement has been submitted to the cluster.


> Add support for INSERT INTO in SQL Client
> -
>
> Key: FLINK-8858
> URL: https://issues.apache.org/jira/browse/FLINK-8858
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Renjie Liu
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> The current design of SQL Client embedded mode doesn't support long running 
> queries. It would be useful for simple jobs that can be expressed in a single 
> sql statement if we can submit sql statements stored in files as long running 
> queries. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...

2018-07-13 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202507463
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
 ---
@@ -97,14 +97,34 @@ private void start() {
// add shutdown hook
Runtime.getRuntime().addShutdownHook(new 
EmbeddedShutdownThread(context, executor));
 
-   // start CLI
-   final CliClient cli = new CliClient(context, executor);
-   cli.open();
+   // do the actual work
+   openCli(context, executor);
} else {
throw new SqlClientException("Gateway mode is not 
supported yet.");
}
}
 
+   /**
+* Opens the CLI client for executing SQL statements.
+*
+* @param context session context
+* @param executor executor
+*/
+   private void openCli(SessionContext context, Executor executor) {
+   final CliClient cli = new CliClient(context, executor);
+   // interactive CLI mode
+   if (options.getUpdateStatement() == null) {
+   cli.open();
+   }
+   // execute single update statement
+   else {
+   final boolean success = 
cli.submitUpdate(options.getUpdateStatement());
--- End diff --

No, this would block the process for unbounded queries and require a 
(fault-tolerant) monitoring in the SQL Client which is not intended. We just 
block until the statement has been submitted to the cluster.


---


[jira] [Commented] (FLINK-8858) Add support for INSERT INTO in SQL Client

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544075#comment-16544075
 ] 

ASF GitHub Bot commented on FLINK-8858:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202507424
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 ---
@@ -85,6 +86,9 @@ public CliClient(SessionContext context, Executor 
executor) {
terminal = TerminalBuilder.builder()
.name(CliStrings.CLI_NAME)
.build();
+   // make space from previous output and test the writer
+   terminal.writer().println();
--- End diff --

It makes the output on the terminal nicer. We don't know what has been 
printed before. This starts a terminal session. The output looks now like:

```
No default environment specified.
Searching for 
'/Users/twalthr/flink/flink/flink-dist/target/flink-1.6-SNAPSHOT-bin/flink-1.6-SNAPSHOT/conf/sql-client-defaults.yaml'...found.
Reading default environment from: 
file:/Users/twalthr/flink/flink/flink-dist/target/flink-1.6-SNAPSHOT-bin/flink-1.6-SNAPSHOT/conf/sql-client-defaults.yaml
No session environment specified.

[INFO] Executing the following statement:
INSERT INTO MyTableName SELECT * FROM MyTableName
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the 
cluster:
Cluster ID: StandaloneClusterId / Job ID: fab21f0632da36f9236c343c2850c71d
For the current job status visit: http://localhost:8081

Shutting down executor...done.
```


> Add support for INSERT INTO in SQL Client
> -
>
> Key: FLINK-8858
> URL: https://issues.apache.org/jira/browse/FLINK-8858
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Renjie Liu
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> The current design of SQL Client embedded mode doesn't support long running 
> queries. It would be useful for simple jobs that can be expressed in a single 
> sql statement if we can submit sql statements stored in files as long running 
> queries. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...

2018-07-13 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202507424
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 ---
@@ -85,6 +86,9 @@ public CliClient(SessionContext context, Executor 
executor) {
terminal = TerminalBuilder.builder()
.name(CliStrings.CLI_NAME)
.build();
+   // make space from previous output and test the writer
+   terminal.writer().println();
--- End diff --

It makes the output on the terminal nicer. We don't know what has been 
printed before. This starts a terminal session. The output looks now like:

```
No default environment specified.
Searching for 
'/Users/twalthr/flink/flink/flink-dist/target/flink-1.6-SNAPSHOT-bin/flink-1.6-SNAPSHOT/conf/sql-client-defaults.yaml'...found.
Reading default environment from: 
file:/Users/twalthr/flink/flink/flink-dist/target/flink-1.6-SNAPSHOT-bin/flink-1.6-SNAPSHOT/conf/sql-client-defaults.yaml
No session environment specified.

[INFO] Executing the following statement:
INSERT INTO MyTableName SELECT * FROM MyTableName
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the 
cluster:
Cluster ID: StandaloneClusterId / Job ID: fab21f0632da36f9236c343c2850c71d
For the current job status visit: http://localhost:8081

Shutting down executor...done.
```


---


[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544072#comment-16544072
 ] 

ASF GitHub Bot commented on FLINK-8558:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/6323
  
Thank you @pnowojski. I hope I could address all your comments. I will 
clean the commit history and improve the commit messages during merging.


> Add unified format interfaces and format discovery
> --
>
> Key: FLINK-8558
> URL: https://issues.apache.org/jira/browse/FLINK-8558
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> In the last release, we introduced a new module {{flink-formats}}. Currently 
> only {{flink-avro}} is located there but we will add more formats such as 
> {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of 
> concerns we want decouple connectors from formats: e.g., remove 
> {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to 
> discovery available formats in the classpath (similar to how file systems are 
> discovered now). A {{Format}} will provide a method for converting {{byte[]}} 
> to target record type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified table ...

2018-07-13 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/6323
  
Thank you @pnowojski. I hope I could address all your comments. I will 
clean the commit history and improve the commit messages during merging.


---


[jira] [Closed] (FLINK-9818) Add cluster component command line parser

2018-07-13 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-9818.

Resolution: Fixed

Fixed via ab9bd87e521d19db7c7d783268a3532d2e876a5d

> Add cluster component command line parser
> -
>
> Key: FLINK-9818
> URL: https://issues.apache.org/jira/browse/FLINK-9818
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> In order to parse command line options for the cluster components 
> ({{TaskManagerRunner}}, {{ClusterEntrypoints}}), we should add a 
> {{CommandLineParser}} which supports the common command line options 
> ({{--configDir}}, {{--webui-port}} and dynamic properties which can override 
> configuration values).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-7087) Implement Flip-6 container entry point

2018-07-13 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-7087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-7087:


Assignee: Till Rohrmann

> Implement Flip-6 container entry point
> --
>
> Key: FLINK-7087
> URL: https://issues.apache.org/jira/browse/FLINK-7087
> Project: Flink
>  Issue Type: Improvement
>  Components: Cluster Management
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.6.0
>
>
> In order to support Docker and K8 we have to provide a container entry point. 
> In a first version, the container entry point could be similar to the 
> standalone session mode just with the difference that we don't submit a job 
> to the cluster. The job has to be contained in the container image or being 
> at least retrievable by the entry point. In that sense the container entry 
> point acts like a per-job standalone mode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-7087) Implement Flip-6 container entry point

2018-07-13 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-7087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-7087.

Resolution: Fixed

Fixed via 8f467c1e9727d5a86d38d0b49753c534a1a161da

> Implement Flip-6 container entry point
> --
>
> Key: FLINK-7087
> URL: https://issues.apache.org/jira/browse/FLINK-7087
> Project: Flink
>  Issue Type: Improvement
>  Components: Cluster Management
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.6.0
>
>
> In order to support Docker and K8 we have to provide a container entry point. 
> In a first version, the container entry point could be similar to the 
> standalone session mode just with the difference that we don't submit a job 
> to the cluster. The job has to be contained in the container image or being 
> at least retrievable by the entry point. In that sense the container entry 
> point acts like a per-job standalone mode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9488) Create common entry point for master and workers

2018-07-13 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-9488.

Resolution: Fixed

Fixed via 8f467c1e9727d5a86d38d0b49753c534a1a161da

> Create common entry point for master and workers
> 
>
> Key: FLINK-9488
> URL: https://issues.apache.org/jira/browse/FLINK-9488
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> To make the container setup easier, we should provide a single cluster entry 
> point which uses leader election to become either the master or a worker 
> which runs the {{TaskManager}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9821) Let dynamic properties overwrite configuration settings in TaskManagerRunner

2018-07-13 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-9821.

Resolution: Fixed

Fixed via 740f2fbf2e65fa988c6a577989ccd8923729be45

> Let dynamic properties overwrite configuration settings in TaskManagerRunner
> 
>
> Key: FLINK-9821
> URL: https://issues.apache.org/jira/browse/FLINK-9821
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Similar to FLINK-9820 we should also allow dynamic properties to overwrite 
> configuration values in the {{TaskManagerRunner}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9820) Let dynamic properties overwrite configuration settings in ClusterEntrypoint

2018-07-13 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-9820.

Resolution: Fixed

Fixed via 2fbbf8ee662647c71581f5cd989226be820fed0f

> Let dynamic properties overwrite configuration settings in ClusterEntrypoint
> 
>
> Key: FLINK-9820
> URL: https://issues.apache.org/jira/browse/FLINK-9820
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The dynamic properties which are passed to the {{ClusterEntrypoint}} should 
> overwrite values in the loaded {{Configuration}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9819) Create start up scripts for the StandaloneJobClusterEntryPoint

2018-07-13 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-9819.

Resolution: Fixed

Fixed via 5a4bdf2c9fd1693ad3b90dbbd3bcb589ed15c101

> Create start up scripts for the StandaloneJobClusterEntryPoint
> --
>
> Key: FLINK-9819
> URL: https://issues.apache.org/jira/browse/FLINK-9819
> Project: Flink
>  Issue Type: New Feature
>  Components: Startup Shell Scripts
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> In order to start the {{StandaloneJobClusterEntryPoint}} we need start up 
> scripts in {{flink-dist}}. We should extend the {{flink-daemon.sh}} and the 
> {{flink-console.sh}} scripts to support this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9822) Add Dockerfile for StandaloneJobClusterEntryPoint image

2018-07-13 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-9822.

Resolution: Fixed

Fixed via 56e5381cb7aba01f1d7ecfa11e4be7f505a35baf

> Add Dockerfile for StandaloneJobClusterEntryPoint image
> ---
>
> Key: FLINK-9822
> URL: https://issues.apache.org/jira/browse/FLINK-9822
> Project: Flink
>  Issue Type: New Feature
>  Components: Docker
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Add a {{Dockerfile}} to create an image which contains the 
> {{StandaloneJobClusterEntryPoint}} and a specified user code jar. The 
> entrypoint of this image should start the {{StandaloneJobClusterEntryPoint}} 
> with the added user code jar. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9488) Create common entry point for master and workers

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544050#comment-16544050
 ] 

ASF GitHub Bot commented on FLINK-9488:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6315


> Create common entry point for master and workers
> 
>
> Key: FLINK-9488
> URL: https://issues.apache.org/jira/browse/FLINK-9488
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> To make the container setup easier, we should provide a single cluster entry 
> point which uses leader election to become either the master or a worker 
> which runs the {{TaskManager}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9619) Always close the task manager connection when the container is completed in YarnResourceManager

2018-07-13 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-9619.
--
   Resolution: Fixed
Fix Version/s: (was: 1.5.2)

Fixed via c9ad0a07ef0339ced74057fc17800ca9ab7784c1

> Always close the task manager connection when the container is completed in 
> YarnResourceManager
> ---
>
> Key: FLINK-9619
> URL: https://issues.apache.org/jira/browse/FLINK-9619
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 1.5.1, 1.6.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> We should always eagerly close the connection with task manager when the 
> container is completed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9619) Always close the task manager connection when the container is completed in YarnResourceManager

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544053#comment-16544053
 ] 

ASF GitHub Bot commented on FLINK-9619:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6185


> Always close the task manager connection when the container is completed in 
> YarnResourceManager
> ---
>
> Key: FLINK-9619
> URL: https://issues.apache.org/jira/browse/FLINK-9619
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 1.5.1, 1.6.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> We should always eagerly close the connection with task manager when the 
> container is completed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6317: [FLINK-9820] Forward dynamic properties to Flink c...

2018-07-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6317


---


[jira] [Commented] (FLINK-9821) Let dynamic properties overwrite configuration settings in TaskManagerRunner

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544052#comment-16544052
 ] 

ASF GitHub Bot commented on FLINK-9821:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6318


> Let dynamic properties overwrite configuration settings in TaskManagerRunner
> 
>
> Key: FLINK-9821
> URL: https://issues.apache.org/jira/browse/FLINK-9821
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Similar to FLINK-9820 we should also allow dynamic properties to overwrite 
> configuration values in the {{TaskManagerRunner}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6331: [FLINK-9701] [state] (follow up) Use StateTtlConfi...

2018-07-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6331


---


[GitHub] flink pull request #6318: [FLINK-9821] Forward dynamic properties to configu...

2018-07-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6318


---


[GitHub] flink pull request #6319: [FLINK-9822] Add Dockerfile for StandaloneJobClust...

2018-07-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6319


---


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544056#comment-16544056
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6283


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9823) Add Kubernetes deployment files for standalone job cluster

2018-07-13 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-9823.

Resolution: Fixed

Fixed via 387a3bc198cfea016abd92953c7fce28e641cf67

> Add Kubernetes deployment files for standalone job cluster
> --
>
> Key: FLINK-9823
> URL: https://issues.apache.org/jira/browse/FLINK-9823
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Similar to FLINK-9822, it would be helpful for the user to have example 
> Kubernetes deployment files to start a standalone job cluster.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6314: [FLINK-9818] Add cluster component command line pa...

2018-07-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6314


---


[jira] [Commented] (FLINK-9818) Add cluster component command line parser

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544048#comment-16544048
 ] 

ASF GitHub Bot commented on FLINK-9818:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6314


> Add cluster component command line parser
> -
>
> Key: FLINK-9818
> URL: https://issues.apache.org/jira/browse/FLINK-9818
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> In order to parse command line options for the cluster components 
> ({{TaskManagerRunner}}, {{ClusterEntrypoints}}), we should add a 
> {{CommandLineParser}} which supports the common command line options 
> ({{--configDir}}, {{--webui-port}} and dynamic properties which can override 
> configuration values).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

2018-07-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6283


---


[jira] [Commented] (FLINK-9820) Let dynamic properties overwrite configuration settings in ClusterEntrypoint

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544051#comment-16544051
 ] 

ASF GitHub Bot commented on FLINK-9820:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6317


> Let dynamic properties overwrite configuration settings in ClusterEntrypoint
> 
>
> Key: FLINK-9820
> URL: https://issues.apache.org/jira/browse/FLINK-9820
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The dynamic properties which are passed to the {{ClusterEntrypoint}} should 
> overwrite values in the loaded {{Configuration}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9823) Add Kubernetes deployment files for standalone job cluster

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544046#comment-16544046
 ] 

ASF GitHub Bot commented on FLINK-9823:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6320


> Add Kubernetes deployment files for standalone job cluster
> --
>
> Key: FLINK-9823
> URL: https://issues.apache.org/jira/browse/FLINK-9823
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Similar to FLINK-9822, it would be helpful for the user to have example 
> Kubernetes deployment files to start a standalone job cluster.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6185: [FLINK-9619][YARN] Eagerly close the connection wi...

2018-07-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6185


---


[GitHub] flink pull request #6315: [FLINK-9488] Add container entry point StandaloneJ...

2018-07-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6315


---


[jira] [Commented] (FLINK-9819) Create start up scripts for the StandaloneJobClusterEntryPoint

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544049#comment-16544049
 ] 

ASF GitHub Bot commented on FLINK-9819:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6316


> Create start up scripts for the StandaloneJobClusterEntryPoint
> --
>
> Key: FLINK-9819
> URL: https://issues.apache.org/jira/browse/FLINK-9819
> Project: Flink
>  Issue Type: New Feature
>  Components: Startup Shell Scripts
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> In order to start the {{StandaloneJobClusterEntryPoint}} we need start up 
> scripts in {{flink-dist}}. We should extend the {{flink-daemon.sh}} and the 
> {{flink-console.sh}} scripts to support this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544054#comment-16544054
 ] 

ASF GitHub Bot commented on FLINK-9701:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6331


> Activate TTL in state descriptors
> -
>
> Key: FLINK-9701
> URL: https://issues.apache.org/jira/browse/FLINK-9701
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6316: [FLINK-9819] Add startup scripts for standalone jo...

2018-07-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6316


---


[jira] [Commented] (FLINK-9822) Add Dockerfile for StandaloneJobClusterEntryPoint image

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544055#comment-16544055
 ] 

ASF GitHub Bot commented on FLINK-9822:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6319


> Add Dockerfile for StandaloneJobClusterEntryPoint image
> ---
>
> Key: FLINK-9822
> URL: https://issues.apache.org/jira/browse/FLINK-9822
> Project: Flink
>  Issue Type: New Feature
>  Components: Docker
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Add a {{Dockerfile}} to create an image which contains the 
> {{StandaloneJobClusterEntryPoint}} and a specified user code jar. The 
> entrypoint of this image should start the {{StandaloneJobClusterEntryPoint}} 
> with the added user code jar. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9619) Always close the task manager connection when the container is completed in YarnResourceManager

2018-07-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9619:
--
Labels: pull-request-available  (was: )

> Always close the task manager connection when the container is completed in 
> YarnResourceManager
> ---
>
> Key: FLINK-9619
> URL: https://issues.apache.org/jira/browse/FLINK-9619
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 1.5.1, 1.6.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> We should always eagerly close the connection with task manager when the 
> container is completed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6320: [FLINK-9823] Add Kubernetes deployment ymls

2018-07-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6320


---


[jira] [Resolved] (FLINK-9701) Activate TTL in state descriptors

2018-07-13 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-9701.
--
   Resolution: Fixed
Fix Version/s: 1.6.0

Fixed via 1632681e41cbc1092a6b4d47a58adfffba6af5d4

> Activate TTL in state descriptors
> -
>
> Key: FLINK-9701
> URL: https://issues.apache.org/jira/browse/FLINK-9701
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-07-13 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-9143.
--
   Resolution: Fixed
Fix Version/s: 1.6.0

Fixed via 57872d53c4584faace6dc8e4038ad1f2d068a453

> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-07-13 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-9143:


Assignee: Dawid Wysakowicz  (was: yuqi)

> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-9701) Activate TTL in state descriptors

2018-07-13 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reopened FLINK-9701:
--

> Activate TTL in state descriptors
> -
>
> Key: FLINK-9701
> URL: https://issues.apache.org/jira/browse/FLINK-9701
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544040#comment-16544040
 ] 

ASF GitHub Bot commented on FLINK-8558:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202507057
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
 ---
@@ -52,12 +52,9 @@ class FlinkLogicalTableSourceScan(
   override def deriveRowType(): RelDataType = {
 val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
 val streamingTable = table.unwrap(classOf[TableSourceSinkTable[_, _]]) 
match {
-  case t: TableSourceSinkTable[_, _] => t.tableSourceTableOpt match {
-case Some(_: StreamTableSourceTable[_]) => true
-case Some(_: BatchTableSourceTable[_]) => false
-case _ => throw TableException(s"Unknown Table type 
${t.getClass}.")
-  }
-  case t => throw TableException(s"Unknown Table type ${t.getClass}.")
+  case t: TableSourceSinkTable[_, _] if t.isStreamSourceTable => true
+  // null
--- End diff --

This information is useful.


> Add unified format interfaces and format discovery
> --
>
> Key: FLINK-8558
> URL: https://issues.apache.org/jira/browse/FLINK-8558
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> In the last release, we introduced a new module {{flink-formats}}. Currently 
> only {{flink-avro}} is located there but we will add more formats such as 
> {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of 
> concerns we want decouple connectors from formats: e.g., remove 
> {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to 
> discovery available formats in the classpath (similar to how file systems are 
> discovered now). A {{Format}} will provide a method for converting {{byte[]}} 
> to target record type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9762) CoreOptions.TMP_DIRS wrongly managed on Yarn

2018-07-13 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-9762:
---

Assignee: (was: vinoyang)

> CoreOptions.TMP_DIRS wrongly managed on Yarn
> 
>
> Key: FLINK-9762
> URL: https://issues.apache.org/jira/browse/FLINK-9762
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.5.0
>Reporter: Oleksandr Nitavskyi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2, 1.6.0
>
>
> The issue on Yarn is that it is impossible to have different LOCAL_DIRS on 
> JobManager and TaskManager, despite LOCAL_DIRS value depends on the container.
> The issue is that CoreOptions.TMP_DIRS is configured to the default value 
> during JobManager initialization and added to the configuration object. When 
> TaskManager is launched the appropriate configuration object is cloned with 
> LOCAL_DIRS which makes sense only for Job Manager container. When YARN 
> container with TaskManager from his point of view CoreOptions.TMP_DIRS is 
> always equal either to path in flink.yml or to the or to the LOCAL_DIRS of 
> Job Manager (default behaviour). Is TaskManager’s container do not have an 
> access to another folders, that folders allocated by YARN TaskManager cannot 
> be started.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

2018-07-13 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202507057
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
 ---
@@ -52,12 +52,9 @@ class FlinkLogicalTableSourceScan(
   override def deriveRowType(): RelDataType = {
 val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
 val streamingTable = table.unwrap(classOf[TableSourceSinkTable[_, _]]) 
match {
-  case t: TableSourceSinkTable[_, _] => t.tableSourceTableOpt match {
-case Some(_: StreamTableSourceTable[_]) => true
-case Some(_: BatchTableSourceTable[_]) => false
-case _ => throw TableException(s"Unknown Table type 
${t.getClass}.")
-  }
-  case t => throw TableException(s"Unknown Table type ${t.getClass}.")
+  case t: TableSourceSinkTable[_, _] if t.isStreamSourceTable => true
+  // null
--- End diff --

This information is useful.


---


[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

2018-07-13 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506906
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
 ---
@@ -18,143 +18,358 @@
 
 package org.apache.flink.table.factories
 
-import java.util.{ServiceConfigurationError, ServiceLoader}
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
 
 import org.apache.flink.table.api._
 import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
 import org.apache.flink.table.descriptors.FormatDescriptorValidator._
 import org.apache.flink.table.descriptors.MetadataValidator._
 import org.apache.flink.table.descriptors.StatisticsValidator._
-import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.descriptors._
 import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
 
 import _root_.scala.collection.JavaConverters._
 import _root_.scala.collection.mutable
 
 /**
-  * Unified interface to search for TableFactoryDiscoverable of provided 
type and properties.
+  * Unified interface to search for a [[TableFactory]] of provided type 
and properties.
   */
 object TableFactoryService extends Logging {
 
   private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableFactory])
 
-  def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
-find(clz, descriptor, null)
+  /**
+* Finds a table factory of the given class and descriptor.
+*
+* @param factoryClass desired factory class
+* @param descriptor descriptor describing the factory configuration
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(descriptor)
+
+val descriptorProperties = new DescriptorProperties()
+descriptor.addProperties(descriptorProperties)
+findInternal(factoryClass, descriptorProperties.asMap, None)
   }
 
-  def find(clz: Class[_], descriptor: TableDescriptor, classLoader: 
ClassLoader)
-  : TableFactory = {
+  /**
+* Finds a table factory of the given class, descriptor, and 
classloader.
+*
+* @param factoryClass desired factory class
+* @param descriptor descriptor describing the factory configuration
+* @param classLoader classloader for service loading
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: 
ClassLoader): T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(descriptor)
+Preconditions.checkNotNull(classLoader)
 
-val properties = new DescriptorProperties()
-descriptor.addProperties(properties)
-find(clz, properties.asMap.asScala.toMap, classLoader)
+val descriptorProperties = new DescriptorProperties()
+descriptor.addProperties(descriptorProperties)
+findInternal(factoryClass, descriptorProperties.asMap, None)
   }
 
-  def find(clz: Class[_], properties: Map[String, String]): TableFactory = 
{
-find(clz: Class[_], properties, null)
+  /**
+* Finds a table factory of the given class and property map.
+*
+* @param factoryClass desired factory class
+* @param propertyMap properties that describe the factory configuration
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): 
T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(propertyMap)
+
+findInternal(factoryClass, propertyMap, None)
   }
 
-  def find(clz: Class[_], properties: Map[String, String],
-   classLoader: ClassLoader): TableFactory = {
+  /**
+* Finds a table factory of the given class, property map, and 
classloader.
+*
+* @param factoryClass desired factory class
+* @param propertyMap properties that describe the factory configuration
+* @param classLoader classloader for service loading
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](
+  factoryClass: Class[T],
+  propertyMap: JMap[String, String],
+  classLoader: ClassLoader)
+: T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(propertyMap)
+Preconditions.checkNotNull(class

[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544038#comment-16544038
 ] 

ASF GitHub Bot commented on FLINK-8558:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506906
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
 ---
@@ -18,143 +18,358 @@
 
 package org.apache.flink.table.factories
 
-import java.util.{ServiceConfigurationError, ServiceLoader}
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
 
 import org.apache.flink.table.api._
 import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
 import org.apache.flink.table.descriptors.FormatDescriptorValidator._
 import org.apache.flink.table.descriptors.MetadataValidator._
 import org.apache.flink.table.descriptors.StatisticsValidator._
-import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.descriptors._
 import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
 
 import _root_.scala.collection.JavaConverters._
 import _root_.scala.collection.mutable
 
 /**
-  * Unified interface to search for TableFactoryDiscoverable of provided 
type and properties.
+  * Unified interface to search for a [[TableFactory]] of provided type 
and properties.
   */
 object TableFactoryService extends Logging {
 
   private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableFactory])
 
-  def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
-find(clz, descriptor, null)
+  /**
+* Finds a table factory of the given class and descriptor.
+*
+* @param factoryClass desired factory class
+* @param descriptor descriptor describing the factory configuration
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(descriptor)
+
+val descriptorProperties = new DescriptorProperties()
+descriptor.addProperties(descriptorProperties)
+findInternal(factoryClass, descriptorProperties.asMap, None)
   }
 
-  def find(clz: Class[_], descriptor: TableDescriptor, classLoader: 
ClassLoader)
-  : TableFactory = {
+  /**
+* Finds a table factory of the given class, descriptor, and 
classloader.
+*
+* @param factoryClass desired factory class
+* @param descriptor descriptor describing the factory configuration
+* @param classLoader classloader for service loading
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: 
ClassLoader): T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(descriptor)
+Preconditions.checkNotNull(classLoader)
 
-val properties = new DescriptorProperties()
-descriptor.addProperties(properties)
-find(clz, properties.asMap.asScala.toMap, classLoader)
+val descriptorProperties = new DescriptorProperties()
+descriptor.addProperties(descriptorProperties)
+findInternal(factoryClass, descriptorProperties.asMap, None)
   }
 
-  def find(clz: Class[_], properties: Map[String, String]): TableFactory = 
{
-find(clz: Class[_], properties, null)
+  /**
+* Finds a table factory of the given class and property map.
+*
+* @param factoryClass desired factory class
+* @param propertyMap properties that describe the factory configuration
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): 
T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(propertyMap)
+
+findInternal(factoryClass, propertyMap, None)
   }
 
-  def find(clz: Class[_], properties: Map[String, String],
-   classLoader: ClassLoader): TableFactory = {
+  /**
+* Finds a table factory of the given class, property map, and 
classloader.
+*
+* @param factoryClass desired factory class
+* @param propertyMap properties that describe the factory configuration
+* @param classLoader classloader for service loading
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](
+  factory

[jira] [Commented] (FLINK-9816) Support Netty SslEngine based on openSSL

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544037#comment-16544037
 ] 

ASF GitHub Bot commented on FLINK-9816:
---

Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202506811
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -163,80 +163,157 @@ public static void 
setSSLVerifyHostname(Configuration sslConfig, SSLParameters s
}
 
/**
-* Creates the SSL Context for the client if SSL is configured.
+* SSL engine provider.
+*/
+   public enum SSLProvider {
+   JDK,
+   /**
+* OpenSSL with fallback to JDK if not available.
+*/
+   OPENSSL;
+
+   public static SSLProvider fromString(String value) {
+   Preconditions.checkNotNull(value);
+   if (value.equalsIgnoreCase("OPENSSL")) {
+   return OPENSSL;
+   } else if (value.equalsIgnoreCase("JDK")) {
+   return JDK;
+   } else {
+   throw new IllegalArgumentException("Unknown SSL 
provider: " + value);
+   }
+   }
+   }
+
+   /**
+* Instances needed to set up an SSL client connection.
+*/
+   public static class SSLClientTools {
--- End diff --

the class name use singular looks better to me


> Support Netty SslEngine based on openSSL
> 
>
> Key: FLINK-9816
> URL: https://issues.apache.org/jira/browse/FLINK-9816
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> Since a while now, Netty does not only support the JDK's {{SSLEngine}} but 
> also implements one based on openSSL which, according to 
> https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly 
> faster. We should add support for using that engine instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9816) Support Netty SslEngine based on openSSL

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544033#comment-16544033
 ] 

ASF GitHub Bot commented on FLINK-9816:
---

Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202506694
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -249,14 +326,40 @@ public static SSLContext 
createSSLServerContext(Configuration sslConfig) throws
 
// Set up key manager factory to use the server key 
store
KeyManagerFactory kmf = KeyManagerFactory.getInstance(
-   
KeyManagerFactory.getDefaultAlgorithm());
+   KeyManagerFactory.getDefaultAlgorithm());
kmf.init(ks, certPassword.toCharArray());
 
+   return new SSLServerTools(sslProvider, 
sslProtocolVersion, sslCipherSuites, kmf);
+   }
+
+   return null;
+   }
+
+   /**
+* Creates the SSL Context for the server if SSL is configured.
+*
+* @param sslConfig
+*The application configuration
--- End diff --

the description of the param and  throws do not need linefeed


> Support Netty SslEngine based on openSSL
> 
>
> Key: FLINK-9816
> URL: https://issues.apache.org/jira/browse/FLINK-9816
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> Since a while now, Netty does not only support the JDK's {{SSLEngine}} but 
> also implements one based on openSSL which, according to 
> https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly 
> faster. We should add support for using that engine instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9816) Support Netty SslEngine based on openSSL

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544031#comment-16544031
 ] 

ASF GitHub Bot commented on FLINK-9816:
---

Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202505334
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -249,14 +326,40 @@ public static SSLContext 
createSSLServerContext(Configuration sslConfig) throws
 
// Set up key manager factory to use the server key 
store
KeyManagerFactory kmf = KeyManagerFactory.getInstance(
-   
KeyManagerFactory.getDefaultAlgorithm());
+   KeyManagerFactory.getDefaultAlgorithm());
kmf.init(ks, certPassword.toCharArray());
 
+   return new SSLServerTools(sslProvider, 
sslProtocolVersion, sslCipherSuites, kmf);
+   }
+
+   return null;
+   }
+
+   /**
+* Creates the SSL Context for the server if SSL is configured.
+*
+* @param sslConfig
+*The application configuration
+* @return The SSLContext object which can be used by the ssl transport 
server
+* Returns null if SSL is disabled
+* @throws Exception
+* Thrown if there is any misconfiguration
+*/
+   @Nullable
+   public static SSLContext createSSLServerContext(Configuration 
sslConfig) throws Exception {
+
--- End diff --

this empty line is useless, can be removed


> Support Netty SslEngine based on openSSL
> 
>
> Key: FLINK-9816
> URL: https://issues.apache.org/jira/browse/FLINK-9816
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> Since a while now, Netty does not only support the JDK's {{SSLEngine}} but 
> also implements one based on openSSL which, according to 
> https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly 
> faster. We should add support for using that engine instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9816) Support Netty SslEngine based on openSSL

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544030#comment-16544030
 ] 

ASF GitHub Bot commented on FLINK-9816:
---

Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202505321
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
 ---
@@ -160,6 +160,7 @@ private Configuration createSslConfig() throws 
Exception {
flinkConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, 
"password");
flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE, 
"src/test/resources/local127.truststore");
flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, 
"password");
+// flinkConfig.setString(SecurityOptions.SSL_PROVIDER, "OPENSSL");
--- End diff --

if this is a useless dead code, can be removed


> Support Netty SslEngine based on openSSL
> 
>
> Key: FLINK-9816
> URL: https://issues.apache.org/jira/browse/FLINK-9816
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> Since a while now, Netty does not only support the JDK's {{SSLEngine}} but 
> also implements one based on openSSL which, according to 
> https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly 
> faster. We should add support for using that engine instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9816) Support Netty SslEngine based on openSSL

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544036#comment-16544036
 ] 

ASF GitHub Bot commented on FLINK-9816:
---

Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202506868
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -163,80 +163,157 @@ public static void 
setSSLVerifyHostname(Configuration sslConfig, SSLParameters s
}
 
/**
-* Creates the SSL Context for the client if SSL is configured.
+* SSL engine provider.
+*/
+   public enum SSLProvider {
+   JDK,
+   /**
+* OpenSSL with fallback to JDK if not available.
+*/
+   OPENSSL;
+
+   public static SSLProvider fromString(String value) {
+   Preconditions.checkNotNull(value);
+   if (value.equalsIgnoreCase("OPENSSL")) {
--- End diff --

provide a constructor like `SSLProvider(String provider)` to give the 
enum's string representation looks better than hard code.


> Support Netty SslEngine based on openSSL
> 
>
> Key: FLINK-9816
> URL: https://issues.apache.org/jira/browse/FLINK-9816
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> Since a while now, Netty does not only support the JDK's {{SSLEngine}} but 
> also implements one based on openSSL which, according to 
> https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly 
> faster. We should add support for using that engine instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9816) Support Netty SslEngine based on openSSL

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544034#comment-16544034
 ] 

ASF GitHub Bot commented on FLINK-9816:
---

Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202506769
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -163,80 +163,157 @@ public static void 
setSSLVerifyHostname(Configuration sslConfig, SSLParameters s
}
 
/**
-* Creates the SSL Context for the client if SSL is configured.
+* SSL engine provider.
+*/
+   public enum SSLProvider {
+   JDK,
+   /**
+* OpenSSL with fallback to JDK if not available.
+*/
+   OPENSSL;
+
+   public static SSLProvider fromString(String value) {
+   Preconditions.checkNotNull(value);
+   if (value.equalsIgnoreCase("OPENSSL")) {
+   return OPENSSL;
+   } else if (value.equalsIgnoreCase("JDK")) {
+   return JDK;
+   } else {
+   throw new IllegalArgumentException("Unknown SSL 
provider: " + value);
+   }
+   }
+   }
+
+   /**
+* Instances needed to set up an SSL client connection.
+*/
+   public static class SSLClientTools {
+   public final SSLProvider preferredSslProvider;
+   public final String sslProtocolVersion;
+   public final TrustManagerFactory trustManagerFactory;
+
+   public SSLClientTools(
+   SSLProvider preferredSslProvider,
+   String sslProtocolVersion,
+   TrustManagerFactory trustManagerFactory) {
+   this.preferredSslProvider = preferredSslProvider;
+   this.sslProtocolVersion = sslProtocolVersion;
+   this.trustManagerFactory = trustManagerFactory;
+   }
+   }
+
+   /**
+* Creates necessary helper objects to use for creating an SSL Context 
for the client if SSL is
+* configured.
 *
 * @param sslConfig
 *The application configuration
-* @return The SSLContext object which can be used by the ssl transport 
client
-* Returns null if SSL is disabled
+* @return The SSLClientTools object which can be used for creating 
some SSL context object;
+* returns null if SSL is disabled.
 * @throws Exception
 * Thrown if there is any misconfiguration
 */
@Nullable
-   public static SSLContext createSSLClientContext(Configuration 
sslConfig) throws Exception {
-
+   public static SSLClientTools createSSLClientTools(Configuration 
sslConfig) throws Exception {
Preconditions.checkNotNull(sslConfig);
-   SSLContext clientSSLContext = null;
 
if (getSSLEnabled(sslConfig)) {
LOG.debug("Creating client SSL context from 
configuration");
 
String trustStoreFilePath = 
sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE);
String trustStorePassword = 
sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
String sslProtocolVersion = 
sslConfig.getString(SecurityOptions.SSL_PROTOCOL);
+   SSLProvider sslProvider = 
SSLProvider.fromString(sslConfig.getString(SecurityOptions.SSL_PROVIDER));
 
Preconditions.checkNotNull(trustStoreFilePath, 
SecurityOptions.SSL_TRUSTSTORE.key() + " was not configured.");
Preconditions.checkNotNull(trustStorePassword, 
SecurityOptions.SSL_TRUSTSTORE_PASSWORD.key() + " was not configured.");
 
KeyStore trustStore = 
KeyStore.getInstance(KeyStore.getDefaultType());
 
-   FileInputStream trustStoreFile = null;
-   try {
-   trustStoreFile = new FileInputStream(new 
File(trustStoreFilePath));
+   try (FileInputStream trustStoreFile = new 
FileInputStream(new File(trustStoreFilePath))) {
trustStore.load(trustStoreFile, 
trustStorePassword.toCharArray());
-   } finally {
-   if (trustStoreFile != null) {
-   trustStoreFile.close();
-   }
}
 
TrustManagerFactory trustManagerFactory = 
TrustManagerFactory.getInstance(
   

[jira] [Commented] (FLINK-9816) Support Netty SslEngine based on openSSL

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544032#comment-16544032
 ] 

ASF GitHub Bot commented on FLINK-9816:
---

Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202506709
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -163,80 +163,157 @@ public static void 
setSSLVerifyHostname(Configuration sslConfig, SSLParameters s
}
 
/**
-* Creates the SSL Context for the client if SSL is configured.
+* SSL engine provider.
+*/
+   public enum SSLProvider {
+   JDK,
+   /**
+* OpenSSL with fallback to JDK if not available.
+*/
+   OPENSSL;
+
+   public static SSLProvider fromString(String value) {
+   Preconditions.checkNotNull(value);
+   if (value.equalsIgnoreCase("OPENSSL")) {
+   return OPENSSL;
+   } else if (value.equalsIgnoreCase("JDK")) {
+   return JDK;
+   } else {
+   throw new IllegalArgumentException("Unknown SSL 
provider: " + value);
+   }
+   }
+   }
+
+   /**
+* Instances needed to set up an SSL client connection.
+*/
+   public static class SSLClientTools {
+   public final SSLProvider preferredSslProvider;
+   public final String sslProtocolVersion;
+   public final TrustManagerFactory trustManagerFactory;
+
+   public SSLClientTools(
+   SSLProvider preferredSslProvider,
+   String sslProtocolVersion,
+   TrustManagerFactory trustManagerFactory) {
+   this.preferredSslProvider = preferredSslProvider;
+   this.sslProtocolVersion = sslProtocolVersion;
+   this.trustManagerFactory = trustManagerFactory;
+   }
+   }
+
+   /**
+* Creates necessary helper objects to use for creating an SSL Context 
for the client if SSL is
+* configured.
 *
 * @param sslConfig
 *The application configuration
-* @return The SSLContext object which can be used by the ssl transport 
client
-* Returns null if SSL is disabled
+* @return The SSLClientTools object which can be used for creating 
some SSL context object;
+* returns null if SSL is disabled.
 * @throws Exception
 * Thrown if there is any misconfiguration
 */
@Nullable
-   public static SSLContext createSSLClientContext(Configuration 
sslConfig) throws Exception {
-
+   public static SSLClientTools createSSLClientTools(Configuration 
sslConfig) throws Exception {
Preconditions.checkNotNull(sslConfig);
-   SSLContext clientSSLContext = null;
 
if (getSSLEnabled(sslConfig)) {
LOG.debug("Creating client SSL context from 
configuration");
 
String trustStoreFilePath = 
sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE);
String trustStorePassword = 
sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
String sslProtocolVersion = 
sslConfig.getString(SecurityOptions.SSL_PROTOCOL);
+   SSLProvider sslProvider = 
SSLProvider.fromString(sslConfig.getString(SecurityOptions.SSL_PROVIDER));
 
Preconditions.checkNotNull(trustStoreFilePath, 
SecurityOptions.SSL_TRUSTSTORE.key() + " was not configured.");
Preconditions.checkNotNull(trustStorePassword, 
SecurityOptions.SSL_TRUSTSTORE_PASSWORD.key() + " was not configured.");
 
KeyStore trustStore = 
KeyStore.getInstance(KeyStore.getDefaultType());
 
-   FileInputStream trustStoreFile = null;
-   try {
-   trustStoreFile = new FileInputStream(new 
File(trustStoreFilePath));
+   try (FileInputStream trustStoreFile = new 
FileInputStream(new File(trustStoreFilePath))) {
trustStore.load(trustStoreFile, 
trustStorePassword.toCharArray());
-   } finally {
-   if (trustStoreFile != null) {
-   trustStoreFile.close();
-   }
}
 
TrustManagerFactory trustManagerFactory = 
TrustManagerFactory.getInstance(
   

[jira] [Commented] (FLINK-9816) Support Netty SslEngine based on openSSL

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544035#comment-16544035
 ] 

ASF GitHub Bot commented on FLINK-9816:
---

Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202506820
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -163,80 +163,157 @@ public static void 
setSSLVerifyHostname(Configuration sslConfig, SSLParameters s
}
 
/**
-* Creates the SSL Context for the client if SSL is configured.
+* SSL engine provider.
+*/
+   public enum SSLProvider {
+   JDK,
+   /**
+* OpenSSL with fallback to JDK if not available.
+*/
+   OPENSSL;
+
+   public static SSLProvider fromString(String value) {
+   Preconditions.checkNotNull(value);
+   if (value.equalsIgnoreCase("OPENSSL")) {
+   return OPENSSL;
+   } else if (value.equalsIgnoreCase("JDK")) {
+   return JDK;
+   } else {
+   throw new IllegalArgumentException("Unknown SSL 
provider: " + value);
+   }
+   }
+   }
+
+   /**
+* Instances needed to set up an SSL client connection.
+*/
+   public static class SSLClientTools {
+   public final SSLProvider preferredSslProvider;
+   public final String sslProtocolVersion;
+   public final TrustManagerFactory trustManagerFactory;
--- End diff --

mark these fields as `private` as provide `getter/setter` looks better to me


> Support Netty SslEngine based on openSSL
> 
>
> Key: FLINK-9816
> URL: https://issues.apache.org/jira/browse/FLINK-9816
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> Since a while now, Netty does not only support the JDK's {{SSLEngine}} but 
> also implements one based on openSSL which, according to 
> https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly 
> faster. We should add support for using that engine instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...

2018-07-13 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202506769
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -163,80 +163,157 @@ public static void 
setSSLVerifyHostname(Configuration sslConfig, SSLParameters s
}
 
/**
-* Creates the SSL Context for the client if SSL is configured.
+* SSL engine provider.
+*/
+   public enum SSLProvider {
+   JDK,
+   /**
+* OpenSSL with fallback to JDK if not available.
+*/
+   OPENSSL;
+
+   public static SSLProvider fromString(String value) {
+   Preconditions.checkNotNull(value);
+   if (value.equalsIgnoreCase("OPENSSL")) {
+   return OPENSSL;
+   } else if (value.equalsIgnoreCase("JDK")) {
+   return JDK;
+   } else {
+   throw new IllegalArgumentException("Unknown SSL 
provider: " + value);
+   }
+   }
+   }
+
+   /**
+* Instances needed to set up an SSL client connection.
+*/
+   public static class SSLClientTools {
+   public final SSLProvider preferredSslProvider;
+   public final String sslProtocolVersion;
+   public final TrustManagerFactory trustManagerFactory;
+
+   public SSLClientTools(
+   SSLProvider preferredSslProvider,
+   String sslProtocolVersion,
+   TrustManagerFactory trustManagerFactory) {
+   this.preferredSslProvider = preferredSslProvider;
+   this.sslProtocolVersion = sslProtocolVersion;
+   this.trustManagerFactory = trustManagerFactory;
+   }
+   }
+
+   /**
+* Creates necessary helper objects to use for creating an SSL Context 
for the client if SSL is
+* configured.
 *
 * @param sslConfig
 *The application configuration
-* @return The SSLContext object which can be used by the ssl transport 
client
-* Returns null if SSL is disabled
+* @return The SSLClientTools object which can be used for creating 
some SSL context object;
+* returns null if SSL is disabled.
 * @throws Exception
 * Thrown if there is any misconfiguration
 */
@Nullable
-   public static SSLContext createSSLClientContext(Configuration 
sslConfig) throws Exception {
-
+   public static SSLClientTools createSSLClientTools(Configuration 
sslConfig) throws Exception {
Preconditions.checkNotNull(sslConfig);
-   SSLContext clientSSLContext = null;
 
if (getSSLEnabled(sslConfig)) {
LOG.debug("Creating client SSL context from 
configuration");
 
String trustStoreFilePath = 
sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE);
String trustStorePassword = 
sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
String sslProtocolVersion = 
sslConfig.getString(SecurityOptions.SSL_PROTOCOL);
+   SSLProvider sslProvider = 
SSLProvider.fromString(sslConfig.getString(SecurityOptions.SSL_PROVIDER));
 
Preconditions.checkNotNull(trustStoreFilePath, 
SecurityOptions.SSL_TRUSTSTORE.key() + " was not configured.");
Preconditions.checkNotNull(trustStorePassword, 
SecurityOptions.SSL_TRUSTSTORE_PASSWORD.key() + " was not configured.");
 
KeyStore trustStore = 
KeyStore.getInstance(KeyStore.getDefaultType());
 
-   FileInputStream trustStoreFile = null;
-   try {
-   trustStoreFile = new FileInputStream(new 
File(trustStoreFilePath));
+   try (FileInputStream trustStoreFile = new 
FileInputStream(new File(trustStoreFilePath))) {
trustStore.load(trustStoreFile, 
trustStorePassword.toCharArray());
-   } finally {
-   if (trustStoreFile != null) {
-   trustStoreFile.close();
-   }
}
 
TrustManagerFactory trustManagerFactory = 
TrustManagerFactory.getInstance(
TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init(trustStore);
 
-   clientSSLContext = 
SSLContext.getInstance(sslProtocolVersion);
-   clientSSLContext

[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...

2018-07-13 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202506820
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -163,80 +163,157 @@ public static void 
setSSLVerifyHostname(Configuration sslConfig, SSLParameters s
}
 
/**
-* Creates the SSL Context for the client if SSL is configured.
+* SSL engine provider.
+*/
+   public enum SSLProvider {
+   JDK,
+   /**
+* OpenSSL with fallback to JDK if not available.
+*/
+   OPENSSL;
+
+   public static SSLProvider fromString(String value) {
+   Preconditions.checkNotNull(value);
+   if (value.equalsIgnoreCase("OPENSSL")) {
+   return OPENSSL;
+   } else if (value.equalsIgnoreCase("JDK")) {
+   return JDK;
+   } else {
+   throw new IllegalArgumentException("Unknown SSL 
provider: " + value);
+   }
+   }
+   }
+
+   /**
+* Instances needed to set up an SSL client connection.
+*/
+   public static class SSLClientTools {
+   public final SSLProvider preferredSslProvider;
+   public final String sslProtocolVersion;
+   public final TrustManagerFactory trustManagerFactory;
--- End diff --

mark these fields as `private` as provide `getter/setter` looks better to me


---


[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...

2018-07-13 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202505334
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -249,14 +326,40 @@ public static SSLContext 
createSSLServerContext(Configuration sslConfig) throws
 
// Set up key manager factory to use the server key 
store
KeyManagerFactory kmf = KeyManagerFactory.getInstance(
-   
KeyManagerFactory.getDefaultAlgorithm());
+   KeyManagerFactory.getDefaultAlgorithm());
kmf.init(ks, certPassword.toCharArray());
 
+   return new SSLServerTools(sslProvider, 
sslProtocolVersion, sslCipherSuites, kmf);
+   }
+
+   return null;
+   }
+
+   /**
+* Creates the SSL Context for the server if SSL is configured.
+*
+* @param sslConfig
+*The application configuration
+* @return The SSLContext object which can be used by the ssl transport 
server
+* Returns null if SSL is disabled
+* @throws Exception
+* Thrown if there is any misconfiguration
+*/
+   @Nullable
+   public static SSLContext createSSLServerContext(Configuration 
sslConfig) throws Exception {
+
--- End diff --

this empty line is useless, can be removed


---


[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...

2018-07-13 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202506868
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -163,80 +163,157 @@ public static void 
setSSLVerifyHostname(Configuration sslConfig, SSLParameters s
}
 
/**
-* Creates the SSL Context for the client if SSL is configured.
+* SSL engine provider.
+*/
+   public enum SSLProvider {
+   JDK,
+   /**
+* OpenSSL with fallback to JDK if not available.
+*/
+   OPENSSL;
+
+   public static SSLProvider fromString(String value) {
+   Preconditions.checkNotNull(value);
+   if (value.equalsIgnoreCase("OPENSSL")) {
--- End diff --

provide a constructor like `SSLProvider(String provider)` to give the 
enum's string representation looks better than hard code.


---


[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...

2018-07-13 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202506811
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -163,80 +163,157 @@ public static void 
setSSLVerifyHostname(Configuration sslConfig, SSLParameters s
}
 
/**
-* Creates the SSL Context for the client if SSL is configured.
+* SSL engine provider.
+*/
+   public enum SSLProvider {
+   JDK,
+   /**
+* OpenSSL with fallback to JDK if not available.
+*/
+   OPENSSL;
+
+   public static SSLProvider fromString(String value) {
+   Preconditions.checkNotNull(value);
+   if (value.equalsIgnoreCase("OPENSSL")) {
+   return OPENSSL;
+   } else if (value.equalsIgnoreCase("JDK")) {
+   return JDK;
+   } else {
+   throw new IllegalArgumentException("Unknown SSL 
provider: " + value);
+   }
+   }
+   }
+
+   /**
+* Instances needed to set up an SSL client connection.
+*/
+   public static class SSLClientTools {
--- End diff --

the class name use singular looks better to me


---


[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...

2018-07-13 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202506709
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -163,80 +163,157 @@ public static void 
setSSLVerifyHostname(Configuration sslConfig, SSLParameters s
}
 
/**
-* Creates the SSL Context for the client if SSL is configured.
+* SSL engine provider.
+*/
+   public enum SSLProvider {
+   JDK,
+   /**
+* OpenSSL with fallback to JDK if not available.
+*/
+   OPENSSL;
+
+   public static SSLProvider fromString(String value) {
+   Preconditions.checkNotNull(value);
+   if (value.equalsIgnoreCase("OPENSSL")) {
+   return OPENSSL;
+   } else if (value.equalsIgnoreCase("JDK")) {
+   return JDK;
+   } else {
+   throw new IllegalArgumentException("Unknown SSL 
provider: " + value);
+   }
+   }
+   }
+
+   /**
+* Instances needed to set up an SSL client connection.
+*/
+   public static class SSLClientTools {
+   public final SSLProvider preferredSslProvider;
+   public final String sslProtocolVersion;
+   public final TrustManagerFactory trustManagerFactory;
+
+   public SSLClientTools(
+   SSLProvider preferredSslProvider,
+   String sslProtocolVersion,
+   TrustManagerFactory trustManagerFactory) {
+   this.preferredSslProvider = preferredSslProvider;
+   this.sslProtocolVersion = sslProtocolVersion;
+   this.trustManagerFactory = trustManagerFactory;
+   }
+   }
+
+   /**
+* Creates necessary helper objects to use for creating an SSL Context 
for the client if SSL is
+* configured.
 *
 * @param sslConfig
 *The application configuration
-* @return The SSLContext object which can be used by the ssl transport 
client
-* Returns null if SSL is disabled
+* @return The SSLClientTools object which can be used for creating 
some SSL context object;
+* returns null if SSL is disabled.
 * @throws Exception
 * Thrown if there is any misconfiguration
 */
@Nullable
-   public static SSLContext createSSLClientContext(Configuration 
sslConfig) throws Exception {
-
+   public static SSLClientTools createSSLClientTools(Configuration 
sslConfig) throws Exception {
Preconditions.checkNotNull(sslConfig);
-   SSLContext clientSSLContext = null;
 
if (getSSLEnabled(sslConfig)) {
LOG.debug("Creating client SSL context from 
configuration");
 
String trustStoreFilePath = 
sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE);
String trustStorePassword = 
sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
String sslProtocolVersion = 
sslConfig.getString(SecurityOptions.SSL_PROTOCOL);
+   SSLProvider sslProvider = 
SSLProvider.fromString(sslConfig.getString(SecurityOptions.SSL_PROVIDER));
 
Preconditions.checkNotNull(trustStoreFilePath, 
SecurityOptions.SSL_TRUSTSTORE.key() + " was not configured.");
Preconditions.checkNotNull(trustStorePassword, 
SecurityOptions.SSL_TRUSTSTORE_PASSWORD.key() + " was not configured.");
 
KeyStore trustStore = 
KeyStore.getInstance(KeyStore.getDefaultType());
 
-   FileInputStream trustStoreFile = null;
-   try {
-   trustStoreFile = new FileInputStream(new 
File(trustStoreFilePath));
+   try (FileInputStream trustStoreFile = new 
FileInputStream(new File(trustStoreFilePath))) {
trustStore.load(trustStoreFile, 
trustStorePassword.toCharArray());
-   } finally {
-   if (trustStoreFile != null) {
-   trustStoreFile.close();
-   }
}
 
TrustManagerFactory trustManagerFactory = 
TrustManagerFactory.getInstance(
TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init(trustStore);
 
-   clientSSLContext = 
SSLContext.getInstance(sslProtocolVersion);
-   clientSSLContext

[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...

2018-07-13 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202506694
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -249,14 +326,40 @@ public static SSLContext 
createSSLServerContext(Configuration sslConfig) throws
 
// Set up key manager factory to use the server key 
store
KeyManagerFactory kmf = KeyManagerFactory.getInstance(
-   
KeyManagerFactory.getDefaultAlgorithm());
+   KeyManagerFactory.getDefaultAlgorithm());
kmf.init(ks, certPassword.toCharArray());
 
+   return new SSLServerTools(sslProvider, 
sslProtocolVersion, sslCipherSuites, kmf);
+   }
+
+   return null;
+   }
+
+   /**
+* Creates the SSL Context for the server if SSL is configured.
+*
+* @param sslConfig
+*The application configuration
--- End diff --

the description of the param and  throws do not need linefeed


---


[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...

2018-07-13 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202505321
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
 ---
@@ -160,6 +160,7 @@ private Configuration createSslConfig() throws 
Exception {
flinkConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, 
"password");
flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE, 
"src/test/resources/local127.truststore");
flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, 
"password");
+// flinkConfig.setString(SecurityOptions.SSL_PROVIDER, "OPENSSL");
--- End diff --

if this is a useless dead code, can be removed


---


[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

2018-07-13 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506861
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
 ---
@@ -18,143 +18,358 @@
 
 package org.apache.flink.table.factories
 
-import java.util.{ServiceConfigurationError, ServiceLoader}
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
 
 import org.apache.flink.table.api._
 import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
 import org.apache.flink.table.descriptors.FormatDescriptorValidator._
 import org.apache.flink.table.descriptors.MetadataValidator._
 import org.apache.flink.table.descriptors.StatisticsValidator._
-import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.descriptors._
 import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
 
 import _root_.scala.collection.JavaConverters._
 import _root_.scala.collection.mutable
 
 /**
-  * Unified interface to search for TableFactoryDiscoverable of provided 
type and properties.
+  * Unified interface to search for a [[TableFactory]] of provided type 
and properties.
   */
 object TableFactoryService extends Logging {
 
   private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableFactory])
 
-  def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
-find(clz, descriptor, null)
+  /**
+* Finds a table factory of the given class and descriptor.
+*
+* @param factoryClass desired factory class
+* @param descriptor descriptor describing the factory configuration
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(descriptor)
+
+val descriptorProperties = new DescriptorProperties()
+descriptor.addProperties(descriptorProperties)
+findInternal(factoryClass, descriptorProperties.asMap, None)
   }
 
-  def find(clz: Class[_], descriptor: TableDescriptor, classLoader: 
ClassLoader)
-  : TableFactory = {
+  /**
+* Finds a table factory of the given class, descriptor, and 
classloader.
+*
+* @param factoryClass desired factory class
+* @param descriptor descriptor describing the factory configuration
+* @param classLoader classloader for service loading
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: 
ClassLoader): T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(descriptor)
+Preconditions.checkNotNull(classLoader)
 
-val properties = new DescriptorProperties()
-descriptor.addProperties(properties)
-find(clz, properties.asMap.asScala.toMap, classLoader)
+val descriptorProperties = new DescriptorProperties()
+descriptor.addProperties(descriptorProperties)
+findInternal(factoryClass, descriptorProperties.asMap, None)
   }
 
-  def find(clz: Class[_], properties: Map[String, String]): TableFactory = 
{
-find(clz: Class[_], properties, null)
+  /**
+* Finds a table factory of the given class and property map.
+*
+* @param factoryClass desired factory class
+* @param propertyMap properties that describe the factory configuration
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): 
T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(propertyMap)
+
+findInternal(factoryClass, propertyMap, None)
   }
 
-  def find(clz: Class[_], properties: Map[String, String],
-   classLoader: ClassLoader): TableFactory = {
+  /**
+* Finds a table factory of the given class, property map, and 
classloader.
+*
+* @param factoryClass desired factory class
+* @param propertyMap properties that describe the factory configuration
+* @param classLoader classloader for service loading
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](
+  factoryClass: Class[T],
+  propertyMap: JMap[String, String],
+  classLoader: ClassLoader)
+: T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(propertyMap)
+Preconditions.checkNotNull(class

[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544028#comment-16544028
 ] 

ASF GitHub Bot commented on FLINK-8558:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506861
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
 ---
@@ -18,143 +18,358 @@
 
 package org.apache.flink.table.factories
 
-import java.util.{ServiceConfigurationError, ServiceLoader}
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
 
 import org.apache.flink.table.api._
 import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
 import org.apache.flink.table.descriptors.FormatDescriptorValidator._
 import org.apache.flink.table.descriptors.MetadataValidator._
 import org.apache.flink.table.descriptors.StatisticsValidator._
-import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.descriptors._
 import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
 
 import _root_.scala.collection.JavaConverters._
 import _root_.scala.collection.mutable
 
 /**
-  * Unified interface to search for TableFactoryDiscoverable of provided 
type and properties.
+  * Unified interface to search for a [[TableFactory]] of provided type 
and properties.
   */
 object TableFactoryService extends Logging {
 
   private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableFactory])
 
-  def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
-find(clz, descriptor, null)
+  /**
+* Finds a table factory of the given class and descriptor.
+*
+* @param factoryClass desired factory class
+* @param descriptor descriptor describing the factory configuration
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(descriptor)
+
+val descriptorProperties = new DescriptorProperties()
+descriptor.addProperties(descriptorProperties)
+findInternal(factoryClass, descriptorProperties.asMap, None)
   }
 
-  def find(clz: Class[_], descriptor: TableDescriptor, classLoader: 
ClassLoader)
-  : TableFactory = {
+  /**
+* Finds a table factory of the given class, descriptor, and 
classloader.
+*
+* @param factoryClass desired factory class
+* @param descriptor descriptor describing the factory configuration
+* @param classLoader classloader for service loading
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: 
ClassLoader): T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(descriptor)
+Preconditions.checkNotNull(classLoader)
 
-val properties = new DescriptorProperties()
-descriptor.addProperties(properties)
-find(clz, properties.asMap.asScala.toMap, classLoader)
+val descriptorProperties = new DescriptorProperties()
+descriptor.addProperties(descriptorProperties)
+findInternal(factoryClass, descriptorProperties.asMap, None)
   }
 
-  def find(clz: Class[_], properties: Map[String, String]): TableFactory = 
{
-find(clz: Class[_], properties, null)
+  /**
+* Finds a table factory of the given class and property map.
+*
+* @param factoryClass desired factory class
+* @param propertyMap properties that describe the factory configuration
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): 
T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(propertyMap)
+
+findInternal(factoryClass, propertyMap, None)
   }
 
-  def find(clz: Class[_], properties: Map[String, String],
-   classLoader: ClassLoader): TableFactory = {
+  /**
+* Finds a table factory of the given class, property map, and 
classloader.
+*
+* @param factoryClass desired factory class
+* @param propertyMap properties that describe the factory configuration
+* @param classLoader classloader for service loading
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](
+  factory

[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544024#comment-16544024
 ] 

ASF GitHub Bot commented on FLINK-8558:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506766
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
 ---
@@ -18,143 +18,358 @@
 
 package org.apache.flink.table.factories
 
-import java.util.{ServiceConfigurationError, ServiceLoader}
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
 
 import org.apache.flink.table.api._
 import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
 import org.apache.flink.table.descriptors.FormatDescriptorValidator._
 import org.apache.flink.table.descriptors.MetadataValidator._
 import org.apache.flink.table.descriptors.StatisticsValidator._
-import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.descriptors._
 import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
 
 import _root_.scala.collection.JavaConverters._
 import _root_.scala.collection.mutable
 
 /**
-  * Unified interface to search for TableFactoryDiscoverable of provided 
type and properties.
+  * Unified interface to search for a [[TableFactory]] of provided type 
and properties.
   */
 object TableFactoryService extends Logging {
 
   private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableFactory])
 
-  def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
-find(clz, descriptor, null)
+  /**
+* Finds a table factory of the given class and descriptor.
+*
+* @param factoryClass desired factory class
+* @param descriptor descriptor describing the factory configuration
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(descriptor)
+
+val descriptorProperties = new DescriptorProperties()
+descriptor.addProperties(descriptorProperties)
+findInternal(factoryClass, descriptorProperties.asMap, None)
   }
 
-  def find(clz: Class[_], descriptor: TableDescriptor, classLoader: 
ClassLoader)
-  : TableFactory = {
+  /**
+* Finds a table factory of the given class, descriptor, and 
classloader.
+*
+* @param factoryClass desired factory class
+* @param descriptor descriptor describing the factory configuration
+* @param classLoader classloader for service loading
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: 
ClassLoader): T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(descriptor)
+Preconditions.checkNotNull(classLoader)
 
-val properties = new DescriptorProperties()
-descriptor.addProperties(properties)
-find(clz, properties.asMap.asScala.toMap, classLoader)
+val descriptorProperties = new DescriptorProperties()
+descriptor.addProperties(descriptorProperties)
+findInternal(factoryClass, descriptorProperties.asMap, None)
   }
 
-  def find(clz: Class[_], properties: Map[String, String]): TableFactory = 
{
-find(clz: Class[_], properties, null)
+  /**
+* Finds a table factory of the given class and property map.
+*
+* @param factoryClass desired factory class
+* @param propertyMap properties that describe the factory configuration
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): 
T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(propertyMap)
+
+findInternal(factoryClass, propertyMap, None)
   }
 
-  def find(clz: Class[_], properties: Map[String, String],
-   classLoader: ClassLoader): TableFactory = {
+  /**
+* Finds a table factory of the given class, property map, and 
classloader.
+*
+* @param factoryClass desired factory class
+* @param propertyMap properties that describe the factory configuration
+* @param classLoader classloader for service loading
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](
+  factory

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

2018-07-13 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506766
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
 ---
@@ -18,143 +18,358 @@
 
 package org.apache.flink.table.factories
 
-import java.util.{ServiceConfigurationError, ServiceLoader}
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
 
 import org.apache.flink.table.api._
 import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
 import org.apache.flink.table.descriptors.FormatDescriptorValidator._
 import org.apache.flink.table.descriptors.MetadataValidator._
 import org.apache.flink.table.descriptors.StatisticsValidator._
-import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.descriptors._
 import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
 
 import _root_.scala.collection.JavaConverters._
 import _root_.scala.collection.mutable
 
 /**
-  * Unified interface to search for TableFactoryDiscoverable of provided 
type and properties.
+  * Unified interface to search for a [[TableFactory]] of provided type 
and properties.
   */
 object TableFactoryService extends Logging {
 
   private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableFactory])
 
-  def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
-find(clz, descriptor, null)
+  /**
+* Finds a table factory of the given class and descriptor.
+*
+* @param factoryClass desired factory class
+* @param descriptor descriptor describing the factory configuration
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(descriptor)
+
+val descriptorProperties = new DescriptorProperties()
+descriptor.addProperties(descriptorProperties)
+findInternal(factoryClass, descriptorProperties.asMap, None)
   }
 
-  def find(clz: Class[_], descriptor: TableDescriptor, classLoader: 
ClassLoader)
-  : TableFactory = {
+  /**
+* Finds a table factory of the given class, descriptor, and 
classloader.
+*
+* @param factoryClass desired factory class
+* @param descriptor descriptor describing the factory configuration
+* @param classLoader classloader for service loading
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: 
ClassLoader): T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(descriptor)
+Preconditions.checkNotNull(classLoader)
 
-val properties = new DescriptorProperties()
-descriptor.addProperties(properties)
-find(clz, properties.asMap.asScala.toMap, classLoader)
+val descriptorProperties = new DescriptorProperties()
+descriptor.addProperties(descriptorProperties)
+findInternal(factoryClass, descriptorProperties.asMap, None)
   }
 
-  def find(clz: Class[_], properties: Map[String, String]): TableFactory = 
{
-find(clz: Class[_], properties, null)
+  /**
+* Finds a table factory of the given class and property map.
+*
+* @param factoryClass desired factory class
+* @param propertyMap properties that describe the factory configuration
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): 
T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(propertyMap)
+
+findInternal(factoryClass, propertyMap, None)
   }
 
-  def find(clz: Class[_], properties: Map[String, String],
-   classLoader: ClassLoader): TableFactory = {
+  /**
+* Finds a table factory of the given class, property map, and 
classloader.
+*
+* @param factoryClass desired factory class
+* @param propertyMap properties that describe the factory configuration
+* @param classLoader classloader for service loading
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](
+  factoryClass: Class[T],
+  propertyMap: JMap[String, String],
+  classLoader: ClassLoader)
+: T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(propertyMap)
+Preconditions.checkNotNull(class

[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544023#comment-16544023
 ] 

ASF GitHub Bot commented on FLINK-8558:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506740
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
 ---
@@ -18,143 +18,358 @@
 
 package org.apache.flink.table.factories
 
-import java.util.{ServiceConfigurationError, ServiceLoader}
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
 
 import org.apache.flink.table.api._
 import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
 import org.apache.flink.table.descriptors.FormatDescriptorValidator._
 import org.apache.flink.table.descriptors.MetadataValidator._
 import org.apache.flink.table.descriptors.StatisticsValidator._
-import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.descriptors._
 import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
 
 import _root_.scala.collection.JavaConverters._
 import _root_.scala.collection.mutable
 
 /**
-  * Unified interface to search for TableFactoryDiscoverable of provided 
type and properties.
+  * Unified interface to search for a [[TableFactory]] of provided type 
and properties.
   */
 object TableFactoryService extends Logging {
 
   private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableFactory])
 
-  def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
-find(clz, descriptor, null)
+  /**
+* Finds a table factory of the given class and descriptor.
+*
+* @param factoryClass desired factory class
+* @param descriptor descriptor describing the factory configuration
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+Preconditions.checkNotNull(factoryClass)
--- End diff --

The variable is only passed one time. The internal method is not checking 
for null gain.


> Add unified format interfaces and format discovery
> --
>
> Key: FLINK-8558
> URL: https://issues.apache.org/jira/browse/FLINK-8558
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> In the last release, we introduced a new module {{flink-formats}}. Currently 
> only {{flink-avro}} is located there but we will add more formats such as 
> {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of 
> concerns we want decouple connectors from formats: e.g., remove 
> {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to 
> discovery available formats in the classpath (similar to how file systems are 
> discovered now). A {{Format}} will provide a method for converting {{byte[]}} 
> to target record type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

2018-07-13 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506740
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
 ---
@@ -18,143 +18,358 @@
 
 package org.apache.flink.table.factories
 
-import java.util.{ServiceConfigurationError, ServiceLoader}
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
 
 import org.apache.flink.table.api._
 import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
 import org.apache.flink.table.descriptors.FormatDescriptorValidator._
 import org.apache.flink.table.descriptors.MetadataValidator._
 import org.apache.flink.table.descriptors.StatisticsValidator._
-import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.descriptors._
 import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
 
 import _root_.scala.collection.JavaConverters._
 import _root_.scala.collection.mutable
 
 /**
-  * Unified interface to search for TableFactoryDiscoverable of provided 
type and properties.
+  * Unified interface to search for a [[TableFactory]] of provided type 
and properties.
   */
 object TableFactoryService extends Logging {
 
   private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableFactory])
 
-  def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
-find(clz, descriptor, null)
+  /**
+* Finds a table factory of the given class and descriptor.
+*
+* @param factoryClass desired factory class
+* @param descriptor descriptor describing the factory configuration
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+Preconditions.checkNotNull(factoryClass)
--- End diff --

The variable is only passed one time. The internal method is not checking 
for null gain.


---


[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544019#comment-16544019
 ] 

ASF GitHub Bot commented on FLINK-8558:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506661
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
 ---
@@ -143,118 +143,82 @@ case class CatalogAlreadyExistException(
 }
 
 /**
-  * Exception for not finding a [[TableFormatFactory]] for the
-  * given properties.
+  * Exception for not finding a [[TableFactory]] for the given properties.
   *
   * @param message message that indicates the current matching step
   * @param factoryClass required factory class
-  * @param formatFactories all found factories
-  * @param properties properties that describe the table format
+  * @param factories all found factories
+  * @param properties properties that describe the configuration
   * @param cause the cause
   */
-case class NoMatchingTableFormatException(
+case class NoMatchingTableFactoryException(
   message: String,
   factoryClass: Class[_],
-  formatFactories: Seq[TableFormatFactory[_]],
+  factories: Seq[TableFactory],
   properties: Map[String, String],
   cause: Throwable)
 extends RuntimeException(
--- End diff --

So far we don't have an inheritance of exceptions. Case classes don't 
support that in Scala so we would need to convert them. 


> Add unified format interfaces and format discovery
> --
>
> Key: FLINK-8558
> URL: https://issues.apache.org/jira/browse/FLINK-8558
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> In the last release, we introduced a new module {{flink-formats}}. Currently 
> only {{flink-avro}} is located there but we will add more formats such as 
> {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of 
> concerns we want decouple connectors from formats: e.g., remove 
> {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to 
> discovery available formats in the classpath (similar to how file systems are 
> discovered now). A {{Format}} will provide a method for converting {{byte[]}} 
> to target record type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

2018-07-13 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506661
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
 ---
@@ -143,118 +143,82 @@ case class CatalogAlreadyExistException(
 }
 
 /**
-  * Exception for not finding a [[TableFormatFactory]] for the
-  * given properties.
+  * Exception for not finding a [[TableFactory]] for the given properties.
   *
   * @param message message that indicates the current matching step
   * @param factoryClass required factory class
-  * @param formatFactories all found factories
-  * @param properties properties that describe the table format
+  * @param factories all found factories
+  * @param properties properties that describe the configuration
   * @param cause the cause
   */
-case class NoMatchingTableFormatException(
+case class NoMatchingTableFactoryException(
   message: String,
   factoryClass: Class[_],
-  formatFactories: Seq[TableFormatFactory[_]],
+  factories: Seq[TableFactory],
   properties: Map[String, String],
   cause: Throwable)
 extends RuntimeException(
--- End diff --

So far we don't have an inheritance of exceptions. Case classes don't 
support that in Scala so we would need to convert them. 


---


[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544018#comment-16544018
 ] 

ASF GitHub Bot commented on FLINK-8558:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506625
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala
 ---
@@ -16,42 +16,17 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.formats
+package org.apache.flink.table.factories
 
 import java.util
 
-import org.apache.flink.api.common.serialization.{DeserializationSchema, 
SerializationSchema}
-
 /**
-  * A factory to create different table format instances. This factory is 
used with Java's Service
-  * Provider Interfaces (SPI) for discovering. A factory is called with a 
set of normalized
-  * properties that describe the desired format. The factory allows for 
matching to the given set of
-  * properties. See also [[SerializationSchemaFactory]] and 
[[DeserializationSchemaFactory]] for
-  * creating configured instances of format classes accordingly.
-  *
-  * Classes that implement this interface need to be added to the
-  * "META_INF/services/org.apache.flink.table.formats.TableFormatFactory' 
file of a JAR file in
-  * the current classpath to be found.
+  * A factory to create configured table format instances based on 
string-based properties. See
+  * also [[TableFactory]] for more information.
   *
   * @tparam T record type that the format produces or consumes
   */
-trait TableFormatFactory[T] {
-
-  /**
-* Specifies the context that this factory has been implemented for. 
The framework guarantees
-* to only use the factory if the specified set of properties and 
values are met.
-*
-* Typical properties might be:
-*   - format.type
-*   - format.version
-*
-* Specified property versions allow the framework to provide backwards 
compatible properties
-* in case of string format changes:
-*   - format.property-version
-*
-* An empty context means that the factory matches for all requests.
-*/
-  def requiredContext(): util.Map[String, String]
+trait TableFormatFactory[T] extends TableFactory {
--- End diff --

Because the Java comment explains the specific situation how supported 
format properties are handled.


> Add unified format interfaces and format discovery
> --
>
> Key: FLINK-8558
> URL: https://issues.apache.org/jira/browse/FLINK-8558
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> In the last release, we introduced a new module {{flink-formats}}. Currently 
> only {{flink-avro}} is located there but we will add more formats such as 
> {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of 
> concerns we want decouple connectors from formats: e.g., remove 
> {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to 
> discovery available formats in the classpath (similar to how file systems are 
> discovered now). A {{Format}} will provide a method for converting {{byte[]}} 
> to target record type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

2018-07-13 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506625
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala
 ---
@@ -16,42 +16,17 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.formats
+package org.apache.flink.table.factories
 
 import java.util
 
-import org.apache.flink.api.common.serialization.{DeserializationSchema, 
SerializationSchema}
-
 /**
-  * A factory to create different table format instances. This factory is 
used with Java's Service
-  * Provider Interfaces (SPI) for discovering. A factory is called with a 
set of normalized
-  * properties that describe the desired format. The factory allows for 
matching to the given set of
-  * properties. See also [[SerializationSchemaFactory]] and 
[[DeserializationSchemaFactory]] for
-  * creating configured instances of format classes accordingly.
-  *
-  * Classes that implement this interface need to be added to the
-  * "META_INF/services/org.apache.flink.table.formats.TableFormatFactory' 
file of a JAR file in
-  * the current classpath to be found.
+  * A factory to create configured table format instances based on 
string-based properties. See
+  * also [[TableFactory]] for more information.
   *
   * @tparam T record type that the format produces or consumes
   */
-trait TableFormatFactory[T] {
-
-  /**
-* Specifies the context that this factory has been implemented for. 
The framework guarantees
-* to only use the factory if the specified set of properties and 
values are met.
-*
-* Typical properties might be:
-*   - format.type
-*   - format.version
-*
-* Specified property versions allow the framework to provide backwards 
compatible properties
-* in case of string format changes:
-*   - format.property-version
-*
-* An empty context means that the factory matches for all requests.
-*/
-  def requiredContext(): util.Map[String, String]
+trait TableFormatFactory[T] extends TableFactory {
--- End diff --

Because the Java comment explains the specific situation how supported 
format properties are handled.


---


[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544017#comment-16544017
 ] 

ASF GitHub Bot commented on FLINK-8558:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506574
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
 ---
@@ -44,32 +43,27 @@ object ExternalTableSourceUtil extends Logging {
 val properties = new DescriptorProperties()
 externalCatalogTable.addProperties(properties)
 val javaMap = properties.asMap
-val source = TableFactoryService.find(classOf[TableSourceFactory[_]], 
javaMap)
-  .asInstanceOf[TableSourceFactory[_]]
-  .createTableSource(javaMap)
 tableEnv match {
   // check for a batch table source in this batch environment
   case _: BatchTableEnvironment =>
-source match {
-  case bts: BatchTableSource[_] =>
-new TableSourceSinkTable(Some(new BatchTableSourceTable(
-  bts,
-  new FlinkStatistic(externalCatalogTable.getTableStats))), 
None)
-  case _ => throw new TableException(
-s"Found table source '${source.getClass.getCanonicalName}' is 
not applicable " +
-  s"in a batch environment.")
-}
+val source = TableFactoryService
--- End diff --

Usually it is very uncommon to define both a batch and streaming source in 
the same factory. Your proposed change would require all future sources to 
implement a check for the environment before which is unnecessary in 80% of the 
cases. Separating by environment is a concept that can be find throughout the 
entire `flink-table` module because both sources/sinks behave quite different.


> Add unified format interfaces and format discovery
> --
>
> Key: FLINK-8558
> URL: https://issues.apache.org/jira/browse/FLINK-8558
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> In the last release, we introduced a new module {{flink-formats}}. Currently 
> only {{flink-avro}} is located there but we will add more formats such as 
> {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of 
> concerns we want decouple connectors from formats: e.g., remove 
> {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to 
> discovery available formats in the classpath (similar to how file systems are 
> discovered now). A {{Format}} will provide a method for converting {{byte[]}} 
> to target record type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

2018-07-13 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506574
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
 ---
@@ -44,32 +43,27 @@ object ExternalTableSourceUtil extends Logging {
 val properties = new DescriptorProperties()
 externalCatalogTable.addProperties(properties)
 val javaMap = properties.asMap
-val source = TableFactoryService.find(classOf[TableSourceFactory[_]], 
javaMap)
-  .asInstanceOf[TableSourceFactory[_]]
-  .createTableSource(javaMap)
 tableEnv match {
   // check for a batch table source in this batch environment
   case _: BatchTableEnvironment =>
-source match {
-  case bts: BatchTableSource[_] =>
-new TableSourceSinkTable(Some(new BatchTableSourceTable(
-  bts,
-  new FlinkStatistic(externalCatalogTable.getTableStats))), 
None)
-  case _ => throw new TableException(
-s"Found table source '${source.getClass.getCanonicalName}' is 
not applicable " +
-  s"in a batch environment.")
-}
+val source = TableFactoryService
--- End diff --

Usually it is very uncommon to define both a batch and streaming source in 
the same factory. Your proposed change would require all future sources to 
implement a check for the environment before which is unnecessary in 80% of the 
cases. Separating by environment is a concept that can be find throughout the 
entire `flink-table` module because both sources/sinks behave quite different.


---


[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544016#comment-16544016
 ] 

ASF GitHub Bot commented on FLINK-8558:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506512
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactoryTest.java
 ---
@@ -21,8 +21,12 @@
 import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_08;
 
 /**
- * Tests for {@link Kafka08JsonTableSourceFactory}.
+ * Tests for legacy Kafka08JsonTableSourceFactory.
+ *
+ * @deprecated Ensures backwards compatibility with Flink 1.5. Can be 
removed once we
+ * drop support for format-specific table sources.
  */
+@Deprecated
--- End diff --

No but we forgot it in the previous commit.


> Add unified format interfaces and format discovery
> --
>
> Key: FLINK-8558
> URL: https://issues.apache.org/jira/browse/FLINK-8558
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> In the last release, we introduced a new module {{flink-formats}}. Currently 
> only {{flink-avro}} is located there but we will add more formats such as 
> {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of 
> concerns we want decouple connectors from formats: e.g., remove 
> {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to 
> discovery available formats in the classpath (similar to how file systems are 
> discovered now). A {{Format}} will provide a method for converting {{byte[]}} 
> to target record type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

2018-07-13 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506512
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactoryTest.java
 ---
@@ -21,8 +21,12 @@
 import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_08;
 
 /**
- * Tests for {@link Kafka08JsonTableSourceFactory}.
+ * Tests for legacy Kafka08JsonTableSourceFactory.
+ *
+ * @deprecated Ensures backwards compatibility with Flink 1.5. Can be 
removed once we
+ * drop support for format-specific table sources.
  */
+@Deprecated
--- End diff --

No but we forgot it in the previous commit.


---


[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544015#comment-16544015
 ] 

ASF GitHub Bot commented on FLINK-8558:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506494
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java
 ---
@@ -51,16 +51,10 @@ public void addProperties(DescriptorProperties 
properties) {
}
 
public Source toSource() {
-   final Map newProperties = new 
HashMap<>(properties);
-   newProperties.replace(TableDescriptorValidator.TABLE_TYPE(),
-   
TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE());
--- End diff --

The table type is a concept of the SQL Client and should not be part of the 
table descriptor.


> Add unified format interfaces and format discovery
> --
>
> Key: FLINK-8558
> URL: https://issues.apache.org/jira/browse/FLINK-8558
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> In the last release, we introduced a new module {{flink-formats}}. Currently 
> only {{flink-avro}} is located there but we will add more formats such as 
> {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of 
> concerns we want decouple connectors from formats: e.g., remove 
> {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to 
> discovery available formats in the classpath (similar to how file systems are 
> discovered now). A {{Format}} will provide a method for converting {{byte[]}} 
> to target record type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

2018-07-13 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506494
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java
 ---
@@ -51,16 +51,10 @@ public void addProperties(DescriptorProperties 
properties) {
}
 
public Source toSource() {
-   final Map newProperties = new 
HashMap<>(properties);
-   newProperties.replace(TableDescriptorValidator.TABLE_TYPE(),
-   
TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE());
--- End diff --

The table type is a concept of the SQL Client and should not be part of the 
table descriptor.


---


[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544007#comment-16544007
 ] 

ASF GitHub Bot commented on FLINK-8558:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506193
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala
 ---
@@ -16,14 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.connectors
+package org.apache.flink.table.factories
 
 import java.util
 
 /**
   * Common trait for all properties-based discoverable table factories.
   */
-trait DiscoverableTableFactory {
+trait TableFactory {
--- End diff --

Actually, I would like the very generic name `Factory` but since we have to 
add some prefix to make it unique in the project, I named it `TableFactory` 
because we prefix everything in this module with `Table`.


> Add unified format interfaces and format discovery
> --
>
> Key: FLINK-8558
> URL: https://issues.apache.org/jira/browse/FLINK-8558
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> In the last release, we introduced a new module {{flink-formats}}. Currently 
> only {{flink-avro}} is located there but we will add more formats such as 
> {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of 
> concerns we want decouple connectors from formats: e.g., remove 
> {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to 
> discovery available formats in the classpath (similar to how file systems are 
> discovered now). A {{Format}} will provide a method for converting {{byte[]}} 
> to target record type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

2018-07-13 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506193
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala
 ---
@@ -16,14 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.connectors
+package org.apache.flink.table.factories
 
 import java.util
 
 /**
   * Common trait for all properties-based discoverable table factories.
   */
-trait DiscoverableTableFactory {
+trait TableFactory {
--- End diff --

Actually, I would like the very generic name `Factory` but since we have to 
add some prefix to make it unique in the project, I named it `TableFactory` 
because we prefix everything in this module with `Table`.


---


[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544004#comment-16544004
 ] 

ASF GitHub Bot commented on FLINK-8558:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506126
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -54,51 +56,105 @@
  * override {@link #createKafkaConsumer(String, Properties, 
DeserializationSchema)}}.
  */
 @Internal
-public abstract class KafkaTableSource
-   implements StreamTableSource, DefinedProctimeAttribute, 
DefinedRowtimeAttributes {
+public abstract class KafkaTableSource implements
+   StreamTableSource,
+   DefinedProctimeAttribute,
+   DefinedRowtimeAttributes,
+   DefinedFieldMapping {
+
+   // common table source attributes
+   // TODO make all attributes final once we drop support for 
format-specific table sources
 
/** The schema of the table. */
private final TableSchema schema;
 
+   /** Field name of the processing time attribute, null if no processing 
time field is defined. */
+   private String proctimeAttribute;
+
+   /** Descriptor for a rowtime attribute. */
+   private List rowtimeAttributeDescriptors;
+
+   /** Mapping for the fields of the table schema to fields of the 
physical returned type or null. */
+   private Map fieldMapping;
+
+   // Kafka-specific attributes
+
/** The Kafka topic to consume. */
private final String topic;
 
/** Properties for the Kafka consumer. */
private final Properties properties;
 
-   /** Type information describing the result type. */
-   private TypeInformation returnType;
-
-   /** Field name of the processing time attribute, null if no processing 
time field is defined. */
-   private String proctimeAttribute;
-
-   /** Descriptor for a rowtime attribute. */
-   private List rowtimeAttributeDescriptors;
+   /** Deserialization schema for decoding records from Kafka. */
+   private final DeserializationSchema deserializationSchema;
 
/** The startup mode for the contained consumer (default is {@link 
StartupMode#GROUP_OFFSETS}). */
-   private StartupMode startupMode = StartupMode.GROUP_OFFSETS;
+   private StartupMode startupMode;
 
/** Specific startup offsets; only relevant when startup mode is {@link 
StartupMode#SPECIFIC_OFFSETS}. */
private Map specificStartupOffsets;
 
/**
 * Creates a generic Kafka {@link StreamTableSource}.
 *
-* @param topic Kafka topic to consume.
-* @param propertiesProperties for the Kafka consumer.
-* @param schemaSchema of the produced table.
-* @param returnTypeType information of the produced 
physical DataStream.
+* @param schema  Schema of the produced table.
+* @param proctimeAttribute   Field name of the processing time 
attribute, null if no
+*processing time field is defined.
+* @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute
+* @param fieldMappingMapping for the fields of the 
table schema to
--- End diff --

Backward compatibility. It could have been null in the past.


> Add unified format interfaces and format discovery
> --
>
> Key: FLINK-8558
> URL: https://issues.apache.org/jira/browse/FLINK-8558
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> In the last release, we introduced a new module {{flink-formats}}. Currently 
> only {{flink-avro}} is located there but we will add more formats such as 
> {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of 
> concerns we want decouple connectors from formats: e.g., remove 
> {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to 
> discovery available formats in the classpath (similar to how file systems are 
> discovered now). A {{Format}} will provide a method for converting {{byte[]}} 
> to target record type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

2018-07-13 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506126
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -54,51 +56,105 @@
  * override {@link #createKafkaConsumer(String, Properties, 
DeserializationSchema)}}.
  */
 @Internal
-public abstract class KafkaTableSource
-   implements StreamTableSource, DefinedProctimeAttribute, 
DefinedRowtimeAttributes {
+public abstract class KafkaTableSource implements
+   StreamTableSource,
+   DefinedProctimeAttribute,
+   DefinedRowtimeAttributes,
+   DefinedFieldMapping {
+
+   // common table source attributes
+   // TODO make all attributes final once we drop support for 
format-specific table sources
 
/** The schema of the table. */
private final TableSchema schema;
 
+   /** Field name of the processing time attribute, null if no processing 
time field is defined. */
+   private String proctimeAttribute;
+
+   /** Descriptor for a rowtime attribute. */
+   private List rowtimeAttributeDescriptors;
+
+   /** Mapping for the fields of the table schema to fields of the 
physical returned type or null. */
+   private Map fieldMapping;
+
+   // Kafka-specific attributes
+
/** The Kafka topic to consume. */
private final String topic;
 
/** Properties for the Kafka consumer. */
private final Properties properties;
 
-   /** Type information describing the result type. */
-   private TypeInformation returnType;
-
-   /** Field name of the processing time attribute, null if no processing 
time field is defined. */
-   private String proctimeAttribute;
-
-   /** Descriptor for a rowtime attribute. */
-   private List rowtimeAttributeDescriptors;
+   /** Deserialization schema for decoding records from Kafka. */
+   private final DeserializationSchema deserializationSchema;
 
/** The startup mode for the contained consumer (default is {@link 
StartupMode#GROUP_OFFSETS}). */
-   private StartupMode startupMode = StartupMode.GROUP_OFFSETS;
+   private StartupMode startupMode;
 
/** Specific startup offsets; only relevant when startup mode is {@link 
StartupMode#SPECIFIC_OFFSETS}. */
private Map specificStartupOffsets;
 
/**
 * Creates a generic Kafka {@link StreamTableSource}.
 *
-* @param topic Kafka topic to consume.
-* @param propertiesProperties for the Kafka consumer.
-* @param schemaSchema of the produced table.
-* @param returnTypeType information of the produced 
physical DataStream.
+* @param schema  Schema of the produced table.
+* @param proctimeAttribute   Field name of the processing time 
attribute, null if no
+*processing time field is defined.
+* @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute
+* @param fieldMappingMapping for the fields of the 
table schema to
--- End diff --

Backward compatibility. It could have been null in the past.


---


[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16543986#comment-16543986
 ] 

ASF GitHub Bot commented on FLINK-9325:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5982
  
@StephanEwen Thanks! Looking forward~


> generate the _meta file for checkpoint only when the writing is truly 
> successful
> 
>
> Key: FLINK-9325
> URL: https://issues.apache.org/jira/browse/FLINK-9325
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: pull-request-available
>
> We should generate the _meta file for checkpoint only when the writing is 
> totally successful. We should write the metadata file first to a temp file 
> and then atomically rename it (with an equivalent workaround for S3). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

2018-07-13 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5982
  
@StephanEwen Thanks! Looking forward~


---


[jira] [Assigned] (FLINK-9849) Upgrade hbase version to 2.0.1 for hbase connector

2018-07-13 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei reassigned FLINK-9849:
---

Assignee: zhangminglei

> Upgrade hbase version to 2.0.1 for hbase connector
> --
>
> Key: FLINK-9849
> URL: https://issues.apache.org/jira/browse/FLINK-9849
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: zhangminglei
>Priority: Major
>
> Currently hbase 1.4.3 is used for hbase connector.
> We should upgrade to 2.0.1 which was recently released.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   4   5   >