[jira] [Commented] (FLINK-9407) Support orc rolling sink writer

2018-07-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536531#comment-16536531
 ] 

ASF GitHub Bot commented on FLINK-9407:
---

Github user wgtmac commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r200878724
  
--- Diff: flink-connectors/flink-orc/pom.xml ---
@@ -54,6 +54,14 @@ under the License.
true

 
+   
+   org.apache.flink
+   
flink-connector-filesystem_${scala.binary.version}
+   ${project.version}
+   
+   true
+   
+

org.apache.orc
orc-core
--- End diff --

Current orc version is 1.5.X. Should we upgrade it as well?


> Support orc rolling sink writer
> ---
>
> Key: FLINK-9407
> URL: https://issues.apache.org/jira/browse/FLINK-9407
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: zhangminglei
>Assignee: zhangminglei
>Priority: Major
>  Labels: pull-request-available
>
> Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and 
> {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling 
> sink.
> Below, FYI.
> I tested the PR and verify the results with spark sql. Obviously, we can get 
> the results of what we had written down before. But I will give more tests in 
> the next couple of days. Including the performance under compression with 
> short checkpoint intervals. And more UTs.
> {code:java}
> scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21")
> res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more 
> field]
> scala>
> scala> res1.registerTempTable("tablerice")
> warning: there was one deprecation warning; re-run with -deprecation for 
> details
> scala> spark.sql("select * from tablerice")
> res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more 
> field]
> scala> res3.show(3)
> +-+---+---+
> | name|age|married|
> +-+---+---+
> |Sagar| 26|  false|
> |Sagar| 30|  false|
> |Sagar| 34|  false|
> +-+---+---+
> only showing top 3 rows
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9407) Support orc rolling sink writer

2018-07-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536530#comment-16536530
 ] 

ASF GitHub Bot commented on FLINK-9407:
---

