[jira] [Commented] (FLINK-9489) Checkpoint timers as part of managed keyed state instead of raw keyed state
[ https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544092#comment-16544092 ] ASF GitHub Bot commented on FLINK-9489: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6333 CC @tillrohrmann > Checkpoint timers as part of managed keyed state instead of raw keyed state > --- > > Key: FLINK-9489 > URL: https://issues.apache.org/jira/browse/FLINK-9489 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Timer state should now become part of the keyed state backend snapshot, i.e., > stored inside the managed keyed state. This means that we have to connect our > preparation for asynchronous checkpoints with the backend, so that the timers > are written as part of the state for each key-group. This means that we will > also free up the raw keyed state an might expose it to user functions in the > future. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9489) Checkpoint timers as part of managed keyed state instead of raw keyed state
[ https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544091#comment-16544091 ] ASF GitHub Bot commented on FLINK-9489: --- GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/6333 [FLINK-9489] Checkpoint timers as part of managed keyed state instead of raw keyed state ## What is the purpose of the change This PR integrates priority queue state (timers) with the snapshotting of Flink's state backend ans also already includes backwards compatibility (FLINK-9490). Core idea is to have a common abstraction for how state is registered in the state backend and how snapshots operator on such state (`StateSnapshotRestore`, `RegisteredStateMetaInfoBase`). With this, the new state integrates more or less seemless with existing snapshot logic. The notable exception is a current lack of integration of RocksDB state backend with heap-based priority queue state. This can currently still use the old snapshot code without causing any regression using a temporary path (see `AbstractStreamOperator.snapshotState(...)`. As a result, after this PR Flink supports asynchronous snapshots for heap kv / heap queue, rocks kv / rocks queue (full and incremental), rocks kv / heap queue (only full) and still uses synchronous snapshots for rocks kv / heap queue (only incremental). This work was created in a bit of a rush to make it into the 1.6 release and still has some known rough edges that we could fix up in the test phase. Here is a list of some things that already come to my mind: - Integrate heap-based timers with incremental RocksDB snapshots, then kick out some code. - Check proper integration with serializer upgrade story (!!) - After that, we can also remove the key-partitioning in the set structure from `HeapPriorityQueueSet`. - Improve integration of the batch wrapper. - Improve general state registration logic in the backends, there is potential to remove duplicated code, and generally still improve the integration of the queue state a bit. - Improve performance of RocksDB based timers, e.g. byte[] based cache, seek sharp to the next potential timer instead of seeking to the key-group start, bulkPoll. - Improve some class/interface/method names - Add tests, e.g. bulkPoll etc. ## Verifying this change This change is already covered by existing tests, but I would add some more eventually. You can activate RocksDB based timers by using the RocksDB backend and setting `RockDBBackendOptions.PRIORITY_QUEUE_STATE_TYPE` to `ROCKS`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (yes) - The runtime per-record code paths (performance sensitive): (yes) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs only for now) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink pq-snapshot-integration Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6333.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6333 commit 1bb8f70700deacc49a4d4ac7900425c10272959d Author: Stefan Richter Date: 2018-06-13T09:56:16Z [FLINK-9489] Checkpoint timers as part of managed keyed state instead of raw keyed state commit fc20df8268decab6d9890d617787a4084284b2f0 Author: Stefan Richter Date: 2018-07-13T23:19:30Z Optimization for relaxed bulk polls commit 4db1bea90fd6881555172fe3d22ee928e97894a7 Author: Stefan Richter Date: 2018-07-14T06:34:16Z Renaming of some classes/interfaces > Checkpoint timers as part of managed keyed state instead of raw keyed state > --- > > Key: FLINK-9489 > URL: https://issues.apache.org/jira/browse/FLINK-9489 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Timer state should now become part of the keyed state backend snapshot, i.e., > stored inside t
[jira] [Updated] (FLINK-9489) Checkpoint timers as part of managed keyed state instead of raw keyed state
[ https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9489: -- Labels: pull-request-available (was: ) > Checkpoint timers as part of managed keyed state instead of raw keyed state > --- > > Key: FLINK-9489 > URL: https://issues.apache.org/jira/browse/FLINK-9489 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Timer state should now become part of the keyed state backend snapshot, i.e., > stored inside the managed keyed state. This means that we have to connect our > preparation for asynchronous checkpoints with the backend, so that the timers > are written as part of the state for each key-group. This means that we will > also free up the raw keyed state an might expose it to user functions in the > future. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6333: [FLINK-9489] Checkpoint timers as part of managed keyed s...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6333 CC @tillrohrmann ---
[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...
GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/6333 [FLINK-9489] Checkpoint timers as part of managed keyed state instead of raw keyed state ## What is the purpose of the change This PR integrates priority queue state (timers) with the snapshotting of Flink's state backend ans also already includes backwards compatibility (FLINK-9490). Core idea is to have a common abstraction for how state is registered in the state backend and how snapshots operator on such state (`StateSnapshotRestore`, `RegisteredStateMetaInfoBase`). With this, the new state integrates more or less seemless with existing snapshot logic. The notable exception is a current lack of integration of RocksDB state backend with heap-based priority queue state. This can currently still use the old snapshot code without causing any regression using a temporary path (see `AbstractStreamOperator.snapshotState(...)`. As a result, after this PR Flink supports asynchronous snapshots for heap kv / heap queue, rocks kv / rocks queue (full and incremental), rocks kv / heap queue (only full) and still uses synchronous snapshots for rocks kv / heap queue (only incremental). This work was created in a bit of a rush to make it into the 1.6 release and still has some known rough edges that we could fix up in the test phase. Here is a list of some things that already come to my mind: - Integrate heap-based timers with incremental RocksDB snapshots, then kick out some code. - Check proper integration with serializer upgrade story (!!) - After that, we can also remove the key-partitioning in the set structure from `HeapPriorityQueueSet`. - Improve integration of the batch wrapper. - Improve general state registration logic in the backends, there is potential to remove duplicated code, and generally still improve the integration of the queue state a bit. - Improve performance of RocksDB based timers, e.g. byte[] based cache, seek sharp to the next potential timer instead of seeking to the key-group start, bulkPoll. - Improve some class/interface/method names - Add tests, e.g. bulkPoll etc. ## Verifying this change This change is already covered by existing tests, but I would add some more eventually. You can activate RocksDB based timers by using the RocksDB backend and setting `RockDBBackendOptions.PRIORITY_QUEUE_STATE_TYPE` to `ROCKS`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (yes) - The runtime per-record code paths (performance sensitive): (yes) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs only for now) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink pq-snapshot-integration Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6333.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6333 commit 1bb8f70700deacc49a4d4ac7900425c10272959d Author: Stefan Richter Date: 2018-06-13T09:56:16Z [FLINK-9489] Checkpoint timers as part of managed keyed state instead of raw keyed state commit fc20df8268decab6d9890d617787a4084284b2f0 Author: Stefan Richter Date: 2018-07-13T23:19:30Z Optimization for relaxed bulk polls commit 4db1bea90fd6881555172fe3d22ee928e97894a7 Author: Stefan Richter Date: 2018-07-14T06:34:16Z Renaming of some classes/interfaces ---
[jira] [Commented] (FLINK-8858) Add support for INSERT INTO in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544087#comment-16544087 ] ASF GitHub Bot commented on FLINK-8858: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6332#discussion_r202507962 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java --- @@ -329,14 +341,46 @@ public void stop(SessionContext session) { } } - private ResultDescriptor executeQueryInternal(ExecutionContext context, String query) { + private ProgramTargetDescriptor executeUpdateInternal(ExecutionContext context, String statement) { + final ExecutionContext.EnvironmentInstance envInst = context.createEnvironmentInstance(); + + // apply update statement --- End diff -- On one side yes but on the other side it allows to read the comments from top to bottom and know what the method is doing without having to look at the actual code. > Add support for INSERT INTO in SQL Client > - > > Key: FLINK-8858 > URL: https://issues.apache.org/jira/browse/FLINK-8858 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.6.0 >Reporter: Renjie Liu >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > The current design of SQL Client embedded mode doesn't support long running > queries. It would be useful for simple jobs that can be expressed in a single > sql statement if we can submit sql statements stored in files as long running > queries. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6332#discussion_r202507962 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java --- @@ -329,14 +341,46 @@ public void stop(SessionContext session) { } } - private ResultDescriptor executeQueryInternal(ExecutionContext context, String query) { + private ProgramTargetDescriptor executeUpdateInternal(ExecutionContext context, String statement) { + final ExecutionContext.EnvironmentInstance envInst = context.createEnvironmentInstance(); + + // apply update statement --- End diff -- On one side yes but on the other side it allows to read the comments from top to bottom and know what the method is doing without having to look at the actual code. ---
[jira] [Commented] (FLINK-8858) Add support for INSERT INTO in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544086#comment-16544086 ] ASF GitHub Bot commented on FLINK-8858: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6332#discussion_r202507930 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java --- @@ -354,6 +398,23 @@ private void callSelect(SqlCommandCall cmdCall) { } } + private boolean callInsertInto(SqlCommandCall cmdCall) { + terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT).toAnsi()); + terminal.flush(); + + try { + final ProgramTargetDescriptor programTarget = executor.executeUpdate(context, cmdCall.operands[0]); + terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_STATEMENT_SUBMITTED).toAnsi()); --- End diff -- Initially I had it similar like your proposal, but this would mix a data model class and visualization. `ProgramTargetDescriptor` should not be responsible how it is represented in the CLI. > Add support for INSERT INTO in SQL Client > - > > Key: FLINK-8858 > URL: https://issues.apache.org/jira/browse/FLINK-8858 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.6.0 >Reporter: Renjie Liu >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > The current design of SQL Client embedded mode doesn't support long running > queries. It would be useful for simple jobs that can be expressed in a single > sql statement if we can submit sql statements stored in files as long running > queries. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6332#discussion_r202507930 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java --- @@ -354,6 +398,23 @@ private void callSelect(SqlCommandCall cmdCall) { } } + private boolean callInsertInto(SqlCommandCall cmdCall) { + terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT).toAnsi()); + terminal.flush(); + + try { + final ProgramTargetDescriptor programTarget = executor.executeUpdate(context, cmdCall.operands[0]); + terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_STATEMENT_SUBMITTED).toAnsi()); --- End diff -- Initially I had it similar like your proposal, but this would mix a data model class and visualization. `ProgramTargetDescriptor` should not be responsible how it is represented in the CLI. ---
[jira] [Commented] (FLINK-8858) Add support for INSERT INTO in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544085#comment-16544085 ] ASF GitHub Bot commented on FLINK-8858: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6332#discussion_r202507864 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java --- @@ -173,55 +180,92 @@ public void open() { if (line == null || line.equals("")) { continue; } + parseAndCall(line); + } + } - final SqlCommandCall cmdCall = SqlCommandParser.parse(line); + /** +* Submits a SQL update statement and prints status information and/or errors on the terminal. +* +* @param statement SQL update statement +* @return flag to indicate if the submission was successful or not +*/ + public boolean submitUpdate(String statement) { --- End diff -- The entire CliClient is only tested manually so far. > Add support for INSERT INTO in SQL Client > - > > Key: FLINK-8858 > URL: https://issues.apache.org/jira/browse/FLINK-8858 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.6.0 >Reporter: Renjie Liu >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > The current design of SQL Client embedded mode doesn't support long running > queries. It would be useful for simple jobs that can be expressed in a single > sql statement if we can submit sql statements stored in files as long running > queries. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6332#discussion_r202507864 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java --- @@ -173,55 +180,92 @@ public void open() { if (line == null || line.equals("")) { continue; } + parseAndCall(line); + } + } - final SqlCommandCall cmdCall = SqlCommandParser.parse(line); + /** +* Submits a SQL update statement and prints status information and/or errors on the terminal. +* +* @param statement SQL update statement +* @return flag to indicate if the submission was successful or not +*/ + public boolean submitUpdate(String statement) { --- End diff -- The entire CliClient is only tested manually so far. ---
[jira] [Commented] (FLINK-9407) Support orc rolling sink writer
[ https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544083#comment-16544083 ] ASF GitHub Bot commented on FLINK-9407: --- Github user sagarl commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r200217627 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/OrcFileWriter.java --- @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.fs; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo; + +/** + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}. + * + * @param The type of the elements that are being written by the sink. + */ +public class OrcFileWriter extends StreamWriterBase { + + private static final long serialVersionUID = 3L; + + private TypeDescription schema; + + private String meatSchema; + + private transient org.apache.orc.Writer writer; + + private VectorizedRowBatch rowBatch; + + private CompressionKind compressionKind; + + private long writedRowSize; + + private OrcBatchWriter orcBatchWriter; + + /** +* Creates a new {@code OrcFileWriter} that writes orc files without compression. +* +* @param metaSchema The orc schema. +*/ + public OrcFileWriter(String metaSchema) { + this(metaSchema, CompressionKind.NONE); + } + + /** +* Create a new {@code OrcFileWriter} that writes orc file with the gaven +* schema and compression kind. +* +* @param metaSchema The schema of a orc file. +* @param compressionKind The compression kind to use. +*/ + public OrcFileWriter(String metaSchema, CompressionKind compressionKind) { + this.meatSchema = metaSchema; + this.schema = TypeDescription.fromString(metaSchema); + this.compressionKind = compressionKind; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind)); + rowBatch = schema.createRowBatch(); + orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes())); + } + + private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) { + List fieldNames = orcSchema.getFieldNames(); + List typeDescriptions = orcSchema.getChildren(); + List typeInformations = new ArrayList<>(); + + typeDescriptions.forEach(typeDescription -> { + typeInformations.add(schemaToTypeInfo(typeDescription)); + }); + + return new TableSchema( + fieldNames.toArray(new String[fieldNames.size()]), + typeInformations.t
[jira] [Commented] (FLINK-9407) Support orc rolling sink writer
[ https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544082#comment-16544082 ] ASF GitHub Bot commented on FLINK-9407: --- Github user sagarl commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r200217021 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/OrcFileWriter.java --- @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.fs; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo; + +/** + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}. + * + * @param The type of the elements that are being written by the sink. + */ +public class OrcFileWriter extends StreamWriterBase { + + private static final long serialVersionUID = 3L; + + private TypeDescription schema; + + private String meatSchema; + + private transient org.apache.orc.Writer writer; + + private VectorizedRowBatch rowBatch; + + private CompressionKind compressionKind; + + private long writedRowSize; + + private OrcBatchWriter orcBatchWriter; + + /** +* Creates a new {@code OrcFileWriter} that writes orc files without compression. +* +* @param metaSchema The orc schema. +*/ + public OrcFileWriter(String metaSchema) { + this(metaSchema, CompressionKind.NONE); + } + + /** +* Create a new {@code OrcFileWriter} that writes orc file with the gaven +* schema and compression kind. +* +* @param metaSchema The schema of a orc file. +* @param compressionKind The compression kind to use. +*/ + public OrcFileWriter(String metaSchema, CompressionKind compressionKind) { + this.meatSchema = metaSchema; + this.schema = TypeDescription.fromString(metaSchema); + this.compressionKind = compressionKind; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind)); + rowBatch = schema.createRowBatch(); + orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes())); + } + + private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) { + List fieldNames = orcSchema.getFieldNames(); + List typeDescriptions = orcSchema.getChildren(); + List typeInformations = new ArrayList<>(); + + typeDescriptions.forEach(typeDescription -> { + typeInformations.add(schemaToTypeInfo(typeDescription)); + }); + + return new TableSchema( + fieldNames.toArray(new String[fieldNames.size()]), + typeInformations.t
[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...
Github user sagarl commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r200217627 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/OrcFileWriter.java --- @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.fs; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo; + +/** + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}. + * + * @param The type of the elements that are being written by the sink. + */ +public class OrcFileWriter extends StreamWriterBase { + + private static final long serialVersionUID = 3L; + + private TypeDescription schema; + + private String meatSchema; + + private transient org.apache.orc.Writer writer; + + private VectorizedRowBatch rowBatch; + + private CompressionKind compressionKind; + + private long writedRowSize; + + private OrcBatchWriter orcBatchWriter; + + /** +* Creates a new {@code OrcFileWriter} that writes orc files without compression. +* +* @param metaSchema The orc schema. +*/ + public OrcFileWriter(String metaSchema) { + this(metaSchema, CompressionKind.NONE); + } + + /** +* Create a new {@code OrcFileWriter} that writes orc file with the gaven +* schema and compression kind. +* +* @param metaSchema The schema of a orc file. +* @param compressionKind The compression kind to use. +*/ + public OrcFileWriter(String metaSchema, CompressionKind compressionKind) { + this.meatSchema = metaSchema; + this.schema = TypeDescription.fromString(metaSchema); + this.compressionKind = compressionKind; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind)); + rowBatch = schema.createRowBatch(); + orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes())); + } + + private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) { + List fieldNames = orcSchema.getFieldNames(); + List typeDescriptions = orcSchema.getChildren(); + List typeInformations = new ArrayList<>(); + + typeDescriptions.forEach(typeDescription -> { + typeInformations.add(schemaToTypeInfo(typeDescription)); + }); + + return new TableSchema( + fieldNames.toArray(new String[fieldNames.size()]), + typeInformations.toArray(new TypeInformation[typeInformations.size()])); + } + + @Override + public void write(T element) throws IOException { + Boolean isFill = orcBatchWriter.fill(rowBatch, element); + if (!isFill) { +
[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...
Github user sagarl commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r200217021 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/OrcFileWriter.java --- @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.fs; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo; + +/** + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}. + * + * @param The type of the elements that are being written by the sink. + */ +public class OrcFileWriter extends StreamWriterBase { + + private static final long serialVersionUID = 3L; + + private TypeDescription schema; + + private String meatSchema; + + private transient org.apache.orc.Writer writer; + + private VectorizedRowBatch rowBatch; + + private CompressionKind compressionKind; + + private long writedRowSize; + + private OrcBatchWriter orcBatchWriter; + + /** +* Creates a new {@code OrcFileWriter} that writes orc files without compression. +* +* @param metaSchema The orc schema. +*/ + public OrcFileWriter(String metaSchema) { + this(metaSchema, CompressionKind.NONE); + } + + /** +* Create a new {@code OrcFileWriter} that writes orc file with the gaven +* schema and compression kind. +* +* @param metaSchema The schema of a orc file. +* @param compressionKind The compression kind to use. +*/ + public OrcFileWriter(String metaSchema, CompressionKind compressionKind) { + this.meatSchema = metaSchema; + this.schema = TypeDescription.fromString(metaSchema); + this.compressionKind = compressionKind; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind)); + rowBatch = schema.createRowBatch(); + orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes())); + } + + private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) { + List fieldNames = orcSchema.getFieldNames(); + List typeDescriptions = orcSchema.getChildren(); + List typeInformations = new ArrayList<>(); + + typeDescriptions.forEach(typeDescription -> { + typeInformations.add(schemaToTypeInfo(typeDescription)); + }); + + return new TableSchema( + fieldNames.toArray(new String[fieldNames.size()]), + typeInformations.toArray(new TypeInformation[typeInformations.size()])); + } + + @Override + public void write(T element) throws IOException { + Boolean isFill = orcBatchWriter.fill(rowBatch, element); + if (!isFill) { +
[jira] [Commented] (FLINK-8858) Add support for INSERT INTO in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544076#comment-16544076 ] ASF GitHub Bot commented on FLINK-8858: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6332#discussion_r202507463 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java --- @@ -97,14 +97,34 @@ private void start() { // add shutdown hook Runtime.getRuntime().addShutdownHook(new EmbeddedShutdownThread(context, executor)); - // start CLI - final CliClient cli = new CliClient(context, executor); - cli.open(); + // do the actual work + openCli(context, executor); } else { throw new SqlClientException("Gateway mode is not supported yet."); } } + /** +* Opens the CLI client for executing SQL statements. +* +* @param context session context +* @param executor executor +*/ + private void openCli(SessionContext context, Executor executor) { + final CliClient cli = new CliClient(context, executor); + // interactive CLI mode + if (options.getUpdateStatement() == null) { + cli.open(); + } + // execute single update statement + else { + final boolean success = cli.submitUpdate(options.getUpdateStatement()); --- End diff -- No, this would block the process for unbounded queries and require a (fault-tolerant) monitoring in the SQL Client which is not intended. We just block until the statement has been submitted to the cluster. > Add support for INSERT INTO in SQL Client > - > > Key: FLINK-8858 > URL: https://issues.apache.org/jira/browse/FLINK-8858 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.6.0 >Reporter: Renjie Liu >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > The current design of SQL Client embedded mode doesn't support long running > queries. It would be useful for simple jobs that can be expressed in a single > sql statement if we can submit sql statements stored in files as long running > queries. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6332#discussion_r202507463 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java --- @@ -97,14 +97,34 @@ private void start() { // add shutdown hook Runtime.getRuntime().addShutdownHook(new EmbeddedShutdownThread(context, executor)); - // start CLI - final CliClient cli = new CliClient(context, executor); - cli.open(); + // do the actual work + openCli(context, executor); } else { throw new SqlClientException("Gateway mode is not supported yet."); } } + /** +* Opens the CLI client for executing SQL statements. +* +* @param context session context +* @param executor executor +*/ + private void openCli(SessionContext context, Executor executor) { + final CliClient cli = new CliClient(context, executor); + // interactive CLI mode + if (options.getUpdateStatement() == null) { + cli.open(); + } + // execute single update statement + else { + final boolean success = cli.submitUpdate(options.getUpdateStatement()); --- End diff -- No, this would block the process for unbounded queries and require a (fault-tolerant) monitoring in the SQL Client which is not intended. We just block until the statement has been submitted to the cluster. ---
[jira] [Commented] (FLINK-8858) Add support for INSERT INTO in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544075#comment-16544075 ] ASF GitHub Bot commented on FLINK-8858: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6332#discussion_r202507424 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java --- @@ -85,6 +86,9 @@ public CliClient(SessionContext context, Executor executor) { terminal = TerminalBuilder.builder() .name(CliStrings.CLI_NAME) .build(); + // make space from previous output and test the writer + terminal.writer().println(); --- End diff -- It makes the output on the terminal nicer. We don't know what has been printed before. This starts a terminal session. The output looks now like: ``` No default environment specified. Searching for '/Users/twalthr/flink/flink/flink-dist/target/flink-1.6-SNAPSHOT-bin/flink-1.6-SNAPSHOT/conf/sql-client-defaults.yaml'...found. Reading default environment from: file:/Users/twalthr/flink/flink/flink-dist/target/flink-1.6-SNAPSHOT-bin/flink-1.6-SNAPSHOT/conf/sql-client-defaults.yaml No session environment specified. [INFO] Executing the following statement: INSERT INTO MyTableName SELECT * FROM MyTableName [INFO] Submitting SQL update statement to the cluster... [INFO] Table update statement has been successfully submitted to the cluster: Cluster ID: StandaloneClusterId / Job ID: fab21f0632da36f9236c343c2850c71d For the current job status visit: http://localhost:8081 Shutting down executor...done. ``` > Add support for INSERT INTO in SQL Client > - > > Key: FLINK-8858 > URL: https://issues.apache.org/jira/browse/FLINK-8858 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.6.0 >Reporter: Renjie Liu >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > The current design of SQL Client embedded mode doesn't support long running > queries. It would be useful for simple jobs that can be expressed in a single > sql statement if we can submit sql statements stored in files as long running > queries. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6332#discussion_r202507424 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java --- @@ -85,6 +86,9 @@ public CliClient(SessionContext context, Executor executor) { terminal = TerminalBuilder.builder() .name(CliStrings.CLI_NAME) .build(); + // make space from previous output and test the writer + terminal.writer().println(); --- End diff -- It makes the output on the terminal nicer. We don't know what has been printed before. This starts a terminal session. The output looks now like: ``` No default environment specified. Searching for '/Users/twalthr/flink/flink/flink-dist/target/flink-1.6-SNAPSHOT-bin/flink-1.6-SNAPSHOT/conf/sql-client-defaults.yaml'...found. Reading default environment from: file:/Users/twalthr/flink/flink/flink-dist/target/flink-1.6-SNAPSHOT-bin/flink-1.6-SNAPSHOT/conf/sql-client-defaults.yaml No session environment specified. [INFO] Executing the following statement: INSERT INTO MyTableName SELECT * FROM MyTableName [INFO] Submitting SQL update statement to the cluster... [INFO] Table update statement has been successfully submitted to the cluster: Cluster ID: StandaloneClusterId / Job ID: fab21f0632da36f9236c343c2850c71d For the current job status visit: http://localhost:8081 Shutting down executor...done. ``` ---
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544072#comment-16544072 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/6323 Thank you @pnowojski. I hope I could address all your comments. I will clean the commit history and improve the commit messages during merging. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified table ...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/6323 Thank you @pnowojski. I hope I could address all your comments. I will clean the commit history and improve the commit messages during merging. ---
[jira] [Closed] (FLINK-9818) Add cluster component command line parser
[ https://issues.apache.org/jira/browse/FLINK-9818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-9818. Resolution: Fixed Fixed via ab9bd87e521d19db7c7d783268a3532d2e876a5d > Add cluster component command line parser > - > > Key: FLINK-9818 > URL: https://issues.apache.org/jira/browse/FLINK-9818 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > In order to parse command line options for the cluster components > ({{TaskManagerRunner}}, {{ClusterEntrypoints}}), we should add a > {{CommandLineParser}} which supports the common command line options > ({{--configDir}}, {{--webui-port}} and dynamic properties which can override > configuration values). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-7087) Implement Flip-6 container entry point
[ https://issues.apache.org/jira/browse/FLINK-7087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-7087: Assignee: Till Rohrmann > Implement Flip-6 container entry point > -- > > Key: FLINK-7087 > URL: https://issues.apache.org/jira/browse/FLINK-7087 > Project: Flink > Issue Type: Improvement > Components: Cluster Management >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.6.0 > > > In order to support Docker and K8 we have to provide a container entry point. > In a first version, the container entry point could be similar to the > standalone session mode just with the difference that we don't submit a job > to the cluster. The job has to be contained in the container image or being > at least retrievable by the entry point. In that sense the container entry > point acts like a per-job standalone mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-7087) Implement Flip-6 container entry point
[ https://issues.apache.org/jira/browse/FLINK-7087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-7087. Resolution: Fixed Fixed via 8f467c1e9727d5a86d38d0b49753c534a1a161da > Implement Flip-6 container entry point > -- > > Key: FLINK-7087 > URL: https://issues.apache.org/jira/browse/FLINK-7087 > Project: Flink > Issue Type: Improvement > Components: Cluster Management >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.6.0 > > > In order to support Docker and K8 we have to provide a container entry point. > In a first version, the container entry point could be similar to the > standalone session mode just with the difference that we don't submit a job > to the cluster. The job has to be contained in the container image or being > at least retrievable by the entry point. In that sense the container entry > point acts like a per-job standalone mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9488) Create common entry point for master and workers
[ https://issues.apache.org/jira/browse/FLINK-9488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-9488. Resolution: Fixed Fixed via 8f467c1e9727d5a86d38d0b49753c534a1a161da > Create common entry point for master and workers > > > Key: FLINK-9488 > URL: https://issues.apache.org/jira/browse/FLINK-9488 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > To make the container setup easier, we should provide a single cluster entry > point which uses leader election to become either the master or a worker > which runs the {{TaskManager}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9821) Let dynamic properties overwrite configuration settings in TaskManagerRunner
[ https://issues.apache.org/jira/browse/FLINK-9821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-9821. Resolution: Fixed Fixed via 740f2fbf2e65fa988c6a577989ccd8923729be45 > Let dynamic properties overwrite configuration settings in TaskManagerRunner > > > Key: FLINK-9821 > URL: https://issues.apache.org/jira/browse/FLINK-9821 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Similar to FLINK-9820 we should also allow dynamic properties to overwrite > configuration values in the {{TaskManagerRunner}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9820) Let dynamic properties overwrite configuration settings in ClusterEntrypoint
[ https://issues.apache.org/jira/browse/FLINK-9820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-9820. Resolution: Fixed Fixed via 2fbbf8ee662647c71581f5cd989226be820fed0f > Let dynamic properties overwrite configuration settings in ClusterEntrypoint > > > Key: FLINK-9820 > URL: https://issues.apache.org/jira/browse/FLINK-9820 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The dynamic properties which are passed to the {{ClusterEntrypoint}} should > overwrite values in the loaded {{Configuration}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9819) Create start up scripts for the StandaloneJobClusterEntryPoint
[ https://issues.apache.org/jira/browse/FLINK-9819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-9819. Resolution: Fixed Fixed via 5a4bdf2c9fd1693ad3b90dbbd3bcb589ed15c101 > Create start up scripts for the StandaloneJobClusterEntryPoint > -- > > Key: FLINK-9819 > URL: https://issues.apache.org/jira/browse/FLINK-9819 > Project: Flink > Issue Type: New Feature > Components: Startup Shell Scripts >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > In order to start the {{StandaloneJobClusterEntryPoint}} we need start up > scripts in {{flink-dist}}. We should extend the {{flink-daemon.sh}} and the > {{flink-console.sh}} scripts to support this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9822) Add Dockerfile for StandaloneJobClusterEntryPoint image
[ https://issues.apache.org/jira/browse/FLINK-9822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-9822. Resolution: Fixed Fixed via 56e5381cb7aba01f1d7ecfa11e4be7f505a35baf > Add Dockerfile for StandaloneJobClusterEntryPoint image > --- > > Key: FLINK-9822 > URL: https://issues.apache.org/jira/browse/FLINK-9822 > Project: Flink > Issue Type: New Feature > Components: Docker >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Add a {{Dockerfile}} to create an image which contains the > {{StandaloneJobClusterEntryPoint}} and a specified user code jar. The > entrypoint of this image should start the {{StandaloneJobClusterEntryPoint}} > with the added user code jar. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9488) Create common entry point for master and workers
[ https://issues.apache.org/jira/browse/FLINK-9488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544050#comment-16544050 ] ASF GitHub Bot commented on FLINK-9488: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6315 > Create common entry point for master and workers > > > Key: FLINK-9488 > URL: https://issues.apache.org/jira/browse/FLINK-9488 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > To make the container setup easier, we should provide a single cluster entry > point which uses leader election to become either the master or a worker > which runs the {{TaskManager}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-9619) Always close the task manager connection when the container is completed in YarnResourceManager
[ https://issues.apache.org/jira/browse/FLINK-9619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-9619. -- Resolution: Fixed Fix Version/s: (was: 1.5.2) Fixed via c9ad0a07ef0339ced74057fc17800ca9ab7784c1 > Always close the task manager connection when the container is completed in > YarnResourceManager > --- > > Key: FLINK-9619 > URL: https://issues.apache.org/jira/browse/FLINK-9619 > Project: Flink > Issue Type: Improvement > Components: YARN >Affects Versions: 1.5.1, 1.6.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > We should always eagerly close the connection with task manager when the > container is completed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9619) Always close the task manager connection when the container is completed in YarnResourceManager
[ https://issues.apache.org/jira/browse/FLINK-9619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544053#comment-16544053 ] ASF GitHub Bot commented on FLINK-9619: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6185 > Always close the task manager connection when the container is completed in > YarnResourceManager > --- > > Key: FLINK-9619 > URL: https://issues.apache.org/jira/browse/FLINK-9619 > Project: Flink > Issue Type: Improvement > Components: YARN >Affects Versions: 1.5.1, 1.6.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > We should always eagerly close the connection with task manager when the > container is completed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6317: [FLINK-9820] Forward dynamic properties to Flink c...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6317 ---
[jira] [Commented] (FLINK-9821) Let dynamic properties overwrite configuration settings in TaskManagerRunner
[ https://issues.apache.org/jira/browse/FLINK-9821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544052#comment-16544052 ] ASF GitHub Bot commented on FLINK-9821: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6318 > Let dynamic properties overwrite configuration settings in TaskManagerRunner > > > Key: FLINK-9821 > URL: https://issues.apache.org/jira/browse/FLINK-9821 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Similar to FLINK-9820 we should also allow dynamic properties to overwrite > configuration values in the {{TaskManagerRunner}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6331: [FLINK-9701] [state] (follow up) Use StateTtlConfi...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6331 ---
[GitHub] flink pull request #6318: [FLINK-9821] Forward dynamic properties to configu...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6318 ---
[GitHub] flink pull request #6319: [FLINK-9822] Add Dockerfile for StandaloneJobClust...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6319 ---
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544056#comment-16544056 ] ASF GitHub Bot commented on FLINK-9143: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6283 > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9823) Add Kubernetes deployment files for standalone job cluster
[ https://issues.apache.org/jira/browse/FLINK-9823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-9823. Resolution: Fixed Fixed via 387a3bc198cfea016abd92953c7fce28e641cf67 > Add Kubernetes deployment files for standalone job cluster > -- > > Key: FLINK-9823 > URL: https://issues.apache.org/jira/browse/FLINK-9823 > Project: Flink > Issue Type: New Feature > Components: Kubernetes >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Similar to FLINK-9822, it would be helpful for the user to have example > Kubernetes deployment files to start a standalone job cluster. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6314: [FLINK-9818] Add cluster component command line pa...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6314 ---
[jira] [Commented] (FLINK-9818) Add cluster component command line parser
[ https://issues.apache.org/jira/browse/FLINK-9818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544048#comment-16544048 ] ASF GitHub Bot commented on FLINK-9818: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6314 > Add cluster component command line parser > - > > Key: FLINK-9818 > URL: https://issues.apache.org/jira/browse/FLINK-9818 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > In order to parse command line options for the cluster components > ({{TaskManagerRunner}}, {{ClusterEntrypoints}}), we should add a > {{CommandLineParser}} which supports the common command line options > ({{--configDir}}, {{--webui-port}} and dynamic properties which can override > configuration values). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6283 ---
[jira] [Commented] (FLINK-9820) Let dynamic properties overwrite configuration settings in ClusterEntrypoint
[ https://issues.apache.org/jira/browse/FLINK-9820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544051#comment-16544051 ] ASF GitHub Bot commented on FLINK-9820: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6317 > Let dynamic properties overwrite configuration settings in ClusterEntrypoint > > > Key: FLINK-9820 > URL: https://issues.apache.org/jira/browse/FLINK-9820 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The dynamic properties which are passed to the {{ClusterEntrypoint}} should > overwrite values in the loaded {{Configuration}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9823) Add Kubernetes deployment files for standalone job cluster
[ https://issues.apache.org/jira/browse/FLINK-9823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544046#comment-16544046 ] ASF GitHub Bot commented on FLINK-9823: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6320 > Add Kubernetes deployment files for standalone job cluster > -- > > Key: FLINK-9823 > URL: https://issues.apache.org/jira/browse/FLINK-9823 > Project: Flink > Issue Type: New Feature > Components: Kubernetes >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Similar to FLINK-9822, it would be helpful for the user to have example > Kubernetes deployment files to start a standalone job cluster. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6185: [FLINK-9619][YARN] Eagerly close the connection wi...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6185 ---
[GitHub] flink pull request #6315: [FLINK-9488] Add container entry point StandaloneJ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6315 ---
[jira] [Commented] (FLINK-9819) Create start up scripts for the StandaloneJobClusterEntryPoint
[ https://issues.apache.org/jira/browse/FLINK-9819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544049#comment-16544049 ] ASF GitHub Bot commented on FLINK-9819: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6316 > Create start up scripts for the StandaloneJobClusterEntryPoint > -- > > Key: FLINK-9819 > URL: https://issues.apache.org/jira/browse/FLINK-9819 > Project: Flink > Issue Type: New Feature > Components: Startup Shell Scripts >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > In order to start the {{StandaloneJobClusterEntryPoint}} we need start up > scripts in {{flink-dist}}. We should extend the {{flink-daemon.sh}} and the > {{flink-console.sh}} scripts to support this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors
[ https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544054#comment-16544054 ] ASF GitHub Bot commented on FLINK-9701: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6331 > Activate TTL in state descriptors > - > > Key: FLINK-9701 > URL: https://issues.apache.org/jira/browse/FLINK-9701 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6316: [FLINK-9819] Add startup scripts for standalone jo...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6316 ---
[jira] [Commented] (FLINK-9822) Add Dockerfile for StandaloneJobClusterEntryPoint image
[ https://issues.apache.org/jira/browse/FLINK-9822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544055#comment-16544055 ] ASF GitHub Bot commented on FLINK-9822: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6319 > Add Dockerfile for StandaloneJobClusterEntryPoint image > --- > > Key: FLINK-9822 > URL: https://issues.apache.org/jira/browse/FLINK-9822 > Project: Flink > Issue Type: New Feature > Components: Docker >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Add a {{Dockerfile}} to create an image which contains the > {{StandaloneJobClusterEntryPoint}} and a specified user code jar. The > entrypoint of this image should start the {{StandaloneJobClusterEntryPoint}} > with the added user code jar. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9619) Always close the task manager connection when the container is completed in YarnResourceManager
[ https://issues.apache.org/jira/browse/FLINK-9619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9619: -- Labels: pull-request-available (was: ) > Always close the task manager connection when the container is completed in > YarnResourceManager > --- > > Key: FLINK-9619 > URL: https://issues.apache.org/jira/browse/FLINK-9619 > Project: Flink > Issue Type: Improvement > Components: YARN >Affects Versions: 1.5.1, 1.6.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > We should always eagerly close the connection with task manager when the > container is completed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6320: [FLINK-9823] Add Kubernetes deployment ymls
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6320 ---
[jira] [Resolved] (FLINK-9701) Activate TTL in state descriptors
[ https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-9701. -- Resolution: Fixed Fix Version/s: 1.6.0 Fixed via 1632681e41cbc1092a6b4d47a58adfffba6af5d4 > Activate TTL in state descriptors > - > > Key: FLINK-9701 > URL: https://issues.apache.org/jira/browse/FLINK-9701 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-9143. -- Resolution: Fixed Fix Version/s: 1.6.0 Fixed via 57872d53c4584faace6dc8e4038ad1f2d068a453 > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-9143: Assignee: Dawid Wysakowicz (was: yuqi) > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-9701) Activate TTL in state descriptors
[ https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reopened FLINK-9701: -- > Activate TTL in state descriptors > - > > Key: FLINK-9701 > URL: https://issues.apache.org/jira/browse/FLINK-9701 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544040#comment-16544040 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202507057 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala --- @@ -52,12 +52,9 @@ class FlinkLogicalTableSourceScan( override def deriveRowType(): RelDataType = { val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] val streamingTable = table.unwrap(classOf[TableSourceSinkTable[_, _]]) match { - case t: TableSourceSinkTable[_, _] => t.tableSourceTableOpt match { -case Some(_: StreamTableSourceTable[_]) => true -case Some(_: BatchTableSourceTable[_]) => false -case _ => throw TableException(s"Unknown Table type ${t.getClass}.") - } - case t => throw TableException(s"Unknown Table type ${t.getClass}.") + case t: TableSourceSinkTable[_, _] if t.isStreamSourceTable => true + // null --- End diff -- This information is useful. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9762) CoreOptions.TMP_DIRS wrongly managed on Yarn
[ https://issues.apache.org/jira/browse/FLINK-9762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-9762: --- Assignee: (was: vinoyang) > CoreOptions.TMP_DIRS wrongly managed on Yarn > > > Key: FLINK-9762 > URL: https://issues.apache.org/jira/browse/FLINK-9762 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.5.0 >Reporter: Oleksandr Nitavskyi >Priority: Major > Labels: pull-request-available > Fix For: 1.5.2, 1.6.0 > > > The issue on Yarn is that it is impossible to have different LOCAL_DIRS on > JobManager and TaskManager, despite LOCAL_DIRS value depends on the container. > The issue is that CoreOptions.TMP_DIRS is configured to the default value > during JobManager initialization and added to the configuration object. When > TaskManager is launched the appropriate configuration object is cloned with > LOCAL_DIRS which makes sense only for Job Manager container. When YARN > container with TaskManager from his point of view CoreOptions.TMP_DIRS is > always equal either to path in flink.yml or to the or to the LOCAL_DIRS of > Job Manager (default behaviour). Is TaskManager’s container do not have an > access to another folders, that folders allocated by YARN TaskManager cannot > be started. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202507057 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala --- @@ -52,12 +52,9 @@ class FlinkLogicalTableSourceScan( override def deriveRowType(): RelDataType = { val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] val streamingTable = table.unwrap(classOf[TableSourceSinkTable[_, _]]) match { - case t: TableSourceSinkTable[_, _] => t.tableSourceTableOpt match { -case Some(_: StreamTableSourceTable[_]) => true -case Some(_: BatchTableSourceTable[_]) => false -case _ => throw TableException(s"Unknown Table type ${t.getClass}.") - } - case t => throw TableException(s"Unknown Table type ${t.getClass}.") + case t: TableSourceSinkTable[_, _] if t.isStreamSourceTable => true + // null --- End diff -- This information is useful. ---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506906 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala --- @@ -18,143 +18,358 @@ package org.apache.flink.table.factories -import java.util.{ServiceConfigurationError, ServiceLoader} +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} import org.apache.flink.table.api._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.util.Logging +import org.apache.flink.util.Preconditions import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable /** - * Unified interface to search for TableFactoryDiscoverable of provided type and properties. + * Unified interface to search for a [[TableFactory]] of provided type and properties. */ object TableFactoryService extends Logging { private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory]) - def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = { -find(clz, descriptor, null) + /** +* Finds a table factory of the given class and descriptor. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) + +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader) - : TableFactory = { + /** +* Finds a table factory of the given class, descriptor, and classloader. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) +Preconditions.checkNotNull(classLoader) -val properties = new DescriptorProperties() -descriptor.addProperties(properties) -find(clz, properties.asMap.asScala.toMap, classLoader) +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], properties: Map[String, String]): TableFactory = { -find(clz: Class[_], properties, null) + /** +* Finds a table factory of the given class and property map. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(propertyMap) + +findInternal(factoryClass, propertyMap, None) } - def find(clz: Class[_], properties: Map[String, String], - classLoader: ClassLoader): TableFactory = { + /** +* Finds a table factory of the given class, property map, and classloader. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T]( + factoryClass: Class[T], + propertyMap: JMap[String, String], + classLoader: ClassLoader) +: T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(propertyMap) +Preconditions.checkNotNull(class
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544038#comment-16544038 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506906 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala --- @@ -18,143 +18,358 @@ package org.apache.flink.table.factories -import java.util.{ServiceConfigurationError, ServiceLoader} +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} import org.apache.flink.table.api._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.util.Logging +import org.apache.flink.util.Preconditions import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable /** - * Unified interface to search for TableFactoryDiscoverable of provided type and properties. + * Unified interface to search for a [[TableFactory]] of provided type and properties. */ object TableFactoryService extends Logging { private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory]) - def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = { -find(clz, descriptor, null) + /** +* Finds a table factory of the given class and descriptor. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) + +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader) - : TableFactory = { + /** +* Finds a table factory of the given class, descriptor, and classloader. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) +Preconditions.checkNotNull(classLoader) -val properties = new DescriptorProperties() -descriptor.addProperties(properties) -find(clz, properties.asMap.asScala.toMap, classLoader) +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], properties: Map[String, String]): TableFactory = { -find(clz: Class[_], properties, null) + /** +* Finds a table factory of the given class and property map. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(propertyMap) + +findInternal(factoryClass, propertyMap, None) } - def find(clz: Class[_], properties: Map[String, String], - classLoader: ClassLoader): TableFactory = { + /** +* Finds a table factory of the given class, property map, and classloader. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T]( + factory
[jira] [Commented] (FLINK-9816) Support Netty SslEngine based on openSSL
[ https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544037#comment-16544037 ] ASF GitHub Bot commented on FLINK-9816: --- Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202506811 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -163,80 +163,157 @@ public static void setSSLVerifyHostname(Configuration sslConfig, SSLParameters s } /** -* Creates the SSL Context for the client if SSL is configured. +* SSL engine provider. +*/ + public enum SSLProvider { + JDK, + /** +* OpenSSL with fallback to JDK if not available. +*/ + OPENSSL; + + public static SSLProvider fromString(String value) { + Preconditions.checkNotNull(value); + if (value.equalsIgnoreCase("OPENSSL")) { + return OPENSSL; + } else if (value.equalsIgnoreCase("JDK")) { + return JDK; + } else { + throw new IllegalArgumentException("Unknown SSL provider: " + value); + } + } + } + + /** +* Instances needed to set up an SSL client connection. +*/ + public static class SSLClientTools { --- End diff -- the class name use singular looks better to me > Support Netty SslEngine based on openSSL > > > Key: FLINK-9816 > URL: https://issues.apache.org/jira/browse/FLINK-9816 > Project: Flink > Issue Type: Improvement > Components: Network >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > Since a while now, Netty does not only support the JDK's {{SSLEngine}} but > also implements one based on openSSL which, according to > https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly > faster. We should add support for using that engine instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9816) Support Netty SslEngine based on openSSL
[ https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544033#comment-16544033 ] ASF GitHub Bot commented on FLINK-9816: --- Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202506694 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -249,14 +326,40 @@ public static SSLContext createSSLServerContext(Configuration sslConfig) throws // Set up key manager factory to use the server key store KeyManagerFactory kmf = KeyManagerFactory.getInstance( - KeyManagerFactory.getDefaultAlgorithm()); + KeyManagerFactory.getDefaultAlgorithm()); kmf.init(ks, certPassword.toCharArray()); + return new SSLServerTools(sslProvider, sslProtocolVersion, sslCipherSuites, kmf); + } + + return null; + } + + /** +* Creates the SSL Context for the server if SSL is configured. +* +* @param sslConfig +*The application configuration --- End diff -- the description of the param and throws do not need linefeed > Support Netty SslEngine based on openSSL > > > Key: FLINK-9816 > URL: https://issues.apache.org/jira/browse/FLINK-9816 > Project: Flink > Issue Type: Improvement > Components: Network >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > Since a while now, Netty does not only support the JDK's {{SSLEngine}} but > also implements one based on openSSL which, according to > https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly > faster. We should add support for using that engine instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9816) Support Netty SslEngine based on openSSL
[ https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544031#comment-16544031 ] ASF GitHub Bot commented on FLINK-9816: --- Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202505334 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -249,14 +326,40 @@ public static SSLContext createSSLServerContext(Configuration sslConfig) throws // Set up key manager factory to use the server key store KeyManagerFactory kmf = KeyManagerFactory.getInstance( - KeyManagerFactory.getDefaultAlgorithm()); + KeyManagerFactory.getDefaultAlgorithm()); kmf.init(ks, certPassword.toCharArray()); + return new SSLServerTools(sslProvider, sslProtocolVersion, sslCipherSuites, kmf); + } + + return null; + } + + /** +* Creates the SSL Context for the server if SSL is configured. +* +* @param sslConfig +*The application configuration +* @return The SSLContext object which can be used by the ssl transport server +* Returns null if SSL is disabled +* @throws Exception +* Thrown if there is any misconfiguration +*/ + @Nullable + public static SSLContext createSSLServerContext(Configuration sslConfig) throws Exception { + --- End diff -- this empty line is useless, can be removed > Support Netty SslEngine based on openSSL > > > Key: FLINK-9816 > URL: https://issues.apache.org/jira/browse/FLINK-9816 > Project: Flink > Issue Type: Improvement > Components: Network >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > Since a while now, Netty does not only support the JDK's {{SSLEngine}} but > also implements one based on openSSL which, according to > https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly > faster. We should add support for using that engine instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9816) Support Netty SslEngine based on openSSL
[ https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544030#comment-16544030 ] ASF GitHub Bot commented on FLINK-9816: --- Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202505321 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java --- @@ -160,6 +160,7 @@ private Configuration createSslConfig() throws Exception { flinkConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password"); flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password"); +// flinkConfig.setString(SecurityOptions.SSL_PROVIDER, "OPENSSL"); --- End diff -- if this is a useless dead code, can be removed > Support Netty SslEngine based on openSSL > > > Key: FLINK-9816 > URL: https://issues.apache.org/jira/browse/FLINK-9816 > Project: Flink > Issue Type: Improvement > Components: Network >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > Since a while now, Netty does not only support the JDK's {{SSLEngine}} but > also implements one based on openSSL which, according to > https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly > faster. We should add support for using that engine instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9816) Support Netty SslEngine based on openSSL
[ https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544036#comment-16544036 ] ASF GitHub Bot commented on FLINK-9816: --- Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202506868 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -163,80 +163,157 @@ public static void setSSLVerifyHostname(Configuration sslConfig, SSLParameters s } /** -* Creates the SSL Context for the client if SSL is configured. +* SSL engine provider. +*/ + public enum SSLProvider { + JDK, + /** +* OpenSSL with fallback to JDK if not available. +*/ + OPENSSL; + + public static SSLProvider fromString(String value) { + Preconditions.checkNotNull(value); + if (value.equalsIgnoreCase("OPENSSL")) { --- End diff -- provide a constructor like `SSLProvider(String provider)` to give the enum's string representation looks better than hard code. > Support Netty SslEngine based on openSSL > > > Key: FLINK-9816 > URL: https://issues.apache.org/jira/browse/FLINK-9816 > Project: Flink > Issue Type: Improvement > Components: Network >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > Since a while now, Netty does not only support the JDK's {{SSLEngine}} but > also implements one based on openSSL which, according to > https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly > faster. We should add support for using that engine instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9816) Support Netty SslEngine based on openSSL
[ https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544034#comment-16544034 ] ASF GitHub Bot commented on FLINK-9816: --- Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202506769 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -163,80 +163,157 @@ public static void setSSLVerifyHostname(Configuration sslConfig, SSLParameters s } /** -* Creates the SSL Context for the client if SSL is configured. +* SSL engine provider. +*/ + public enum SSLProvider { + JDK, + /** +* OpenSSL with fallback to JDK if not available. +*/ + OPENSSL; + + public static SSLProvider fromString(String value) { + Preconditions.checkNotNull(value); + if (value.equalsIgnoreCase("OPENSSL")) { + return OPENSSL; + } else if (value.equalsIgnoreCase("JDK")) { + return JDK; + } else { + throw new IllegalArgumentException("Unknown SSL provider: " + value); + } + } + } + + /** +* Instances needed to set up an SSL client connection. +*/ + public static class SSLClientTools { + public final SSLProvider preferredSslProvider; + public final String sslProtocolVersion; + public final TrustManagerFactory trustManagerFactory; + + public SSLClientTools( + SSLProvider preferredSslProvider, + String sslProtocolVersion, + TrustManagerFactory trustManagerFactory) { + this.preferredSslProvider = preferredSslProvider; + this.sslProtocolVersion = sslProtocolVersion; + this.trustManagerFactory = trustManagerFactory; + } + } + + /** +* Creates necessary helper objects to use for creating an SSL Context for the client if SSL is +* configured. * * @param sslConfig *The application configuration -* @return The SSLContext object which can be used by the ssl transport client -* Returns null if SSL is disabled +* @return The SSLClientTools object which can be used for creating some SSL context object; +* returns null if SSL is disabled. * @throws Exception * Thrown if there is any misconfiguration */ @Nullable - public static SSLContext createSSLClientContext(Configuration sslConfig) throws Exception { - + public static SSLClientTools createSSLClientTools(Configuration sslConfig) throws Exception { Preconditions.checkNotNull(sslConfig); - SSLContext clientSSLContext = null; if (getSSLEnabled(sslConfig)) { LOG.debug("Creating client SSL context from configuration"); String trustStoreFilePath = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE); String trustStorePassword = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD); String sslProtocolVersion = sslConfig.getString(SecurityOptions.SSL_PROTOCOL); + SSLProvider sslProvider = SSLProvider.fromString(sslConfig.getString(SecurityOptions.SSL_PROVIDER)); Preconditions.checkNotNull(trustStoreFilePath, SecurityOptions.SSL_TRUSTSTORE.key() + " was not configured."); Preconditions.checkNotNull(trustStorePassword, SecurityOptions.SSL_TRUSTSTORE_PASSWORD.key() + " was not configured."); KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); - FileInputStream trustStoreFile = null; - try { - trustStoreFile = new FileInputStream(new File(trustStoreFilePath)); + try (FileInputStream trustStoreFile = new FileInputStream(new File(trustStoreFilePath))) { trustStore.load(trustStoreFile, trustStorePassword.toCharArray()); - } finally { - if (trustStoreFile != null) { - trustStoreFile.close(); - } } TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(
[jira] [Commented] (FLINK-9816) Support Netty SslEngine based on openSSL
[ https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544032#comment-16544032 ] ASF GitHub Bot commented on FLINK-9816: --- Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202506709 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -163,80 +163,157 @@ public static void setSSLVerifyHostname(Configuration sslConfig, SSLParameters s } /** -* Creates the SSL Context for the client if SSL is configured. +* SSL engine provider. +*/ + public enum SSLProvider { + JDK, + /** +* OpenSSL with fallback to JDK if not available. +*/ + OPENSSL; + + public static SSLProvider fromString(String value) { + Preconditions.checkNotNull(value); + if (value.equalsIgnoreCase("OPENSSL")) { + return OPENSSL; + } else if (value.equalsIgnoreCase("JDK")) { + return JDK; + } else { + throw new IllegalArgumentException("Unknown SSL provider: " + value); + } + } + } + + /** +* Instances needed to set up an SSL client connection. +*/ + public static class SSLClientTools { + public final SSLProvider preferredSslProvider; + public final String sslProtocolVersion; + public final TrustManagerFactory trustManagerFactory; + + public SSLClientTools( + SSLProvider preferredSslProvider, + String sslProtocolVersion, + TrustManagerFactory trustManagerFactory) { + this.preferredSslProvider = preferredSslProvider; + this.sslProtocolVersion = sslProtocolVersion; + this.trustManagerFactory = trustManagerFactory; + } + } + + /** +* Creates necessary helper objects to use for creating an SSL Context for the client if SSL is +* configured. * * @param sslConfig *The application configuration -* @return The SSLContext object which can be used by the ssl transport client -* Returns null if SSL is disabled +* @return The SSLClientTools object which can be used for creating some SSL context object; +* returns null if SSL is disabled. * @throws Exception * Thrown if there is any misconfiguration */ @Nullable - public static SSLContext createSSLClientContext(Configuration sslConfig) throws Exception { - + public static SSLClientTools createSSLClientTools(Configuration sslConfig) throws Exception { Preconditions.checkNotNull(sslConfig); - SSLContext clientSSLContext = null; if (getSSLEnabled(sslConfig)) { LOG.debug("Creating client SSL context from configuration"); String trustStoreFilePath = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE); String trustStorePassword = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD); String sslProtocolVersion = sslConfig.getString(SecurityOptions.SSL_PROTOCOL); + SSLProvider sslProvider = SSLProvider.fromString(sslConfig.getString(SecurityOptions.SSL_PROVIDER)); Preconditions.checkNotNull(trustStoreFilePath, SecurityOptions.SSL_TRUSTSTORE.key() + " was not configured."); Preconditions.checkNotNull(trustStorePassword, SecurityOptions.SSL_TRUSTSTORE_PASSWORD.key() + " was not configured."); KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); - FileInputStream trustStoreFile = null; - try { - trustStoreFile = new FileInputStream(new File(trustStoreFilePath)); + try (FileInputStream trustStoreFile = new FileInputStream(new File(trustStoreFilePath))) { trustStore.load(trustStoreFile, trustStorePassword.toCharArray()); - } finally { - if (trustStoreFile != null) { - trustStoreFile.close(); - } } TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(
[jira] [Commented] (FLINK-9816) Support Netty SslEngine based on openSSL
[ https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544035#comment-16544035 ] ASF GitHub Bot commented on FLINK-9816: --- Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202506820 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -163,80 +163,157 @@ public static void setSSLVerifyHostname(Configuration sslConfig, SSLParameters s } /** -* Creates the SSL Context for the client if SSL is configured. +* SSL engine provider. +*/ + public enum SSLProvider { + JDK, + /** +* OpenSSL with fallback to JDK if not available. +*/ + OPENSSL; + + public static SSLProvider fromString(String value) { + Preconditions.checkNotNull(value); + if (value.equalsIgnoreCase("OPENSSL")) { + return OPENSSL; + } else if (value.equalsIgnoreCase("JDK")) { + return JDK; + } else { + throw new IllegalArgumentException("Unknown SSL provider: " + value); + } + } + } + + /** +* Instances needed to set up an SSL client connection. +*/ + public static class SSLClientTools { + public final SSLProvider preferredSslProvider; + public final String sslProtocolVersion; + public final TrustManagerFactory trustManagerFactory; --- End diff -- mark these fields as `private` as provide `getter/setter` looks better to me > Support Netty SslEngine based on openSSL > > > Key: FLINK-9816 > URL: https://issues.apache.org/jira/browse/FLINK-9816 > Project: Flink > Issue Type: Improvement > Components: Network >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > Since a while now, Netty does not only support the JDK's {{SSLEngine}} but > also implements one based on openSSL which, according to > https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly > faster. We should add support for using that engine instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202506769 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -163,80 +163,157 @@ public static void setSSLVerifyHostname(Configuration sslConfig, SSLParameters s } /** -* Creates the SSL Context for the client if SSL is configured. +* SSL engine provider. +*/ + public enum SSLProvider { + JDK, + /** +* OpenSSL with fallback to JDK if not available. +*/ + OPENSSL; + + public static SSLProvider fromString(String value) { + Preconditions.checkNotNull(value); + if (value.equalsIgnoreCase("OPENSSL")) { + return OPENSSL; + } else if (value.equalsIgnoreCase("JDK")) { + return JDK; + } else { + throw new IllegalArgumentException("Unknown SSL provider: " + value); + } + } + } + + /** +* Instances needed to set up an SSL client connection. +*/ + public static class SSLClientTools { + public final SSLProvider preferredSslProvider; + public final String sslProtocolVersion; + public final TrustManagerFactory trustManagerFactory; + + public SSLClientTools( + SSLProvider preferredSslProvider, + String sslProtocolVersion, + TrustManagerFactory trustManagerFactory) { + this.preferredSslProvider = preferredSslProvider; + this.sslProtocolVersion = sslProtocolVersion; + this.trustManagerFactory = trustManagerFactory; + } + } + + /** +* Creates necessary helper objects to use for creating an SSL Context for the client if SSL is +* configured. * * @param sslConfig *The application configuration -* @return The SSLContext object which can be used by the ssl transport client -* Returns null if SSL is disabled +* @return The SSLClientTools object which can be used for creating some SSL context object; +* returns null if SSL is disabled. * @throws Exception * Thrown if there is any misconfiguration */ @Nullable - public static SSLContext createSSLClientContext(Configuration sslConfig) throws Exception { - + public static SSLClientTools createSSLClientTools(Configuration sslConfig) throws Exception { Preconditions.checkNotNull(sslConfig); - SSLContext clientSSLContext = null; if (getSSLEnabled(sslConfig)) { LOG.debug("Creating client SSL context from configuration"); String trustStoreFilePath = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE); String trustStorePassword = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD); String sslProtocolVersion = sslConfig.getString(SecurityOptions.SSL_PROTOCOL); + SSLProvider sslProvider = SSLProvider.fromString(sslConfig.getString(SecurityOptions.SSL_PROVIDER)); Preconditions.checkNotNull(trustStoreFilePath, SecurityOptions.SSL_TRUSTSTORE.key() + " was not configured."); Preconditions.checkNotNull(trustStorePassword, SecurityOptions.SSL_TRUSTSTORE_PASSWORD.key() + " was not configured."); KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); - FileInputStream trustStoreFile = null; - try { - trustStoreFile = new FileInputStream(new File(trustStoreFilePath)); + try (FileInputStream trustStoreFile = new FileInputStream(new File(trustStoreFilePath))) { trustStore.load(trustStoreFile, trustStorePassword.toCharArray()); - } finally { - if (trustStoreFile != null) { - trustStoreFile.close(); - } } TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance( TrustManagerFactory.getDefaultAlgorithm()); trustManagerFactory.init(trustStore); - clientSSLContext = SSLContext.getInstance(sslProtocolVersion); - clientSSLContext
[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202506820 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -163,80 +163,157 @@ public static void setSSLVerifyHostname(Configuration sslConfig, SSLParameters s } /** -* Creates the SSL Context for the client if SSL is configured. +* SSL engine provider. +*/ + public enum SSLProvider { + JDK, + /** +* OpenSSL with fallback to JDK if not available. +*/ + OPENSSL; + + public static SSLProvider fromString(String value) { + Preconditions.checkNotNull(value); + if (value.equalsIgnoreCase("OPENSSL")) { + return OPENSSL; + } else if (value.equalsIgnoreCase("JDK")) { + return JDK; + } else { + throw new IllegalArgumentException("Unknown SSL provider: " + value); + } + } + } + + /** +* Instances needed to set up an SSL client connection. +*/ + public static class SSLClientTools { + public final SSLProvider preferredSslProvider; + public final String sslProtocolVersion; + public final TrustManagerFactory trustManagerFactory; --- End diff -- mark these fields as `private` as provide `getter/setter` looks better to me ---
[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202505334 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -249,14 +326,40 @@ public static SSLContext createSSLServerContext(Configuration sslConfig) throws // Set up key manager factory to use the server key store KeyManagerFactory kmf = KeyManagerFactory.getInstance( - KeyManagerFactory.getDefaultAlgorithm()); + KeyManagerFactory.getDefaultAlgorithm()); kmf.init(ks, certPassword.toCharArray()); + return new SSLServerTools(sslProvider, sslProtocolVersion, sslCipherSuites, kmf); + } + + return null; + } + + /** +* Creates the SSL Context for the server if SSL is configured. +* +* @param sslConfig +*The application configuration +* @return The SSLContext object which can be used by the ssl transport server +* Returns null if SSL is disabled +* @throws Exception +* Thrown if there is any misconfiguration +*/ + @Nullable + public static SSLContext createSSLServerContext(Configuration sslConfig) throws Exception { + --- End diff -- this empty line is useless, can be removed ---
[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202506868 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -163,80 +163,157 @@ public static void setSSLVerifyHostname(Configuration sslConfig, SSLParameters s } /** -* Creates the SSL Context for the client if SSL is configured. +* SSL engine provider. +*/ + public enum SSLProvider { + JDK, + /** +* OpenSSL with fallback to JDK if not available. +*/ + OPENSSL; + + public static SSLProvider fromString(String value) { + Preconditions.checkNotNull(value); + if (value.equalsIgnoreCase("OPENSSL")) { --- End diff -- provide a constructor like `SSLProvider(String provider)` to give the enum's string representation looks better than hard code. ---
[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202506811 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -163,80 +163,157 @@ public static void setSSLVerifyHostname(Configuration sslConfig, SSLParameters s } /** -* Creates the SSL Context for the client if SSL is configured. +* SSL engine provider. +*/ + public enum SSLProvider { + JDK, + /** +* OpenSSL with fallback to JDK if not available. +*/ + OPENSSL; + + public static SSLProvider fromString(String value) { + Preconditions.checkNotNull(value); + if (value.equalsIgnoreCase("OPENSSL")) { + return OPENSSL; + } else if (value.equalsIgnoreCase("JDK")) { + return JDK; + } else { + throw new IllegalArgumentException("Unknown SSL provider: " + value); + } + } + } + + /** +* Instances needed to set up an SSL client connection. +*/ + public static class SSLClientTools { --- End diff -- the class name use singular looks better to me ---
[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202506709 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -163,80 +163,157 @@ public static void setSSLVerifyHostname(Configuration sslConfig, SSLParameters s } /** -* Creates the SSL Context for the client if SSL is configured. +* SSL engine provider. +*/ + public enum SSLProvider { + JDK, + /** +* OpenSSL with fallback to JDK if not available. +*/ + OPENSSL; + + public static SSLProvider fromString(String value) { + Preconditions.checkNotNull(value); + if (value.equalsIgnoreCase("OPENSSL")) { + return OPENSSL; + } else if (value.equalsIgnoreCase("JDK")) { + return JDK; + } else { + throw new IllegalArgumentException("Unknown SSL provider: " + value); + } + } + } + + /** +* Instances needed to set up an SSL client connection. +*/ + public static class SSLClientTools { + public final SSLProvider preferredSslProvider; + public final String sslProtocolVersion; + public final TrustManagerFactory trustManagerFactory; + + public SSLClientTools( + SSLProvider preferredSslProvider, + String sslProtocolVersion, + TrustManagerFactory trustManagerFactory) { + this.preferredSslProvider = preferredSslProvider; + this.sslProtocolVersion = sslProtocolVersion; + this.trustManagerFactory = trustManagerFactory; + } + } + + /** +* Creates necessary helper objects to use for creating an SSL Context for the client if SSL is +* configured. * * @param sslConfig *The application configuration -* @return The SSLContext object which can be used by the ssl transport client -* Returns null if SSL is disabled +* @return The SSLClientTools object which can be used for creating some SSL context object; +* returns null if SSL is disabled. * @throws Exception * Thrown if there is any misconfiguration */ @Nullable - public static SSLContext createSSLClientContext(Configuration sslConfig) throws Exception { - + public static SSLClientTools createSSLClientTools(Configuration sslConfig) throws Exception { Preconditions.checkNotNull(sslConfig); - SSLContext clientSSLContext = null; if (getSSLEnabled(sslConfig)) { LOG.debug("Creating client SSL context from configuration"); String trustStoreFilePath = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE); String trustStorePassword = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD); String sslProtocolVersion = sslConfig.getString(SecurityOptions.SSL_PROTOCOL); + SSLProvider sslProvider = SSLProvider.fromString(sslConfig.getString(SecurityOptions.SSL_PROVIDER)); Preconditions.checkNotNull(trustStoreFilePath, SecurityOptions.SSL_TRUSTSTORE.key() + " was not configured."); Preconditions.checkNotNull(trustStorePassword, SecurityOptions.SSL_TRUSTSTORE_PASSWORD.key() + " was not configured."); KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); - FileInputStream trustStoreFile = null; - try { - trustStoreFile = new FileInputStream(new File(trustStoreFilePath)); + try (FileInputStream trustStoreFile = new FileInputStream(new File(trustStoreFilePath))) { trustStore.load(trustStoreFile, trustStorePassword.toCharArray()); - } finally { - if (trustStoreFile != null) { - trustStoreFile.close(); - } } TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance( TrustManagerFactory.getDefaultAlgorithm()); trustManagerFactory.init(trustStore); - clientSSLContext = SSLContext.getInstance(sslProtocolVersion); - clientSSLContext
[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202506694 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -249,14 +326,40 @@ public static SSLContext createSSLServerContext(Configuration sslConfig) throws // Set up key manager factory to use the server key store KeyManagerFactory kmf = KeyManagerFactory.getInstance( - KeyManagerFactory.getDefaultAlgorithm()); + KeyManagerFactory.getDefaultAlgorithm()); kmf.init(ks, certPassword.toCharArray()); + return new SSLServerTools(sslProvider, sslProtocolVersion, sslCipherSuites, kmf); + } + + return null; + } + + /** +* Creates the SSL Context for the server if SSL is configured. +* +* @param sslConfig +*The application configuration --- End diff -- the description of the param and throws do not need linefeed ---
[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202505321 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java --- @@ -160,6 +160,7 @@ private Configuration createSslConfig() throws Exception { flinkConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password"); flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password"); +// flinkConfig.setString(SecurityOptions.SSL_PROVIDER, "OPENSSL"); --- End diff -- if this is a useless dead code, can be removed ---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506861 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala --- @@ -18,143 +18,358 @@ package org.apache.flink.table.factories -import java.util.{ServiceConfigurationError, ServiceLoader} +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} import org.apache.flink.table.api._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.util.Logging +import org.apache.flink.util.Preconditions import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable /** - * Unified interface to search for TableFactoryDiscoverable of provided type and properties. + * Unified interface to search for a [[TableFactory]] of provided type and properties. */ object TableFactoryService extends Logging { private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory]) - def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = { -find(clz, descriptor, null) + /** +* Finds a table factory of the given class and descriptor. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) + +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader) - : TableFactory = { + /** +* Finds a table factory of the given class, descriptor, and classloader. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) +Preconditions.checkNotNull(classLoader) -val properties = new DescriptorProperties() -descriptor.addProperties(properties) -find(clz, properties.asMap.asScala.toMap, classLoader) +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], properties: Map[String, String]): TableFactory = { -find(clz: Class[_], properties, null) + /** +* Finds a table factory of the given class and property map. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(propertyMap) + +findInternal(factoryClass, propertyMap, None) } - def find(clz: Class[_], properties: Map[String, String], - classLoader: ClassLoader): TableFactory = { + /** +* Finds a table factory of the given class, property map, and classloader. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T]( + factoryClass: Class[T], + propertyMap: JMap[String, String], + classLoader: ClassLoader) +: T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(propertyMap) +Preconditions.checkNotNull(class
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544028#comment-16544028 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506861 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala --- @@ -18,143 +18,358 @@ package org.apache.flink.table.factories -import java.util.{ServiceConfigurationError, ServiceLoader} +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} import org.apache.flink.table.api._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.util.Logging +import org.apache.flink.util.Preconditions import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable /** - * Unified interface to search for TableFactoryDiscoverable of provided type and properties. + * Unified interface to search for a [[TableFactory]] of provided type and properties. */ object TableFactoryService extends Logging { private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory]) - def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = { -find(clz, descriptor, null) + /** +* Finds a table factory of the given class and descriptor. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) + +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader) - : TableFactory = { + /** +* Finds a table factory of the given class, descriptor, and classloader. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) +Preconditions.checkNotNull(classLoader) -val properties = new DescriptorProperties() -descriptor.addProperties(properties) -find(clz, properties.asMap.asScala.toMap, classLoader) +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], properties: Map[String, String]): TableFactory = { -find(clz: Class[_], properties, null) + /** +* Finds a table factory of the given class and property map. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(propertyMap) + +findInternal(factoryClass, propertyMap, None) } - def find(clz: Class[_], properties: Map[String, String], - classLoader: ClassLoader): TableFactory = { + /** +* Finds a table factory of the given class, property map, and classloader. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T]( + factory
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544024#comment-16544024 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506766 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala --- @@ -18,143 +18,358 @@ package org.apache.flink.table.factories -import java.util.{ServiceConfigurationError, ServiceLoader} +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} import org.apache.flink.table.api._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.util.Logging +import org.apache.flink.util.Preconditions import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable /** - * Unified interface to search for TableFactoryDiscoverable of provided type and properties. + * Unified interface to search for a [[TableFactory]] of provided type and properties. */ object TableFactoryService extends Logging { private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory]) - def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = { -find(clz, descriptor, null) + /** +* Finds a table factory of the given class and descriptor. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) + +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader) - : TableFactory = { + /** +* Finds a table factory of the given class, descriptor, and classloader. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) +Preconditions.checkNotNull(classLoader) -val properties = new DescriptorProperties() -descriptor.addProperties(properties) -find(clz, properties.asMap.asScala.toMap, classLoader) +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], properties: Map[String, String]): TableFactory = { -find(clz: Class[_], properties, null) + /** +* Finds a table factory of the given class and property map. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(propertyMap) + +findInternal(factoryClass, propertyMap, None) } - def find(clz: Class[_], properties: Map[String, String], - classLoader: ClassLoader): TableFactory = { + /** +* Finds a table factory of the given class, property map, and classloader. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T]( + factory
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506766 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala --- @@ -18,143 +18,358 @@ package org.apache.flink.table.factories -import java.util.{ServiceConfigurationError, ServiceLoader} +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} import org.apache.flink.table.api._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.util.Logging +import org.apache.flink.util.Preconditions import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable /** - * Unified interface to search for TableFactoryDiscoverable of provided type and properties. + * Unified interface to search for a [[TableFactory]] of provided type and properties. */ object TableFactoryService extends Logging { private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory]) - def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = { -find(clz, descriptor, null) + /** +* Finds a table factory of the given class and descriptor. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) + +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader) - : TableFactory = { + /** +* Finds a table factory of the given class, descriptor, and classloader. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) +Preconditions.checkNotNull(classLoader) -val properties = new DescriptorProperties() -descriptor.addProperties(properties) -find(clz, properties.asMap.asScala.toMap, classLoader) +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], properties: Map[String, String]): TableFactory = { -find(clz: Class[_], properties, null) + /** +* Finds a table factory of the given class and property map. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(propertyMap) + +findInternal(factoryClass, propertyMap, None) } - def find(clz: Class[_], properties: Map[String, String], - classLoader: ClassLoader): TableFactory = { + /** +* Finds a table factory of the given class, property map, and classloader. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T]( + factoryClass: Class[T], + propertyMap: JMap[String, String], + classLoader: ClassLoader) +: T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(propertyMap) +Preconditions.checkNotNull(class
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544023#comment-16544023 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506740 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala --- @@ -18,143 +18,358 @@ package org.apache.flink.table.factories -import java.util.{ServiceConfigurationError, ServiceLoader} +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} import org.apache.flink.table.api._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.util.Logging +import org.apache.flink.util.Preconditions import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable /** - * Unified interface to search for TableFactoryDiscoverable of provided type and properties. + * Unified interface to search for a [[TableFactory]] of provided type and properties. */ object TableFactoryService extends Logging { private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory]) - def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = { -find(clz, descriptor, null) + /** +* Finds a table factory of the given class and descriptor. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +Preconditions.checkNotNull(factoryClass) --- End diff -- The variable is only passed one time. The internal method is not checking for null gain. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506740 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala --- @@ -18,143 +18,358 @@ package org.apache.flink.table.factories -import java.util.{ServiceConfigurationError, ServiceLoader} +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} import org.apache.flink.table.api._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.util.Logging +import org.apache.flink.util.Preconditions import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable /** - * Unified interface to search for TableFactoryDiscoverable of provided type and properties. + * Unified interface to search for a [[TableFactory]] of provided type and properties. */ object TableFactoryService extends Logging { private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory]) - def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = { -find(clz, descriptor, null) + /** +* Finds a table factory of the given class and descriptor. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +Preconditions.checkNotNull(factoryClass) --- End diff -- The variable is only passed one time. The internal method is not checking for null gain. ---
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544019#comment-16544019 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506661 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala --- @@ -143,118 +143,82 @@ case class CatalogAlreadyExistException( } /** - * Exception for not finding a [[TableFormatFactory]] for the - * given properties. + * Exception for not finding a [[TableFactory]] for the given properties. * * @param message message that indicates the current matching step * @param factoryClass required factory class - * @param formatFactories all found factories - * @param properties properties that describe the table format + * @param factories all found factories + * @param properties properties that describe the configuration * @param cause the cause */ -case class NoMatchingTableFormatException( +case class NoMatchingTableFactoryException( message: String, factoryClass: Class[_], - formatFactories: Seq[TableFormatFactory[_]], + factories: Seq[TableFactory], properties: Map[String, String], cause: Throwable) extends RuntimeException( --- End diff -- So far we don't have an inheritance of exceptions. Case classes don't support that in Scala so we would need to convert them. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506661 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala --- @@ -143,118 +143,82 @@ case class CatalogAlreadyExistException( } /** - * Exception for not finding a [[TableFormatFactory]] for the - * given properties. + * Exception for not finding a [[TableFactory]] for the given properties. * * @param message message that indicates the current matching step * @param factoryClass required factory class - * @param formatFactories all found factories - * @param properties properties that describe the table format + * @param factories all found factories + * @param properties properties that describe the configuration * @param cause the cause */ -case class NoMatchingTableFormatException( +case class NoMatchingTableFactoryException( message: String, factoryClass: Class[_], - formatFactories: Seq[TableFormatFactory[_]], + factories: Seq[TableFactory], properties: Map[String, String], cause: Throwable) extends RuntimeException( --- End diff -- So far we don't have an inheritance of exceptions. Case classes don't support that in Scala so we would need to convert them. ---
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544018#comment-16544018 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506625 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala --- @@ -16,42 +16,17 @@ * limitations under the License. */ -package org.apache.flink.table.formats +package org.apache.flink.table.factories import java.util -import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema} - /** - * A factory to create different table format instances. This factory is used with Java's Service - * Provider Interfaces (SPI) for discovering. A factory is called with a set of normalized - * properties that describe the desired format. The factory allows for matching to the given set of - * properties. See also [[SerializationSchemaFactory]] and [[DeserializationSchemaFactory]] for - * creating configured instances of format classes accordingly. - * - * Classes that implement this interface need to be added to the - * "META_INF/services/org.apache.flink.table.formats.TableFormatFactory' file of a JAR file in - * the current classpath to be found. + * A factory to create configured table format instances based on string-based properties. See + * also [[TableFactory]] for more information. * * @tparam T record type that the format produces or consumes */ -trait TableFormatFactory[T] { - - /** -* Specifies the context that this factory has been implemented for. The framework guarantees -* to only use the factory if the specified set of properties and values are met. -* -* Typical properties might be: -* - format.type -* - format.version -* -* Specified property versions allow the framework to provide backwards compatible properties -* in case of string format changes: -* - format.property-version -* -* An empty context means that the factory matches for all requests. -*/ - def requiredContext(): util.Map[String, String] +trait TableFormatFactory[T] extends TableFactory { --- End diff -- Because the Java comment explains the specific situation how supported format properties are handled. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506625 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala --- @@ -16,42 +16,17 @@ * limitations under the License. */ -package org.apache.flink.table.formats +package org.apache.flink.table.factories import java.util -import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema} - /** - * A factory to create different table format instances. This factory is used with Java's Service - * Provider Interfaces (SPI) for discovering. A factory is called with a set of normalized - * properties that describe the desired format. The factory allows for matching to the given set of - * properties. See also [[SerializationSchemaFactory]] and [[DeserializationSchemaFactory]] for - * creating configured instances of format classes accordingly. - * - * Classes that implement this interface need to be added to the - * "META_INF/services/org.apache.flink.table.formats.TableFormatFactory' file of a JAR file in - * the current classpath to be found. + * A factory to create configured table format instances based on string-based properties. See + * also [[TableFactory]] for more information. * * @tparam T record type that the format produces or consumes */ -trait TableFormatFactory[T] { - - /** -* Specifies the context that this factory has been implemented for. The framework guarantees -* to only use the factory if the specified set of properties and values are met. -* -* Typical properties might be: -* - format.type -* - format.version -* -* Specified property versions allow the framework to provide backwards compatible properties -* in case of string format changes: -* - format.property-version -* -* An empty context means that the factory matches for all requests. -*/ - def requiredContext(): util.Map[String, String] +trait TableFormatFactory[T] extends TableFactory { --- End diff -- Because the Java comment explains the specific situation how supported format properties are handled. ---
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544017#comment-16544017 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506574 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala --- @@ -44,32 +43,27 @@ object ExternalTableSourceUtil extends Logging { val properties = new DescriptorProperties() externalCatalogTable.addProperties(properties) val javaMap = properties.asMap -val source = TableFactoryService.find(classOf[TableSourceFactory[_]], javaMap) - .asInstanceOf[TableSourceFactory[_]] - .createTableSource(javaMap) tableEnv match { // check for a batch table source in this batch environment case _: BatchTableEnvironment => -source match { - case bts: BatchTableSource[_] => -new TableSourceSinkTable(Some(new BatchTableSourceTable( - bts, - new FlinkStatistic(externalCatalogTable.getTableStats))), None) - case _ => throw new TableException( -s"Found table source '${source.getClass.getCanonicalName}' is not applicable " + - s"in a batch environment.") -} +val source = TableFactoryService --- End diff -- Usually it is very uncommon to define both a batch and streaming source in the same factory. Your proposed change would require all future sources to implement a check for the environment before which is unnecessary in 80% of the cases. Separating by environment is a concept that can be find throughout the entire `flink-table` module because both sources/sinks behave quite different. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506574 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala --- @@ -44,32 +43,27 @@ object ExternalTableSourceUtil extends Logging { val properties = new DescriptorProperties() externalCatalogTable.addProperties(properties) val javaMap = properties.asMap -val source = TableFactoryService.find(classOf[TableSourceFactory[_]], javaMap) - .asInstanceOf[TableSourceFactory[_]] - .createTableSource(javaMap) tableEnv match { // check for a batch table source in this batch environment case _: BatchTableEnvironment => -source match { - case bts: BatchTableSource[_] => -new TableSourceSinkTable(Some(new BatchTableSourceTable( - bts, - new FlinkStatistic(externalCatalogTable.getTableStats))), None) - case _ => throw new TableException( -s"Found table source '${source.getClass.getCanonicalName}' is not applicable " + - s"in a batch environment.") -} +val source = TableFactoryService --- End diff -- Usually it is very uncommon to define both a batch and streaming source in the same factory. Your proposed change would require all future sources to implement a check for the environment before which is unnecessary in 80% of the cases. Separating by environment is a concept that can be find throughout the entire `flink-table` module because both sources/sinks behave quite different. ---
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544016#comment-16544016 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506512 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactoryTest.java --- @@ -21,8 +21,12 @@ import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_08; /** - * Tests for {@link Kafka08JsonTableSourceFactory}. + * Tests for legacy Kafka08JsonTableSourceFactory. + * + * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we + * drop support for format-specific table sources. */ +@Deprecated --- End diff -- No but we forgot it in the previous commit. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506512 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactoryTest.java --- @@ -21,8 +21,12 @@ import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_08; /** - * Tests for {@link Kafka08JsonTableSourceFactory}. + * Tests for legacy Kafka08JsonTableSourceFactory. + * + * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we + * drop support for format-specific table sources. */ +@Deprecated --- End diff -- No but we forgot it in the previous commit. ---
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544015#comment-16544015 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506494 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java --- @@ -51,16 +51,10 @@ public void addProperties(DescriptorProperties properties) { } public Source toSource() { - final Map newProperties = new HashMap<>(properties); - newProperties.replace(TableDescriptorValidator.TABLE_TYPE(), - TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE()); --- End diff -- The table type is a concept of the SQL Client and should not be part of the table descriptor. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506494 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java --- @@ -51,16 +51,10 @@ public void addProperties(DescriptorProperties properties) { } public Source toSource() { - final Map newProperties = new HashMap<>(properties); - newProperties.replace(TableDescriptorValidator.TABLE_TYPE(), - TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE()); --- End diff -- The table type is a concept of the SQL Client and should not be part of the table descriptor. ---
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544007#comment-16544007 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506193 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala --- @@ -16,14 +16,14 @@ * limitations under the License. */ -package org.apache.flink.table.connectors +package org.apache.flink.table.factories import java.util /** * Common trait for all properties-based discoverable table factories. */ -trait DiscoverableTableFactory { +trait TableFactory { --- End diff -- Actually, I would like the very generic name `Factory` but since we have to add some prefix to make it unique in the project, I named it `TableFactory` because we prefix everything in this module with `Table`. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506193 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala --- @@ -16,14 +16,14 @@ * limitations under the License. */ -package org.apache.flink.table.connectors +package org.apache.flink.table.factories import java.util /** * Common trait for all properties-based discoverable table factories. */ -trait DiscoverableTableFactory { +trait TableFactory { --- End diff -- Actually, I would like the very generic name `Factory` but since we have to add some prefix to make it unique in the project, I named it `TableFactory` because we prefix everything in this module with `Table`. ---
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544004#comment-16544004 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506126 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -54,51 +56,105 @@ * override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}. */ @Internal -public abstract class KafkaTableSource - implements StreamTableSource, DefinedProctimeAttribute, DefinedRowtimeAttributes { +public abstract class KafkaTableSource implements + StreamTableSource, + DefinedProctimeAttribute, + DefinedRowtimeAttributes, + DefinedFieldMapping { + + // common table source attributes + // TODO make all attributes final once we drop support for format-specific table sources /** The schema of the table. */ private final TableSchema schema; + /** Field name of the processing time attribute, null if no processing time field is defined. */ + private String proctimeAttribute; + + /** Descriptor for a rowtime attribute. */ + private List rowtimeAttributeDescriptors; + + /** Mapping for the fields of the table schema to fields of the physical returned type or null. */ + private Map fieldMapping; + + // Kafka-specific attributes + /** The Kafka topic to consume. */ private final String topic; /** Properties for the Kafka consumer. */ private final Properties properties; - /** Type information describing the result type. */ - private TypeInformation returnType; - - /** Field name of the processing time attribute, null if no processing time field is defined. */ - private String proctimeAttribute; - - /** Descriptor for a rowtime attribute. */ - private List rowtimeAttributeDescriptors; + /** Deserialization schema for decoding records from Kafka. */ + private final DeserializationSchema deserializationSchema; /** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). */ - private StartupMode startupMode = StartupMode.GROUP_OFFSETS; + private StartupMode startupMode; /** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */ private Map specificStartupOffsets; /** * Creates a generic Kafka {@link StreamTableSource}. * -* @param topic Kafka topic to consume. -* @param propertiesProperties for the Kafka consumer. -* @param schemaSchema of the produced table. -* @param returnTypeType information of the produced physical DataStream. +* @param schema Schema of the produced table. +* @param proctimeAttribute Field name of the processing time attribute, null if no +*processing time field is defined. +* @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute +* @param fieldMappingMapping for the fields of the table schema to --- End diff -- Backward compatibility. It could have been null in the past. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506126 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -54,51 +56,105 @@ * override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}. */ @Internal -public abstract class KafkaTableSource - implements StreamTableSource, DefinedProctimeAttribute, DefinedRowtimeAttributes { +public abstract class KafkaTableSource implements + StreamTableSource, + DefinedProctimeAttribute, + DefinedRowtimeAttributes, + DefinedFieldMapping { + + // common table source attributes + // TODO make all attributes final once we drop support for format-specific table sources /** The schema of the table. */ private final TableSchema schema; + /** Field name of the processing time attribute, null if no processing time field is defined. */ + private String proctimeAttribute; + + /** Descriptor for a rowtime attribute. */ + private List rowtimeAttributeDescriptors; + + /** Mapping for the fields of the table schema to fields of the physical returned type or null. */ + private Map fieldMapping; + + // Kafka-specific attributes + /** The Kafka topic to consume. */ private final String topic; /** Properties for the Kafka consumer. */ private final Properties properties; - /** Type information describing the result type. */ - private TypeInformation returnType; - - /** Field name of the processing time attribute, null if no processing time field is defined. */ - private String proctimeAttribute; - - /** Descriptor for a rowtime attribute. */ - private List rowtimeAttributeDescriptors; + /** Deserialization schema for decoding records from Kafka. */ + private final DeserializationSchema deserializationSchema; /** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). */ - private StartupMode startupMode = StartupMode.GROUP_OFFSETS; + private StartupMode startupMode; /** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */ private Map specificStartupOffsets; /** * Creates a generic Kafka {@link StreamTableSource}. * -* @param topic Kafka topic to consume. -* @param propertiesProperties for the Kafka consumer. -* @param schemaSchema of the produced table. -* @param returnTypeType information of the produced physical DataStream. +* @param schema Schema of the produced table. +* @param proctimeAttribute Field name of the processing time attribute, null if no +*processing time field is defined. +* @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute +* @param fieldMappingMapping for the fields of the table schema to --- End diff -- Backward compatibility. It could have been null in the past. ---
[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful
[ https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16543986#comment-16543986 ] ASF GitHub Bot commented on FLINK-9325: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5982 @StephanEwen Thanks! Looking forward~ > generate the _meta file for checkpoint only when the writing is truly > successful > > > Key: FLINK-9325 > URL: https://issues.apache.org/jira/browse/FLINK-9325 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > > We should generate the _meta file for checkpoint only when the writing is > totally successful. We should write the metadata file first to a temp file > and then atomically rename it (with an equivalent workaround for S3). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5982 @StephanEwen Thanks! Looking forward~ ---
[jira] [Assigned] (FLINK-9849) Upgrade hbase version to 2.0.1 for hbase connector
[ https://issues.apache.org/jira/browse/FLINK-9849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei reassigned FLINK-9849: --- Assignee: zhangminglei > Upgrade hbase version to 2.0.1 for hbase connector > -- > > Key: FLINK-9849 > URL: https://issues.apache.org/jira/browse/FLINK-9849 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Assignee: zhangminglei >Priority: Major > > Currently hbase 1.4.3 is used for hbase connector. > We should upgrade to 2.0.1 which was recently released. -- This message was sent by Atlassian JIRA (v7.6.3#76005)