[jira] [Commented] (FLINK-9407) Support orc rolling sink writer
[ https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536531#comment-16536531 ] ASF GitHub Bot commented on FLINK-9407: --- Github user wgtmac commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r200878724 --- Diff: flink-connectors/flink-orc/pom.xml --- @@ -54,6 +54,14 @@ under the License. true + + org.apache.flink + flink-connector-filesystem_${scala.binary.version} + ${project.version} + + true + + org.apache.orc orc-core --- End diff -- Current orc version is 1.5.X. Should we upgrade it as well? > Support orc rolling sink writer > --- > > Key: FLINK-9407 > URL: https://issues.apache.org/jira/browse/FLINK-9407 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: zhangminglei >Assignee: zhangminglei >Priority: Major > Labels: pull-request-available > > Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and > {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling > sink. > Below, FYI. > I tested the PR and verify the results with spark sql. Obviously, we can get > the results of what we had written down before. But I will give more tests in > the next couple of days. Including the performance under compression with > short checkpoint intervals. And more UTs. > {code:java} > scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21") > res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more > field] > scala> > scala> res1.registerTempTable("tablerice") > warning: there was one deprecation warning; re-run with -deprecation for > details > scala> spark.sql("select * from tablerice") > res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more > field] > scala> res3.show(3) > +-+---+---+ > | name|age|married| > +-+---+---+ > |Sagar| 26| false| > |Sagar| 30| false| > |Sagar| 34| false| > +-+---+---+ > only showing top 3 rows > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9407) Support orc rolling sink writer
[ https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536530#comment-16536530 ] ASF GitHub Bot commented on FLINK-9407: --- Github user wgtmac commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r200879775 --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java --- @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.fs.StreamWriterBase; +import org.apache.flink.streaming.connectors.fs.Writer; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo; + +/** + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}. + * + * @param The type of the elements that are being written by the sink. + */ +public class OrcFileWriter extends StreamWriterBase { + + private static final long serialVersionUID = 3L; + + /** +* The description of the types in an ORC file. +*/ + private TypeDescription schema; + + /** +* The schema of an ORC file. +*/ + private String metaSchema; + + /** +* A row batch that will be written to the ORC file. +*/ + private VectorizedRowBatch rowBatch; + + /** +* The writer that fill the records into the batch. +*/ + private OrcBatchWriter orcBatchWriter; + + private transient org.apache.orc.Writer writer; + + private CompressionKind compressionKind; + + /** +* The number of rows that currently being written. +*/ + private long writedRowSize; + + /** +* Creates a new {@code OrcFileWriter} that writes orc files without compression. +* +* @param metaSchema The orc schema. +*/ + public OrcFileWriter(String metaSchema) { + this(metaSchema, CompressionKind.NONE); + } + + /** +* Create a new {@code OrcFileWriter} that writes orc file with the gaven +* schema and compression kind. +* +* @param metaSchema The schema of an orc file. +* @param compressionKind The compression kind to use. +*/ + public OrcFileWriter(String metaSchema, CompressionKind compressionKind) { + this.metaSchema = metaSchema; + this.schema = TypeDescription.fromString(metaSchema); + this.compressionKind = compressionKind; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind)); + rowBatch = schema.createRowBatch(); + orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes())); + } + + private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) { + List fieldNames = orcSchema.getFieldNames();
[jira] [Updated] (FLINK-9407) Support orc rolling sink writer
[ https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9407: -- Labels: pull-request-available (was: ) > Support orc rolling sink writer > --- > > Key: FLINK-9407 > URL: https://issues.apache.org/jira/browse/FLINK-9407 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: zhangminglei >Assignee: zhangminglei >Priority: Major > Labels: pull-request-available > > Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and > {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling > sink. > Below, FYI. > I tested the PR and verify the results with spark sql. Obviously, we can get > the results of what we had written down before. But I will give more tests in > the next couple of days. Including the performance under compression with > short checkpoint intervals. And more UTs. > {code:java} > scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21") > res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more > field] > scala> > scala> res1.registerTempTable("tablerice") > warning: there was one deprecation warning; re-run with -deprecation for > details > scala> spark.sql("select * from tablerice") > res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more > field] > scala> res3.show(3) > +-+---+---+ > | name|age|married| > +-+---+---+ > |Sagar| 26| false| > |Sagar| 30| false| > |Sagar| 34| false| > +-+---+---+ > only showing top 3 rows > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...
Github user wgtmac commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r200879243 --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java --- @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.fs.StreamWriterBase; +import org.apache.flink.streaming.connectors.fs.Writer; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo; + +/** + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}. + * + * @param The type of the elements that are being written by the sink. + */ +public class OrcFileWriter extends StreamWriterBase { + + private static final long serialVersionUID = 3L; + + /** +* The description of the types in an ORC file. +*/ + private TypeDescription schema; + + /** +* The schema of an ORC file. +*/ + private String metaSchema; + + /** +* A row batch that will be written to the ORC file. +*/ + private VectorizedRowBatch rowBatch; + + /** +* The writer that fill the records into the batch. +*/ + private OrcBatchWriter orcBatchWriter; + + private transient org.apache.orc.Writer writer; + + private CompressionKind compressionKind; + + /** +* The number of rows that currently being written. +*/ + private long writedRowSize; + + /** +* Creates a new {@code OrcFileWriter} that writes orc files without compression. +* +* @param metaSchema The orc schema. +*/ + public OrcFileWriter(String metaSchema) { + this(metaSchema, CompressionKind.NONE); + } + + /** +* Create a new {@code OrcFileWriter} that writes orc file with the gaven +* schema and compression kind. +* +* @param metaSchema The schema of an orc file. +* @param compressionKind The compression kind to use. +*/ + public OrcFileWriter(String metaSchema, CompressionKind compressionKind) { + this.metaSchema = metaSchema; + this.schema = TypeDescription.fromString(metaSchema); + this.compressionKind = compressionKind; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind)); + rowBatch = schema.createRowBatch(); + orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes())); + } + + private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) { + List fieldNames = orcSchema.getFieldNames(); + List typeDescriptions = orcSchema.getChildren(); + List typeInformations = new ArrayList<>(); + + typeDescriptions.forEach(typeDescription -> { +
[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...
Github user wgtmac commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r200879624 --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java --- @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.fs.StreamWriterBase; +import org.apache.flink.streaming.connectors.fs.Writer; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo; + +/** + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}. + * + * @param The type of the elements that are being written by the sink. + */ +public class OrcFileWriter extends StreamWriterBase { + + private static final long serialVersionUID = 3L; + + /** +* The description of the types in an ORC file. +*/ + private TypeDescription schema; + + /** +* The schema of an ORC file. +*/ + private String metaSchema; + + /** +* A row batch that will be written to the ORC file. +*/ + private VectorizedRowBatch rowBatch; + + /** +* The writer that fill the records into the batch. +*/ + private OrcBatchWriter orcBatchWriter; + + private transient org.apache.orc.Writer writer; + + private CompressionKind compressionKind; + + /** +* The number of rows that currently being written. +*/ + private long writedRowSize; + + /** +* Creates a new {@code OrcFileWriter} that writes orc files without compression. +* +* @param metaSchema The orc schema. +*/ + public OrcFileWriter(String metaSchema) { + this(metaSchema, CompressionKind.NONE); + } + + /** +* Create a new {@code OrcFileWriter} that writes orc file with the gaven +* schema and compression kind. +* +* @param metaSchema The schema of an orc file. +* @param compressionKind The compression kind to use. +*/ + public OrcFileWriter(String metaSchema, CompressionKind compressionKind) { + this.metaSchema = metaSchema; + this.schema = TypeDescription.fromString(metaSchema); + this.compressionKind = compressionKind; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind)); + rowBatch = schema.createRowBatch(); + orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes())); + } + + private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) { + List fieldNames = orcSchema.getFieldNames(); + List typeDescriptions = orcSchema.getChildren(); + List typeInformations = new ArrayList<>(); + + typeDescriptions.forEach(typeDescription -> { +
[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...
Github user wgtmac commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r200878724 --- Diff: flink-connectors/flink-orc/pom.xml --- @@ -54,6 +54,14 @@ under the License. true + + org.apache.flink + flink-connector-filesystem_${scala.binary.version} + ${project.version} + + true + + org.apache.orc orc-core --- End diff -- Current orc version is 1.5.X. Should we upgrade it as well? ---
[jira] [Commented] (FLINK-9407) Support orc rolling sink writer
[ https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536529#comment-16536529 ] ASF GitHub Bot commented on FLINK-9407: --- Github user wgtmac commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r200879624 --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java --- @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.fs.StreamWriterBase; +import org.apache.flink.streaming.connectors.fs.Writer; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo; + +/** + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}. + * + * @param The type of the elements that are being written by the sink. + */ +public class OrcFileWriter extends StreamWriterBase { + + private static final long serialVersionUID = 3L; + + /** +* The description of the types in an ORC file. +*/ + private TypeDescription schema; + + /** +* The schema of an ORC file. +*/ + private String metaSchema; + + /** +* A row batch that will be written to the ORC file. +*/ + private VectorizedRowBatch rowBatch; + + /** +* The writer that fill the records into the batch. +*/ + private OrcBatchWriter orcBatchWriter; + + private transient org.apache.orc.Writer writer; + + private CompressionKind compressionKind; + + /** +* The number of rows that currently being written. +*/ + private long writedRowSize; + + /** +* Creates a new {@code OrcFileWriter} that writes orc files without compression. +* +* @param metaSchema The orc schema. +*/ + public OrcFileWriter(String metaSchema) { + this(metaSchema, CompressionKind.NONE); + } + + /** +* Create a new {@code OrcFileWriter} that writes orc file with the gaven +* schema and compression kind. +* +* @param metaSchema The schema of an orc file. +* @param compressionKind The compression kind to use. +*/ + public OrcFileWriter(String metaSchema, CompressionKind compressionKind) { + this.metaSchema = metaSchema; + this.schema = TypeDescription.fromString(metaSchema); + this.compressionKind = compressionKind; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind)); + rowBatch = schema.createRowBatch(); + orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes())); + } + + private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) { + List fieldNames = orcSchema.getFieldNames();
[jira] [Commented] (FLINK-9407) Support orc rolling sink writer
[ https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536532#comment-16536532 ] ASF GitHub Bot commented on FLINK-9407: --- Github user wgtmac commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r200879243 --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java --- @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.fs.StreamWriterBase; +import org.apache.flink.streaming.connectors.fs.Writer; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo; + +/** + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}. + * + * @param The type of the elements that are being written by the sink. + */ +public class OrcFileWriter extends StreamWriterBase { + + private static final long serialVersionUID = 3L; + + /** +* The description of the types in an ORC file. +*/ + private TypeDescription schema; + + /** +* The schema of an ORC file. +*/ + private String metaSchema; + + /** +* A row batch that will be written to the ORC file. +*/ + private VectorizedRowBatch rowBatch; + + /** +* The writer that fill the records into the batch. +*/ + private OrcBatchWriter orcBatchWriter; + + private transient org.apache.orc.Writer writer; + + private CompressionKind compressionKind; + + /** +* The number of rows that currently being written. +*/ + private long writedRowSize; + + /** +* Creates a new {@code OrcFileWriter} that writes orc files without compression. +* +* @param metaSchema The orc schema. +*/ + public OrcFileWriter(String metaSchema) { + this(metaSchema, CompressionKind.NONE); + } + + /** +* Create a new {@code OrcFileWriter} that writes orc file with the gaven +* schema and compression kind. +* +* @param metaSchema The schema of an orc file. +* @param compressionKind The compression kind to use. +*/ + public OrcFileWriter(String metaSchema, CompressionKind compressionKind) { + this.metaSchema = metaSchema; + this.schema = TypeDescription.fromString(metaSchema); + this.compressionKind = compressionKind; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind)); + rowBatch = schema.createRowBatch(); + orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes())); + } + + private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) { + List fieldNames = orcSchema.getFieldNames();
[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...
Github user wgtmac commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r200879775 --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java --- @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.fs.StreamWriterBase; +import org.apache.flink.streaming.connectors.fs.Writer; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo; + +/** + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}. + * + * @param The type of the elements that are being written by the sink. + */ +public class OrcFileWriter extends StreamWriterBase { + + private static final long serialVersionUID = 3L; + + /** +* The description of the types in an ORC file. +*/ + private TypeDescription schema; + + /** +* The schema of an ORC file. +*/ + private String metaSchema; + + /** +* A row batch that will be written to the ORC file. +*/ + private VectorizedRowBatch rowBatch; + + /** +* The writer that fill the records into the batch. +*/ + private OrcBatchWriter orcBatchWriter; + + private transient org.apache.orc.Writer writer; + + private CompressionKind compressionKind; + + /** +* The number of rows that currently being written. +*/ + private long writedRowSize; + + /** +* Creates a new {@code OrcFileWriter} that writes orc files without compression. +* +* @param metaSchema The orc schema. +*/ + public OrcFileWriter(String metaSchema) { + this(metaSchema, CompressionKind.NONE); + } + + /** +* Create a new {@code OrcFileWriter} that writes orc file with the gaven +* schema and compression kind. +* +* @param metaSchema The schema of an orc file. +* @param compressionKind The compression kind to use. +*/ + public OrcFileWriter(String metaSchema, CompressionKind compressionKind) { + this.metaSchema = metaSchema; + this.schema = TypeDescription.fromString(metaSchema); + this.compressionKind = compressionKind; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind)); + rowBatch = schema.createRowBatch(); + orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes())); + } + + private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) { + List fieldNames = orcSchema.getFieldNames(); + List typeDescriptions = orcSchema.getChildren(); + List typeInformations = new ArrayList<>(); + + typeDescriptions.forEach(typeDescription -> { +
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536525#comment-16536525 ] Zhenqiu Huang commented on FLINK-7243: -- [~nssalian] We already have a working version on production. I am gonna create a PR next week for people to review. Do you have specific requirement or functionality for it? > Add ParquetInputFormat > -- > > Key: FLINK-7243 > URL: https://issues.apache.org/jira/browse/FLINK-7243 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: godfrey he >Assignee: Zhenqiu Huang >Priority: Major > > Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536522#comment-16536522 ] ASF GitHub Bot commented on FLINK-8866: --- Github user suez1224 closed the pull request at: https://github.com/apache/flink/pull/6201 > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536523#comment-16536523 ] ASF GitHub Bot commented on FLINK-8866: --- GitHub user suez1224 reopened a pull request: https://github.com/apache/flink/pull/6201 [FLINK-8866][Table API & SQL] Add support for unified table sink instantiation **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change Add interfaces to support unified table sink configuration and instantiation. Consolidate table source and table sink configuration and instantiation. ## Brief change log - Consolidate table sink and table source instantiation with TableConnectorFactory{Service}. - Add support to register a Calcite table with both tableSource and tableSink. - Add Insert command support in SQL client. - Add CsvTableSinkFactory. ## Verifying this change This change added tests and can be verified as follows: - *Added integration tests for testing registering table source and sink tables under the same name. - *Added integration tests for testing insert into command in sql client. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/suez1224/flink FLINK-8866-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6201.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6201 commit d436041fda022f405fa3a1d497b95d2ff0934ce5 Author: Shuyi Chen Date: 2018-06-19T19:00:34Z - Add unified table sink instantiation. - Consolidate table sink and table source instantiation. - Add support to register a Calcite table with both tableSource and tableSink. - Add Insert command support in SQL client. - Add CsvTableSinkFactory. commit d70d033abec0806f1723b18f7bcbab1f60ec7280 Author: Shuyi Chen Date: 2018-06-28T18:30:21Z comment fixes commit 0649359106ddfa003c44e3d0221f7f52ac507593 Author: Shuyi Chen Date: 2018-07-06T23:31:05Z refactor: 1) add TableFactoryDiscoverable trait 2) add util for handling rowtime/proctime for table schema and unittests > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...
GitHub user suez1224 reopened a pull request: https://github.com/apache/flink/pull/6201 [FLINK-8866][Table API & SQL] Add support for unified table sink instantiation **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change Add interfaces to support unified table sink configuration and instantiation. Consolidate table source and table sink configuration and instantiation. ## Brief change log - Consolidate table sink and table source instantiation with TableConnectorFactory{Service}. - Add support to register a Calcite table with both tableSource and tableSink. - Add Insert command support in SQL client. - Add CsvTableSinkFactory. ## Verifying this change This change added tests and can be verified as follows: - *Added integration tests for testing registering table source and sink tables under the same name. - *Added integration tests for testing insert into command in sql client. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/suez1224/flink FLINK-8866-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6201.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6201 commit d436041fda022f405fa3a1d497b95d2ff0934ce5 Author: Shuyi Chen Date: 2018-06-19T19:00:34Z - Add unified table sink instantiation. - Consolidate table sink and table source instantiation. - Add support to register a Calcite table with both tableSource and tableSink. - Add Insert command support in SQL client. - Add CsvTableSinkFactory. commit d70d033abec0806f1723b18f7bcbab1f60ec7280 Author: Shuyi Chen Date: 2018-06-28T18:30:21Z comment fixes commit 0649359106ddfa003c44e3d0221f7f52ac507593 Author: Shuyi Chen Date: 2018-07-06T23:31:05Z refactor: 1) add TableFactoryDiscoverable trait 2) add util for handling rowtime/proctime for table schema and unittests ---
[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...
Github user suez1224 closed the pull request at: https://github.com/apache/flink/pull/6201 ---
[jira] [Commented] (FLINK-9666) short-circuit logic should be used in boolean contexts
[ https://issues.apache.org/jira/browse/FLINK-9666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536483#comment-16536483 ] ASF GitHub Bot commented on FLINK-9666: --- Github user lamber-ken commented on the issue: https://github.com/apache/flink/pull/6230 @tillrohrmann, @zentol, cc. > short-circuit logic should be used in boolean contexts > -- > > Key: FLINK-9666 > URL: https://issues.apache.org/jira/browse/FLINK-9666 > Project: Flink > Issue Type: Improvement > Components: Core, DataStream API >Affects Versions: 1.5.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Minor > Labels: pull-request-available > Fix For: 1.6.0 > > > short-circuit logic should be used in boolean contexts -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6230: [FLINK-9666] short-circuit logic should be used in boolea...
Github user lamber-ken commented on the issue: https://github.com/apache/flink/pull/6230 @tillrohrmann, @zentol, cc. ---
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536479#comment-16536479 ] Neelesh Srinivas Salian commented on FLINK-7243: What is the status of this [~godfreyhe] and [~ZhenqiuHuang]? Shall I assign and start working on this? > Add ParquetInputFormat > -- > > Key: FLINK-7243 > URL: https://issues.apache.org/jira/browse/FLINK-7243 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: godfrey he >Assignee: Zhenqiu Huang >Priority: Major > > Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-6976) Add STR_TO_DATE supported in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-6976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536478#comment-16536478 ] Neelesh Srinivas Salian edited comment on FLINK-6976 at 7/9/18 1:40 AM: [~sunjincheng121], could I work on this if no one else has? I realize it needs https://issues.apache.org/jira/browse/FLINK-6895 I could work on both as well. was (Author: nssalian): [~sunjincheng121], could I work on this if no one else has? I realize it needs [https://issues.apache.org/jira/browse/FLINK-6895](FLINK-6895) > Add STR_TO_DATE supported in TableAPI > - > > Key: FLINK-6976 > URL: https://issues.apache.org/jira/browse/FLINK-6976 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Priority: Major > Labels: starter > > See FLINK-6895 for detail. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-6976) Add STR_TO_DATE supported in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-6976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536478#comment-16536478 ] Neelesh Srinivas Salian edited comment on FLINK-6976 at 7/9/18 1:40 AM: [~sunjincheng121], could I work on this if no one else has? I realize it needs [https://issues.apache.org/jira/browse/FLINK-6895](FLINK-6895) was (Author: nssalian): [~sunjincheng121], could I work on this if no one else has? > Add STR_TO_DATE supported in TableAPI > - > > Key: FLINK-6976 > URL: https://issues.apache.org/jira/browse/FLINK-6976 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Priority: Major > Labels: starter > > See FLINK-6895 for detail. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6976) Add STR_TO_DATE supported in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-6976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536478#comment-16536478 ] Neelesh Srinivas Salian commented on FLINK-6976: [~sunjincheng121], could I work on this if no one else has? > Add STR_TO_DATE supported in TableAPI > - > > Key: FLINK-6976 > URL: https://issues.apache.org/jira/browse/FLINK-6976 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Priority: Major > Labels: starter > > See FLINK-6895 for detail. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9777) YARN: JM and TM Memory must be specified with Units
[ https://issues.apache.org/jira/browse/FLINK-9777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536477#comment-16536477 ] vinoyang commented on FLINK-9777: - [~gjy] I fixed issue FLINK-6469, I will take this ticket. > YARN: JM and TM Memory must be specified with Units > > > Key: FLINK-9777 > URL: https://issues.apache.org/jira/browse/FLINK-9777 > Project: Flink > Issue Type: Bug > Components: Documentation, YARN >Affects Versions: 1.6.0 > Environment: commit 9f736d1927c62d220a82931c4f5ffa4955910f27 >Reporter: Gary Yao >Assignee: vinoyang >Priority: Blocker > Fix For: 1.6.0 > > > FLINK-6469 breaks backwards compatibility because the JobManager and > TaskManager memory must be specified with units (otherwise bytes are > assumed). The command to start a YARN session as documented > ([https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md) > > |https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md] > would not work because 1024 bytes and 4096 bytes are not enough for the heap > size. The command finishes with the following exception: > {noformat} > java.lang.reflect.UndeclaredThrowableException > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1854) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:802) > Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: > Couldn't deploy Yarn session cluster > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:420) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:599) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:802) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > ... 2 more > Caused by: org.apache.flink.util.FlinkException: Cannot fulfill the minimum > memory requirements with the provided cluster specification. Please increase > the memory of the cluster. > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.validateClusterSpecification(AbstractYarnClusterDescriptor.java:453) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:475) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:413) > ... 7 more > Caused by: java.lang.IllegalArgumentException > at > org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123) > at > org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters.calculateCutoffMB(ContaineredTaskManagerParameters.java:115) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.validateClusterSpecification(AbstractYarnClusterDescriptor.java:450) > ... 9 more > {noformat} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9777) YARN: JM and TM Memory must be specified with Units
[ https://issues.apache.org/jira/browse/FLINK-9777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-9777: --- Assignee: vinoyang > YARN: JM and TM Memory must be specified with Units > > > Key: FLINK-9777 > URL: https://issues.apache.org/jira/browse/FLINK-9777 > Project: Flink > Issue Type: Bug > Components: Documentation, YARN >Affects Versions: 1.6.0 > Environment: commit 9f736d1927c62d220a82931c4f5ffa4955910f27 >Reporter: Gary Yao >Assignee: vinoyang >Priority: Blocker > Fix For: 1.6.0 > > > FLINK-6469 breaks backwards compatibility because the JobManager and > TaskManager memory must be specified with units (otherwise bytes are > assumed). The command to start a YARN session as documented > ([https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md) > > |https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md] > would not work because 1024 bytes and 4096 bytes are not enough for the heap > size. The command finishes with the following exception: > {noformat} > java.lang.reflect.UndeclaredThrowableException > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1854) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:802) > Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: > Couldn't deploy Yarn session cluster > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:420) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:599) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:802) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > ... 2 more > Caused by: org.apache.flink.util.FlinkException: Cannot fulfill the minimum > memory requirements with the provided cluster specification. Please increase > the memory of the cluster. > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.validateClusterSpecification(AbstractYarnClusterDescriptor.java:453) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:475) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:413) > ... 7 more > Caused by: java.lang.IllegalArgumentException > at > org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123) > at > org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters.calculateCutoffMB(ContaineredTaskManagerParameters.java:115) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.validateClusterSpecification(AbstractYarnClusterDescriptor.java:450) > ... 9 more > {noformat} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9588) Reuse the same conditionContext with in a same computationState
[ https://issues.apache.org/jira/browse/FLINK-9588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536455#comment-16536455 ] ASF GitHub Bot commented on FLINK-9588: --- Github user Aitozi closed the pull request at: https://github.com/apache/flink/pull/6168 > Reuse the same conditionContext with in a same computationState > --- > > Key: FLINK-9588 > URL: https://issues.apache.org/jira/browse/FLINK-9588 > Project: Flink > Issue Type: Improvement > Components: CEP >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Now cep checkFilterCondition with a newly created Conditioncontext for each > edge, which will result in the repeatable getEventsForPattern because of the > different Conditioncontext Object. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...
Github user Aitozi closed the pull request at: https://github.com/apache/flink/pull/6168 ---
[jira] [Commented] (FLINK-9750) Create new StreamingFileSink that works on Flink's FileSystem abstraction
[ https://issues.apache.org/jira/browse/FLINK-9750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536289#comment-16536289 ] ASF GitHub Bot commented on FLINK-9750: --- Github user xndai commented on the issue: https://github.com/apache/flink/pull/6281 hi @kl0u , where can I find overall document/design of the bucketing sink rework (FLINK-9749)? I am a committer of Apache Orc and are interested in helping Flink-Orc integration. > Create new StreamingFileSink that works on Flink's FileSystem abstraction > - > > Key: FLINK-9750 > URL: https://issues.apache.org/jira/browse/FLINK-9750 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Stephan Ewen >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Using Flink's own file system abstraction means that we can add additional > streaming/checkpointing related behavior. > In addition, the new StreamingFileSink should only rely on internal > checkpointed state what files are possibly in progress or need to roll over, > never assume enumeration of files in the file system. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6281: [FLINK-9750] Add new StreamingFileSink with ResumableWrit...
Github user xndai commented on the issue: https://github.com/apache/flink/pull/6281 hi @kl0u , where can I find overall document/design of the bucketing sink rework (FLINK-9749)? I am a committer of Apache Orc and are interested in helping Flink-Orc integration. ---
[jira] [Updated] (FLINK-9777) YARN: JM and TM Memory must be specified with Units
[ https://issues.apache.org/jira/browse/FLINK-9777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-9777: Description: FLINK-6469 breaks backwards compatibility because the JobManager and TaskManager memory must be specified with units (otherwise bytes are assumed). The command to start a YARN session as documented ([https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md) |https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md] would not work because 1024 bytes and 4096 bytes are not enough for the heap size. The command finishes with the following exception: {noformat} java.lang.reflect.UndeclaredThrowableException at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1854) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:802) Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:420) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:599) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:802) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) ... 2 more Caused by: org.apache.flink.util.FlinkException: Cannot fulfill the minimum memory requirements with the provided cluster specification. Please increase the memory of the cluster. at org.apache.flink.yarn.AbstractYarnClusterDescriptor.validateClusterSpecification(AbstractYarnClusterDescriptor.java:453) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:475) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:413) ... 7 more Caused by: java.lang.IllegalArgumentException at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123) at org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters.calculateCutoffMB(ContaineredTaskManagerParameters.java:115) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.validateClusterSpecification(AbstractYarnClusterDescriptor.java:450) ... 9 more {noformat} was: FLINK-6469 breaks backwards compatibility because the JobManager and TaskManager memory should be specified with units (otherwise bytes are assumed). The command to start a YARN session as documented ([https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md) |https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md] would not work because 1024 bytes and 4096 bytes are not enough for the heap size. The command finishes with the following exception: {noformat} java.lang.reflect.UndeclaredThrowableException at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1854) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:802) Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:420) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:599) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:802) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) ... 2 more Caused by: org.apache.flink.util.FlinkException: Cannot fulfill the minimum memory requirements with the provided cluster specification. Please increase the memory of the cluster. at org.apache.flink.yarn.AbstractYarnClusterDescriptor.validateClusterSpecification(AbstractYarnClusterDescriptor.java:453) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:475) at
[jira] [Updated] (FLINK-9777) YARN: JM and TM Memory must be specified with Units
[ https://issues.apache.org/jira/browse/FLINK-9777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-9777: Description: FLINK-6469 breaks backwards compatibility because the JobManager and TaskManager memory should be specified with units (otherwise bytes are assumed). The command to start a YARN session as documented ([https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md) |https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md] would not work because 1024 bytes and 4096 bytes are not enough for the heap size. The command finishes with the following exception: {noformat} java.lang.reflect.UndeclaredThrowableException at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1854) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:802) Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:420) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:599) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:802) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) ... 2 more Caused by: org.apache.flink.util.FlinkException: Cannot fulfill the minimum memory requirements with the provided cluster specification. Please increase the memory of the cluster. at org.apache.flink.yarn.AbstractYarnClusterDescriptor.validateClusterSpecification(AbstractYarnClusterDescriptor.java:453) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:475) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:413) ... 7 more Caused by: java.lang.IllegalArgumentException at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123) at org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters.calculateCutoffMB(ContaineredTaskManagerParameters.java:115) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.validateClusterSpecification(AbstractYarnClusterDescriptor.java:450) ... 9 more {noformat} was: With FLINK-6469, the JobManager and TaskManager memory must be specified with units (otherwise bytes are assumed). The command to start a YARN session as documented ([https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md) |https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md] would not work because 1024 bytes and 4096 bytes are not enough for the heap size. The command finishes with the following exception: {noformat} java.lang.reflect.UndeclaredThrowableException at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1854) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:802) Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:420) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:599) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:802) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) ... 2 more Caused by: org.apache.flink.util.FlinkException: Cannot fulfill the minimum memory requirements with the provided cluster specification. Please increase the memory of the cluster. at org.apache.flink.yarn.AbstractYarnClusterDescriptor.validateClusterSpecification(AbstractYarnClusterDescriptor.java:453) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:475) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:413) ... 7 more
[jira] [Updated] (FLINK-9777) YARN: JM and TM Memory must be specified with Units
[ https://issues.apache.org/jira/browse/FLINK-9777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-9777: Environment: commit 9f736d1927c62d220a82931c4f5ffa4955910f27 > YARN: JM and TM Memory must be specified with Units > > > Key: FLINK-9777 > URL: https://issues.apache.org/jira/browse/FLINK-9777 > Project: Flink > Issue Type: Bug > Components: Documentation, YARN >Affects Versions: 1.6.0 > Environment: commit 9f736d1927c62d220a82931c4f5ffa4955910f27 >Reporter: Gary Yao >Priority: Blocker > Fix For: 1.6.0 > > > With FLINK-6469, the JobManager and TaskManager memory must be specified with > units (otherwise bytes are assumed). The command to start a YARN session as > documented > ([https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md) > > |https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md] > would not work because 1024 bytes and 4096 bytes are not enough for the heap > size. The command finishes with the following exception: > {noformat} > java.lang.reflect.UndeclaredThrowableException > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1854) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:802) > Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: > Couldn't deploy Yarn session cluster > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:420) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:599) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:802) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > ... 2 more > Caused by: org.apache.flink.util.FlinkException: Cannot fulfill the minimum > memory requirements with the provided cluster specification. Please increase > the memory of the cluster. > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.validateClusterSpecification(AbstractYarnClusterDescriptor.java:453) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:475) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:413) > ... 7 more > Caused by: java.lang.IllegalArgumentException > at > org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123) > at > org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters.calculateCutoffMB(ContaineredTaskManagerParameters.java:115) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.validateClusterSpecification(AbstractYarnClusterDescriptor.java:450) > ... 9 more > {noformat} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9777) YARN: JM and TM Memory must be specified with Units
Gary Yao created FLINK-9777: --- Summary: YARN: JM and TM Memory must be specified with Units Key: FLINK-9777 URL: https://issues.apache.org/jira/browse/FLINK-9777 Project: Flink Issue Type: Bug Components: Documentation, YARN Affects Versions: 1.6.0 Reporter: Gary Yao Fix For: 1.6.0 With FLINK-6469, the JobManager and TaskManager memory must be specified with units (otherwise bytes are assumed). The command to start a YARN session as documented ([https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md) |https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md] would not work because 1024 bytes and 4096 bytes are not enough for the heap size. The command finishes with the following exception: {noformat} java.lang.reflect.UndeclaredThrowableException at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1854) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:802) Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:420) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:599) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:802) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) ... 2 more Caused by: org.apache.flink.util.FlinkException: Cannot fulfill the minimum memory requirements with the provided cluster specification. Please increase the memory of the cluster. at org.apache.flink.yarn.AbstractYarnClusterDescriptor.validateClusterSpecification(AbstractYarnClusterDescriptor.java:453) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:475) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:413) ... 7 more Caused by: java.lang.IllegalArgumentException at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123) at org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters.calculateCutoffMB(ContaineredTaskManagerParameters.java:115) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.validateClusterSpecification(AbstractYarnClusterDescriptor.java:450) ... 9 more {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9588) Reuse the same conditionContext with in a same computationState
[ https://issues.apache.org/jira/browse/FLINK-9588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536132#comment-16536132 ] ASF GitHub Bot commented on FLINK-9588: --- Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/6168 Hi @Aitozi, yes please close this PR, as I made a spelling mistake while merging and that's why it wasn't closed automatically. Thanks! > Reuse the same conditionContext with in a same computationState > --- > > Key: FLINK-9588 > URL: https://issues.apache.org/jira/browse/FLINK-9588 > Project: Flink > Issue Type: Improvement > Components: CEP >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Now cep checkFilterCondition with a newly created Conditioncontext for each > edge, which will result in the repeatable getEventsForPattern because of the > different Conditioncontext Object. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6168: [FLINK-9588][CEP]Reused context with same computation sta...
Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/6168 Hi @Aitozi, yes please close this PR, as I made a spelling mistake while merging and that's why it wasn't closed automatically. Thanks! ---
[jira] [Updated] (FLINK-9407) Support orc rolling sink writer
[ https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei updated FLINK-9407: Description: Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling sink. Below, FYI. I tested the PR and verify the results with spark sql. Obviously, we can get the results of what we had written down before. But I will give more tests in the next couple of days. Including the performance under compression with short checkpoint intervals. And more UTs. {code:java} scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21") res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field] scala> scala> res1.registerTempTable("tablerice") warning: there was one deprecation warning; re-run with -deprecation for details scala> spark.sql("select * from tablerice") res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field] scala> res3.show(3) +-+---+---+ | name|age|married| +-+---+---+ |Sagar| 26| false| |Sagar| 30| false| |Sagar| 34| false| +-+---+---+ only showing top 3 rows {code} was: Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling sink. Below, FYI. I tested the PR and verify the results with spark sql. Obviously, we can get the results of what we had written down before. But I will give more tests in the next couple of days. Including the performance under compression. And more UTs. {code:java} scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21") res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field] scala> scala> res1.registerTempTable("tablerice") warning: there was one deprecation warning; re-run with -deprecation for details scala> spark.sql("select * from tablerice") res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field] scala> res3.show(3) +-+---+---+ | name|age|married| +-+---+---+ |Sagar| 26| false| |Sagar| 30| false| |Sagar| 34| false| +-+---+---+ only showing top 3 rows {code} > Support orc rolling sink writer > --- > > Key: FLINK-9407 > URL: https://issues.apache.org/jira/browse/FLINK-9407 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: zhangminglei >Assignee: zhangminglei >Priority: Major > > Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and > {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling > sink. > Below, FYI. > I tested the PR and verify the results with spark sql. Obviously, we can get > the results of what we had written down before. But I will give more tests in > the next couple of days. Including the performance under compression with > short checkpoint intervals. And more UTs. > {code:java} > scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21") > res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more > field] > scala> > scala> res1.registerTempTable("tablerice") > warning: there was one deprecation warning; re-run with -deprecation for > details > scala> spark.sql("select * from tablerice") > res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more > field] > scala> res3.show(3) > +-+---+---+ > | name|age|married| > +-+---+---+ > |Sagar| 26| false| > |Sagar| 30| false| > |Sagar| 34| false| > +-+---+---+ > only showing top 3 rows > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-6847) Add TIMESTAMPDIFF supported in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-6847: -- Labels: pull-request-available starter (was: starter) > Add TIMESTAMPDIFF supported in TableAPI > --- > > Key: FLINK-6847 > URL: https://issues.apache.org/jira/browse/FLINK-6847 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Priority: Major > Labels: pull-request-available, starter > > see FLINK-6813 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9411) Support parquet rolling sink writer
[ https://issues.apache.org/jira/browse/FLINK-9411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536070#comment-16536070 ] zhangminglei commented on FLINK-9411: - [~triones] Are you still working on this ? > Support parquet rolling sink writer > --- > > Key: FLINK-9411 > URL: https://issues.apache.org/jira/browse/FLINK-9411 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: zhangminglei >Assignee: Triones Deng >Priority: Major > > Like support orc rolling sink writer in FLINK-9407 , we should also support > parquet rolling sink writer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6847) Add TIMESTAMPDIFF supported in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536071#comment-16536071 ] ASF GitHub Bot commented on FLINK-6847: --- GitHub user xueyumusic opened a pull request: https://github.com/apache/flink/pull/6282 [FLINK-6847][FLINK-6813][Table Api & Sql] TimestampDiff table api and sql support ## What is the purpose of the change This PR propose to add TimestampDiff sql and table Api. Sql format is like `TIMESTAMP(DAY, DATE '2018-07-03', DATE '2018-07-04')`, was explained and based on https://github.com/apache/flink/pull/4117 Table Api signature is like `timestampDiff(TimeIntervalUnit.DAY, '2018-07-03'.toDate, '2018-07-04'.toDate)` ## Brief change log - *modify `generateTemporalPlusMinus` in `ScalarOperators.scala` to support sql code gen* - *add `timestampDiff` api in `time.scala`* - *add timestampDiff expr string parse in `ExpressionParser.scala`* ## Verifying this change This change added tests and can be verified as follows: *(example:)* - *add tests to verify sql and table api, in various types combine of unit(YEAR, MONTH...SECOND) and datetime(DATE or TIMESTAMP)* - *add validation tests * ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs / JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/xueyumusic/flink timestampDiff Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6282.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6282 commit 462060f8854d19a355bcf4cc2fd54a2200f3e35b Author: xueyu <278006819@...> Date: 2018-07-08T10:50:06Z timestampDiff table api and sql > Add TIMESTAMPDIFF supported in TableAPI > --- > > Key: FLINK-6847 > URL: https://issues.apache.org/jira/browse/FLINK-6847 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Priority: Major > Labels: pull-request-available, starter > > see FLINK-6813 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6282: [FLINK-6847][FLINK-6813][Table Api & Sql] Timestam...
GitHub user xueyumusic opened a pull request: https://github.com/apache/flink/pull/6282 [FLINK-6847][FLINK-6813][Table Api & Sql] TimestampDiff table api and sql support ## What is the purpose of the change This PR propose to add TimestampDiff sql and table Api. Sql format is like `TIMESTAMP(DAY, DATE '2018-07-03', DATE '2018-07-04')`, was explained and based on https://github.com/apache/flink/pull/4117 Table Api signature is like `timestampDiff(TimeIntervalUnit.DAY, '2018-07-03'.toDate, '2018-07-04'.toDate)` ## Brief change log - *modify `generateTemporalPlusMinus` in `ScalarOperators.scala` to support sql code gen* - *add `timestampDiff` api in `time.scala`* - *add timestampDiff expr string parse in `ExpressionParser.scala`* ## Verifying this change This change added tests and can be verified as follows: *(example:)* - *add tests to verify sql and table api, in various types combine of unit(YEAR, MONTH...SECOND) and datetime(DATE or TIMESTAMP)* - *add validation tests * ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs / JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/xueyumusic/flink timestampDiff Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6282.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6282 commit 462060f8854d19a355bcf4cc2fd54a2200f3e35b Author: xueyu <278006819@...> Date: 2018-07-08T10:50:06Z timestampDiff table api and sql ---
[jira] [Updated] (FLINK-9407) Support orc rolling sink writer
[ https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei updated FLINK-9407: Labels: (was: patch-available pull-request-available) > Support orc rolling sink writer > --- > > Key: FLINK-9407 > URL: https://issues.apache.org/jira/browse/FLINK-9407 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: zhangminglei >Assignee: zhangminglei >Priority: Major > > Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and > {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling > sink. > Below, FYI. > I tested the PR and verify the results with spark sql. Obviously, we can get > the results of what we had written down before. But I will give more tests in > the next couple of days. Including the performance under compression. And > more UTs. > {code:java} > scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21") > res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more > field] > scala> > scala> res1.registerTempTable("tablerice") > warning: there was one deprecation warning; re-run with -deprecation for > details > scala> spark.sql("select * from tablerice") > res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more > field] > scala> res3.show(3) > +-+---+---+ > | name|age|married| > +-+---+---+ > |Sagar| 26| false| > |Sagar| 30| false| > |Sagar| 34| false| > +-+---+---+ > only showing top 3 rows > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9407) Support orc rolling sink writer
[ https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei updated FLINK-9407: Description: Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling sink. Below, FYI. I tested the PR and verify the results with spark sql. Obviously, we can get the results of what we had written down before. But I will give more tests in the next couple of days. Including the performance under compression. And more UTs. {code:java} scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21") res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field] scala> scala> res1.registerTempTable("tablerice") warning: there was one deprecation warning; re-run with -deprecation for details scala> spark.sql("select * from tablerice") res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field] scala> res3.show(3) +-+---+---+ | name|age|married| +-+---+---+ |Sagar| 26| false| |Sagar| 30| false| |Sagar| 34| false| +-+---+---+ only showing top 3 rows {code} was: Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling sink. Below, FYI. I tested the PR and verify the results with spark sql. Obviously, we can get the results of what we written down before. But I will give more tests in the next couple of days. Including the performance under compression. And more UTs. {code:java} scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21") res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field] scala> scala> res1.registerTempTable("tablerice") warning: there was one deprecation warning; re-run with -deprecation for details scala> spark.sql("select * from tablerice") res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field] scala> res3.show(3) +-+---+---+ | name|age|married| +-+---+---+ |Sagar| 26| false| |Sagar| 30| false| |Sagar| 34| false| +-+---+---+ only showing top 3 rows {code} > Support orc rolling sink writer > --- > > Key: FLINK-9407 > URL: https://issues.apache.org/jira/browse/FLINK-9407 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: zhangminglei >Assignee: zhangminglei >Priority: Major > Labels: patch-available, pull-request-available > > Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and > {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling > sink. > Below, FYI. > I tested the PR and verify the results with spark sql. Obviously, we can get > the results of what we had written down before. But I will give more tests in > the next couple of days. Including the performance under compression. And > more UTs. > {code:java} > scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21") > res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more > field] > scala> > scala> res1.registerTempTable("tablerice") > warning: there was one deprecation warning; re-run with -deprecation for > details > scala> spark.sql("select * from tablerice") > res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more > field] > scala> res3.show(3) > +-+---+---+ > | name|age|married| > +-+---+---+ > |Sagar| 26| false| > |Sagar| 30| false| > |Sagar| 34| false| > +-+---+---+ > only showing top 3 rows > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9407) Support orc rolling sink writer
[ https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei updated FLINK-9407: Description: Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling sink. Below, FYI. I tested the PR and verify the results with spark sql. Obviously, we can get the results of what we written down before. But I will give more tests in the next couple of days. Including the performance under compression. And more UTs. {code:java} scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21") res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field] scala> scala> res1.registerTempTable("tablerice") warning: there was one deprecation warning; re-run with -deprecation for details scala> spark.sql("select * from tablerice") res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field] scala> res3.show(3) +-+---+---+ | name|age|married| +-+---+---+ |Sagar| 26| false| |Sagar| 30| false| |Sagar| 34| false| +-+---+---+ only showing top 3 rows {code} was: Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling sink. Below, FYI. I tested the PR and verify the results with spark sql. Obviously, we can get the results of what we written down before. But I will give more tests in the next couple of days. Including the performance under compression. And more UT tests. {code:java} scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21") res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field] scala> scala> res1.registerTempTable("tablerice") warning: there was one deprecation warning; re-run with -deprecation for details scala> spark.sql("select * from tablerice") res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field] scala> res3.show(3) +-+---+---+ | name|age|married| +-+---+---+ |Sagar| 26| false| |Sagar| 30| false| |Sagar| 34| false| +-+---+---+ only showing top 3 rows {code} > Support orc rolling sink writer > --- > > Key: FLINK-9407 > URL: https://issues.apache.org/jira/browse/FLINK-9407 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: zhangminglei >Assignee: zhangminglei >Priority: Major > Labels: patch-available, pull-request-available > > Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and > {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling > sink. > Below, FYI. > I tested the PR and verify the results with spark sql. Obviously, we can get > the results of what we written down before. But I will give more tests in the > next couple of days. Including the performance under compression. And more > UTs. > {code:java} > scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21") > res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more > field] > scala> > scala> res1.registerTempTable("tablerice") > warning: there was one deprecation warning; re-run with -deprecation for > details > scala> spark.sql("select * from tablerice") > res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more > field] > scala> res3.show(3) > +-+---+---+ > | name|age|married| > +-+---+---+ > |Sagar| 26| false| > |Sagar| 30| false| > |Sagar| 34| false| > +-+---+---+ > only showing top 3 rows > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9407) Support orc rolling sink writer
[ https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei updated FLINK-9407: Description: Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling sink. Below, FYI. I tested the PR and verify the results with spark sql. Obviously, we can get the results of what we written down before. But I will give more tests in the next couple of days. Including the performance under compression. And more UT tests. {code:java} scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21") res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field] scala> scala> res1.registerTempTable("tablerice") warning: there was one deprecation warning; re-run with -deprecation for details scala> spark.sql("select * from tablerice") res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field] scala> res3.show(3) +-+---+---+ | name|age|married| +-+---+---+ |Sagar| 26| false| |Sagar| 30| false| |Sagar| 34| false| +-+---+---+ only showing top 3 rows {code} was:Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling sink. > Support orc rolling sink writer > --- > > Key: FLINK-9407 > URL: https://issues.apache.org/jira/browse/FLINK-9407 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: zhangminglei >Assignee: zhangminglei >Priority: Major > Labels: patch-available, pull-request-available > > Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and > {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling > sink. > Below, FYI. > I tested the PR and verify the results with spark sql. Obviously, we can get > the results of what we written down before. But I will give more tests in the > next couple of days. Including the performance under compression. And more UT > tests. > {code:java} > scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21") > res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more > field] > scala> > scala> res1.registerTempTable("tablerice") > warning: there was one deprecation warning; re-run with -deprecation for > details > scala> spark.sql("select * from tablerice") > res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more > field] > scala> res3.show(3) > +-+---+---+ > | name|age|married| > +-+---+---+ > |Sagar| 26| false| > |Sagar| 30| false| > |Sagar| 34| false| > +-+---+---+ > only showing top 3 rows > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5232) Add a Thread default uncaught exception handler on the JobManager
[ https://issues.apache.org/jira/browse/FLINK-5232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536059#comment-16536059 ] vinoyang commented on FLINK-5232: - [~till.rohrmann] this issues past a long time, any opinion? > Add a Thread default uncaught exception handler on the JobManager > - > > Key: FLINK-5232 > URL: https://issues.apache.org/jira/browse/FLINK-5232 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: Stephan Ewen >Assignee: vinoyang >Priority: Major > > When some JobManager threads die because of uncaught exceptions, we should > bring down the JobManager. If a thread dies from an uncaught exception, there > is a high chance that the JobManager becomes dysfunctional. > The only sfae thing is to rely on the JobManager being restarted by YARN / > Mesos / Kubernetes / etc. > I suggest to add this code to the JobManager launch: > {code} > Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() { > @Override > public void uncaughtException(Thread t, Throwable e) { > try { > LOG.error("Thread {} died due to an uncaught exception. Killing > process.", t.getName()); > } finally { > Runtime.getRuntime().halt(-1); > } > } > }); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-5231) JobManager should exit on "OutOfMemoryError"
[ https://issues.apache.org/jira/browse/FLINK-5231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-5231: --- Assignee: vinoyang > JobManager should exit on "OutOfMemoryError" > > > Key: FLINK-5231 > URL: https://issues.apache.org/jira/browse/FLINK-5231 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Affects Versions: 1.1.3 >Reporter: Stephan Ewen >Assignee: vinoyang >Priority: Major > > A JobManager that ever encountered an {{OutOfMemoryError}} is with a high > probability not functional any more. However, the {{OutOfMemoryError}} kills > only the thread where it is thrown by default, not the entire JVM. > We should add {{-XX:OnOutOfMemoryError="kill -9 %p"}} both to the standalone > bash options and to the YARN and Mesos launch options. > On the TaskManager side, the decision is more tricky and less critical, so I > suggest to add this only to the JobManager by default. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information
[ https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536052#comment-16536052 ] ASF GitHub Bot commented on FLINK-9377: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6273 conflicts, please update the PR~ > Remove writing serializers as part of the checkpoint meta information > - > > Key: FLINK-9377 > URL: https://issues.apache.org/jira/browse/FLINK-9377 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: pull-request-available > Fix For: 1.6.0 > > > When writing meta information of a state in savepoints, we currently write > both the state serializer as well as the state serializer's configuration > snapshot. > Writing both is actually redundant, as most of the time they have identical > information. > Moreover, the fact that we use Java serialization to write the serializer > and rely on it to be re-readable on the restore run, already poses problems > for serializers such as the {{AvroSerializer}} (see discussion in FLINK-9202) > to perform even a compatible upgrade. > The proposal here is to leave only the config snapshot as meta information, > and use that as the single source of truth of information about the schema of > serialized state. > The config snapshot should be treated as a factory (or provided to a > factory) to re-create serializers capable of reading old, serialized state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6273: [FLINK-9377] [core] Implement restore serializer factory ...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6273 conflicts, please update the PR~ ---
[jira] [Commented] (FLINK-9750) Create new StreamingFileSink that works on Flink's FileSystem abstraction
[ https://issues.apache.org/jira/browse/FLINK-9750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536051#comment-16536051 ] ASF GitHub Bot commented on FLINK-9750: --- Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6281#discussion_r200835177 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/Writer.java --- @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +import org.apache.flink.core.fs.FSDataOutputStream; + +import java.io.IOException; +import java.io.Serializable; + +/** + * Javadoc. --- End diff -- wrong Java doc > Create new StreamingFileSink that works on Flink's FileSystem abstraction > - > > Key: FLINK-9750 > URL: https://issues.apache.org/jira/browse/FLINK-9750 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Stephan Ewen >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Using Flink's own file system abstraction means that we can add additional > streaming/checkpointing related behavior. > In addition, the new StreamingFileSink should only rely on internal > checkpointed state what files are possibly in progress or need to roll over, > never assume enumeration of files in the file system. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6281#discussion_r200835177 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/Writer.java --- @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +import org.apache.flink.core.fs.FSDataOutputStream; + +import java.io.IOException; +import java.io.Serializable; + +/** + * Javadoc. --- End diff -- wrong Java doc ---
[jira] [Commented] (FLINK-9733) Make location for job graph files configurable
[ https://issues.apache.org/jira/browse/FLINK-9733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536050#comment-16536050 ] ASF GitHub Bot commented on FLINK-9733: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6280 @zentol thanks for your suggestion, I have done some refactor and removed the config item from `RestOptions ` > Make location for job graph files configurable > -- > > Key: FLINK-9733 > URL: https://issues.apache.org/jira/browse/FLINK-9733 > Project: Flink > Issue Type: Improvement > Components: Client, Job-Submission >Affects Versions: 1.5.1, 1.6.0 >Reporter: Chesnay Schepler >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > During the job-submission by the {{RestClusterClient}} the {{JobGraph}} is > serialized and written to a file. > Currently we just use {{Files.createTempFile}} for this purposes. > This location should be made configurable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6280: [FLINK-9733] Make location for job graph files configurab...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6280 @zentol thanks for your suggestion, I have done some refactor and removed the config item from `RestOptions ` ---
[jira] [Commented] (FLINK-9407) Support orc rolling sink writer
[ https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536025#comment-16536025 ] ASF GitHub Bot commented on FLINK-9407: --- Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r200833563 --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java --- @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.fs.StreamWriterBase; +import org.apache.flink.streaming.connectors.fs.Writer; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo; + +/** + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}. + * + * @param The type of the elements that are being written by the sink. + */ +public class OrcFileWriter extends StreamWriterBase { + + private static final long serialVersionUID = 3L; + + /** +* The description of the types in an ORC file. +*/ + private TypeDescription schema; + + /** +* The schema of an ORC file. +*/ + private String metaSchema; + + /** +* A row batch that will be written to the ORC file. +*/ + private VectorizedRowBatch rowBatch; + + /** +* The writer that fill the records into the batch. +*/ + private OrcBatchWriter orcBatchWriter; + + private transient org.apache.orc.Writer writer; + + private CompressionKind compressionKind; + + /** +* The number of rows that currently being written. +*/ + private long writedRowSize; + + /** +* Creates a new {@code OrcFileWriter} that writes orc files without compression. +* +* @param metaSchema The orc schema. +*/ + public OrcFileWriter(String metaSchema) { + this(metaSchema, CompressionKind.NONE); + } + + /** +* Create a new {@code OrcFileWriter} that writes orc file with the gaven +* schema and compression kind. +* +* @param metaSchema The schema of an orc file. +* @param compressionKind The compression kind to use. +*/ + public OrcFileWriter(String metaSchema, CompressionKind compressionKind) { + this.metaSchema = metaSchema; + this.schema = TypeDescription.fromString(metaSchema); + this.compressionKind = compressionKind; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind)); + rowBatch = schema.createRowBatch(); + orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes())); + } + + private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) { + List fieldNames =
[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...
Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r200833563 --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java --- @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.fs.StreamWriterBase; +import org.apache.flink.streaming.connectors.fs.Writer; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo; + +/** + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}. + * + * @param The type of the elements that are being written by the sink. + */ +public class OrcFileWriter extends StreamWriterBase { + + private static final long serialVersionUID = 3L; + + /** +* The description of the types in an ORC file. +*/ + private TypeDescription schema; + + /** +* The schema of an ORC file. +*/ + private String metaSchema; + + /** +* A row batch that will be written to the ORC file. +*/ + private VectorizedRowBatch rowBatch; + + /** +* The writer that fill the records into the batch. +*/ + private OrcBatchWriter orcBatchWriter; + + private transient org.apache.orc.Writer writer; + + private CompressionKind compressionKind; + + /** +* The number of rows that currently being written. +*/ + private long writedRowSize; + + /** +* Creates a new {@code OrcFileWriter} that writes orc files without compression. +* +* @param metaSchema The orc schema. +*/ + public OrcFileWriter(String metaSchema) { + this(metaSchema, CompressionKind.NONE); + } + + /** +* Create a new {@code OrcFileWriter} that writes orc file with the gaven +* schema and compression kind. +* +* @param metaSchema The schema of an orc file. +* @param compressionKind The compression kind to use. +*/ + public OrcFileWriter(String metaSchema, CompressionKind compressionKind) { + this.metaSchema = metaSchema; + this.schema = TypeDescription.fromString(metaSchema); + this.compressionKind = compressionKind; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind)); + rowBatch = schema.createRowBatch(); + orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes())); + } + + private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) { + List fieldNames = orcSchema.getFieldNames(); + List typeDescriptions = orcSchema.getChildren(); + List typeInformations = new ArrayList<>(); + + typeDescriptions.forEach(typeDescription -> { +
[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...
Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r200831458 --- Diff: flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcFileWriterTest.java --- @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.streaming.connectors.fs.Writer; +import org.apache.flink.types.Row; + +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link OrcFileWriter}. + */ +public class OrcFileWriterTest { + + @Test + public void testDuplicate() { + OrcFileWriter writer = new OrcFileWriter("struct"); --- End diff -- Yes. I will add more UTs for all in the next couple of days. ---
[jira] [Commented] (FLINK-9407) Support orc rolling sink writer
[ https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536007#comment-16536007 ] ASF GitHub Bot commented on FLINK-9407: --- Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r200831458 --- Diff: flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcFileWriterTest.java --- @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.streaming.connectors.fs.Writer; +import org.apache.flink.types.Row; + +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link OrcFileWriter}. + */ +public class OrcFileWriterTest { + + @Test + public void testDuplicate() { + OrcFileWriter writer = new OrcFileWriter("struct"); --- End diff -- Yes. I will add more UTs for all in the next couple of days. > Support orc rolling sink writer > --- > > Key: FLINK-9407 > URL: https://issues.apache.org/jira/browse/FLINK-9407 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: zhangminglei >Assignee: zhangminglei >Priority: Major > Labels: patch-available, pull-request-available > > Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and > {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling > sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9407) Support orc rolling sink writer
[ https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536005#comment-16536005 ] ASF GitHub Bot commented on FLINK-9407: --- Github user xndai commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r200831360 --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java --- @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.fs.StreamWriterBase; +import org.apache.flink.streaming.connectors.fs.Writer; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo; + +/** + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}. + * + * @param The type of the elements that are being written by the sink. + */ +public class OrcFileWriter extends StreamWriterBase { + + private static final long serialVersionUID = 3L; + + /** +* The description of the types in an ORC file. +*/ + private TypeDescription schema; + + /** +* The schema of an ORC file. +*/ + private String metaSchema; + + /** +* A row batch that will be written to the ORC file. +*/ + private VectorizedRowBatch rowBatch; + + /** +* The writer that fill the records into the batch. +*/ + private OrcBatchWriter orcBatchWriter; + + private transient org.apache.orc.Writer writer; + + private CompressionKind compressionKind; + + /** +* The number of rows that currently being written. +*/ + private long writedRowSize; + + /** +* Creates a new {@code OrcFileWriter} that writes orc files without compression. +* +* @param metaSchema The orc schema. +*/ + public OrcFileWriter(String metaSchema) { + this(metaSchema, CompressionKind.NONE); + } + + /** +* Create a new {@code OrcFileWriter} that writes orc file with the gaven +* schema and compression kind. +* +* @param metaSchema The schema of an orc file. +* @param compressionKind The compression kind to use. +*/ + public OrcFileWriter(String metaSchema, CompressionKind compressionKind) { + this.metaSchema = metaSchema; + this.schema = TypeDescription.fromString(metaSchema); + this.compressionKind = compressionKind; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind)); + rowBatch = schema.createRowBatch(); + orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes())); + } + + private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) { + List fieldNames = orcSchema.getFieldNames();
[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...
Github user xndai commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r200831360 --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java --- @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.fs.StreamWriterBase; +import org.apache.flink.streaming.connectors.fs.Writer; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo; + +/** + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}. + * + * @param The type of the elements that are being written by the sink. + */ +public class OrcFileWriter extends StreamWriterBase { + + private static final long serialVersionUID = 3L; + + /** +* The description of the types in an ORC file. +*/ + private TypeDescription schema; + + /** +* The schema of an ORC file. +*/ + private String metaSchema; + + /** +* A row batch that will be written to the ORC file. +*/ + private VectorizedRowBatch rowBatch; + + /** +* The writer that fill the records into the batch. +*/ + private OrcBatchWriter orcBatchWriter; + + private transient org.apache.orc.Writer writer; + + private CompressionKind compressionKind; + + /** +* The number of rows that currently being written. +*/ + private long writedRowSize; + + /** +* Creates a new {@code OrcFileWriter} that writes orc files without compression. +* +* @param metaSchema The orc schema. +*/ + public OrcFileWriter(String metaSchema) { + this(metaSchema, CompressionKind.NONE); + } + + /** +* Create a new {@code OrcFileWriter} that writes orc file with the gaven +* schema and compression kind. +* +* @param metaSchema The schema of an orc file. +* @param compressionKind The compression kind to use. +*/ + public OrcFileWriter(String metaSchema, CompressionKind compressionKind) { + this.metaSchema = metaSchema; + this.schema = TypeDescription.fromString(metaSchema); + this.compressionKind = compressionKind; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind)); + rowBatch = schema.createRowBatch(); + orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes())); + } + + private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) { + List fieldNames = orcSchema.getFieldNames(); + List typeDescriptions = orcSchema.getChildren(); + List typeInformations = new ArrayList<>(); + + typeDescriptions.forEach(typeDescription -> { +
[jira] [Commented] (FLINK-9407) Support orc rolling sink writer
[ https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536004#comment-16536004 ] ASF GitHub Bot commented on FLINK-9407: --- Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r200830833 --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java --- @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.fs.StreamWriterBase; +import org.apache.flink.streaming.connectors.fs.Writer; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo; + +/** + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}. + * + * @param The type of the elements that are being written by the sink. + */ +public class OrcFileWriter extends StreamWriterBase { + + private static final long serialVersionUID = 3L; + + /** +* The description of the types in an ORC file. +*/ + private TypeDescription schema; + + /** +* The schema of an ORC file. +*/ + private String metaSchema; + + /** +* A row batch that will be written to the ORC file. +*/ + private VectorizedRowBatch rowBatch; + + /** +* The writer that fill the records into the batch. +*/ + private OrcBatchWriter orcBatchWriter; + + private transient org.apache.orc.Writer writer; + + private CompressionKind compressionKind; + + /** +* The number of rows that currently being written. +*/ + private long writedRowSize; + + /** +* Creates a new {@code OrcFileWriter} that writes orc files without compression. +* +* @param metaSchema The orc schema. +*/ + public OrcFileWriter(String metaSchema) { + this(metaSchema, CompressionKind.NONE); + } + + /** +* Create a new {@code OrcFileWriter} that writes orc file with the gaven +* schema and compression kind. +* +* @param metaSchema The schema of an orc file. +* @param compressionKind The compression kind to use. +*/ + public OrcFileWriter(String metaSchema, CompressionKind compressionKind) { + this.metaSchema = metaSchema; + this.schema = TypeDescription.fromString(metaSchema); + this.compressionKind = compressionKind; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind)); + rowBatch = schema.createRowBatch(); + orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes())); + } + + private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) { + List fieldNames =
[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...
Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r200830833 --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java --- @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.fs.StreamWriterBase; +import org.apache.flink.streaming.connectors.fs.Writer; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo; + +/** + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}. + * + * @param The type of the elements that are being written by the sink. + */ +public class OrcFileWriter extends StreamWriterBase { + + private static final long serialVersionUID = 3L; + + /** +* The description of the types in an ORC file. +*/ + private TypeDescription schema; + + /** +* The schema of an ORC file. +*/ + private String metaSchema; + + /** +* A row batch that will be written to the ORC file. +*/ + private VectorizedRowBatch rowBatch; + + /** +* The writer that fill the records into the batch. +*/ + private OrcBatchWriter orcBatchWriter; + + private transient org.apache.orc.Writer writer; + + private CompressionKind compressionKind; + + /** +* The number of rows that currently being written. +*/ + private long writedRowSize; + + /** +* Creates a new {@code OrcFileWriter} that writes orc files without compression. +* +* @param metaSchema The orc schema. +*/ + public OrcFileWriter(String metaSchema) { + this(metaSchema, CompressionKind.NONE); + } + + /** +* Create a new {@code OrcFileWriter} that writes orc file with the gaven +* schema and compression kind. +* +* @param metaSchema The schema of an orc file. +* @param compressionKind The compression kind to use. +*/ + public OrcFileWriter(String metaSchema, CompressionKind compressionKind) { + this.metaSchema = metaSchema; + this.schema = TypeDescription.fromString(metaSchema); + this.compressionKind = compressionKind; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind)); + rowBatch = schema.createRowBatch(); + orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes())); + } + + private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) { + List fieldNames = orcSchema.getFieldNames(); + List typeDescriptions = orcSchema.getChildren(); + List typeInformations = new ArrayList<>(); + + typeDescriptions.forEach(typeDescription -> { +
[jira] [Commented] (FLINK-9407) Support orc rolling sink writer
[ https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16535997#comment-16535997 ] ASF GitHub Bot commented on FLINK-9407: --- Github user xndai commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r200830153 --- Diff: flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcFileWriterTest.java --- @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.streaming.connectors.fs.Writer; +import org.apache.flink.types.Row; + +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link OrcFileWriter}. + */ +public class OrcFileWriterTest { + + @Test + public void testDuplicate() { + OrcFileWriter writer = new OrcFileWriter("struct"); --- End diff -- Need UTs for writing Orc files with all supported types. Also include negative cases, and cases where OrcBatchWriter.fill() returns false. > Support orc rolling sink writer > --- > > Key: FLINK-9407 > URL: https://issues.apache.org/jira/browse/FLINK-9407 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: zhangminglei >Assignee: zhangminglei >Priority: Major > Labels: patch-available, pull-request-available > > Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and > {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling > sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9407) Support orc rolling sink writer
[ https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16535999#comment-16535999 ] ASF GitHub Bot commented on FLINK-9407: --- Github user xndai commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r200830078 --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java --- @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.fs.StreamWriterBase; +import org.apache.flink.streaming.connectors.fs.Writer; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo; + +/** + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}. + * + * @param The type of the elements that are being written by the sink. + */ +public class OrcFileWriter extends StreamWriterBase { + + private static final long serialVersionUID = 3L; + + /** +* The description of the types in an ORC file. +*/ + private TypeDescription schema; + + /** +* The schema of an ORC file. +*/ + private String metaSchema; + + /** +* A row batch that will be written to the ORC file. +*/ + private VectorizedRowBatch rowBatch; + + /** +* The writer that fill the records into the batch. +*/ + private OrcBatchWriter orcBatchWriter; + + private transient org.apache.orc.Writer writer; + + private CompressionKind compressionKind; + + /** +* The number of rows that currently being written. +*/ + private long writedRowSize; + + /** +* Creates a new {@code OrcFileWriter} that writes orc files without compression. +* +* @param metaSchema The orc schema. +*/ + public OrcFileWriter(String metaSchema) { + this(metaSchema, CompressionKind.NONE); + } + + /** +* Create a new {@code OrcFileWriter} that writes orc file with the gaven +* schema and compression kind. +* +* @param metaSchema The schema of an orc file. +* @param compressionKind The compression kind to use. +*/ + public OrcFileWriter(String metaSchema, CompressionKind compressionKind) { + this.metaSchema = metaSchema; + this.schema = TypeDescription.fromString(metaSchema); + this.compressionKind = compressionKind; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind)); + rowBatch = schema.createRowBatch(); + orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes())); + } + + private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) { + List fieldNames = orcSchema.getFieldNames();
[jira] [Commented] (FLINK-9407) Support orc rolling sink writer
[ https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16535998#comment-16535998 ] ASF GitHub Bot commented on FLINK-9407: --- Github user xndai commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r200830111 --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java --- @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.fs.StreamWriterBase; +import org.apache.flink.streaming.connectors.fs.Writer; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo; + +/** + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}. + * + * @param The type of the elements that are being written by the sink. + */ +public class OrcFileWriter extends StreamWriterBase { + + private static final long serialVersionUID = 3L; + + /** +* The description of the types in an ORC file. +*/ + private TypeDescription schema; + + /** +* The schema of an ORC file. +*/ + private String metaSchema; + + /** +* A row batch that will be written to the ORC file. +*/ + private VectorizedRowBatch rowBatch; + + /** +* The writer that fill the records into the batch. +*/ + private OrcBatchWriter orcBatchWriter; + + private transient org.apache.orc.Writer writer; + + private CompressionKind compressionKind; + + /** +* The number of rows that currently being written. +*/ + private long writedRowSize; + + /** +* Creates a new {@code OrcFileWriter} that writes orc files without compression. +* +* @param metaSchema The orc schema. +*/ + public OrcFileWriter(String metaSchema) { + this(metaSchema, CompressionKind.NONE); + } + + /** +* Create a new {@code OrcFileWriter} that writes orc file with the gaven +* schema and compression kind. +* +* @param metaSchema The schema of an orc file. +* @param compressionKind The compression kind to use. +*/ + public OrcFileWriter(String metaSchema, CompressionKind compressionKind) { + this.metaSchema = metaSchema; + this.schema = TypeDescription.fromString(metaSchema); + this.compressionKind = compressionKind; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind)); + rowBatch = schema.createRowBatch(); + orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes())); + } + + private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) { + List fieldNames = orcSchema.getFieldNames();
[jira] [Commented] (FLINK-9407) Support orc rolling sink writer
[ https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16535996#comment-16535996 ] ASF GitHub Bot commented on FLINK-9407: --- Github user xndai commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r200830013 --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java --- @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.fs.StreamWriterBase; +import org.apache.flink.streaming.connectors.fs.Writer; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo; + +/** + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}. + * + * @param The type of the elements that are being written by the sink. + */ +public class OrcFileWriter extends StreamWriterBase { + + private static final long serialVersionUID = 3L; + + /** +* The description of the types in an ORC file. +*/ + private TypeDescription schema; + + /** +* The schema of an ORC file. +*/ + private String metaSchema; + + /** +* A row batch that will be written to the ORC file. +*/ + private VectorizedRowBatch rowBatch; + + /** +* The writer that fill the records into the batch. +*/ + private OrcBatchWriter orcBatchWriter; + + private transient org.apache.orc.Writer writer; + + private CompressionKind compressionKind; + + /** +* The number of rows that currently being written. +*/ + private long writedRowSize; + + /** +* Creates a new {@code OrcFileWriter} that writes orc files without compression. +* +* @param metaSchema The orc schema. +*/ + public OrcFileWriter(String metaSchema) { + this(metaSchema, CompressionKind.NONE); + } + + /** +* Create a new {@code OrcFileWriter} that writes orc file with the gaven +* schema and compression kind. +* +* @param metaSchema The schema of an orc file. +* @param compressionKind The compression kind to use. +*/ + public OrcFileWriter(String metaSchema, CompressionKind compressionKind) { + this.metaSchema = metaSchema; + this.schema = TypeDescription.fromString(metaSchema); + this.compressionKind = compressionKind; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind)); + rowBatch = schema.createRowBatch(); + orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes())); + } + + private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) { + List fieldNames = orcSchema.getFieldNames();
[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...
Github user xndai commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r200830078 --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java --- @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.fs.StreamWriterBase; +import org.apache.flink.streaming.connectors.fs.Writer; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo; + +/** + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}. + * + * @param The type of the elements that are being written by the sink. + */ +public class OrcFileWriter extends StreamWriterBase { + + private static final long serialVersionUID = 3L; + + /** +* The description of the types in an ORC file. +*/ + private TypeDescription schema; + + /** +* The schema of an ORC file. +*/ + private String metaSchema; + + /** +* A row batch that will be written to the ORC file. +*/ + private VectorizedRowBatch rowBatch; + + /** +* The writer that fill the records into the batch. +*/ + private OrcBatchWriter orcBatchWriter; + + private transient org.apache.orc.Writer writer; + + private CompressionKind compressionKind; + + /** +* The number of rows that currently being written. +*/ + private long writedRowSize; + + /** +* Creates a new {@code OrcFileWriter} that writes orc files without compression. +* +* @param metaSchema The orc schema. +*/ + public OrcFileWriter(String metaSchema) { + this(metaSchema, CompressionKind.NONE); + } + + /** +* Create a new {@code OrcFileWriter} that writes orc file with the gaven +* schema and compression kind. +* +* @param metaSchema The schema of an orc file. +* @param compressionKind The compression kind to use. +*/ + public OrcFileWriter(String metaSchema, CompressionKind compressionKind) { + this.metaSchema = metaSchema; + this.schema = TypeDescription.fromString(metaSchema); + this.compressionKind = compressionKind; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind)); + rowBatch = schema.createRowBatch(); + orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes())); + } + + private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) { + List fieldNames = orcSchema.getFieldNames(); + List typeDescriptions = orcSchema.getChildren(); + List typeInformations = new ArrayList<>(); + + typeDescriptions.forEach(typeDescription -> { +
[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...
Github user xndai commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r200830111 --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java --- @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.fs.StreamWriterBase; +import org.apache.flink.streaming.connectors.fs.Writer; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo; + +/** + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}. + * + * @param The type of the elements that are being written by the sink. + */ +public class OrcFileWriter extends StreamWriterBase { + + private static final long serialVersionUID = 3L; + + /** +* The description of the types in an ORC file. +*/ + private TypeDescription schema; + + /** +* The schema of an ORC file. +*/ + private String metaSchema; + + /** +* A row batch that will be written to the ORC file. +*/ + private VectorizedRowBatch rowBatch; + + /** +* The writer that fill the records into the batch. +*/ + private OrcBatchWriter orcBatchWriter; + + private transient org.apache.orc.Writer writer; + + private CompressionKind compressionKind; + + /** +* The number of rows that currently being written. +*/ + private long writedRowSize; + + /** +* Creates a new {@code OrcFileWriter} that writes orc files without compression. +* +* @param metaSchema The orc schema. +*/ + public OrcFileWriter(String metaSchema) { + this(metaSchema, CompressionKind.NONE); + } + + /** +* Create a new {@code OrcFileWriter} that writes orc file with the gaven +* schema and compression kind. +* +* @param metaSchema The schema of an orc file. +* @param compressionKind The compression kind to use. +*/ + public OrcFileWriter(String metaSchema, CompressionKind compressionKind) { + this.metaSchema = metaSchema; + this.schema = TypeDescription.fromString(metaSchema); + this.compressionKind = compressionKind; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind)); + rowBatch = schema.createRowBatch(); + orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes())); + } + + private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) { + List fieldNames = orcSchema.getFieldNames(); + List typeDescriptions = orcSchema.getChildren(); + List typeInformations = new ArrayList<>(); + + typeDescriptions.forEach(typeDescription -> { +
[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...
Github user xndai commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r200830013 --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java --- @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.fs.StreamWriterBase; +import org.apache.flink.streaming.connectors.fs.Writer; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo; + +/** + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}. + * + * @param The type of the elements that are being written by the sink. + */ +public class OrcFileWriter extends StreamWriterBase { + + private static final long serialVersionUID = 3L; + + /** +* The description of the types in an ORC file. +*/ + private TypeDescription schema; + + /** +* The schema of an ORC file. +*/ + private String metaSchema; + + /** +* A row batch that will be written to the ORC file. +*/ + private VectorizedRowBatch rowBatch; + + /** +* The writer that fill the records into the batch. +*/ + private OrcBatchWriter orcBatchWriter; + + private transient org.apache.orc.Writer writer; + + private CompressionKind compressionKind; + + /** +* The number of rows that currently being written. +*/ + private long writedRowSize; + + /** +* Creates a new {@code OrcFileWriter} that writes orc files without compression. +* +* @param metaSchema The orc schema. +*/ + public OrcFileWriter(String metaSchema) { + this(metaSchema, CompressionKind.NONE); + } + + /** +* Create a new {@code OrcFileWriter} that writes orc file with the gaven +* schema and compression kind. +* +* @param metaSchema The schema of an orc file. +* @param compressionKind The compression kind to use. +*/ + public OrcFileWriter(String metaSchema, CompressionKind compressionKind) { + this.metaSchema = metaSchema; + this.schema = TypeDescription.fromString(metaSchema); + this.compressionKind = compressionKind; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind)); + rowBatch = schema.createRowBatch(); + orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes())); + } + + private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) { + List fieldNames = orcSchema.getFieldNames(); + List typeDescriptions = orcSchema.getChildren(); + List typeInformations = new ArrayList<>(); + + typeDescriptions.forEach(typeDescription -> { +
[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...
Github user xndai commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r200830153 --- Diff: flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcFileWriterTest.java --- @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.streaming.connectors.fs.Writer; +import org.apache.flink.types.Row; + +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link OrcFileWriter}. + */ +public class OrcFileWriterTest { + + @Test + public void testDuplicate() { + OrcFileWriter writer = new OrcFileWriter("struct"); --- End diff -- Need UTs for writing Orc files with all supported types. Also include negative cases, and cases where OrcBatchWriter.fill() returns false. ---