Github user wgtmac commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r200879775
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java
 ---
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
+import org.apache.flink.streaming.connectors.fs.Writer;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
+
+/**
+ * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
+ *
+ * @param  The type of the elements that are being written by the sink.
+ */
+public class OrcFileWriter extends StreamWriterBase {
+
+   private static final long serialVersionUID = 3L;
+
+   /**
+* The description of the types in an ORC file.
+*/
+   private TypeDescription schema;
+
+   /**
+* The schema of an ORC file.
+*/
+   private String metaSchema;
+
+   /**
+* A row batch that will be written to the ORC file.
+*/
+   private VectorizedRowBatch rowBatch;
+
+   /**
+* The writer that fill the records into the batch.
+*/
+   private OrcBatchWriter orcBatchWriter;
+
+   private transient org.apache.orc.Writer writer;
+
+   private CompressionKind compressionKind;
+
+   /**
+* The number of rows that currently being written.
+*/
+   private long writedRowSize;
+
+   /**
+* Creates a new {@code OrcFileWriter} that writes orc files without 
compression.
+*
+* @param metaSchema The orc schema.
+*/
+   public OrcFileWriter(String metaSchema) {
+   this(metaSchema, CompressionKind.NONE);
+   }
+
+   /**
+* Create a new {@code OrcFileWriter} that writes orc file with the 
gaven
+* schema and compression kind.
+*
+* @param metaSchema  The schema of an orc file.
+* @param compressionKind The compression kind to use.
+*/
+   public OrcFileWriter(String metaSchema, CompressionKind 
compressionKind) {
+   this.metaSchema = metaSchema;
+   this.schema = TypeDescription.fromString(metaSchema);
+   this.compressionKind = compressionKind;
+   }
+
+   @Override
+   public void open(FileSystem fs, Path path) throws IOException {
+   writer = OrcFile.createWriter(path, 
OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
+   rowBatch = schema.createRowBatch();
+   orcBatchWriter = new 
OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
+   }
+
+   private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
+   List fieldNames = orcSchema.getFieldNames();
   

[jira] [Updated] (FLINK-9407) Support orc rolling sink writer

2018-07-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9407:
--
Labels: pull-request-available  (was: )

> Support orc rolling sink writer
> ---
>
> Key: FLINK-9407
> URL: https://issues.apache.org/jira/browse/FLINK-9407
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: zhangminglei
>Assignee: zhangminglei
>Priority: Major
>  Labels: pull-request-available
>
> Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and 
> {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling 
> sink.
> Below, FYI.
> I tested the PR and verify the results with spark sql. Obviously, we can get 
> the results of what we had written down before. But I will give more tests in 
> the next couple of days. Including the performance under compression with 
> short checkpoint intervals. And more UTs.
> {code:java}
> scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21")
> res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more 
> field]
> scala>
> scala> res1.registerTempTable("tablerice")
> warning: there was one deprecation warning; re-run with -deprecation for 
> details
> scala> spark.sql("select * from tablerice")
> res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more 
> field]
> scala> res3.show(3)
> +-+---+---+
> | name|age|married|
> +-+---+---+
> |Sagar| 26|  false|
> |Sagar| 30|  false|
> |Sagar| 34|  false|
> +-+---+---+
> only showing top 3 rows
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

2018-07-08 Thread wgtmac
Github user wgtmac commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r200879243
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java
 ---
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
+import org.apache.flink.streaming.connectors.fs.Writer;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
+
+/**
+ * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
+ *
+ * @param  The type of the elements that are being written by the sink.
+ */
+public class OrcFileWriter extends StreamWriterBase {
+
+   private static final long serialVersionUID = 3L;
+
+   /**
+* The description of the types in an ORC file.
+*/
+   private TypeDescription schema;
+
+   /**
+* The schema of an ORC file.
+*/
+   private String metaSchema;
+
+   /**
+* A row batch that will be written to the ORC file.
+*/
+   private VectorizedRowBatch rowBatch;
+
+   /**
+* The writer that fill the records into the batch.
+*/
+   private OrcBatchWriter orcBatchWriter;
+
+   private transient org.apache.orc.Writer writer;
+
+   private CompressionKind compressionKind;
+
+   /**
+* The number of rows that currently being written.
+*/
+   private long writedRowSize;
+
+   /**
+* Creates a new {@code OrcFileWriter} that writes orc files without 
compression.
+*
+* @param metaSchema The orc schema.
+*/
+   public OrcFileWriter(String metaSchema) {
+   this(metaSchema, CompressionKind.NONE);
+   }
+
+   /**
+* Create a new {@code OrcFileWriter} that writes orc file with the 
gaven
+* schema and compression kind.
+*
+* @param metaSchema  The schema of an orc file.
+* @param compressionKind The compression kind to use.
+*/
+   public OrcFileWriter(String metaSchema, CompressionKind 
compressionKind) {
+   this.metaSchema = metaSchema;
+   this.schema = TypeDescription.fromString(metaSchema);
+   this.compressionKind = compressionKind;
+   }
+
+   @Override
+   public void open(FileSystem fs, Path path) throws IOException {
+   writer = OrcFile.createWriter(path, 
OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
+   rowBatch = schema.createRowBatch();
+   orcBatchWriter = new 
OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
+   }
+
+   private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
+   List fieldNames = orcSchema.getFieldNames();
+   List typeDescriptions = 
orcSchema.getChildren();
+   List typeInformations = new ArrayList<>();
+
+   typeDescriptions.forEach(typeDescription -> {
+   

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

2018-07-08 Thread wgtmac
Github user wgtmac commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r200879624
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java
 ---
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
+import org.apache.flink.streaming.connectors.fs.Writer;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
+
+/**
+ * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
+ *
+ * @param  The type of the elements that are being written by the sink.
+ */
+public class OrcFileWriter extends StreamWriterBase {
+
+   private static final long serialVersionUID = 3L;
+
+   /**
+* The description of the types in an ORC file.
+*/
+   private TypeDescription schema;
+
+   /**
+* The schema of an ORC file.
+*/
+   private String metaSchema;
+
+   /**
+* A row batch that will be written to the ORC file.
+*/
+   private VectorizedRowBatch rowBatch;
+
+   /**
+* The writer that fill the records into the batch.
+*/
+   private OrcBatchWriter orcBatchWriter;
+
+   private transient org.apache.orc.Writer writer;
+
+   private CompressionKind compressionKind;
+
+   /**
+* The number of rows that currently being written.
+*/
+   private long writedRowSize;
+
+   /**
+* Creates a new {@code OrcFileWriter} that writes orc files without 
compression.
+*
+* @param metaSchema The orc schema.
+*/
+   public OrcFileWriter(String metaSchema) {
+   this(metaSchema, CompressionKind.NONE);
+   }
+
+   /**
+* Create a new {@code OrcFileWriter} that writes orc file with the 
gaven
+* schema and compression kind.
+*
+* @param metaSchema  The schema of an orc file.
+* @param compressionKind The compression kind to use.
+*/
+   public OrcFileWriter(String metaSchema, CompressionKind 
compressionKind) {
+   this.metaSchema = metaSchema;
+   this.schema = TypeDescription.fromString(metaSchema);
+   this.compressionKind = compressionKind;
+   }
+
+   @Override
+   public void open(FileSystem fs, Path path) throws IOException {
+   writer = OrcFile.createWriter(path, 
OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
+   rowBatch = schema.createRowBatch();
+   orcBatchWriter = new 
OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
+   }
+
+   private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
+   List fieldNames = orcSchema.getFieldNames();
+   List typeDescriptions = 
orcSchema.getChildren();
+   List typeInformations = new ArrayList<>();
+
+   typeDescriptions.forEach(typeDescription -> {
+   

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

2018-07-08 Thread wgtmac
Github user wgtmac commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r200878724
  
--- Diff: flink-connectors/flink-orc/pom.xml ---
@@ -54,6 +54,14 @@ under the License.
true

 
+   
+   org.apache.flink
+   
flink-connector-filesystem_${scala.binary.version}
+   ${project.version}
+   
+   true
+   
+

org.apache.orc
orc-core
--- End diff --

Current orc version is 1.5.X. Should we upgrade it as well?


---


[jira] [Commented] (FLINK-9407) Support orc rolling sink writer

2018-07-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536529#comment-16536529
 ] 

ASF GitHub Bot commented on FLINK-9407:
---

Github user wgtmac commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r200879624
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java
 ---
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
+import org.apache.flink.streaming.connectors.fs.Writer;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
+
+/**
+ * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
+ *
+ * @param  The type of the elements that are being written by the sink.
+ */
+public class OrcFileWriter extends StreamWriterBase {
+
+   private static final long serialVersionUID = 3L;
+
+   /**
+* The description of the types in an ORC file.
+*/
+   private TypeDescription schema;
+
+   /**
+* The schema of an ORC file.
+*/
+   private String metaSchema;
+
+   /**
+* A row batch that will be written to the ORC file.
+*/
+   private VectorizedRowBatch rowBatch;
+
+   /**
+* The writer that fill the records into the batch.
+*/
+   private OrcBatchWriter orcBatchWriter;
+
+   private transient org.apache.orc.Writer writer;
+
+   private CompressionKind compressionKind;
+
+   /**
+* The number of rows that currently being written.
+*/
+   private long writedRowSize;
+
+   /**
+* Creates a new {@code OrcFileWriter} that writes orc files without 
compression.
+*
+* @param metaSchema The orc schema.
+*/
+   public OrcFileWriter(String metaSchema) {
+   this(metaSchema, CompressionKind.NONE);
+   }
+
+   /**
+* Create a new {@code OrcFileWriter} that writes orc file with the 
gaven
+* schema and compression kind.
+*
+* @param metaSchema  The schema of an orc file.
+* @param compressionKind The compression kind to use.
+*/
+   public OrcFileWriter(String metaSchema, CompressionKind 
compressionKind) {
+   this.metaSchema = metaSchema;
+   this.schema = TypeDescription.fromString(metaSchema);
+   this.compressionKind = compressionKind;
+   }
+
+   @Override
+   public void open(FileSystem fs, Path path) throws IOException {
+   writer = OrcFile.createWriter(path, 
OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
+   rowBatch = schema.createRowBatch();
+   orcBatchWriter = new 
OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
+   }
+
+   private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
+   List fieldNames = orcSchema.getFieldNames();
   

[jira] [Commented] (FLINK-9407) Support orc rolling sink writer

2018-07-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536532#comment-16536532
 ] 

ASF GitHub Bot commented on FLINK-9407:
---

Github user wgtmac commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r200879243
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java
 ---
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
+import org.apache.flink.streaming.connectors.fs.Writer;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
+
+/**
+ * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
+ *
+ * @param  The type of the elements that are being written by the sink.
+ */
+public class OrcFileWriter extends StreamWriterBase {
+
+   private static final long serialVersionUID = 3L;
+
+   /**
+* The description of the types in an ORC file.
+*/
+   private TypeDescription schema;
+
+   /**
+* The schema of an ORC file.
+*/
+   private String metaSchema;
+
+   /**
+* A row batch that will be written to the ORC file.
+*/
+   private VectorizedRowBatch rowBatch;
+
+   /**
+* The writer that fill the records into the batch.
+*/
+   private OrcBatchWriter orcBatchWriter;
+
+   private transient org.apache.orc.Writer writer;
+
+   private CompressionKind compressionKind;
+
+   /**
+* The number of rows that currently being written.
+*/
+   private long writedRowSize;
+
+   /**
+* Creates a new {@code OrcFileWriter} that writes orc files without 
compression.
+*
+* @param metaSchema The orc schema.
+*/
+   public OrcFileWriter(String metaSchema) {
+   this(metaSchema, CompressionKind.NONE);
+   }
+
+   /**
+* Create a new {@code OrcFileWriter} that writes orc file with the 
gaven
+* schema and compression kind.
+*
+* @param metaSchema  The schema of an orc file.
+* @param compressionKind The compression kind to use.
+*/
+   public OrcFileWriter(String metaSchema, CompressionKind 
compressionKind) {
+   this.metaSchema = metaSchema;
+   this.schema = TypeDescription.fromString(metaSchema);
+   this.compressionKind = compressionKind;
+   }
+
+   @Override
+   public void open(FileSystem fs, Path path) throws IOException {
+   writer = OrcFile.createWriter(path, 
OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
+   rowBatch = schema.createRowBatch();
+   orcBatchWriter = new 
OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
+   }
+
+   private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
+   List fieldNames = orcSchema.getFieldNames();
   

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

2018-07-08 Thread wgtmac
Github user wgtmac commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r200879775
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java
 ---
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
+import org.apache.flink.streaming.connectors.fs.Writer;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
+
+/**
+ * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
+ *
+ * @param  The type of the elements that are being written by the sink.
+ */
+public class OrcFileWriter extends StreamWriterBase {
+
+   private static final long serialVersionUID = 3L;
+
+   /**
+* The description of the types in an ORC file.
+*/
+   private TypeDescription schema;
+
+   /**
+* The schema of an ORC file.
+*/
+   private String metaSchema;
+
+   /**
+* A row batch that will be written to the ORC file.
+*/
+   private VectorizedRowBatch rowBatch;
+
+   /**
+* The writer that fill the records into the batch.
+*/
+   private OrcBatchWriter orcBatchWriter;
+
+   private transient org.apache.orc.Writer writer;
+
+   private CompressionKind compressionKind;
+
+   /**
+* The number of rows that currently being written.
+*/
+   private long writedRowSize;
+
+   /**
+* Creates a new {@code OrcFileWriter} that writes orc files without 
compression.
+*
+* @param metaSchema The orc schema.
+*/
+   public OrcFileWriter(String metaSchema) {
+   this(metaSchema, CompressionKind.NONE);
+   }
+
+   /**
+* Create a new {@code OrcFileWriter} that writes orc file with the 
gaven
+* schema and compression kind.
+*
+* @param metaSchema  The schema of an orc file.
+* @param compressionKind The compression kind to use.
+*/
+   public OrcFileWriter(String metaSchema, CompressionKind 
compressionKind) {
+   this.metaSchema = metaSchema;
+   this.schema = TypeDescription.fromString(metaSchema);
+   this.compressionKind = compressionKind;
+   }
+
+   @Override
+   public void open(FileSystem fs, Path path) throws IOException {
+   writer = OrcFile.createWriter(path, 
OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
+   rowBatch = schema.createRowBatch();
+   orcBatchWriter = new 
OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
+   }
+
+   private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
+   List fieldNames = orcSchema.getFieldNames();
+   List typeDescriptions = 
orcSchema.getChildren();
+   List typeInformations = new ArrayList<>();
+
+   typeDescriptions.forEach(typeDescription -> {
+   

[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-07-08 Thread Zhenqiu Huang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536525#comment-16536525
 ] 

Zhenqiu Huang commented on FLINK-7243:
--

[~nssalian] 

We already have a working version on production. I am gonna create a PR next 
week for people to review. Do you have specific requirement or functionality 
for it?

> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: godfrey he
>Assignee: Zhenqiu Huang
>Priority: Major
>
> Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-07-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536522#comment-16536522
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user suez1224 closed the pull request at:

https://github.com/apache/flink/pull/6201


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-07-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536523#comment-16536523
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

GitHub user suez1224 reopened a pull request:

https://github.com/apache/flink/pull/6201

[FLINK-8866][Table API & SQL] Add support for unified table sink 
instantiation

**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

Add interfaces to support unified table sink configuration and 
instantiation. Consolidate table source and table sink configuration and 
instantiation.


## Brief change log

- Consolidate table sink and table source instantiation with 
TableConnectorFactory{Service}.
- Add support to register a Calcite table with both tableSource and 
tableSink.
- Add Insert command support in SQL client.
- Add CsvTableSinkFactory.

## Verifying this change

This change added tests and can be verified as follows:

  - *Added integration tests for testing registering table source and sink 
tables under the same name. 
  - *Added integration tests for testing insert into command in sql client.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/suez1224/flink FLINK-8866-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6201.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6201


commit d436041fda022f405fa3a1d497b95d2ff0934ce5
Author: Shuyi Chen 
Date:   2018-06-19T19:00:34Z

- Add unified table sink instantiation.
- Consolidate table sink and table source instantiation.
- Add support to register a Calcite table with both tableSource and 
tableSink.
- Add Insert command support in SQL client.
- Add CsvTableSinkFactory.

commit d70d033abec0806f1723b18f7bcbab1f60ec7280
Author: Shuyi Chen 
Date:   2018-06-28T18:30:21Z

comment fixes

commit 0649359106ddfa003c44e3d0221f7f52ac507593
Author: Shuyi Chen 
Date:   2018-07-06T23:31:05Z

refactor:
1) add TableFactoryDiscoverable trait
2) add util for handling rowtime/proctime for table schema and unittests




> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-07-08 Thread suez1224
GitHub user suez1224 reopened a pull request:

https://github.com/apache/flink/pull/6201

[FLINK-8866][Table API & SQL] Add support for unified table sink 
instantiation

**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

Add interfaces to support unified table sink configuration and 
instantiation. Consolidate table source and table sink configuration and 
instantiation.


## Brief change log

- Consolidate table sink and table source instantiation with 
TableConnectorFactory{Service}.
- Add support to register a Calcite table with both tableSource and 
tableSink.
- Add Insert command support in SQL client.
- Add CsvTableSinkFactory.

## Verifying this change

This change added tests and can be verified as follows:

  - *Added integration tests for testing registering table source and sink 
tables under the same name. 
  - *Added integration tests for testing insert into command in sql client.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/suez1224/flink FLINK-8866-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6201.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6201


commit d436041fda022f405fa3a1d497b95d2ff0934ce5
Author: Shuyi Chen 
Date:   2018-06-19T19:00:34Z

- Add unified table sink instantiation.
- Consolidate table sink and table source instantiation.
- Add support to register a Calcite table with both tableSource and 
tableSink.
- Add Insert command support in SQL client.
- Add CsvTableSinkFactory.

commit d70d033abec0806f1723b18f7bcbab1f60ec7280
Author: Shuyi Chen 
Date:   2018-06-28T18:30:21Z

comment fixes

commit 0649359106ddfa003c44e3d0221f7f52ac507593
Author: Shuyi Chen 
Date:   2018-07-06T23:31:05Z

refactor:
1) add TableFactoryDiscoverable trait
2) add util for handling rowtime/proctime for table schema and unittests




---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-07-08 Thread suez1224
Github user suez1224 closed the pull request at:

https://github.com/apache/flink/pull/6201


---


[jira] [Commented] (FLINK-9666) short-circuit logic should be used in boolean contexts

2018-07-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536483#comment-16536483
 ] 

ASF GitHub Bot commented on FLINK-9666:
---

Github user lamber-ken commented on the issue:

https://github.com/apache/flink/pull/6230
  
@tillrohrmann, @zentol, cc.


> short-circuit logic should be used in boolean contexts
> --
>
> Key: FLINK-9666
> URL: https://issues.apache.org/jira/browse/FLINK-9666
> Project: Flink
>  Issue Type: Improvement
>  Components: Core, DataStream API
>Affects Versions: 1.5.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> short-circuit logic should be used in boolean contexts



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6230: [FLINK-9666] short-circuit logic should be used in boolea...

2018-07-08 Thread lamber-ken
Github user lamber-ken commented on the issue:

https://github.com/apache/flink/pull/6230
  
@tillrohrmann, @zentol, cc.


---


[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-07-08 Thread Neelesh Srinivas Salian (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536479#comment-16536479
 ] 

Neelesh Srinivas Salian commented on FLINK-7243:


What is the status of this [~godfreyhe] and [~ZhenqiuHuang]?

Shall I assign and start working on this?

> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: godfrey he
>Assignee: Zhenqiu Huang
>Priority: Major
>
> Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-6976) Add STR_TO_DATE supported in TableAPI

2018-07-08 Thread Neelesh Srinivas Salian (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536478#comment-16536478
 ] 

Neelesh Srinivas Salian edited comment on FLINK-6976 at 7/9/18 1:40 AM:


[~sunjincheng121], could I work on this if no one else has? I realize it needs 
https://issues.apache.org/jira/browse/FLINK-6895


I could work on both as well.


was (Author: nssalian):
[~sunjincheng121], could I work on this if no one else has? I realize it needs 
[https://issues.apache.org/jira/browse/FLINK-6895](FLINK-6895)

> Add STR_TO_DATE supported in TableAPI
> -
>
> Key: FLINK-6976
> URL: https://issues.apache.org/jira/browse/FLINK-6976
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Priority: Major
>  Labels: starter
>
> See FLINK-6895 for detail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-6976) Add STR_TO_DATE supported in TableAPI

2018-07-08 Thread Neelesh Srinivas Salian (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536478#comment-16536478
 ] 

Neelesh Srinivas Salian edited comment on FLINK-6976 at 7/9/18 1:40 AM:


[~sunjincheng121], could I work on this if no one else has? I realize it needs 
[https://issues.apache.org/jira/browse/FLINK-6895](FLINK-6895)


was (Author: nssalian):
[~sunjincheng121], could I work on this if no one else has?

> Add STR_TO_DATE supported in TableAPI
> -
>
> Key: FLINK-6976
> URL: https://issues.apache.org/jira/browse/FLINK-6976
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Priority: Major
>  Labels: starter
>
> See FLINK-6895 for detail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6976) Add STR_TO_DATE supported in TableAPI

2018-07-08 Thread Neelesh Srinivas Salian (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536478#comment-16536478
 ] 

Neelesh Srinivas Salian commented on FLINK-6976:


[~sunjincheng121], could I work on this if no one else has?

> Add STR_TO_DATE supported in TableAPI
> -
>
> Key: FLINK-6976
> URL: https://issues.apache.org/jira/browse/FLINK-6976
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Priority: Major
>  Labels: starter
>
> See FLINK-6895 for detail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9777) YARN: JM and TM Memory must be specified with Units

2018-07-08 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536477#comment-16536477
 ] 

vinoyang commented on FLINK-9777:
-

[~gjy] I fixed issue FLINK-6469, I will take this ticket.

> YARN: JM and TM Memory must be specified with Units 
> 
>
> Key: FLINK-9777
> URL: https://issues.apache.org/jira/browse/FLINK-9777
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, YARN
>Affects Versions: 1.6.0
> Environment: commit 9f736d1927c62d220a82931c4f5ffa4955910f27
>Reporter: Gary Yao
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.6.0
>
>
> FLINK-6469 breaks backwards compatibility because the JobManager and 
> TaskManager memory must be specified with units (otherwise bytes are 
> assumed). The command to start a YARN session as documented 
> ([https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md)
>  
> |https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md]
>  would not work because 1024 bytes and 4096 bytes are not enough for the heap 
> size. The command finishes with the following exception:
> {noformat}
> java.lang.reflect.UndeclaredThrowableException
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1854)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:802)
> Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: 
> Couldn't deploy Yarn session cluster
>   at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:420)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:599)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:802)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>   ... 2 more
> Caused by: org.apache.flink.util.FlinkException: Cannot fulfill the minimum 
> memory requirements with the provided cluster specification. Please increase 
> the memory of the cluster.
>   at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.validateClusterSpecification(AbstractYarnClusterDescriptor.java:453)
>   at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:475)
>   at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:413)
>   ... 7 more
> Caused by: java.lang.IllegalArgumentException
>   at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
>   at 
> org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters.calculateCutoffMB(ContaineredTaskManagerParameters.java:115)
>   at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.validateClusterSpecification(AbstractYarnClusterDescriptor.java:450)
>   ... 9 more
> {noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9777) YARN: JM and TM Memory must be specified with Units

2018-07-08 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-9777:
---

Assignee: vinoyang

> YARN: JM and TM Memory must be specified with Units 
> 
>
> Key: FLINK-9777
> URL: https://issues.apache.org/jira/browse/FLINK-9777
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, YARN
>Affects Versions: 1.6.0
> Environment: commit 9f736d1927c62d220a82931c4f5ffa4955910f27
>Reporter: Gary Yao
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.6.0
>
>
> FLINK-6469 breaks backwards compatibility because the JobManager and 
> TaskManager memory must be specified with units (otherwise bytes are 
> assumed). The command to start a YARN session as documented 
> ([https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md)
>  
> |https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md]
>  would not work because 1024 bytes and 4096 bytes are not enough for the heap 
> size. The command finishes with the following exception:
> {noformat}
> java.lang.reflect.UndeclaredThrowableException
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1854)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:802)
> Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: 
> Couldn't deploy Yarn session cluster
>   at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:420)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:599)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:802)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>   ... 2 more
> Caused by: org.apache.flink.util.FlinkException: Cannot fulfill the minimum 
> memory requirements with the provided cluster specification. Please increase 
> the memory of the cluster.
>   at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.validateClusterSpecification(AbstractYarnClusterDescriptor.java:453)
>   at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:475)
>   at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:413)
>   ... 7 more
> Caused by: java.lang.IllegalArgumentException
>   at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
>   at 
> org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters.calculateCutoffMB(ContaineredTaskManagerParameters.java:115)
>   at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.validateClusterSpecification(AbstractYarnClusterDescriptor.java:450)
>   ... 9 more
> {noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9588) Reuse the same conditionContext with in a same computationState

2018-07-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536455#comment-16536455
 ] 

ASF GitHub Bot commented on FLINK-9588:
---

Github user Aitozi closed the pull request at:

https://github.com/apache/flink/pull/6168


> Reuse the same conditionContext with in a same computationState
> ---
>
> Key: FLINK-9588
> URL: https://issues.apache.org/jira/browse/FLINK-9588
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Now cep checkFilterCondition with a newly created Conditioncontext for each 
> edge, which will result in the repeatable getEventsForPattern because of the 
> different Conditioncontext Object.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...

2018-07-08 Thread Aitozi
Github user Aitozi closed the pull request at:

https://github.com/apache/flink/pull/6168


---


[jira] [Commented] (FLINK-9750) Create new StreamingFileSink that works on Flink's FileSystem abstraction

2018-07-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536289#comment-16536289
 ] 

ASF GitHub Bot commented on FLINK-9750:
---

Github user xndai commented on the issue:

https://github.com/apache/flink/pull/6281
  
hi @kl0u , where can I find overall document/design of the bucketing sink 
rework (FLINK-9749)? I am a committer of Apache Orc and are interested in 
helping Flink-Orc integration.


> Create new StreamingFileSink that works on Flink's FileSystem abstraction
> -
>
> Key: FLINK-9750
> URL: https://issues.apache.org/jira/browse/FLINK-9750
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Stephan Ewen
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Using Flink's own file system abstraction means that we can add additional 
> streaming/checkpointing related behavior.
> In addition, the new StreamingFileSink should only rely on internal 
> checkpointed state what files are possibly in progress or need to roll over, 
> never assume enumeration of files in the file system.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6281: [FLINK-9750] Add new StreamingFileSink with ResumableWrit...

2018-07-08 Thread xndai
Github user xndai commented on the issue:

https://github.com/apache/flink/pull/6281
  
hi @kl0u , where can I find overall document/design of the bucketing sink 
rework (FLINK-9749)? I am a committer of Apache Orc and are interested in 
helping Flink-Orc integration.


---


[jira] [Updated] (FLINK-9777) YARN: JM and TM Memory must be specified with Units

2018-07-08 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-9777:

Description: 
FLINK-6469 breaks backwards compatibility because the JobManager and 
TaskManager memory must be specified with units (otherwise bytes are assumed). 
The command to start a YARN session as documented 
([https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md)
 
|https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md]
 would not work because 1024 bytes and 4096 bytes are not enough for the heap 
size. The command finishes with the following exception:
{noformat}
java.lang.reflect.UndeclaredThrowableException
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1854)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:802)
Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: 
Couldn't deploy Yarn session cluster
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:420)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:599)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:802)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
... 2 more
Caused by: org.apache.flink.util.FlinkException: Cannot fulfill the minimum 
memory requirements with the provided cluster specification. Please increase 
the memory of the cluster.
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.validateClusterSpecification(AbstractYarnClusterDescriptor.java:453)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:475)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:413)
... 7 more
Caused by: java.lang.IllegalArgumentException
at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
at 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters.calculateCutoffMB(ContaineredTaskManagerParameters.java:115)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.validateClusterSpecification(AbstractYarnClusterDescriptor.java:450)
... 9 more
{noformat}
 

  was:
FLINK-6469 breaks backwards compatibility because the JobManager and 
TaskManager memory should be specified with units (otherwise bytes are 
assumed). The command to start a YARN session as documented 
([https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md)
 
|https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md]
 would not work because 1024 bytes and 4096 bytes are not enough for the heap 
size. The command finishes with the following exception:
{noformat}
java.lang.reflect.UndeclaredThrowableException
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1854)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:802)
Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: 
Couldn't deploy Yarn session cluster
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:420)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:599)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:802)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
... 2 more
Caused by: org.apache.flink.util.FlinkException: Cannot fulfill the minimum 
memory requirements with the provided cluster specification. Please increase 
the memory of the cluster.
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.validateClusterSpecification(AbstractYarnClusterDescriptor.java:453)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:475)
at 

[jira] [Updated] (FLINK-9777) YARN: JM and TM Memory must be specified with Units

2018-07-08 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-9777:

Description: 
FLINK-6469 breaks backwards compatibility because the JobManager and 
TaskManager memory should be specified with units (otherwise bytes are 
assumed). The command to start a YARN session as documented 
([https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md)
 
|https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md]
 would not work because 1024 bytes and 4096 bytes are not enough for the heap 
size. The command finishes with the following exception:
{noformat}
java.lang.reflect.UndeclaredThrowableException
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1854)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:802)
Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: 
Couldn't deploy Yarn session cluster
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:420)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:599)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:802)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
... 2 more
Caused by: org.apache.flink.util.FlinkException: Cannot fulfill the minimum 
memory requirements with the provided cluster specification. Please increase 
the memory of the cluster.
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.validateClusterSpecification(AbstractYarnClusterDescriptor.java:453)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:475)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:413)
... 7 more
Caused by: java.lang.IllegalArgumentException
at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
at 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters.calculateCutoffMB(ContaineredTaskManagerParameters.java:115)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.validateClusterSpecification(AbstractYarnClusterDescriptor.java:450)
... 9 more
{noformat}
 

  was:
With FLINK-6469, the JobManager and TaskManager memory must be specified with 
units (otherwise bytes are assumed). The command to start a YARN session as 
documented 
([https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md)
 
|https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md]
 would not work because 1024 bytes and 4096 bytes are not enough for the heap 
size. The command finishes with the following exception: 

{noformat}
java.lang.reflect.UndeclaredThrowableException
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1854)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:802)
Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: 
Couldn't deploy Yarn session cluster
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:420)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:599)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:802)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
... 2 more
Caused by: org.apache.flink.util.FlinkException: Cannot fulfill the minimum 
memory requirements with the provided cluster specification. Please increase 
the memory of the cluster.
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.validateClusterSpecification(AbstractYarnClusterDescriptor.java:453)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:475)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:413)
... 7 more

[jira] [Updated] (FLINK-9777) YARN: JM and TM Memory must be specified with Units

2018-07-08 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-9777:

Environment: commit 9f736d1927c62d220a82931c4f5ffa4955910f27

> YARN: JM and TM Memory must be specified with Units 
> 
>
> Key: FLINK-9777
> URL: https://issues.apache.org/jira/browse/FLINK-9777
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, YARN
>Affects Versions: 1.6.0
> Environment: commit 9f736d1927c62d220a82931c4f5ffa4955910f27
>Reporter: Gary Yao
>Priority: Blocker
> Fix For: 1.6.0
>
>
> With FLINK-6469, the JobManager and TaskManager memory must be specified with 
> units (otherwise bytes are assumed). The command to start a YARN session as 
> documented 
> ([https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md)
>  
> |https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md]
>  would not work because 1024 bytes and 4096 bytes are not enough for the heap 
> size. The command finishes with the following exception: 
> {noformat}
> java.lang.reflect.UndeclaredThrowableException
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1854)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:802)
> Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: 
> Couldn't deploy Yarn session cluster
>   at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:420)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:599)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:802)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>   ... 2 more
> Caused by: org.apache.flink.util.FlinkException: Cannot fulfill the minimum 
> memory requirements with the provided cluster specification. Please increase 
> the memory of the cluster.
>   at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.validateClusterSpecification(AbstractYarnClusterDescriptor.java:453)
>   at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:475)
>   at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:413)
>   ... 7 more
> Caused by: java.lang.IllegalArgumentException
>   at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
>   at 
> org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters.calculateCutoffMB(ContaineredTaskManagerParameters.java:115)
>   at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.validateClusterSpecification(AbstractYarnClusterDescriptor.java:450)
>   ... 9 more
> {noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9777) YARN: JM and TM Memory must be specified with Units

2018-07-08 Thread Gary Yao (JIRA)
Gary Yao created FLINK-9777:
---

 Summary: YARN: JM and TM Memory must be specified with Units 
 Key: FLINK-9777
 URL: https://issues.apache.org/jira/browse/FLINK-9777
 Project: Flink
  Issue Type: Bug
  Components: Documentation, YARN
Affects Versions: 1.6.0
Reporter: Gary Yao
 Fix For: 1.6.0


With FLINK-6469, the JobManager and TaskManager memory must be specified with 
units (otherwise bytes are assumed). The command to start a YARN session as 
documented 
([https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md)
 
|https://github.com/apache/flink/blob/9f736d1927c62d220a82931c4f5ffa4955910f27/docs/ops/deployment/yarn_setup.md]
 would not work because 1024 bytes and 4096 bytes are not enough for the heap 
size. The command finishes with the following exception: 

{noformat}
java.lang.reflect.UndeclaredThrowableException
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1854)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:802)
Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: 
Couldn't deploy Yarn session cluster
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:420)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:599)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:802)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
... 2 more
Caused by: org.apache.flink.util.FlinkException: Cannot fulfill the minimum 
memory requirements with the provided cluster specification. Please increase 
the memory of the cluster.
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.validateClusterSpecification(AbstractYarnClusterDescriptor.java:453)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:475)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:413)
... 7 more
Caused by: java.lang.IllegalArgumentException
at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
at 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters.calculateCutoffMB(ContaineredTaskManagerParameters.java:115)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.validateClusterSpecification(AbstractYarnClusterDescriptor.java:450)
... 9 more
{noformat}

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9588) Reuse the same conditionContext with in a same computationState

2018-07-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536132#comment-16536132
 ] 

ASF GitHub Bot commented on FLINK-9588:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/6168
  
Hi @Aitozi, yes please close this PR, as I made a spelling mistake while 
merging and that's why it wasn't closed automatically. Thanks!


> Reuse the same conditionContext with in a same computationState
> ---
>
> Key: FLINK-9588
> URL: https://issues.apache.org/jira/browse/FLINK-9588
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Now cep checkFilterCondition with a newly created Conditioncontext for each 
> edge, which will result in the repeatable getEventsForPattern because of the 
> different Conditioncontext Object.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6168: [FLINK-9588][CEP]Reused context with same computation sta...

2018-07-08 Thread dawidwys
Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/6168
  
Hi @Aitozi, yes please close this PR, as I made a spelling mistake while 
merging and that's why it wasn't closed automatically. Thanks!


---


[jira] [Updated] (FLINK-9407) Support orc rolling sink writer

2018-07-08 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei updated FLINK-9407:

Description: 
Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and 
{{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling sink.

Below, FYI.

I tested the PR and verify the results with spark sql. Obviously, we can get 
the results of what we had written down before. But I will give more tests in 
the next couple of days. Including the performance under compression with short 
checkpoint intervals. And more UTs.
{code:java}
scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21")
res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]

scala>

scala> res1.registerTempTable("tablerice")
warning: there was one deprecation warning; re-run with -deprecation for details

scala> spark.sql("select * from tablerice")
res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]

scala> res3.show(3)
+-+---+---+
| name|age|married|
+-+---+---+
|Sagar| 26|  false|
|Sagar| 30|  false|
|Sagar| 34|  false|
+-+---+---+
only showing top 3 rows
{code}

  was:
Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and 
{{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling sink.

Below, FYI.

I tested the PR and verify the results with spark sql. Obviously, we can get 
the results of what we had written down before. But I will give more tests in 
the next couple of days. Including the performance under compression. And more 
UTs.
{code:java}
scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21")
res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]

scala>

scala> res1.registerTempTable("tablerice")
warning: there was one deprecation warning; re-run with -deprecation for details

scala> spark.sql("select * from tablerice")
res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]

scala> res3.show(3)
+-+---+---+
| name|age|married|
+-+---+---+
|Sagar| 26|  false|
|Sagar| 30|  false|
|Sagar| 34|  false|
+-+---+---+
only showing top 3 rows
{code}


> Support orc rolling sink writer
> ---
>
> Key: FLINK-9407
> URL: https://issues.apache.org/jira/browse/FLINK-9407
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: zhangminglei
>Assignee: zhangminglei
>Priority: Major
>
> Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and 
> {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling 
> sink.
> Below, FYI.
> I tested the PR and verify the results with spark sql. Obviously, we can get 
> the results of what we had written down before. But I will give more tests in 
> the next couple of days. Including the performance under compression with 
> short checkpoint intervals. And more UTs.
> {code:java}
> scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21")
> res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more 
> field]
> scala>
> scala> res1.registerTempTable("tablerice")
> warning: there was one deprecation warning; re-run with -deprecation for 
> details
> scala> spark.sql("select * from tablerice")
> res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more 
> field]
> scala> res3.show(3)
> +-+---+---+
> | name|age|married|
> +-+---+---+
> |Sagar| 26|  false|
> |Sagar| 30|  false|
> |Sagar| 34|  false|
> +-+---+---+
> only showing top 3 rows
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-6847) Add TIMESTAMPDIFF supported in TableAPI

2018-07-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-6847:
--
Labels: pull-request-available starter  (was: starter)

> Add TIMESTAMPDIFF supported in TableAPI
> ---
>
> Key: FLINK-6847
> URL: https://issues.apache.org/jira/browse/FLINK-6847
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Priority: Major
>  Labels: pull-request-available, starter
>
> see FLINK-6813



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9411) Support parquet rolling sink writer

2018-07-08 Thread zhangminglei (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536070#comment-16536070
 ] 

zhangminglei commented on FLINK-9411:
-

[~triones] Are you still working on this ?

> Support parquet rolling sink writer
> ---
>
> Key: FLINK-9411
> URL: https://issues.apache.org/jira/browse/FLINK-9411
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: zhangminglei
>Assignee: Triones Deng
>Priority: Major
>
> Like support orc rolling sink writer in FLINK-9407 , we should also support 
> parquet rolling sink writer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6847) Add TIMESTAMPDIFF supported in TableAPI

2018-07-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536071#comment-16536071
 ] 

ASF GitHub Bot commented on FLINK-6847:
---

GitHub user xueyumusic opened a pull request:

https://github.com/apache/flink/pull/6282

[FLINK-6847][FLINK-6813][Table Api & Sql] TimestampDiff table api and sql 
support


## What is the purpose of the change
This PR propose to add TimestampDiff sql and table Api.
Sql format is like `TIMESTAMP(DAY, DATE '2018-07-03', DATE '2018-07-04')`, 
was explained and based on https://github.com/apache/flink/pull/4117
Table Api signature is like `timestampDiff(TimeIntervalUnit.DAY, 
'2018-07-03'.toDate, '2018-07-04'.toDate)`

## Brief change log
  - *modify `generateTemporalPlusMinus`  in `ScalarOperators.scala` to 
support sql code gen*
  - *add `timestampDiff` api in `time.scala`*
  - *add timestampDiff expr string parse in `ExpressionParser.scala`*

## Verifying this change
This change added tests and can be verified as follows:

*(example:)*
  - *add tests to verify sql and table api, in various types combine of 
unit(YEAR, MONTH...SECOND) and datetime(DATE or TIMESTAMP)*
  - *add validation tests *

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (docs / JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xueyumusic/flink timestampDiff

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6282.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6282


commit 462060f8854d19a355bcf4cc2fd54a2200f3e35b
Author: xueyu <278006819@...>
Date:   2018-07-08T10:50:06Z

timestampDiff table api and sql




> Add TIMESTAMPDIFF supported in TableAPI
> ---
>
> Key: FLINK-6847
> URL: https://issues.apache.org/jira/browse/FLINK-6847
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Priority: Major
>  Labels: pull-request-available, starter
>
> see FLINK-6813



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6282: [FLINK-6847][FLINK-6813][Table Api & Sql] Timestam...

2018-07-08 Thread xueyumusic
GitHub user xueyumusic opened a pull request:

https://github.com/apache/flink/pull/6282

[FLINK-6847][FLINK-6813][Table Api & Sql] TimestampDiff table api and sql 
support


## What is the purpose of the change
This PR propose to add TimestampDiff sql and table Api.
Sql format is like `TIMESTAMP(DAY, DATE '2018-07-03', DATE '2018-07-04')`, 
was explained and based on https://github.com/apache/flink/pull/4117
Table Api signature is like `timestampDiff(TimeIntervalUnit.DAY, 
'2018-07-03'.toDate, '2018-07-04'.toDate)`

## Brief change log
  - *modify `generateTemporalPlusMinus`  in `ScalarOperators.scala` to 
support sql code gen*
  - *add `timestampDiff` api in `time.scala`*
  - *add timestampDiff expr string parse in `ExpressionParser.scala`*

## Verifying this change
This change added tests and can be verified as follows:

*(example:)*
  - *add tests to verify sql and table api, in various types combine of 
unit(YEAR, MONTH...SECOND) and datetime(DATE or TIMESTAMP)*
  - *add validation tests *

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (docs / JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xueyumusic/flink timestampDiff

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6282.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6282


commit 462060f8854d19a355bcf4cc2fd54a2200f3e35b
Author: xueyu <278006819@...>
Date:   2018-07-08T10:50:06Z

timestampDiff table api and sql




---


[jira] [Updated] (FLINK-9407) Support orc rolling sink writer

2018-07-08 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei updated FLINK-9407:

Labels:   (was: patch-available pull-request-available)

> Support orc rolling sink writer
> ---
>
> Key: FLINK-9407
> URL: https://issues.apache.org/jira/browse/FLINK-9407
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: zhangminglei
>Assignee: zhangminglei
>Priority: Major
>
> Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and 
> {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling 
> sink.
> Below, FYI.
> I tested the PR and verify the results with spark sql. Obviously, we can get 
> the results of what we had written down before. But I will give more tests in 
> the next couple of days. Including the performance under compression. And 
> more UTs.
> {code:java}
> scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21")
> res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more 
> field]
> scala>
> scala> res1.registerTempTable("tablerice")
> warning: there was one deprecation warning; re-run with -deprecation for 
> details
> scala> spark.sql("select * from tablerice")
> res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more 
> field]
> scala> res3.show(3)
> +-+---+---+
> | name|age|married|
> +-+---+---+
> |Sagar| 26|  false|
> |Sagar| 30|  false|
> |Sagar| 34|  false|
> +-+---+---+
> only showing top 3 rows
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9407) Support orc rolling sink writer

2018-07-08 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei updated FLINK-9407:

Description: 
Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and 
{{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling sink.

Below, FYI.

I tested the PR and verify the results with spark sql. Obviously, we can get 
the results of what we had written down before. But I will give more tests in 
the next couple of days. Including the performance under compression. And more 
UTs.
{code:java}
scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21")
res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]

scala>

scala> res1.registerTempTable("tablerice")
warning: there was one deprecation warning; re-run with -deprecation for details

scala> spark.sql("select * from tablerice")
res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]

scala> res3.show(3)
+-+---+---+
| name|age|married|
+-+---+---+
|Sagar| 26|  false|
|Sagar| 30|  false|
|Sagar| 34|  false|
+-+---+---+
only showing top 3 rows
{code}

  was:
Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and 
{{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling sink.

Below, FYI.

I tested the PR and verify the results with spark sql. Obviously, we can get 
the results of what we written down before. But I will give more tests in the 
next couple of days. Including the performance under compression. And more UTs.

{code:java}
scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21")
res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]

scala>

scala> res1.registerTempTable("tablerice")
warning: there was one deprecation warning; re-run with -deprecation for details

scala> spark.sql("select * from tablerice")
res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]

scala> res3.show(3)
+-+---+---+
| name|age|married|
+-+---+---+
|Sagar| 26|  false|
|Sagar| 30|  false|
|Sagar| 34|  false|
+-+---+---+
only showing top 3 rows
{code}



> Support orc rolling sink writer
> ---
>
> Key: FLINK-9407
> URL: https://issues.apache.org/jira/browse/FLINK-9407
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: zhangminglei
>Assignee: zhangminglei
>Priority: Major
>  Labels: patch-available, pull-request-available
>
> Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and 
> {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling 
> sink.
> Below, FYI.
> I tested the PR and verify the results with spark sql. Obviously, we can get 
> the results of what we had written down before. But I will give more tests in 
> the next couple of days. Including the performance under compression. And 
> more UTs.
> {code:java}
> scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21")
> res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more 
> field]
> scala>
> scala> res1.registerTempTable("tablerice")
> warning: there was one deprecation warning; re-run with -deprecation for 
> details
> scala> spark.sql("select * from tablerice")
> res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more 
> field]
> scala> res3.show(3)
> +-+---+---+
> | name|age|married|
> +-+---+---+
> |Sagar| 26|  false|
> |Sagar| 30|  false|
> |Sagar| 34|  false|
> +-+---+---+
> only showing top 3 rows
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9407) Support orc rolling sink writer

2018-07-08 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei updated FLINK-9407:

Description: 
Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and 
{{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling sink.

Below, FYI.

I tested the PR and verify the results with spark sql. Obviously, we can get 
the results of what we written down before. But I will give more tests in the 
next couple of days. Including the performance under compression. And more UTs.

{code:java}
scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21")
res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]

scala>

scala> res1.registerTempTable("tablerice")
warning: there was one deprecation warning; re-run with -deprecation for details

scala> spark.sql("select * from tablerice")
res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]

scala> res3.show(3)
+-+---+---+
| name|age|married|
+-+---+---+
|Sagar| 26|  false|
|Sagar| 30|  false|
|Sagar| 34|  false|
+-+---+---+
only showing top 3 rows
{code}


  was:
Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and 
{{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling sink.

Below, FYI.

I tested the PR and verify the results with spark sql. Obviously, we can get 
the results of what we written down before. But I will give more tests in the 
next couple of days. Including the performance under compression. And more UT 
tests.

{code:java}
scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21")
res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]

scala>

scala> res1.registerTempTable("tablerice")
warning: there was one deprecation warning; re-run with -deprecation for details

scala> spark.sql("select * from tablerice")
res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]

scala> res3.show(3)
+-+---+---+
| name|age|married|
+-+---+---+
|Sagar| 26|  false|
|Sagar| 30|  false|
|Sagar| 34|  false|
+-+---+---+
only showing top 3 rows
{code}



> Support orc rolling sink writer
> ---
>
> Key: FLINK-9407
> URL: https://issues.apache.org/jira/browse/FLINK-9407
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: zhangminglei
>Assignee: zhangminglei
>Priority: Major
>  Labels: patch-available, pull-request-available
>
> Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and 
> {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling 
> sink.
> Below, FYI.
> I tested the PR and verify the results with spark sql. Obviously, we can get 
> the results of what we written down before. But I will give more tests in the 
> next couple of days. Including the performance under compression. And more 
> UTs.
> {code:java}
> scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21")
> res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more 
> field]
> scala>
> scala> res1.registerTempTable("tablerice")
> warning: there was one deprecation warning; re-run with -deprecation for 
> details
> scala> spark.sql("select * from tablerice")
> res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more 
> field]
> scala> res3.show(3)
> +-+---+---+
> | name|age|married|
> +-+---+---+
> |Sagar| 26|  false|
> |Sagar| 30|  false|
> |Sagar| 34|  false|
> +-+---+---+
> only showing top 3 rows
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9407) Support orc rolling sink writer

2018-07-08 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei updated FLINK-9407:

Description: 
Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and 
{{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling sink.

Below, FYI.

I tested the PR and verify the results with spark sql. Obviously, we can get 
the results of what we written down before. But I will give more tests in the 
next couple of days. Including the performance under compression. And more UT 
tests.

{code:java}
scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21")
res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]

scala>

scala> res1.registerTempTable("tablerice")
warning: there was one deprecation warning; re-run with -deprecation for details

scala> spark.sql("select * from tablerice")
res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]

scala> res3.show(3)
+-+---+---+
| name|age|married|
+-+---+---+
|Sagar| 26|  false|
|Sagar| 30|  false|
|Sagar| 34|  false|
+-+---+---+
only showing top 3 rows
{code}


  was:Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and 
{{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling sink.


> Support orc rolling sink writer
> ---
>
> Key: FLINK-9407
> URL: https://issues.apache.org/jira/browse/FLINK-9407
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: zhangminglei
>Assignee: zhangminglei
>Priority: Major
>  Labels: patch-available, pull-request-available
>
> Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and 
> {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling 
> sink.
> Below, FYI.
> I tested the PR and verify the results with spark sql. Obviously, we can get 
> the results of what we written down before. But I will give more tests in the 
> next couple of days. Including the performance under compression. And more UT 
> tests.
> {code:java}
> scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21")
> res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more 
> field]
> scala>
> scala> res1.registerTempTable("tablerice")
> warning: there was one deprecation warning; re-run with -deprecation for 
> details
> scala> spark.sql("select * from tablerice")
> res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more 
> field]
> scala> res3.show(3)
> +-+---+---+
> | name|age|married|
> +-+---+---+
> |Sagar| 26|  false|
> |Sagar| 30|  false|
> |Sagar| 34|  false|
> +-+---+---+
> only showing top 3 rows
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5232) Add a Thread default uncaught exception handler on the JobManager

2018-07-08 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536059#comment-16536059
 ] 

vinoyang commented on FLINK-5232:
-

[~till.rohrmann] this issues past a long time,  any opinion?

> Add a Thread default uncaught exception handler on the JobManager
> -
>
> Key: FLINK-5232
> URL: https://issues.apache.org/jira/browse/FLINK-5232
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
>
> When some JobManager threads die because of uncaught exceptions, we should 
> bring down the JobManager. If a thread dies from an uncaught exception, there 
> is a high chance that the JobManager becomes dysfunctional.
> The only sfae thing is to rely on the JobManager being restarted by YARN / 
> Mesos / Kubernetes / etc.
> I suggest to add this code to the JobManager launch:
> {code}
> Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
> @Override
> public void uncaughtException(Thread t, Throwable e) {
> try {
> LOG.error("Thread {} died due to an uncaught exception. Killing 
> process.", t.getName());
> } finally {
> Runtime.getRuntime().halt(-1);
> }
> }
> });
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-5231) JobManager should exit on "OutOfMemoryError"

2018-07-08 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-5231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-5231:
---

Assignee: vinoyang

> JobManager should exit on "OutOfMemoryError"
> 
>
> Key: FLINK-5231
> URL: https://issues.apache.org/jira/browse/FLINK-5231
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.1.3
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
>
> A JobManager that ever encountered an {{OutOfMemoryError}} is with a high 
> probability not functional any more. However, the {{OutOfMemoryError}} kills 
> only the thread where it is thrown by default, not the entire JVM.
> We should add {{-XX:OnOutOfMemoryError="kill -9 %p"}} both to the standalone 
> bash options and to the YARN and Mesos launch options.
> On the TaskManager side, the decision is more tricky and less critical, so I 
> suggest to add this only to the JobManager by default.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information

2018-07-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536052#comment-16536052
 ] 

ASF GitHub Bot commented on FLINK-9377:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6273
  
conflicts, please update the PR~


> Remove writing serializers as part of the checkpoint meta information
> -
>
> Key: FLINK-9377
> URL: https://issues.apache.org/jira/browse/FLINK-9377
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> When writing meta information of a state in savepoints, we currently write 
> both the state serializer as well as the state serializer's configuration 
> snapshot.
> Writing both is actually redundant, as most of the time they have identical 
> information.
>  Moreover, the fact that we use Java serialization to write the serializer 
> and rely on it to be re-readable on the restore run, already poses problems 
> for serializers such as the {{AvroSerializer}} (see discussion in FLINK-9202) 
> to perform even a compatible upgrade.
> The proposal here is to leave only the config snapshot as meta information, 
> and use that as the single source of truth of information about the schema of 
> serialized state.
>  The config snapshot should be treated as a factory (or provided to a 
> factory) to re-create serializers capable of reading old, serialized state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6273: [FLINK-9377] [core] Implement restore serializer factory ...

2018-07-08 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6273
  
conflicts, please update the PR~


---


[jira] [Commented] (FLINK-9750) Create new StreamingFileSink that works on Flink's FileSystem abstraction

2018-07-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536051#comment-16536051
 ] 

ASF GitHub Bot commented on FLINK-9750:
---

Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6281#discussion_r200835177
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/Writer.java 
---
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.core.fs.FSDataOutputStream;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Javadoc.
--- End diff --

wrong Java doc


> Create new StreamingFileSink that works on Flink's FileSystem abstraction
> -
>
> Key: FLINK-9750
> URL: https://issues.apache.org/jira/browse/FLINK-9750
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Stephan Ewen
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Using Flink's own file system abstraction means that we can add additional 
> streaming/checkpointing related behavior.
> In addition, the new StreamingFileSink should only rely on internal 
> checkpointed state what files are possibly in progress or need to roll over, 
> never assume enumeration of files in the file system.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...

2018-07-08 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6281#discussion_r200835177
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/Writer.java 
---
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.core.fs.FSDataOutputStream;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Javadoc.
--- End diff --

wrong Java doc


---


[jira] [Commented] (FLINK-9733) Make location for job graph files configurable

2018-07-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536050#comment-16536050
 ] 

ASF GitHub Bot commented on FLINK-9733:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6280
  
@zentol thanks for your suggestion, I have done some refactor and removed 
the config item from `RestOptions `


> Make location for job graph files configurable
> --
>
> Key: FLINK-9733
> URL: https://issues.apache.org/jira/browse/FLINK-9733
> Project: Flink
>  Issue Type: Improvement
>  Components: Client, Job-Submission
>Affects Versions: 1.5.1, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> During the job-submission by the {{RestClusterClient}} the {{JobGraph}} is 
> serialized and written to a file.
> Currently we just use {{Files.createTempFile}} for this purposes.
> This location should be made configurable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6280: [FLINK-9733] Make location for job graph files configurab...

2018-07-08 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6280
  
@zentol thanks for your suggestion, I have done some refactor and removed 
the config item from `RestOptions `


---


[jira] [Commented] (FLINK-9407) Support orc rolling sink writer

2018-07-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536025#comment-16536025
 ] 

ASF GitHub Bot commented on FLINK-9407:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r200833563
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java
 ---
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
+import org.apache.flink.streaming.connectors.fs.Writer;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
+
+/**
+ * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
+ *
+ * @param  The type of the elements that are being written by the sink.
+ */
+public class OrcFileWriter extends StreamWriterBase {
+
+   private static final long serialVersionUID = 3L;
+
+   /**
+* The description of the types in an ORC file.
+*/
+   private TypeDescription schema;
+
+   /**
+* The schema of an ORC file.
+*/
+   private String metaSchema;
+
+   /**
+* A row batch that will be written to the ORC file.
+*/
+   private VectorizedRowBatch rowBatch;
+
+   /**
+* The writer that fill the records into the batch.
+*/
+   private OrcBatchWriter orcBatchWriter;
+
+   private transient org.apache.orc.Writer writer;
+
+   private CompressionKind compressionKind;
+
+   /**
+* The number of rows that currently being written.
+*/
+   private long writedRowSize;
+
+   /**
+* Creates a new {@code OrcFileWriter} that writes orc files without 
compression.
+*
+* @param metaSchema The orc schema.
+*/
+   public OrcFileWriter(String metaSchema) {
+   this(metaSchema, CompressionKind.NONE);
+   }
+
+   /**
+* Create a new {@code OrcFileWriter} that writes orc file with the 
gaven
+* schema and compression kind.
+*
+* @param metaSchema  The schema of an orc file.
+* @param compressionKind The compression kind to use.
+*/
+   public OrcFileWriter(String metaSchema, CompressionKind 
compressionKind) {
+   this.metaSchema = metaSchema;
+   this.schema = TypeDescription.fromString(metaSchema);
+   this.compressionKind = compressionKind;
+   }
+
+   @Override
+   public void open(FileSystem fs, Path path) throws IOException {
+   writer = OrcFile.createWriter(path, 
OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
+   rowBatch = schema.createRowBatch();
+   orcBatchWriter = new 
OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
+   }
+
+   private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
+   List fieldNames = 

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

2018-07-08 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r200833563
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java
 ---
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
+import org.apache.flink.streaming.connectors.fs.Writer;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
+
+/**
+ * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
+ *
+ * @param  The type of the elements that are being written by the sink.
+ */
+public class OrcFileWriter extends StreamWriterBase {
+
+   private static final long serialVersionUID = 3L;
+
+   /**
+* The description of the types in an ORC file.
+*/
+   private TypeDescription schema;
+
+   /**
+* The schema of an ORC file.
+*/
+   private String metaSchema;
+
+   /**
+* A row batch that will be written to the ORC file.
+*/
+   private VectorizedRowBatch rowBatch;
+
+   /**
+* The writer that fill the records into the batch.
+*/
+   private OrcBatchWriter orcBatchWriter;
+
+   private transient org.apache.orc.Writer writer;
+
+   private CompressionKind compressionKind;
+
+   /**
+* The number of rows that currently being written.
+*/
+   private long writedRowSize;
+
+   /**
+* Creates a new {@code OrcFileWriter} that writes orc files without 
compression.
+*
+* @param metaSchema The orc schema.
+*/
+   public OrcFileWriter(String metaSchema) {
+   this(metaSchema, CompressionKind.NONE);
+   }
+
+   /**
+* Create a new {@code OrcFileWriter} that writes orc file with the 
gaven
+* schema and compression kind.
+*
+* @param metaSchema  The schema of an orc file.
+* @param compressionKind The compression kind to use.
+*/
+   public OrcFileWriter(String metaSchema, CompressionKind 
compressionKind) {
+   this.metaSchema = metaSchema;
+   this.schema = TypeDescription.fromString(metaSchema);
+   this.compressionKind = compressionKind;
+   }
+
+   @Override
+   public void open(FileSystem fs, Path path) throws IOException {
+   writer = OrcFile.createWriter(path, 
OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
+   rowBatch = schema.createRowBatch();
+   orcBatchWriter = new 
OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
+   }
+
+   private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
+   List fieldNames = orcSchema.getFieldNames();
+   List typeDescriptions = 
orcSchema.getChildren();
+   List typeInformations = new ArrayList<>();
+
+   typeDescriptions.forEach(typeDescription -> {
+   

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

2018-07-08 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r200831458
  
--- Diff: 
flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcFileWriterTest.java
 ---
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.streaming.connectors.fs.Writer;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link OrcFileWriter}.
+ */
+public class OrcFileWriterTest {
+
+   @Test
+   public void testDuplicate() {
+   OrcFileWriter writer = new 
OrcFileWriter("struct");
--- End diff --

Yes. I will add more UTs for all in the next couple of days.


---


[jira] [Commented] (FLINK-9407) Support orc rolling sink writer

2018-07-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536007#comment-16536007
 ] 

ASF GitHub Bot commented on FLINK-9407:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r200831458
  
--- Diff: 
flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcFileWriterTest.java
 ---
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.streaming.connectors.fs.Writer;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link OrcFileWriter}.
+ */
+public class OrcFileWriterTest {
+
+   @Test
+   public void testDuplicate() {
+   OrcFileWriter writer = new 
OrcFileWriter("struct");
--- End diff --

Yes. I will add more UTs for all in the next couple of days.


> Support orc rolling sink writer
> ---
>
> Key: FLINK-9407
> URL: https://issues.apache.org/jira/browse/FLINK-9407
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: zhangminglei
>Assignee: zhangminglei
>Priority: Major
>  Labels: patch-available, pull-request-available
>
> Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and 
> {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling 
> sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9407) Support orc rolling sink writer

2018-07-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536005#comment-16536005
 ] 

ASF GitHub Bot commented on FLINK-9407:
---

Github user xndai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r200831360
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java
 ---
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
+import org.apache.flink.streaming.connectors.fs.Writer;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
+
+/**
+ * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
+ *
+ * @param  The type of the elements that are being written by the sink.
+ */
+public class OrcFileWriter extends StreamWriterBase {
+
+   private static final long serialVersionUID = 3L;
+
+   /**
+* The description of the types in an ORC file.
+*/
+   private TypeDescription schema;
+
+   /**
+* The schema of an ORC file.
+*/
+   private String metaSchema;
+
+   /**
+* A row batch that will be written to the ORC file.
+*/
+   private VectorizedRowBatch rowBatch;
+
+   /**
+* The writer that fill the records into the batch.
+*/
+   private OrcBatchWriter orcBatchWriter;
+
+   private transient org.apache.orc.Writer writer;
+
+   private CompressionKind compressionKind;
+
+   /**
+* The number of rows that currently being written.
+*/
+   private long writedRowSize;
+
+   /**
+* Creates a new {@code OrcFileWriter} that writes orc files without 
compression.
+*
+* @param metaSchema The orc schema.
+*/
+   public OrcFileWriter(String metaSchema) {
+   this(metaSchema, CompressionKind.NONE);
+   }
+
+   /**
+* Create a new {@code OrcFileWriter} that writes orc file with the 
gaven
+* schema and compression kind.
+*
+* @param metaSchema  The schema of an orc file.
+* @param compressionKind The compression kind to use.
+*/
+   public OrcFileWriter(String metaSchema, CompressionKind 
compressionKind) {
+   this.metaSchema = metaSchema;
+   this.schema = TypeDescription.fromString(metaSchema);
+   this.compressionKind = compressionKind;
+   }
+
+   @Override
+   public void open(FileSystem fs, Path path) throws IOException {
+   writer = OrcFile.createWriter(path, 
OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
+   rowBatch = schema.createRowBatch();
+   orcBatchWriter = new 
OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
+   }
+
+   private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
+   List fieldNames = orcSchema.getFieldNames();

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

2018-07-08 Thread xndai
Github user xndai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r200831360
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java
 ---
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
+import org.apache.flink.streaming.connectors.fs.Writer;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
+
+/**
+ * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
+ *
+ * @param  The type of the elements that are being written by the sink.
+ */
+public class OrcFileWriter extends StreamWriterBase {
+
+   private static final long serialVersionUID = 3L;
+
+   /**
+* The description of the types in an ORC file.
+*/
+   private TypeDescription schema;
+
+   /**
+* The schema of an ORC file.
+*/
+   private String metaSchema;
+
+   /**
+* A row batch that will be written to the ORC file.
+*/
+   private VectorizedRowBatch rowBatch;
+
+   /**
+* The writer that fill the records into the batch.
+*/
+   private OrcBatchWriter orcBatchWriter;
+
+   private transient org.apache.orc.Writer writer;
+
+   private CompressionKind compressionKind;
+
+   /**
+* The number of rows that currently being written.
+*/
+   private long writedRowSize;
+
+   /**
+* Creates a new {@code OrcFileWriter} that writes orc files without 
compression.
+*
+* @param metaSchema The orc schema.
+*/
+   public OrcFileWriter(String metaSchema) {
+   this(metaSchema, CompressionKind.NONE);
+   }
+
+   /**
+* Create a new {@code OrcFileWriter} that writes orc file with the 
gaven
+* schema and compression kind.
+*
+* @param metaSchema  The schema of an orc file.
+* @param compressionKind The compression kind to use.
+*/
+   public OrcFileWriter(String metaSchema, CompressionKind 
compressionKind) {
+   this.metaSchema = metaSchema;
+   this.schema = TypeDescription.fromString(metaSchema);
+   this.compressionKind = compressionKind;
+   }
+
+   @Override
+   public void open(FileSystem fs, Path path) throws IOException {
+   writer = OrcFile.createWriter(path, 
OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
+   rowBatch = schema.createRowBatch();
+   orcBatchWriter = new 
OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
+   }
+
+   private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
+   List fieldNames = orcSchema.getFieldNames();
+   List typeDescriptions = 
orcSchema.getChildren();
+   List typeInformations = new ArrayList<>();
+
+   typeDescriptions.forEach(typeDescription -> {
+   

[jira] [Commented] (FLINK-9407) Support orc rolling sink writer

2018-07-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536004#comment-16536004
 ] 

ASF GitHub Bot commented on FLINK-9407:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r200830833
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java
 ---
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
+import org.apache.flink.streaming.connectors.fs.Writer;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
+
+/**
+ * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
+ *
+ * @param  The type of the elements that are being written by the sink.
+ */
+public class OrcFileWriter extends StreamWriterBase {
+
+   private static final long serialVersionUID = 3L;
+
+   /**
+* The description of the types in an ORC file.
+*/
+   private TypeDescription schema;
+
+   /**
+* The schema of an ORC file.
+*/
+   private String metaSchema;
+
+   /**
+* A row batch that will be written to the ORC file.
+*/
+   private VectorizedRowBatch rowBatch;
+
+   /**
+* The writer that fill the records into the batch.
+*/
+   private OrcBatchWriter orcBatchWriter;
+
+   private transient org.apache.orc.Writer writer;
+
+   private CompressionKind compressionKind;
+
+   /**
+* The number of rows that currently being written.
+*/
+   private long writedRowSize;
+
+   /**
+* Creates a new {@code OrcFileWriter} that writes orc files without 
compression.
+*
+* @param metaSchema The orc schema.
+*/
+   public OrcFileWriter(String metaSchema) {
+   this(metaSchema, CompressionKind.NONE);
+   }
+
+   /**
+* Create a new {@code OrcFileWriter} that writes orc file with the 
gaven
+* schema and compression kind.
+*
+* @param metaSchema  The schema of an orc file.
+* @param compressionKind The compression kind to use.
+*/
+   public OrcFileWriter(String metaSchema, CompressionKind 
compressionKind) {
+   this.metaSchema = metaSchema;
+   this.schema = TypeDescription.fromString(metaSchema);
+   this.compressionKind = compressionKind;
+   }
+
+   @Override
+   public void open(FileSystem fs, Path path) throws IOException {
+   writer = OrcFile.createWriter(path, 
OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
+   rowBatch = schema.createRowBatch();
+   orcBatchWriter = new 
OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
+   }
+
+   private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
+   List fieldNames = 

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

2018-07-08 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r200830833
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java
 ---
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
+import org.apache.flink.streaming.connectors.fs.Writer;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
+
+/**
+ * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
+ *
+ * @param  The type of the elements that are being written by the sink.
+ */
+public class OrcFileWriter extends StreamWriterBase {
+
+   private static final long serialVersionUID = 3L;
+
+   /**
+* The description of the types in an ORC file.
+*/
+   private TypeDescription schema;
+
+   /**
+* The schema of an ORC file.
+*/
+   private String metaSchema;
+
+   /**
+* A row batch that will be written to the ORC file.
+*/
+   private VectorizedRowBatch rowBatch;
+
+   /**
+* The writer that fill the records into the batch.
+*/
+   private OrcBatchWriter orcBatchWriter;
+
+   private transient org.apache.orc.Writer writer;
+
+   private CompressionKind compressionKind;
+
+   /**
+* The number of rows that currently being written.
+*/
+   private long writedRowSize;
+
+   /**
+* Creates a new {@code OrcFileWriter} that writes orc files without 
compression.
+*
+* @param metaSchema The orc schema.
+*/
+   public OrcFileWriter(String metaSchema) {
+   this(metaSchema, CompressionKind.NONE);
+   }
+
+   /**
+* Create a new {@code OrcFileWriter} that writes orc file with the 
gaven
+* schema and compression kind.
+*
+* @param metaSchema  The schema of an orc file.
+* @param compressionKind The compression kind to use.
+*/
+   public OrcFileWriter(String metaSchema, CompressionKind 
compressionKind) {
+   this.metaSchema = metaSchema;
+   this.schema = TypeDescription.fromString(metaSchema);
+   this.compressionKind = compressionKind;
+   }
+
+   @Override
+   public void open(FileSystem fs, Path path) throws IOException {
+   writer = OrcFile.createWriter(path, 
OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
+   rowBatch = schema.createRowBatch();
+   orcBatchWriter = new 
OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
+   }
+
+   private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
+   List fieldNames = orcSchema.getFieldNames();
+   List typeDescriptions = 
orcSchema.getChildren();
+   List typeInformations = new ArrayList<>();
+
+   typeDescriptions.forEach(typeDescription -> {
+   

[jira] [Commented] (FLINK-9407) Support orc rolling sink writer

2018-07-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16535997#comment-16535997
 ] 

ASF GitHub Bot commented on FLINK-9407:
---

Github user xndai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r200830153
  
--- Diff: 
flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcFileWriterTest.java
 ---
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.streaming.connectors.fs.Writer;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link OrcFileWriter}.
+ */
+public class OrcFileWriterTest {
+
+   @Test
+   public void testDuplicate() {
+   OrcFileWriter writer = new 
OrcFileWriter("struct");
--- End diff --

Need UTs for writing Orc files with all supported types. Also include 
negative cases, and cases where OrcBatchWriter.fill() returns false.


> Support orc rolling sink writer
> ---
>
> Key: FLINK-9407
> URL: https://issues.apache.org/jira/browse/FLINK-9407
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: zhangminglei
>Assignee: zhangminglei
>Priority: Major
>  Labels: patch-available, pull-request-available
>
> Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and 
> {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling 
> sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9407) Support orc rolling sink writer

2018-07-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16535999#comment-16535999
 ] 

ASF GitHub Bot commented on FLINK-9407:
---

Github user xndai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r200830078
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java
 ---
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
+import org.apache.flink.streaming.connectors.fs.Writer;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
+
+/**
+ * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
+ *
+ * @param  The type of the elements that are being written by the sink.
+ */
+public class OrcFileWriter extends StreamWriterBase {
+
+   private static final long serialVersionUID = 3L;
+
+   /**
+* The description of the types in an ORC file.
+*/
+   private TypeDescription schema;
+
+   /**
+* The schema of an ORC file.
+*/
+   private String metaSchema;
+
+   /**
+* A row batch that will be written to the ORC file.
+*/
+   private VectorizedRowBatch rowBatch;
+
+   /**
+* The writer that fill the records into the batch.
+*/
+   private OrcBatchWriter orcBatchWriter;
+
+   private transient org.apache.orc.Writer writer;
+
+   private CompressionKind compressionKind;
+
+   /**
+* The number of rows that currently being written.
+*/
+   private long writedRowSize;
+
+   /**
+* Creates a new {@code OrcFileWriter} that writes orc files without 
compression.
+*
+* @param metaSchema The orc schema.
+*/
+   public OrcFileWriter(String metaSchema) {
+   this(metaSchema, CompressionKind.NONE);
+   }
+
+   /**
+* Create a new {@code OrcFileWriter} that writes orc file with the 
gaven
+* schema and compression kind.
+*
+* @param metaSchema  The schema of an orc file.
+* @param compressionKind The compression kind to use.
+*/
+   public OrcFileWriter(String metaSchema, CompressionKind 
compressionKind) {
+   this.metaSchema = metaSchema;
+   this.schema = TypeDescription.fromString(metaSchema);
+   this.compressionKind = compressionKind;
+   }
+
+   @Override
+   public void open(FileSystem fs, Path path) throws IOException {
+   writer = OrcFile.createWriter(path, 
OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
+   rowBatch = schema.createRowBatch();
+   orcBatchWriter = new 
OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
+   }
+
+   private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
+   List fieldNames = orcSchema.getFieldNames();

[jira] [Commented] (FLINK-9407) Support orc rolling sink writer

2018-07-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16535998#comment-16535998
 ] 

ASF GitHub Bot commented on FLINK-9407:
---

Github user xndai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r200830111
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java
 ---
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
+import org.apache.flink.streaming.connectors.fs.Writer;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
+
+/**
+ * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
+ *
+ * @param  The type of the elements that are being written by the sink.
+ */
+public class OrcFileWriter extends StreamWriterBase {
+
+   private static final long serialVersionUID = 3L;
+
+   /**
+* The description of the types in an ORC file.
+*/
+   private TypeDescription schema;
+
+   /**
+* The schema of an ORC file.
+*/
+   private String metaSchema;
+
+   /**
+* A row batch that will be written to the ORC file.
+*/
+   private VectorizedRowBatch rowBatch;
+
+   /**
+* The writer that fill the records into the batch.
+*/
+   private OrcBatchWriter orcBatchWriter;
+
+   private transient org.apache.orc.Writer writer;
+
+   private CompressionKind compressionKind;
+
+   /**
+* The number of rows that currently being written.
+*/
+   private long writedRowSize;
+
+   /**
+* Creates a new {@code OrcFileWriter} that writes orc files without 
compression.
+*
+* @param metaSchema The orc schema.
+*/
+   public OrcFileWriter(String metaSchema) {
+   this(metaSchema, CompressionKind.NONE);
+   }
+
+   /**
+* Create a new {@code OrcFileWriter} that writes orc file with the 
gaven
+* schema and compression kind.
+*
+* @param metaSchema  The schema of an orc file.
+* @param compressionKind The compression kind to use.
+*/
+   public OrcFileWriter(String metaSchema, CompressionKind 
compressionKind) {
+   this.metaSchema = metaSchema;
+   this.schema = TypeDescription.fromString(metaSchema);
+   this.compressionKind = compressionKind;
+   }
+
+   @Override
+   public void open(FileSystem fs, Path path) throws IOException {
+   writer = OrcFile.createWriter(path, 
OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
+   rowBatch = schema.createRowBatch();
+   orcBatchWriter = new 
OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
+   }
+
+   private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
+   List fieldNames = orcSchema.getFieldNames();

[jira] [Commented] (FLINK-9407) Support orc rolling sink writer

2018-07-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16535996#comment-16535996
 ] 

ASF GitHub Bot commented on FLINK-9407:
---

Github user xndai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r200830013
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java
 ---
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
+import org.apache.flink.streaming.connectors.fs.Writer;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
+
+/**
+ * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
+ *
+ * @param  The type of the elements that are being written by the sink.
+ */
+public class OrcFileWriter extends StreamWriterBase {
+
+   private static final long serialVersionUID = 3L;
+
+   /**
+* The description of the types in an ORC file.
+*/
+   private TypeDescription schema;
+
+   /**
+* The schema of an ORC file.
+*/
+   private String metaSchema;
+
+   /**
+* A row batch that will be written to the ORC file.
+*/
+   private VectorizedRowBatch rowBatch;
+
+   /**
+* The writer that fill the records into the batch.
+*/
+   private OrcBatchWriter orcBatchWriter;
+
+   private transient org.apache.orc.Writer writer;
+
+   private CompressionKind compressionKind;
+
+   /**
+* The number of rows that currently being written.
+*/
+   private long writedRowSize;
+
+   /**
+* Creates a new {@code OrcFileWriter} that writes orc files without 
compression.
+*
+* @param metaSchema The orc schema.
+*/
+   public OrcFileWriter(String metaSchema) {
+   this(metaSchema, CompressionKind.NONE);
+   }
+
+   /**
+* Create a new {@code OrcFileWriter} that writes orc file with the 
gaven
+* schema and compression kind.
+*
+* @param metaSchema  The schema of an orc file.
+* @param compressionKind The compression kind to use.
+*/
+   public OrcFileWriter(String metaSchema, CompressionKind 
compressionKind) {
+   this.metaSchema = metaSchema;
+   this.schema = TypeDescription.fromString(metaSchema);
+   this.compressionKind = compressionKind;
+   }
+
+   @Override
+   public void open(FileSystem fs, Path path) throws IOException {
+   writer = OrcFile.createWriter(path, 
OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
+   rowBatch = schema.createRowBatch();
+   orcBatchWriter = new 
OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
+   }
+
+   private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
+   List fieldNames = orcSchema.getFieldNames();

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

2018-07-08 Thread xndai
Github user xndai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r200830078
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java
 ---
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
+import org.apache.flink.streaming.connectors.fs.Writer;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
+
+/**
+ * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
+ *
+ * @param  The type of the elements that are being written by the sink.
+ */
+public class OrcFileWriter extends StreamWriterBase {
+
+   private static final long serialVersionUID = 3L;
+
+   /**
+* The description of the types in an ORC file.
+*/
+   private TypeDescription schema;
+
+   /**
+* The schema of an ORC file.
+*/
+   private String metaSchema;
+
+   /**
+* A row batch that will be written to the ORC file.
+*/
+   private VectorizedRowBatch rowBatch;
+
+   /**
+* The writer that fill the records into the batch.
+*/
+   private OrcBatchWriter orcBatchWriter;
+
+   private transient org.apache.orc.Writer writer;
+
+   private CompressionKind compressionKind;
+
+   /**
+* The number of rows that currently being written.
+*/
+   private long writedRowSize;
+
+   /**
+* Creates a new {@code OrcFileWriter} that writes orc files without 
compression.
+*
+* @param metaSchema The orc schema.
+*/
+   public OrcFileWriter(String metaSchema) {
+   this(metaSchema, CompressionKind.NONE);
+   }
+
+   /**
+* Create a new {@code OrcFileWriter} that writes orc file with the 
gaven
+* schema and compression kind.
+*
+* @param metaSchema  The schema of an orc file.
+* @param compressionKind The compression kind to use.
+*/
+   public OrcFileWriter(String metaSchema, CompressionKind 
compressionKind) {
+   this.metaSchema = metaSchema;
+   this.schema = TypeDescription.fromString(metaSchema);
+   this.compressionKind = compressionKind;
+   }
+
+   @Override
+   public void open(FileSystem fs, Path path) throws IOException {
+   writer = OrcFile.createWriter(path, 
OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
+   rowBatch = schema.createRowBatch();
+   orcBatchWriter = new 
OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
+   }
+
+   private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
+   List fieldNames = orcSchema.getFieldNames();
+   List typeDescriptions = 
orcSchema.getChildren();
+   List typeInformations = new ArrayList<>();
+
+   typeDescriptions.forEach(typeDescription -> {
+   

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

2018-07-08 Thread xndai
Github user xndai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r200830111
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java
 ---
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
+import org.apache.flink.streaming.connectors.fs.Writer;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
+
+/**
+ * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
+ *
+ * @param  The type of the elements that are being written by the sink.
+ */
+public class OrcFileWriter extends StreamWriterBase {
+
+   private static final long serialVersionUID = 3L;
+
+   /**
+* The description of the types in an ORC file.
+*/
+   private TypeDescription schema;
+
+   /**
+* The schema of an ORC file.
+*/
+   private String metaSchema;
+
+   /**
+* A row batch that will be written to the ORC file.
+*/
+   private VectorizedRowBatch rowBatch;
+
+   /**
+* The writer that fill the records into the batch.
+*/
+   private OrcBatchWriter orcBatchWriter;
+
+   private transient org.apache.orc.Writer writer;
+
+   private CompressionKind compressionKind;
+
+   /**
+* The number of rows that currently being written.
+*/
+   private long writedRowSize;
+
+   /**
+* Creates a new {@code OrcFileWriter} that writes orc files without 
compression.
+*
+* @param metaSchema The orc schema.
+*/
+   public OrcFileWriter(String metaSchema) {
+   this(metaSchema, CompressionKind.NONE);
+   }
+
+   /**
+* Create a new {@code OrcFileWriter} that writes orc file with the 
gaven
+* schema and compression kind.
+*
+* @param metaSchema  The schema of an orc file.
+* @param compressionKind The compression kind to use.
+*/
+   public OrcFileWriter(String metaSchema, CompressionKind 
compressionKind) {
+   this.metaSchema = metaSchema;
+   this.schema = TypeDescription.fromString(metaSchema);
+   this.compressionKind = compressionKind;
+   }
+
+   @Override
+   public void open(FileSystem fs, Path path) throws IOException {
+   writer = OrcFile.createWriter(path, 
OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
+   rowBatch = schema.createRowBatch();
+   orcBatchWriter = new 
OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
+   }
+
+   private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
+   List fieldNames = orcSchema.getFieldNames();
+   List typeDescriptions = 
orcSchema.getChildren();
+   List typeInformations = new ArrayList<>();
+
+   typeDescriptions.forEach(typeDescription -> {
+   

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

2018-07-08 Thread xndai
Github user xndai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r200830013
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java
 ---
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
+import org.apache.flink.streaming.connectors.fs.Writer;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
+
+/**
+ * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
+ *
+ * @param  The type of the elements that are being written by the sink.
+ */
+public class OrcFileWriter extends StreamWriterBase {
+
+   private static final long serialVersionUID = 3L;
+
+   /**
+* The description of the types in an ORC file.
+*/
+   private TypeDescription schema;
+
+   /**
+* The schema of an ORC file.
+*/
+   private String metaSchema;
+
+   /**
+* A row batch that will be written to the ORC file.
+*/
+   private VectorizedRowBatch rowBatch;
+
+   /**
+* The writer that fill the records into the batch.
+*/
+   private OrcBatchWriter orcBatchWriter;
+
+   private transient org.apache.orc.Writer writer;
+
+   private CompressionKind compressionKind;
+
+   /**
+* The number of rows that currently being written.
+*/
+   private long writedRowSize;
+
+   /**
+* Creates a new {@code OrcFileWriter} that writes orc files without 
compression.
+*
+* @param metaSchema The orc schema.
+*/
+   public OrcFileWriter(String metaSchema) {
+   this(metaSchema, CompressionKind.NONE);
+   }
+
+   /**
+* Create a new {@code OrcFileWriter} that writes orc file with the 
gaven
+* schema and compression kind.
+*
+* @param metaSchema  The schema of an orc file.
+* @param compressionKind The compression kind to use.
+*/
+   public OrcFileWriter(String metaSchema, CompressionKind 
compressionKind) {
+   this.metaSchema = metaSchema;
+   this.schema = TypeDescription.fromString(metaSchema);
+   this.compressionKind = compressionKind;
+   }
+
+   @Override
+   public void open(FileSystem fs, Path path) throws IOException {
+   writer = OrcFile.createWriter(path, 
OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
+   rowBatch = schema.createRowBatch();
+   orcBatchWriter = new 
OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
+   }
+
+   private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
+   List fieldNames = orcSchema.getFieldNames();
+   List typeDescriptions = 
orcSchema.getChildren();
+   List typeInformations = new ArrayList<>();
+
+   typeDescriptions.forEach(typeDescription -> {
+   

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

2018-07-08 Thread xndai
Github user xndai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r200830153
  
--- Diff: 
flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcFileWriterTest.java
 ---
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.streaming.connectors.fs.Writer;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link OrcFileWriter}.
+ */
+public class OrcFileWriterTest {
+
+   @Test
+   public void testDuplicate() {
+   OrcFileWriter writer = new 
OrcFileWriter("struct");
--- End diff --

Need UTs for writing Orc files with all supported types. Also include 
negative cases, and cases where OrcBatchWriter.fill() returns false.


---