[GitHub] ifndef-SleePy commented on issue #6808: [FLINK-10361] [test] Harden instable elastic search end-to-end test case

2018-10-09 Thread GitBox
ifndef-SleePy commented on issue #6808: [FLINK-10361] [test] Harden instable 
elastic search end-to-end test case
URL: https://github.com/apache/flink/pull/6808#issuecomment-428459801
 
 
   @tzulitai 
   Thank you for reviewing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10361) Elasticsearch (v6.3.1) sink end-to-end test instable

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644536#comment-16644536
 ] 

ASF GitHub Bot commented on FLINK-10361:


ifndef-SleePy commented on issue #6808: [FLINK-10361] [test] Harden instable 
elastic search end-to-end test case
URL: https://github.com/apache/flink/pull/6808#issuecomment-428459801
 
 
   @tzulitai 
   Thank you for reviewing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Elasticsearch (v6.3.1) sink end-to-end test instable
> 
>
> Key: FLINK-10361
> URL: https://issues.apache.org/jira/browse/FLINK-10361
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Biao Liu
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.7.0
>
> Attachments: flink-elasticsearch-logs.tgz
>
>
> The Elasticsearch (v6.3.1) sink end-to-end test is instable. Running it on an 
> Amazon instance it failed with the following exception in the logs:
> {code}
> 2018-09-17 20:46:04,856 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Source: Sequence Source -> Flat Map -> Sink: Unnamed (1/1) 
> (cb23fdd9df0d4e09270b2ae9970efbac) switched from RUNNING to FAILED.
> java.io.IOException: Connection refused
>   at 
> org.elasticsearch.client.RestClient$SyncResponseListener.get(RestClient.java:728)
>   at 
> org.elasticsearch.client.RestClient.performRequest(RestClient.java:235)
>   at 
> org.elasticsearch.client.RestClient.performRequest(RestClient.java:198)
>   at 
> org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:522)
>   at 
> org.elasticsearch.client.RestHighLevelClient.ping(RestHighLevelClient.java:275)
>   at 
> org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6ApiCallBridge.createClient(Elasticsearch6ApiCallBridge.java:81)
>   at 
> org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6ApiCallBridge.createClient(Elasticsearch6ApiCallBridge.java:47)
>   at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:296)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.net.ConnectException: Connection refused
>   at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>   at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>   at 
> org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvent(DefaultConnectingIOReactor.java:171)
>   at 
> org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:145)
>   at 
> org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:348)
>   at 
> org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:192)
>   at 
> org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)
>   ... 1 more
> {code}
> I assume that we should harden the test against connection problems a little 
> bit better.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9845) Make InternalTimerService's timer processing interruptible/abortable

2018-10-09 Thread Biao Liu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644530#comment-16644530
 ] 

Biao Liu commented on FLINK-9845:
-

[~till.rohrmann] This proposal sounds great. We have encountered the problem. 
If nobody is working on this, I'd like take it.

> Make InternalTimerService's timer processing interruptible/abortable
> 
>
> Key: FLINK-9845
> URL: https://issues.apache.org/jira/browse/FLINK-9845
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.1, 1.6.0
>Reporter: Till Rohrmann
>Priority: Major
> Fix For: 1.7.0
>
>
> When cancelling a {{Task}}, the task thread might currently process the 
> timers registered at the {{InternalTimerService}}. Depending on the timer 
> action, this might take a while and, thus, blocks the cancellation of the 
> {{Task}}. In the most extreme case, the {{TaskCancelerWatchDog}} kicks in and 
> kills the whole {{TaskManager}} process.
> In order to alleviate the problem (speed up the cancellation reaction), we 
> should make the processing of the timers interruptible/abortable. This means 
> that instead of processing all timers we should check in between timers 
> whether the {{Task}} is currently being cancelled or not. If this is the 
> case, then we should directly stop processing the remaining timers and return.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] aljoscha commented on a change in pull request #6703: [FLINK-9697] Provide connector for Kafka 2.0.0

2018-10-09 Thread GitBox
aljoscha commented on a change in pull request #6703: [FLINK-9697] Provide 
connector for Kafka 2.0.0
URL: https://github.com/apache/flink/pull/6703#discussion_r223951120
 
 

 ##
 File path: flink-connectors/flink-connector-kafka-2.0/pom.xml
 ##
 @@ -0,0 +1,312 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+   4.0.0
+
+   
+   flink-connectors
+   org.apache.flink
+   1.7-SNAPSHOT
+   ..
+   
+
+   flink-connector-kafka-2.0
 
 Review comment:
   Agreed, but we still have to add the Scala suffix, i.e. it should be 
`flink-connector-kafka_${scala.binary.version}` because we depend on 
`flink-streaming`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9697) Provide connector for Kafka 2.0.0

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644517#comment-16644517
 ] 

ASF GitHub Bot commented on FLINK-9697:
---

aljoscha commented on a change in pull request #6703: [FLINK-9697] Provide 
connector for Kafka 2.0.0
URL: https://github.com/apache/flink/pull/6703#discussion_r223951120
 
 

 ##
 File path: flink-connectors/flink-connector-kafka-2.0/pom.xml
 ##
 @@ -0,0 +1,312 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+   4.0.0
+
+   
+   flink-connectors
+   org.apache.flink
+   1.7-SNAPSHOT
+   ..
+   
+
+   flink-connector-kafka-2.0
 
 Review comment:
   Agreed, but we still have to add the Scala suffix, i.e. it should be 
`flink-connector-kafka_${scala.binary.version}` because we depend on 
`flink-streaming`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide connector for Kafka 2.0.0
> -
>
> Key: FLINK-9697
> URL: https://issues.apache.org/jira/browse/FLINK-9697
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Kafka 2.0.0 would be released soon.
> Here is vote thread:
> [http://search-hadoop.com/m/Kafka/uyzND1vxnEd23QLxb?subj=+VOTE+2+0+0+RC1]
> We should provide connector for Kafka 2.0.0 once it is released.
> Upgrade to 2.0 documentation : 
> http://kafka.apache.org/20/documentation.html#upgrade_2_0_0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223947861
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
+
+   protected transient long recordsReadSinceLastSync;
+
+   protected long lastSyncedBlock = -1L;
+
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames, boolean isStandard) {
+   super(path);
+   this.readType = new RowTypeInfo(fieldTypes, fieldNames);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   this.unsplittable = true;
+   this.isStandard = isStandard;
+   }
+
+   @Override
+   public Tuple2 getCurrentState() {
+   return new Tuple2<>(this.lastSyncedBlock, 
this.recordsReadSinceLastSync);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
 
 Review comment:
   They are reset in the next Record function.
   if (parquetRecordReader.getCurrentBlock() != lastSyncedBlocker) {
   ...
   }


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644505#comment-16644505
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223947861
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
+
+   protected transient long recordsReadSinceLastSync;
+
+   protected long lastSyncedBlock = -1L;
+
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames, boolean isStandard) {
+   super(path);
+   this.readType = new RowTypeInfo(fieldTypes, fieldNames);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   this.unsplittable = true;
+   this.isStandard = isStandard;
+   }
+
+   @Override
+   public Tuple2 getCurrentState() {
+   return new Tuple2<>(this.lastSyncedBlock, 
this.recordsReadSinceLastSync);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
 
 Review comment:
   They are reset in the next Record function.
   if (parquetRecordReader.getCurrentBlock() != lastSyncedBlocker) {
   ...
   }


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ParquetInputFormat
> --
>
> Key: FLINK-

[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644504#comment-16644504
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223947437
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
+
+   protected transient long recordsReadSinceLastSync;
+
+   protected long lastSyncedBlock = -1L;
+
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames, boolean isStandard) {
+   super(path);
+   this.readType = new RowTypeInfo(fieldTypes, fieldNames);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   this.unsplittable = true;
+   this.isStandard = isStandard;
+   }
+
+   @Override
+   public Tuple2 getCurrentState() {
+   return new Tuple2<>(this.lastSyncedBlock, 
this.recordsReadSinceLastSync);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   org.apache.hadoop.conf.Configuration configuration = new 
org.apache.hadoop.conf.Configuration();
+   InputFile inputFile =
+   HadoopInputFile.fromPath(new 
org.apache.hadoop.fs.Path(split.getPath().toUri()), configuration);
+   ParquetReadOptions options = 
ParquetReadOptions.builder().build();
+   ParquetFileReader fileReader = new ParquetFileReader(inputFile, 
options);
+   MessageType schema = fileReader.getFileMetaData().getSchema();
+   Me

[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223947437
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
+
+   protected transient long recordsReadSinceLastSync;
+
+   protected long lastSyncedBlock = -1L;
+
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames, boolean isStandard) {
+   super(path);
+   this.readType = new RowTypeInfo(fieldTypes, fieldNames);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   this.unsplittable = true;
+   this.isStandard = isStandard;
+   }
+
+   @Override
+   public Tuple2 getCurrentState() {
+   return new Tuple2<>(this.lastSyncedBlock, 
this.recordsReadSinceLastSync);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   org.apache.hadoop.conf.Configuration configuration = new 
org.apache.hadoop.conf.Configuration();
+   InputFile inputFile =
+   HadoopInputFile.fromPath(new 
org.apache.hadoop.fs.Path(split.getPath().toUri()), configuration);
+   ParquetReadOptions options = 
ParquetReadOptions.builder().build();
+   ParquetFileReader fileReader = new ParquetFileReader(inputFile, 
options);
+   MessageType schema = fileReader.getFileMetaData().getSchema();
+   MessageType readSchema = getReadSchema(schema);
+   this.parquetRecordReader = new ParquetRecordReader<>(new 
RowReadSupport(), readSchema, FilterCompat.NOOP);
+   this.parquetRecordReader.initialize(fileReader, configuration);
+  

[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644488#comment-16644488
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223942389
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
+
+   protected transient long recordsReadSinceLastSync;
+
+   protected long lastSyncedBlock = -1L;
+
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames, boolean isStandard) {
+   super(path);
+   this.readType = new RowTypeInfo(fieldTypes, fieldNames);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   this.unsplittable = true;
+   this.isStandard = isStandard;
+   }
+
+   @Override
+   public Tuple2 getCurrentState() {
+   return new Tuple2<>(this.lastSyncedBlock, 
this.recordsReadSinceLastSync);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   org.apache.hadoop.conf.Configuration configuration = new 
org.apache.hadoop.conf.Configuration();
+   InputFile inputFile =
+   HadoopInputFile.fromPath(new 
org.apache.hadoop.fs.Path(split.getPath().toUri()), configuration);
+   ParquetReadOptions options = 
ParquetReadOptions.builder().build();
+   ParquetFileReader fileReader = new ParquetFileReader(inputFile, 
options);
+   MessageType schema = fileReader.getFileMetaData().getSchema();
+   Me

[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644487#comment-16644487
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223942328
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
+
+   protected transient long recordsReadSinceLastSync;
+
+   protected long lastSyncedBlock = -1L;
+
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames, boolean isStandard) {
+   super(path);
+   this.readType = new RowTypeInfo(fieldTypes, fieldNames);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   this.unsplittable = true;
+   this.isStandard = isStandard;
+   }
+
+   @Override
+   public Tuple2 getCurrentState() {
+   return new Tuple2<>(this.lastSyncedBlock, 
this.recordsReadSinceLastSync);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   org.apache.hadoop.conf.Configuration configuration = new 
org.apache.hadoop.conf.Configuration();
+   InputFile inputFile =
+   HadoopInputFile.fromPath(new 
org.apache.hadoop.fs.Path(split.getPath().toUri()), configuration);
+   ParquetReadOptions options = 
ParquetReadOptions.builder().build();
+   ParquetFileReader fileReader = new ParquetFileReader(inputFile, 
options);
+   MessageType schema = fileReader.getFileMetaData().getSchema();
+   Me

[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223942389
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
+
+   protected transient long recordsReadSinceLastSync;
+
+   protected long lastSyncedBlock = -1L;
+
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames, boolean isStandard) {
+   super(path);
+   this.readType = new RowTypeInfo(fieldTypes, fieldNames);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   this.unsplittable = true;
+   this.isStandard = isStandard;
+   }
+
+   @Override
+   public Tuple2 getCurrentState() {
+   return new Tuple2<>(this.lastSyncedBlock, 
this.recordsReadSinceLastSync);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   org.apache.hadoop.conf.Configuration configuration = new 
org.apache.hadoop.conf.Configuration();
+   InputFile inputFile =
+   HadoopInputFile.fromPath(new 
org.apache.hadoop.fs.Path(split.getPath().toUri()), configuration);
+   ParquetReadOptions options = 
ParquetReadOptions.builder().build();
+   ParquetFileReader fileReader = new ParquetFileReader(inputFile, 
options);
+   MessageType schema = fileReader.getFileMetaData().getSchema();
+   MessageType readSchema = getReadSchema(schema);
+   this.parquetRecordReader = new ParquetRecordReader<>(new 
RowReadSupport(), readSchema, FilterCompat.NOOP);
+   this.parquetRecordReader.initialize(fileReader, configuration);
+  

[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223942328
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
+
+   protected transient long recordsReadSinceLastSync;
+
+   protected long lastSyncedBlock = -1L;
+
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames, boolean isStandard) {
+   super(path);
+   this.readType = new RowTypeInfo(fieldTypes, fieldNames);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   this.unsplittable = true;
+   this.isStandard = isStandard;
+   }
+
+   @Override
+   public Tuple2 getCurrentState() {
+   return new Tuple2<>(this.lastSyncedBlock, 
this.recordsReadSinceLastSync);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   org.apache.hadoop.conf.Configuration configuration = new 
org.apache.hadoop.conf.Configuration();
+   InputFile inputFile =
+   HadoopInputFile.fromPath(new 
org.apache.hadoop.fs.Path(split.getPath().toUri()), configuration);
+   ParquetReadOptions options = 
ParquetReadOptions.builder().build();
+   ParquetFileReader fileReader = new ParquetFileReader(inputFile, 
options);
+   MessageType schema = fileReader.getFileMetaData().getSchema();
+   MessageType readSchema = getReadSchema(schema);
+   this.parquetRecordReader = new ParquetRecordReader<>(new 
RowReadSupport(), readSchema, FilterCompat.NOOP);
+   this.parquetRecordReader.initialize(fileReader, configuration);
+  

[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223941595
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
+
+   protected transient long recordsReadSinceLastSync;
+
+   protected long lastSyncedBlock = -1L;
+
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames, boolean isStandard) {
+   super(path);
+   this.readType = new RowTypeInfo(fieldTypes, fieldNames);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   this.unsplittable = true;
+   this.isStandard = isStandard;
+   }
+
+   @Override
+   public Tuple2 getCurrentState() {
+   return new Tuple2<>(this.lastSyncedBlock, 
this.recordsReadSinceLastSync);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   org.apache.hadoop.conf.Configuration configuration = new 
org.apache.hadoop.conf.Configuration();
+   InputFile inputFile =
+   HadoopInputFile.fromPath(new 
org.apache.hadoop.fs.Path(split.getPath().toUri()), configuration);
+   ParquetReadOptions options = 
ParquetReadOptions.builder().build();
+   ParquetFileReader fileReader = new ParquetFileReader(inputFile, 
options);
+   MessageType schema = fileReader.getFileMetaData().getSchema();
+   MessageType readSchema = getReadSchema(schema);
+   this.parquetRecordReader = new ParquetRecordReader<>(new 
RowReadSupport(), readSchema, FilterCompat.NOOP);
+   this.parquetRecordReader.initialize(fileReader, configuration);
+  

[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644486#comment-16644486
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223941595
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
+
+   protected transient long recordsReadSinceLastSync;
+
+   protected long lastSyncedBlock = -1L;
+
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames, boolean isStandard) {
+   super(path);
+   this.readType = new RowTypeInfo(fieldTypes, fieldNames);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   this.unsplittable = true;
+   this.isStandard = isStandard;
+   }
+
+   @Override
+   public Tuple2 getCurrentState() {
+   return new Tuple2<>(this.lastSyncedBlock, 
this.recordsReadSinceLastSync);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   org.apache.hadoop.conf.Configuration configuration = new 
org.apache.hadoop.conf.Configuration();
+   InputFile inputFile =
+   HadoopInputFile.fromPath(new 
org.apache.hadoop.fs.Path(split.getPath().toUri()), configuration);
+   ParquetReadOptions options = 
ParquetReadOptions.builder().build();
+   ParquetFileReader fileReader = new ParquetFileReader(inputFile, 
options);
+   MessageType schema = fileReader.getFileMetaData().getSchema();
+   Me

[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644483#comment-16644483
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223941595
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
+
+   protected transient long recordsReadSinceLastSync;
+
+   protected long lastSyncedBlock = -1L;
+
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames, boolean isStandard) {
+   super(path);
+   this.readType = new RowTypeInfo(fieldTypes, fieldNames);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   this.unsplittable = true;
+   this.isStandard = isStandard;
+   }
+
+   @Override
+   public Tuple2 getCurrentState() {
+   return new Tuple2<>(this.lastSyncedBlock, 
this.recordsReadSinceLastSync);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   org.apache.hadoop.conf.Configuration configuration = new 
org.apache.hadoop.conf.Configuration();
+   InputFile inputFile =
+   HadoopInputFile.fromPath(new 
org.apache.hadoop.fs.Path(split.getPath().toUri()), configuration);
+   ParquetReadOptions options = 
ParquetReadOptions.builder().build();
+   ParquetFileReader fileReader = new ParquetFileReader(inputFile, 
options);
+   MessageType schema = fileReader.getFileMetaData().getSchema();
+   Me

[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223941595
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
+
+   protected transient long recordsReadSinceLastSync;
+
+   protected long lastSyncedBlock = -1L;
+
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames, boolean isStandard) {
+   super(path);
+   this.readType = new RowTypeInfo(fieldTypes, fieldNames);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   this.unsplittable = true;
+   this.isStandard = isStandard;
+   }
+
+   @Override
+   public Tuple2 getCurrentState() {
+   return new Tuple2<>(this.lastSyncedBlock, 
this.recordsReadSinceLastSync);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   org.apache.hadoop.conf.Configuration configuration = new 
org.apache.hadoop.conf.Configuration();
+   InputFile inputFile =
+   HadoopInputFile.fromPath(new 
org.apache.hadoop.fs.Path(split.getPath().toUri()), configuration);
+   ParquetReadOptions options = 
ParquetReadOptions.builder().build();
+   ParquetFileReader fileReader = new ParquetFileReader(inputFile, 
options);
+   MessageType schema = fileReader.getFileMetaData().getSchema();
+   MessageType readSchema = getReadSchema(schema);
+   this.parquetRecordReader = new ParquetRecordReader<>(new 
RowReadSupport(), readSchema, FilterCompat.NOOP);
+   this.parquetRecordReader.initialize(fileReader, configuration);
+  

[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644481#comment-16644481
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223940956
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: pull-request-available
>
> Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644480#comment-16644480
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223940936
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
+
+   protected transient long recordsReadSinceLastSync;
 
 Review comment:
   Agree.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: pull-request-available
>
> Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223940956
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644479#comment-16644479
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223940919
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
 
 Review comment:
   It should be private. Changed accordingly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: pull-request-available
>
> Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223940919
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
 
 Review comment:
   It should be private. Changed accordingly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223940936
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
+
+   protected transient long recordsReadSinceLastSync;
 
 Review comment:
   Agree.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644476#comment-16644476
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223940551
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
+
+   protected transient long recordsReadSinceLastSync;
+
+   protected long lastSyncedBlock = -1L;
+
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames, boolean isStandard) {
 
 Review comment:
   1) added java doc for two constructors
   2) Given Path is required and multiple file path is not supported, I would 
prefer leave it there as AvroInputFormat, otherwise, users need to call 
constructor then setPath. It is not that convenient.
   3) Agree. I added another constructor to take MessageType as return type.
   4) I think we need. The getProducerType will be needed for table source in 
early stage. If we rely on schema in parquet file for field type, we can only 
get it after open the split for now. It is too late for SQL magics right? 
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243
> Project: Flink
>  Issue Type: S

[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223940551
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
+
+   protected transient long recordsReadSinceLastSync;
+
+   protected long lastSyncedBlock = -1L;
+
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames, boolean isStandard) {
 
 Review comment:
   1) added java doc for two constructors
   2) Given Path is required and multiple file path is not supported, I would 
prefer leave it there as AvroInputFormat, otherwise, users need to call 
constructor then setPath. It is not that convenient.
   3) Agree. I added another constructor to take MessageType as return type.
   4) I think we need. The getProducerType will be needed for table source in 
early stage. If we rely on schema in parquet file for field type, we can only 
get it after open the split for now. It is too late for SQL magics right? 
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644465#comment-16644465
 ] 

ASF GitHub Bot commented on FLINK-9377:
---

tzulitai commented on issue #6711: [FLINK-9377] [core, state backends] Remove 
serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#issuecomment-428440460
 
 
   Thanks @dawidwys.
   I'll address your last comments, rebase on master and after Travis gives 
green, proceed to merge this :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove writing serializers as part of the checkpoint meta information
> -
>
> Key: FLINK-9377
> URL: https://issues.apache.org/jira/browse/FLINK-9377
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> When writing meta information of a state in savepoints, we currently write 
> both the state serializer as well as the state serializer's configuration 
> snapshot.
> Writing both is actually redundant, as most of the time they have identical 
> information.
>  Moreover, the fact that we use Java serialization to write the serializer 
> and rely on it to be re-readable on the restore run, already poses problems 
> for serializers such as the {{AvroSerializer}} (see discussion in FLINK-9202) 
> to perform even a compatible upgrade.
> The proposal here is to leave only the config snapshot as meta information, 
> and use that as the single source of truth of information about the schema of 
> serialized state.
>  The config snapshot should be treated as a factory (or provided to a 
> factory) to re-create serializers capable of reading old, serialized state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tzulitai commented on issue #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints

2018-10-09 Thread GitBox
tzulitai commented on issue #6711: [FLINK-9377] [core, state backends] Remove 
serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#issuecomment-428440460
 
 
   Thanks @dawidwys.
   I'll address your last comments, rebase on master and after Travis gives 
green, proceed to merge this :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10464) TimeIndicatorRelDataType: digest can not describe a type completely.

2018-10-09 Thread huangjiatian (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644460#comment-16644460
 ] 

huangjiatian commented on FLINK-10464:
--

[~twalthr] supplement to picture

> TimeIndicatorRelDataType: digest can not describe a type completely.
> 
>
> Key: FLINK-10464
> URL: https://issues.apache.org/jira/browse/FLINK-10464
> Project: Flink
>  Issue Type: Bug
>Reporter: huangjiatian
>Priority: Minor
> Attachments: 1.png, 2.png, 3.png
>
>
> I met a strange question when i use Flink SQL API.
> I found two difference type are considered equal.
> !1.png!
> After debug, i found the hashCode method and equals method in RelRecordType 
> just consider "digest".
>  !2.png!
>  But some time, "digest" can not describe a type completely, More information 
> in the "fieldList".
>  !3.png!
> “digest” in TimeIndicatorRelDataType should contains time type 
> information(event time or process time).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10464) TimeIndicatorRelDataType: digest can not describe a type completely.

2018-10-09 Thread huangjiatian (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huangjiatian updated FLINK-10464:
-
Description: 
I met a strange question when i use Flink SQL API.

I found two difference type are considered equal.

!1.png!

After debug, i found the hashCode method and equals method in RelRecordType 
just consider "digest".
 !2.png!
 But some time, "digest" can not describe a type completely, More information 
in the "fieldList".
 !3.png!

“digest” in TimeIndicatorRelDataType should contains time type 
information(event time or process time).

  was:
I met a strange question when i use Flink SQL API.

I found two difference type are considered equal.

!1.png!

After debug, i found the hashCode method and equals method in RelRecordType 
just consider "digest".
!2.png!
 But some time, "digest" can not describe a type completely, More information 
in the "fieldList".
!3.png!


> TimeIndicatorRelDataType: digest can not describe a type completely.
> 
>
> Key: FLINK-10464
> URL: https://issues.apache.org/jira/browse/FLINK-10464
> Project: Flink
>  Issue Type: Bug
>Reporter: huangjiatian
>Priority: Minor
> Attachments: 1.png, 2.png, 3.png
>
>
> I met a strange question when i use Flink SQL API.
> I found two difference type are considered equal.
> !1.png!
> After debug, i found the hashCode method and equals method in RelRecordType 
> just consider "digest".
>  !2.png!
>  But some time, "digest" can not describe a type completely, More information 
> in the "fieldList".
>  !3.png!
> “digest” in TimeIndicatorRelDataType should contains time type 
> information(event time or process time).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10464) TimeIndicatorRelDataType: digest can not describe a type completely.

2018-10-09 Thread huangjiatian (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huangjiatian updated FLINK-10464:
-
Attachment: 3.png

> TimeIndicatorRelDataType: digest can not describe a type completely.
> 
>
> Key: FLINK-10464
> URL: https://issues.apache.org/jira/browse/FLINK-10464
> Project: Flink
>  Issue Type: Bug
>Reporter: huangjiatian
>Priority: Minor
> Attachments: 1.png, 2.png, 3.png
>
>
> I met a strange question when i use Flink SQL API.
> I found two difference type are considered equal.
> !1.png!
> After debug, i found the hashCode method and equals method in RelRecordType 
> just consider "digest".
> !2.png!
>  But some time, "digest" can not describe a type completely, More information 
> in the "fieldList".
> !3.png!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10464) TimeIndicatorRelDataType: digest can not describe a type completely.

2018-10-09 Thread huangjiatian (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huangjiatian updated FLINK-10464:
-
Description: 
I met a strange question when i use Flink SQL API.

I found two difference type are considered equal.

!1.png!

After debug, i found the hashCode method and equals method in RelRecordType 
just consider "digest".
!2.png!
 But some time, "digest" can not describe a type completely, More information 
in the "fieldList".
!3.png!

  was:
I met a strange question when i use Flink SQL API.

I found two difference type are considered equal.

 

After debug, i found the hashCode method and equals method in RelRecordType 
just consider "digest".
 !image-2018-09-28-13-15-19-711.png!
 But some time, "digest" can not describe a type completely, More information 
in the "fieldList".
 !image-2018-09-28-13-17-19-369.png!


> TimeIndicatorRelDataType: digest can not describe a type completely.
> 
>
> Key: FLINK-10464
> URL: https://issues.apache.org/jira/browse/FLINK-10464
> Project: Flink
>  Issue Type: Bug
>Reporter: huangjiatian
>Priority: Minor
> Attachments: 1.png, 2.png, 3.png
>
>
> I met a strange question when i use Flink SQL API.
> I found two difference type are considered equal.
> !1.png!
> After debug, i found the hashCode method and equals method in RelRecordType 
> just consider "digest".
> !2.png!
>  But some time, "digest" can not describe a type completely, More information 
> in the "fieldList".
> !3.png!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10464) TimeIndicatorRelDataType: digest can not describe a type completely.

2018-10-09 Thread huangjiatian (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huangjiatian updated FLINK-10464:
-
Attachment: 2.png

> TimeIndicatorRelDataType: digest can not describe a type completely.
> 
>
> Key: FLINK-10464
> URL: https://issues.apache.org/jira/browse/FLINK-10464
> Project: Flink
>  Issue Type: Bug
>Reporter: huangjiatian
>Priority: Minor
> Attachments: 1.png, 2.png
>
>
> I met a strange question when i use Flink SQL API.
> I found two difference type are considered equal.
>  
> After debug, i found the hashCode method and equals method in RelRecordType 
> just consider "digest".
>  !image-2018-09-28-13-15-19-711.png!
>  But some time, "digest" can not describe a type completely, More information 
> in the "fieldList".
>  !image-2018-09-28-13-17-19-369.png!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10464) TimeIndicatorRelDataType: digest can not describe a type completely.

2018-10-09 Thread huangjiatian (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huangjiatian updated FLINK-10464:
-
Description: 
I met a strange question when i use Flink SQL API.

I found two difference type are considered equal.

  !image-2018-09-28-13-11-43-515.png!

After debug, i found the hashCode method and equals method in RelRecordType 
just consider "digest".
 !image-2018-09-28-13-15-19-711.png!
 But some time, "digest" can not describe a type completely, More information 
in the "fieldList".
 !image-2018-09-28-13-17-19-369.png!

  was:
I met a strange question when i use Flink SQL API.

I found two difference type are considered equal.

 

After debug, i found the hashCode method and equals method in RelRecordType 
just consider "digest".
 !image-2018-09-28-13-15-19-711.png!
 But some time, "digest" can not describe a type completely, More information 
in the "fieldList".
 !image-2018-09-28-13-17-19-369.png!


> TimeIndicatorRelDataType: digest can not describe a type completely.
> 
>
> Key: FLINK-10464
> URL: https://issues.apache.org/jira/browse/FLINK-10464
> Project: Flink
>  Issue Type: Bug
>Reporter: huangjiatian
>Priority: Minor
> Attachments: 1.png
>
>
> I met a strange question when i use Flink SQL API.
> I found two difference type are considered equal.
>   !image-2018-09-28-13-11-43-515.png!
> After debug, i found the hashCode method and equals method in RelRecordType 
> just consider "digest".
>  !image-2018-09-28-13-15-19-711.png!
>  But some time, "digest" can not describe a type completely, More information 
> in the "fieldList".
>  !image-2018-09-28-13-17-19-369.png!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10464) TimeIndicatorRelDataType: digest can not describe a type completely.

2018-10-09 Thread huangjiatian (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huangjiatian updated FLINK-10464:
-
Description: 
I met a strange question when i use Flink SQL API.

I found two difference type are considered equal.

 

After debug, i found the hashCode method and equals method in RelRecordType 
just consider "digest".
 !image-2018-09-28-13-15-19-711.png!
 But some time, "digest" can not describe a type completely, More information 
in the "fieldList".
 !image-2018-09-28-13-17-19-369.png!

  was:
I met a strange question when i use Flink SQL API.

I found two difference type are considered equal.

  !image-2018-09-28-13-11-43-515.png!

After debug, i found the hashCode method and equals method in RelRecordType 
just consider "digest".
 !image-2018-09-28-13-15-19-711.png!
 But some time, "digest" can not describe a type completely, More information 
in the "fieldList".
 !image-2018-09-28-13-17-19-369.png!


> TimeIndicatorRelDataType: digest can not describe a type completely.
> 
>
> Key: FLINK-10464
> URL: https://issues.apache.org/jira/browse/FLINK-10464
> Project: Flink
>  Issue Type: Bug
>Reporter: huangjiatian
>Priority: Minor
> Attachments: 1.png
>
>
> I met a strange question when i use Flink SQL API.
> I found two difference type are considered equal.
>  
> After debug, i found the hashCode method and equals method in RelRecordType 
> just consider "digest".
>  !image-2018-09-28-13-15-19-711.png!
>  But some time, "digest" can not describe a type completely, More information 
> in the "fieldList".
>  !image-2018-09-28-13-17-19-369.png!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10464) TimeIndicatorRelDataType: digest can not describe a type completely.

2018-10-09 Thread huangjiatian (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huangjiatian updated FLINK-10464:
-
Description: 
I met a strange question when i use Flink SQL API.

I found two difference type are considered equal.

 

After debug, i found the hashCode method and equals method in RelRecordType 
just consider "digest".
 !image-2018-09-28-13-15-19-711.png!
 But some time, "digest" can not describe a type completely, More information 
in the "fieldList".
 !image-2018-09-28-13-17-19-369.png!

  was:
I met a strange question when i use Flink SQL API.

I found two difference type are considered equal.
!image-2018-09-28-13-11-43-515.png! After debug, i found the hashCode method 
and equals method in RelRecordType just consider "digest".
 !image-2018-09-28-13-15-19-711.png!
 But some time, "digest" can not describe a type completely, More information 
in the "fieldList".
 !image-2018-09-28-13-17-19-369.png!


> TimeIndicatorRelDataType: digest can not describe a type completely.
> 
>
> Key: FLINK-10464
> URL: https://issues.apache.org/jira/browse/FLINK-10464
> Project: Flink
>  Issue Type: Bug
>Reporter: huangjiatian
>Priority: Minor
> Attachments: 1.png
>
>
> I met a strange question when i use Flink SQL API.
> I found two difference type are considered equal.
>  
> After debug, i found the hashCode method and equals method in RelRecordType 
> just consider "digest".
>  !image-2018-09-28-13-15-19-711.png!
>  But some time, "digest" can not describe a type completely, More information 
> in the "fieldList".
>  !image-2018-09-28-13-17-19-369.png!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10464) TimeIndicatorRelDataType: digest can not describe a type completely.

2018-10-09 Thread huangjiatian (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huangjiatian updated FLINK-10464:
-
Attachment: 1.png

> TimeIndicatorRelDataType: digest can not describe a type completely.
> 
>
> Key: FLINK-10464
> URL: https://issues.apache.org/jira/browse/FLINK-10464
> Project: Flink
>  Issue Type: Bug
>Reporter: huangjiatian
>Priority: Minor
> Attachments: 1.png
>
>
> I met a strange question when i use Flink SQL API.
> I found two difference type are considered equal.
> !image-2018-09-28-13-11-43-515.png! After debug, i found the hashCode method 
> and equals method in RelRecordType just consider "digest".
>  !image-2018-09-28-13-15-19-711.png!
>  But some time, "digest" can not describe a type completely, More information 
> in the "fieldList".
>  !image-2018-09-28-13-17-19-369.png!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644450#comment-16644450
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223935440
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
 
 Review comment:
   Agree. I removed the duplicated readType here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: pull-request-available
>
> Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10464) TimeIndicatorRelDataType: digest can not describe a type completely.

2018-10-09 Thread huangjiatian (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huangjiatian updated FLINK-10464:
-
Description: 
I met a strange question when i use Flink SQL API.

I found two difference type are considered equal.
!image-2018-09-28-13-11-43-515.png! After debug, i found the hashCode method 
and equals method in RelRecordType just consider "digest".
 !image-2018-09-28-13-15-19-711.png!
 But some time, "digest" can not describe a type completely, More information 
in the "fieldList".
 !image-2018-09-28-13-17-19-369.png!

  was:
I met a strange question when i use Flink SQL API.

I found two difference type are considered equal.
!image-2018-09-28-13-11-43-515.png!
After debug, i found the hashCode method and equals method in RelRecordType 
just consider "digest".
!image-2018-09-28-13-15-19-711.png!
But some time, "digest" can not describe a type completely, More information in 
the "fieldList".
!image-2018-09-28-13-17-19-369.png!


> TimeIndicatorRelDataType: digest can not describe a type completely.
> 
>
> Key: FLINK-10464
> URL: https://issues.apache.org/jira/browse/FLINK-10464
> Project: Flink
>  Issue Type: Bug
>Reporter: huangjiatian
>Priority: Minor
>
> I met a strange question when i use Flink SQL API.
> I found two difference type are considered equal.
> !image-2018-09-28-13-11-43-515.png! After debug, i found the hashCode method 
> and equals method in RelRecordType just consider "digest".
>  !image-2018-09-28-13-15-19-711.png!
>  But some time, "digest" can not describe a type completely, More information 
> in the "fieldList".
>  !image-2018-09-28-13-17-19-369.png!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223935440
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
 
 Review comment:
   Agree. I removed the duplicated readType here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10464) TimeIndicatorRelDataType: digest can not describe a type completely.

2018-10-09 Thread huangjiatian (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huangjiatian updated FLINK-10464:
-
Description: 
I met a strange question when i use Flink SQL API.

I found two difference type are considered equal.
!image-2018-09-28-13-11-43-515.png!
After debug, i found the hashCode method and equals method in RelRecordType 
just consider "digest".
!image-2018-09-28-13-15-19-711.png!
But some time, "digest" can not describe a type completely, More information in 
the "fieldList".
!image-2018-09-28-13-17-19-369.png!

  was:
I met a strange question when i use Flink SQL API.

The error message like that: 

java.lang.AssertionError: Conversion to relational algebra failed to preserve 
datatypes:

validated type:

RecordType(TIMESTAMP(3) NOT NULL rowtime) NOT NULL

converted type:

RecordType(TIMESTAMP(3) NOT NULL rowtime) NOT NULL

rel:

LogicalProject(rowtime=[$3])

  LogicalTableScan(table=[[hjtsrc]])

    

I found two difference type are considered equal.
!image-2018-09-28-13-11-43-515.png!

that mean, "select deviceid, rowtime.rowtime" equal to "select deviceid, 
rowtime.proctime"

"digest" in TimeIndicatorRelDataType without event time message , it can not 
describe a TimeIndicatorRelDataType completely.


> TimeIndicatorRelDataType: digest can not describe a type completely.
> 
>
> Key: FLINK-10464
> URL: https://issues.apache.org/jira/browse/FLINK-10464
> Project: Flink
>  Issue Type: Bug
>Reporter: huangjiatian
>Priority: Minor
>
> I met a strange question when i use Flink SQL API.
> I found two difference type are considered equal.
> !image-2018-09-28-13-11-43-515.png!
> After debug, i found the hashCode method and equals method in RelRecordType 
> just consider "digest".
> !image-2018-09-28-13-15-19-711.png!
> But some time, "digest" can not describe a type completely, More information 
> in the "fieldList".
> !image-2018-09-28-13-17-19-369.png!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1663#comment-1663
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223934091
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
 
 Review comment:
   Yes, It was original added to distinguish the backward compatible schema. It 
is not needed here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: pull-request-available
>
> Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223934091
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
 
 Review comment:
   Yes, It was original added to distinguish the backward compatible schema. It 
is not needed here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10491) Deadlock during spilling data in SpillableSubpartition

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644432#comment-16644432
 ] 

ASF GitHub Bot commented on FLINK-10491:


zhijiangW commented on issue #6809: [FLINK-10491][network] Pass BufferPoolOwner 
in the constructor of LocalBufferPool
URL: https://github.com/apache/flink/pull/6809#issuecomment-428432627
 
 
   Thanks for efficient reviews! @NicoK @pnowojski 
   
   I already updated the codes based on above comments except the additional 
missing tests. I find there only exists one test currently related with 
`BufferPoolOwner` whose behavior is throwing exception when calling release 
memory, so the normal behavior of `BufferPoolOwner` is also missing besides 
with different `ResultPartitionType`. 
   
   But I think it is another topic and I want to add more tests for it in a 
separate JIRA which I will create later. The motivation of this PR only moves 
the `BufferPoolOwner` into the constructor of `LocalBufferPool` and it does not 
effect the previous behaviors. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Deadlock during spilling data in SpillableSubpartition 
> ---
>
> Key: FLINK-10491
> URL: https://issues.apache.org/jira/browse/FLINK-10491
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.6.1, 1.5.4
>Reporter: Piotr Nowojski
>Assignee: zhijiang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Originally reported here: 
> [https://lists.apache.org/thread.html/472c8f4a2711c5e217fadd9a88f8c73670218e7432bb81ba3f5076db@%3Cuser.flink.apache.org%3E]
> Thread dump (from 1.5.3 version) showing two deadlocked threads, because they 
> are taking two locks in different order:
> {noformat}
> Thread-1
> "DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA 
> waiting for monitor entry
> waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a 
> java.util.ArrayDeque)
> at 
> org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223)
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355)
> - locked <0x2dfd> (a java.util.ArrayDeque)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203)
> - locked <0x2da5> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)
> at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
> at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
> at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
> at 
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:745)
> Thread-2
> "Map (Key Extractor) (1/10)@9967" prio=5 tid=0xaab nid=NA waiting for monitor 
> entry
> java.lang.Thread.State: BLOCKED
> blocks DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002
> waiting for DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 to release 
> lock on <0x2dfd> (a java.util.ArrayDeque)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:261)
>

[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644433#comment-16644433
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223932565
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
+
+   protected transient long recordsReadSinceLastSync;
+
+   protected long lastSyncedBlock = -1L;
+
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames, boolean isStandard) {
+   super(path);
+   this.readType = new RowTypeInfo(fieldTypes, fieldNames);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   this.unsplittable = true;
 
 Review comment:
   As a Parquet file is composed of row groups with a separate metadata in 
footer, it is definitely splittable. It acquires to read footer of each files 
in advance and process each file slit with the offset info of original file. It 
could be separate improvement that can be done in a separate PR. How do you 
think? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243
> Project: Flink
>   

[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223932565
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
+
+   protected transient long recordsReadSinceLastSync;
+
+   protected long lastSyncedBlock = -1L;
+
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames, boolean isStandard) {
+   super(path);
+   this.readType = new RowTypeInfo(fieldTypes, fieldNames);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   this.unsplittable = true;
 
 Review comment:
   As a Parquet file is composed of row groups with a separate metadata in 
footer, it is definitely splittable. It acquires to read footer of each files 
in advance and process each file slit with the offset info of original file. It 
could be separate improvement that can be done in a separate PR. How do you 
think? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhijiangW commented on issue #6809: [FLINK-10491][network] Pass BufferPoolOwner in the constructor of LocalBufferPool

2018-10-09 Thread GitBox
zhijiangW commented on issue #6809: [FLINK-10491][network] Pass BufferPoolOwner 
in the constructor of LocalBufferPool
URL: https://github.com/apache/flink/pull/6809#issuecomment-428432627
 
 
   Thanks for efficient reviews! @NicoK @pnowojski 
   
   I already updated the codes based on above comments except the additional 
missing tests. I find there only exists one test currently related with 
`BufferPoolOwner` whose behavior is throwing exception when calling release 
memory, so the normal behavior of `BufferPoolOwner` is also missing besides 
with different `ResultPartitionType`. 
   
   But I think it is another topic and I want to add more tests for it in a 
separate JIRA which I will create later. The motivation of this PR only moves 
the `BufferPoolOwner` into the constructor of `LocalBufferPool` and it does not 
effect the previous behaviors. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10491) Deadlock during spilling data in SpillableSubpartition

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644425#comment-16644425
 ] 

ASF GitHub Bot commented on FLINK-10491:


zhijiangW commented on a change in pull request #6809: [FLINK-10491][network] 
Pass BufferPoolOwner in the constructor of LocalBufferPool
URL: https://github.com/apache/flink/pull/6809#discussion_r223931343
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ##
 @@ -349,11 +346,13 @@ public void setNumBuffers(int numBuffers) throws 
IOException {
 
returnExcessMemorySegments();
 
-   // If there is a registered owner and we have still 
requested more buffers than our
-   // size, trigger a recycle via the owner.
-   if (owner != null && numberOfRequestedMemorySegments > 
currentPoolSize) {
-   
owner.releaseMemory(numberOfRequestedMemorySegments - currentPoolSize);
-   }
+   numReleased = numberOfRequestedMemorySegments - 
currentPoolSize;
+   }
+
+   // If there is a registered owner and we have still requested 
more buffers than our
+   // size, trigger a recycle via the owner.
+   if (owner != null && numReleased > 0) {
+   owner.releaseMemory(numReleased);
 
 Review comment:
   Yes, I also have not thought of any potential problems currently.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Deadlock during spilling data in SpillableSubpartition 
> ---
>
> Key: FLINK-10491
> URL: https://issues.apache.org/jira/browse/FLINK-10491
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.6.1, 1.5.4
>Reporter: Piotr Nowojski
>Assignee: zhijiang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Originally reported here: 
> [https://lists.apache.org/thread.html/472c8f4a2711c5e217fadd9a88f8c73670218e7432bb81ba3f5076db@%3Cuser.flink.apache.org%3E]
> Thread dump (from 1.5.3 version) showing two deadlocked threads, because they 
> are taking two locks in different order:
> {noformat}
> Thread-1
> "DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA 
> waiting for monitor entry
> waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a 
> java.util.ArrayDeque)
> at 
> org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223)
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355)
> - locked <0x2dfd> (a java.util.ArrayDeque)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203)
> - locked <0x2da5> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)
> at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
> at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
> at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
> at 
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216)
> at org.apache.flink.runtime.taskmanager.Task.run

[GitHub] zhijiangW commented on a change in pull request #6809: [FLINK-10491][network] Pass BufferPoolOwner in the constructor of LocalBufferPool

2018-10-09 Thread GitBox
zhijiangW commented on a change in pull request #6809: [FLINK-10491][network] 
Pass BufferPoolOwner in the constructor of LocalBufferPool
URL: https://github.com/apache/flink/pull/6809#discussion_r223931343
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ##
 @@ -349,11 +346,13 @@ public void setNumBuffers(int numBuffers) throws 
IOException {
 
returnExcessMemorySegments();
 
-   // If there is a registered owner and we have still 
requested more buffers than our
-   // size, trigger a recycle via the owner.
-   if (owner != null && numberOfRequestedMemorySegments > 
currentPoolSize) {
-   
owner.releaseMemory(numberOfRequestedMemorySegments - currentPoolSize);
-   }
+   numReleased = numberOfRequestedMemorySegments - 
currentPoolSize;
+   }
+
+   // If there is a registered owner and we have still requested 
more buffers than our
+   // size, trigger a recycle via the owner.
+   if (owner != null && numReleased > 0) {
+   owner.releaseMemory(numReleased);
 
 Review comment:
   Yes, I also have not thought of any potential problems currently.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644421#comment-16644421
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223930910
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: pull-request-available
>
> Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223930910
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10491) Deadlock during spilling data in SpillableSubpartition

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644407#comment-16644407
 ] 

ASF GitHub Bot commented on FLINK-10491:


zhijiangW commented on a change in pull request #6809: [FLINK-10491][network] 
Pass BufferPoolOwner in the constructor of LocalBufferPool
URL: https://github.com/apache/flink/pull/6809#discussion_r223929542
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ##
 @@ -209,8 +209,11 @@ public void setupPartition(ResultPartition partition) 
throws IOException {
int maxNumberOfMemorySegments = 
partition.getPartitionType().isBounded() ?
partition.getNumberOfSubpartitions() * 
networkBuffersPerChannel +
extraNetworkBuffersPerGate : 
Integer.MAX_VALUE;
+   // If the partition type is back pressure-free, we 
register with the buffer pool for
+   // callbacks to release memory.
bufferPool = 
networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions(),
-   maxNumberOfMemorySegments);
+   maxNumberOfMemorySegments, 
partition.getPartitionType().hasBackPressure() ? partition : null);
 
 Review comment:
   That is my careless, and find previous missing tests as a result. :)
   I will add related test to cover it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Deadlock during spilling data in SpillableSubpartition 
> ---
>
> Key: FLINK-10491
> URL: https://issues.apache.org/jira/browse/FLINK-10491
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.6.1, 1.5.4
>Reporter: Piotr Nowojski
>Assignee: zhijiang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Originally reported here: 
> [https://lists.apache.org/thread.html/472c8f4a2711c5e217fadd9a88f8c73670218e7432bb81ba3f5076db@%3Cuser.flink.apache.org%3E]
> Thread dump (from 1.5.3 version) showing two deadlocked threads, because they 
> are taking two locks in different order:
> {noformat}
> Thread-1
> "DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA 
> waiting for monitor entry
> waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a 
> java.util.ArrayDeque)
> at 
> org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223)
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355)
> - locked <0x2dfd> (a java.util.ArrayDeque)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203)
> - locked <0x2da5> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)
> at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
> at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
> at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
> at 
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(T

[GitHub] zhijiangW commented on a change in pull request #6809: [FLINK-10491][network] Pass BufferPoolOwner in the constructor of LocalBufferPool

2018-10-09 Thread GitBox
zhijiangW commented on a change in pull request #6809: [FLINK-10491][network] 
Pass BufferPoolOwner in the constructor of LocalBufferPool
URL: https://github.com/apache/flink/pull/6809#discussion_r223929542
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ##
 @@ -209,8 +209,11 @@ public void setupPartition(ResultPartition partition) 
throws IOException {
int maxNumberOfMemorySegments = 
partition.getPartitionType().isBounded() ?
partition.getNumberOfSubpartitions() * 
networkBuffersPerChannel +
extraNetworkBuffersPerGate : 
Integer.MAX_VALUE;
+   // If the partition type is back pressure-free, we 
register with the buffer pool for
+   // callbacks to release memory.
bufferPool = 
networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions(),
-   maxNumberOfMemorySegments);
+   maxNumberOfMemorySegments, 
partition.getPartitionType().hasBackPressure() ? partition : null);
 
 Review comment:
   That is my careless, and find previous missing tests as a result. :)
   I will add related test to cover it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644406#comment-16644406
 ] 

ASF GitHub Bot commented on FLINK-10134:


XuQianJin-Stars commented on a change in pull request #6710: [FLINK-10134] 
UTF-16 support for TextInputFormat bug fixed
URL: https://github.com/apache/flink/pull/6710#discussion_r223929236
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
 ##
 @@ -601,41 +602,44 @@ public LocatableInputSplitAssigner 
getInputSplitAssigner(FileInputSplit[] splits
if (unsplittable) {
int splitNum = 0;
for (final FileStatus file : files) {
+   String bomCharsetName = getBomCharset(file);
 
 Review comment:
   Whether to judge the encoding type of a file or byte stream (no BOM), there 
may be the following scenario:
   The file type is encoded as UTF-8, but the user-specified encoding type is 
UTF-16 or UTF-32, which still causes incorrect garbled parsing. On the other 
hand, it is a nuisance to determine what type of encoding a file or byte stream 
(no BOM) is.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> UTF-16 support for TextInputFormat
> --
>
> Key: FLINK-10134
> URL: https://issues.apache.org/jira/browse/FLINK-10134
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.4.2
>Reporter: David Dreyfus
>Priority: Blocker
>  Labels: pull-request-available
>
> It does not appear that Flink supports a charset encoding of "UTF-16". It 
> particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) 
> to establish whether a UTF-16 file is UTF-16LE or UTF-16BE.
>  
> TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), 
> which sets TextInputFormat.charsetName and then modifies the previously set 
> delimiterString to construct the proper byte string encoding of the the 
> delimiter. This same charsetName is also used in TextInputFormat.readRecord() 
> to interpret the bytes read from the file.
>  
> There are two problems that this implementation would seem to have when using 
> UTF-16.
>  # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will 
> return a Big Endian byte sequence including the Byte Order Mark (BOM). The 
> actual text file will not contain a BOM at each line ending, so the delimiter 
> will never be read. Moreover, if the actual byte encoding of the file is 
> Little Endian, the bytes will be interpreted incorrectly.
>  # TextInputFormat.readRecord() will not see a BOM each time it decodes a 
> byte sequence with the String(bytes, offset, numBytes, charset) call. 
> Therefore, it will assume Big Endian, which may not always be correct. [1] 
> [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95]
>  
> While there are likely many solutions, I would think that all of them would 
> have to start by reading the BOM from the file when a Split is opened and 
> then using that BOM to modify the specified encoding to a BOM specific one 
> when the caller doesn't specify one, and to overwrite the caller's 
> specification if the BOM is in conflict with the caller's specification. That 
> is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, 
> Flink should rewrite the charsetName as UTF-16LE.
>  I hope this makes sense and that I haven't been testing incorrectly or 
> misreading the code.
>  
> I've verified the problem on version 1.4.2. I believe the problem exists on 
> all versions. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] XuQianJin-Stars commented on a change in pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed

2018-10-09 Thread GitBox
XuQianJin-Stars commented on a change in pull request #6710: [FLINK-10134] 
UTF-16 support for TextInputFormat bug fixed
URL: https://github.com/apache/flink/pull/6710#discussion_r223929236
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
 ##
 @@ -601,41 +602,44 @@ public LocatableInputSplitAssigner 
getInputSplitAssigner(FileInputSplit[] splits
if (unsplittable) {
int splitNum = 0;
for (final FileStatus file : files) {
+   String bomCharsetName = getBomCharset(file);
 
 Review comment:
   Whether to judge the encoding type of a file or byte stream (no BOM), there 
may be the following scenario:
   The file type is encoded as UTF-8, but the user-specified encoding type is 
UTF-16 or UTF-32, which still causes incorrect garbled parsing. On the other 
hand, it is a nuisance to determine what type of encoding a file or byte stream 
(no BOM) is.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10491) Deadlock during spilling data in SpillableSubpartition

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644401#comment-16644401
 ] 

ASF GitHub Bot commented on FLINK-10491:


zhijiangW commented on a change in pull request #6809: [FLINK-10491][network] 
Pass BufferPoolOwner in the constructor of LocalBufferPool
URL: https://github.com/apache/flink/pull/6809#discussion_r223929018
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ##
 @@ -349,11 +346,13 @@ public void setNumBuffers(int numBuffers) throws 
IOException {
 
returnExcessMemorySegments();
 
-   // If there is a registered owner and we have still 
requested more buffers than our
-   // size, trigger a recycle via the owner.
-   if (owner != null && numberOfRequestedMemorySegments > 
currentPoolSize) {
-   
owner.releaseMemory(numberOfRequestedMemorySegments - currentPoolSize);
-   }
+   numReleased = numberOfRequestedMemorySegments - 
currentPoolSize;
 
 Review comment:
   Yes, I will adjust this variable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Deadlock during spilling data in SpillableSubpartition 
> ---
>
> Key: FLINK-10491
> URL: https://issues.apache.org/jira/browse/FLINK-10491
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.6.1, 1.5.4
>Reporter: Piotr Nowojski
>Assignee: zhijiang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Originally reported here: 
> [https://lists.apache.org/thread.html/472c8f4a2711c5e217fadd9a88f8c73670218e7432bb81ba3f5076db@%3Cuser.flink.apache.org%3E]
> Thread dump (from 1.5.3 version) showing two deadlocked threads, because they 
> are taking two locks in different order:
> {noformat}
> Thread-1
> "DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA 
> waiting for monitor entry
> waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a 
> java.util.ArrayDeque)
> at 
> org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223)
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355)
> - locked <0x2dfd> (a java.util.ArrayDeque)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203)
> - locked <0x2da5> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)
> at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
> at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
> at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
> at 
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:745)
> Thread-2
> "Map (Key Extractor) (1/10)@9967" prio=5 tid=0xaab nid=NA waiting for monitor 
> entry
> java.lang.Thread.State: BLOCKED
> blocks DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002
> waiting for DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 to re

[jira] [Commented] (FLINK-10491) Deadlock during spilling data in SpillableSubpartition

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644403#comment-16644403
 ] 

ASF GitHub Bot commented on FLINK-10491:


zhijiangW commented on a change in pull request #6809: [FLINK-10491][network] 
Pass BufferPoolOwner in the constructor of LocalBufferPool
URL: https://github.com/apache/flink/pull/6809#discussion_r223929096
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ##
 @@ -86,7 +84,7 @@
 
private boolean isDestroyed;
 
-   private BufferPoolOwner owner;
+   private final BufferPoolOwner owner;
 
 Review comment:
   Yes, the optional way is better, and I will take the first option as you 
suggested.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Deadlock during spilling data in SpillableSubpartition 
> ---
>
> Key: FLINK-10491
> URL: https://issues.apache.org/jira/browse/FLINK-10491
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.6.1, 1.5.4
>Reporter: Piotr Nowojski
>Assignee: zhijiang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Originally reported here: 
> [https://lists.apache.org/thread.html/472c8f4a2711c5e217fadd9a88f8c73670218e7432bb81ba3f5076db@%3Cuser.flink.apache.org%3E]
> Thread dump (from 1.5.3 version) showing two deadlocked threads, because they 
> are taking two locks in different order:
> {noformat}
> Thread-1
> "DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA 
> waiting for monitor entry
> waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a 
> java.util.ArrayDeque)
> at 
> org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223)
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355)
> - locked <0x2dfd> (a java.util.ArrayDeque)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203)
> - locked <0x2da5> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)
> at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
> at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
> at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
> at 
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:745)
> Thread-2
> "Map (Key Extractor) (1/10)@9967" prio=5 tid=0xaab nid=NA waiting for monitor 
> entry
> java.lang.Thread.State: BLOCKED
> blocks DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002
> waiting for DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 to release 
> lock on <0x2dfd> (a java.util.ArrayDeque)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:261)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:171)
> at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:106)
> at 
> org.apache.flink.runtime.io.network.buffer.Netwo

[GitHub] zhijiangW commented on a change in pull request #6809: [FLINK-10491][network] Pass BufferPoolOwner in the constructor of LocalBufferPool

2018-10-09 Thread GitBox
zhijiangW commented on a change in pull request #6809: [FLINK-10491][network] 
Pass BufferPoolOwner in the constructor of LocalBufferPool
URL: https://github.com/apache/flink/pull/6809#discussion_r223929096
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ##
 @@ -86,7 +84,7 @@
 
private boolean isDestroyed;
 
-   private BufferPoolOwner owner;
+   private final BufferPoolOwner owner;
 
 Review comment:
   Yes, the optional way is better, and I will take the first option as you 
suggested.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhijiangW commented on a change in pull request #6809: [FLINK-10491][network] Pass BufferPoolOwner in the constructor of LocalBufferPool

2018-10-09 Thread GitBox
zhijiangW commented on a change in pull request #6809: [FLINK-10491][network] 
Pass BufferPoolOwner in the constructor of LocalBufferPool
URL: https://github.com/apache/flink/pull/6809#discussion_r223929018
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ##
 @@ -349,11 +346,13 @@ public void setNumBuffers(int numBuffers) throws 
IOException {
 
returnExcessMemorySegments();
 
-   // If there is a registered owner and we have still 
requested more buffers than our
-   // size, trigger a recycle via the owner.
-   if (owner != null && numberOfRequestedMemorySegments > 
currentPoolSize) {
-   
owner.releaseMemory(numberOfRequestedMemorySegments - currentPoolSize);
-   }
+   numReleased = numberOfRequestedMemorySegments - 
currentPoolSize;
 
 Review comment:
   Yes, I will adjust this variable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10474) Don't translate IN with Literals to JOIN with VALUES for streaming queries

2018-10-09 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644347#comment-16644347
 ] 

Hequn Cheng commented on FLINK-10474:
-

Hi all,
Thanks for your comments. I think there are two things here:
1. Don't translate IN with Literals to JOIN
2. Convert a cascade of predicates to a HashSet way. 

For the first problem, I increase the threshold to avoid translating IN to 
JOIN. For the second problem, I added a rule to convert the predicates into an 
IN. The first problem can also benefit from the solution of the second problem.
I think it is a good choice to use rule to optimize our query during 
optimization. It is quite similar to the Calc rules. There are tons of Calc 
rules, but i think it would be fine as long as there are orthogonal to each 
other. 

> Don't translate IN with Literals to JOIN with VALUES for streaming queries
> --
>
> Key: FLINK-10474
> URL: https://issues.apache.org/jira/browse/FLINK-10474
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.6.1, 1.7.0
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> IN predicates with literals are translated to JOIN with VALUES if the number 
> of elements in the IN clause exceeds a certain threshold. This should not be 
> done, because a streaming join is very heavy and materializes both inputs 
> (which is fine for the VALUES) input but not for the other.
> There are two ways to solve this:
>  # don't translate IN to a JOIN at all
>  # translate it to a JOIN but have a special join strategy if one input is 
> bound and final (non-updating)
> Option 1. should be easy to do, option 2. requires much more effort.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9990) Add regexp_extract supported in TableAPI and SQL

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644337#comment-16644337
 ] 

ASF GitHub Bot commented on FLINK-9990:
---

yanghua commented on issue #6448: [FLINK-9990] [table] Add regexp_extract 
supported in TableAPI and SQL
URL: https://github.com/apache/flink/pull/6448#issuecomment-428410089
 
 
   @xccui Thank you very much for your time and effort. I also have a PR #6432  
about `ASCII/CHR`. When you have time, can you help with the review?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add regexp_extract supported in TableAPI and SQL
> 
>
> Key: FLINK-9990
> URL: https://issues.apache.org/jira/browse/FLINK-9990
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> regex_extract is a very useful function, it returns a string based on a regex 
> pattern and a index.
> For example : 
> {code:java}
> regexp_extract('foothebar', 'foo(.*?)(bar)', 2) // returns 'bar.'
> {code}
> It is provided as a UDF in Hive, more details please see[1].
> [1]: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6448: [FLINK-9990] [table] Add regexp_extract supported in TableAPI and SQL

2018-10-09 Thread GitBox
yanghua commented on issue #6448: [FLINK-9990] [table] Add regexp_extract 
supported in TableAPI and SQL
URL: https://github.com/apache/flink/pull/6448#issuecomment-428410089
 
 
   @xccui Thank you very much for your time and effort. I also have a PR #6432  
about `ASCII/CHR`. When you have time, can you help with the review?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644320#comment-16644320
 ] 

ASF GitHub Bot commented on FLINK-10423:


sjwiesman commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r223912317
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
 ##
 @@ -250,7 +251,10 @@ public void testCorrectMergeOperatorSet() throws 
IOException {
enableIncrementalCheckpointing,
TestLocalRecoveryConfig.disabled(),
RocksDBStateBackend.PriorityQueueStateType.HEAP,
-   TtlTimeProvider.DEFAULT);
+   TtlTimeProvider.DEFAULT,
+   new RocksDBNativeMetricOptions(),
+   Optional.empty()
 
 Review comment:
   It is optional because I needed a default value for overriding the previous 
version of `StateBackend#createKeyedStateBackend`[1]. I was not aware of 
`UnregisteredOperatorMetricGroup` but that would make a better fallback, will 
update. 
   
   [1] 
https://github.com/apache/flink/blob/8859febcb3cbb592d7a80d5609f9b26925dc2d45/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java#L165


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Forward RocksDB native metrics to Flink metrics reporter 
> -
>
> Key: FLINK-10423
> URL: https://issues.apache.org/jira/browse/FLINK-10423
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics, State Backends, Checkpointing
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>  Labels: pull-request-available
>
> RocksDB contains a number of metrics at the column family level about current 
> memory usage, open memtables,  etc that would be useful to users wishing 
> greater insight what rocksdb is doing. This work is inspired heavily by the 
> comments on this rocksdb issue thread 
> (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] sjwiesman commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-09 Thread GitBox
sjwiesman commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r223912317
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
 ##
 @@ -250,7 +251,10 @@ public void testCorrectMergeOperatorSet() throws 
IOException {
enableIncrementalCheckpointing,
TestLocalRecoveryConfig.disabled(),
RocksDBStateBackend.PriorityQueueStateType.HEAP,
-   TtlTimeProvider.DEFAULT);
+   TtlTimeProvider.DEFAULT,
+   new RocksDBNativeMetricOptions(),
+   Optional.empty()
 
 Review comment:
   It is optional because I needed a default value for overriding the previous 
version of `StateBackend#createKeyedStateBackend`[1]. I was not aware of 
`UnregisteredOperatorMetricGroup` but that would make a better fallback, will 
update. 
   
   [1] 
https://github.com/apache/flink/blob/8859febcb3cbb592d7a80d5609f9b26925dc2d45/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java#L165


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-10520) Job save points REST API fails unless parameters are specified

2018-10-09 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10520:
--

 Summary: Job save points REST API fails unless parameters are 
specified
 Key: FLINK-10520
 URL: https://issues.apache.org/jira/browse/FLINK-10520
 Project: Flink
  Issue Type: Bug
  Components: REST
Affects Versions: 1.6.1
Reporter: Elias Levy


The new REST API POST endpoint, {{/jobs/:jobid/savepoints}}, returns an error 
unless the request includes a body with all parameters ({{target-directory}} 
and {{cancel-job}})), even thought the 
[documentation|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.html]
 suggests these are optional.

If a POST request with no data is made, the response is a 400 status code with 
the error message "Bad request received."

If the POST request submits an empty JSON object ( {} ), the response is a 400 
status code with the error message "Request did not match expected format 
SavepointTriggerRequestBody."  The same is true if only the 
{{target-directory}} or {{cancel-job}} parameters are included.

As the system is configured with a default savepoint location, there shouldn't 
be a need to include the parameter in the quest.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10516) YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup

2018-10-09 Thread Shuyi Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644150#comment-16644150
 ] 

Shuyi Chen commented on FLINK-10516:


Hi [~till.rohrmann], how do you want to proceed? I can either send a PR to fix 
it at current master, and then we cherry pick for release 1.5.5 and 1.6.2, or 
do you have other suggestions? Thanks a lot.

> YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink 
> Configuration during setup
> ---
>
> Key: FLINK-10516
> URL: https://issues.apache.org/jira/browse/FLINK-10516
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.4.0, 1.5.0, 1.6.0, 1.7.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Will add a fix, and refactor YarnApplicationMasterRunner to add a unittest to 
> prevent future regression.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644138#comment-16644138
 ] 

ASF GitHub Bot commented on FLINK-10423:


zentol commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r223872647
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricTest.java
 ##
 @@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.ResourceGuard;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests that native metrics can be pulled from RocksDB.
+ */
+public class RocksDBNativeMetricTest {
+
+   private static final String COLUMN_FAMILY_NAME = "test-cf";
+
+   private static final String PROPERTY = "property";
+
+   @Rule public RocksDBResource rocksDBResource = new RocksDBResource();
+
+   @Test
+   public void testMessageCollector() throws IOException {
+   ConcurrentLinkedQueue> 
messageQueue = new ConcurrentLinkedQueue<>();
+
+   RocksDBNativeMetricMonitor.RocksDBNativeMetricTask task = new 
RocksDBNativeMetricMonitor.RocksDBNativeMetricTask(
+   mock(RocksDB.class),
+   new ResourceGuard(),
+   new ArrayList<>(),
+   mock(MetricGroup.class),
+   messageQueue
+   );
+
+   messageQueue.add(Tuple2.of("", mock(ColumnFamilyHandle.class)));
+   task.collectNewMessages();
+
+   Assert.assertEquals("failed to removed pending messages from 
queue", 0, messageQueue.size());
+   Assert.assertEquals("failed to add message to kv state", 1, 
task.numberOfColumnFamilies());
+   }
+
+   @Test
+   public void testGaugeRegistration() throws IOException {
+   MetricGroup metricGroup = mock(MetricGroup.class);
+   MetricGroup innerGroup = mock(MetricGroup.class);
 
 Review comment:
   there are various utility classes to not require mocking for metric group 
related stuff. `UnregisteredMetricGroups` can be used to create safe default 
implementations where specific methods can be overridden.
   You can pass a custom MetricRegistry to check for specific metrics instead.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Forward RocksDB native metrics to Flink metrics reporter 
> -
>
> Key: FLINK-10423
> URL: https://issues.apache.org/jira/browse/FLINK-10423
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics, State Backends, Checkpointing
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>  Labels: pull-request-available
>
> RocksDB contains a number of metrics at the column family level about current 
> memory usage, open memtables,  etc that would be useful to users wishing 
> greater insight what rocksdb

[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644136#comment-16644136
 ] 

ASF GitHub Bot commented on FLINK-10423:


zentol commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r223873851
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricOptions.java
 ##
 @@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Enable which RocksDB metrics to forward to Flink's metrics reporter.
+ * All metrics report at the column family level and return unsigned long 
values.
+ *
+ * Properties and doc comments are taken from RocksDB documentation. See
+ * https://github.com/facebook/rocksdb/blob/64324e329eb0a9b4e77241a425a1615ff524c7f1/include/rocksdb/db.h#L429";>
+ * db.h for more information.
+ */
+public class RocksDBNativeMetricOptions {
+
+   private static final long TEN_SECONDS = 10 * 1000;
+
+   private Set properties;
+
+   private long frequency = TEN_SECONDS;
+
+   public RocksDBNativeMetricOptions() {
+   this.properties = new HashSet<>();
+   }
+
+   /**
+* Returns number of immutable memtables that have not yet been flushed.
+*/
+   public void enableNumImmutableMemTable() {
+   this.properties.add("rocksdb.num-immutable-mem-table");
 
 Review comment:
   all of this must be configurable via the `flink-conf.yaml` configuration.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Forward RocksDB native metrics to Flink metrics reporter 
> -
>
> Key: FLINK-10423
> URL: https://issues.apache.org/jira/browse/FLINK-10423
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics, State Backends, Checkpointing
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>  Labels: pull-request-available
>
> RocksDB contains a number of metrics at the column family level about current 
> memory usage, open memtables,  etc that would be useful to users wishing 
> greater insight what rocksdb is doing. This work is inspired heavily by the 
> comments on this rocksdb issue thread 
> (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644137#comment-16644137
 ] 

ASF GitHub Bot commented on FLINK-10423:


zentol commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r223872847
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
 ##
 @@ -250,7 +251,10 @@ public void testCorrectMergeOperatorSet() throws 
IOException {
enableIncrementalCheckpointing,
TestLocalRecoveryConfig.disabled(),
RocksDBStateBackend.PriorityQueueStateType.HEAP,
-   TtlTimeProvider.DEFAULT);
+   TtlTimeProvider.DEFAULT,
+   new RocksDBNativeMetricOptions(),
+   Optional.empty()
 
 Review comment:
   If this argument an optional purely to simplify tests? If so, please just 
use an `UnregisteredOperatorMetricGroup` instead.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Forward RocksDB native metrics to Flink metrics reporter 
> -
>
> Key: FLINK-10423
> URL: https://issues.apache.org/jira/browse/FLINK-10423
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics, State Backends, Checkpointing
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>  Labels: pull-request-available
>
> RocksDB contains a number of metrics at the column family level about current 
> memory usage, open memtables,  etc that would be useful to users wishing 
> greater insight what rocksdb is doing. This work is inspired heavily by the 
> comments on this rocksdb issue thread 
> (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644139#comment-16644139
 ] 

ASF GitHub Bot commented on FLINK-10423:


zentol commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r223873328
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * A monitor which pull {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+
+   private final TimerTask task;
+
+   private final Timer timer;
+
+   private final ConcurrentLinkedQueue> 
messageQueue;
+
+   RocksDBNativeMetricMonitor(
 
 Review comment:
   metric polling can be implemented using a `View`, which is periodically 
called to update the current value. This should simplify this class quite a bit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Forward RocksDB native metrics to Flink metrics reporter 
> -
>
> Key: FLINK-10423
> URL: https://issues.apache.org/jira/browse/FLINK-10423
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics, State Backends, Checkpointing
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>  Labels: pull-request-available
>
> RocksDB contains a number of metrics at the column family level about current 
> memory usage, open memtables,  etc that would be useful to users wishing 
> greater insight what rocksdb is doing. This work is inspired heavily by the 
> comments on this rocksdb issue thread 
> (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-09 Thread GitBox
zentol commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r223873328
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * A monitor which pull {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+
+   private final TimerTask task;
+
+   private final Timer timer;
+
+   private final ConcurrentLinkedQueue> 
messageQueue;
+
+   RocksDBNativeMetricMonitor(
 
 Review comment:
   metric polling can be implemented using a `View`, which is periodically 
called to update the current value. This should simplify this class quite a bit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-09 Thread GitBox
zentol commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r223872847
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
 ##
 @@ -250,7 +251,10 @@ public void testCorrectMergeOperatorSet() throws 
IOException {
enableIncrementalCheckpointing,
TestLocalRecoveryConfig.disabled(),
RocksDBStateBackend.PriorityQueueStateType.HEAP,
-   TtlTimeProvider.DEFAULT);
+   TtlTimeProvider.DEFAULT,
+   new RocksDBNativeMetricOptions(),
+   Optional.empty()
 
 Review comment:
   If this argument an optional purely to simplify tests? If so, please just 
use an `UnregisteredOperatorMetricGroup` instead.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-09 Thread GitBox
zentol commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r223873851
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricOptions.java
 ##
 @@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Enable which RocksDB metrics to forward to Flink's metrics reporter.
+ * All metrics report at the column family level and return unsigned long 
values.
+ *
+ * Properties and doc comments are taken from RocksDB documentation. See
+ * https://github.com/facebook/rocksdb/blob/64324e329eb0a9b4e77241a425a1615ff524c7f1/include/rocksdb/db.h#L429";>
+ * db.h for more information.
+ */
+public class RocksDBNativeMetricOptions {
+
+   private static final long TEN_SECONDS = 10 * 1000;
+
+   private Set properties;
+
+   private long frequency = TEN_SECONDS;
+
+   public RocksDBNativeMetricOptions() {
+   this.properties = new HashSet<>();
+   }
+
+   /**
+* Returns number of immutable memtables that have not yet been flushed.
+*/
+   public void enableNumImmutableMemTable() {
+   this.properties.add("rocksdb.num-immutable-mem-table");
 
 Review comment:
   all of this must be configurable via the `flink-conf.yaml` configuration.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-09 Thread GitBox
zentol commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r223872647
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricTest.java
 ##
 @@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.ResourceGuard;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests that native metrics can be pulled from RocksDB.
+ */
+public class RocksDBNativeMetricTest {
+
+   private static final String COLUMN_FAMILY_NAME = "test-cf";
+
+   private static final String PROPERTY = "property";
+
+   @Rule public RocksDBResource rocksDBResource = new RocksDBResource();
+
+   @Test
+   public void testMessageCollector() throws IOException {
+   ConcurrentLinkedQueue> 
messageQueue = new ConcurrentLinkedQueue<>();
+
+   RocksDBNativeMetricMonitor.RocksDBNativeMetricTask task = new 
RocksDBNativeMetricMonitor.RocksDBNativeMetricTask(
+   mock(RocksDB.class),
+   new ResourceGuard(),
+   new ArrayList<>(),
+   mock(MetricGroup.class),
+   messageQueue
+   );
+
+   messageQueue.add(Tuple2.of("", mock(ColumnFamilyHandle.class)));
+   task.collectNewMessages();
+
+   Assert.assertEquals("failed to removed pending messages from 
queue", 0, messageQueue.size());
+   Assert.assertEquals("failed to add message to kv state", 1, 
task.numberOfColumnFamilies());
+   }
+
+   @Test
+   public void testGaugeRegistration() throws IOException {
+   MetricGroup metricGroup = mock(MetricGroup.class);
+   MetricGroup innerGroup = mock(MetricGroup.class);
 
 Review comment:
   there are various utility classes to not require mocking for metric group 
related stuff. `UnregisteredMetricGroups` can be used to create safe default 
implementations where specific methods can be overridden.
   You can pass a custom MetricRegistry to check for specific metrics instead.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9990) Add regexp_extract supported in TableAPI and SQL

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644127#comment-16644127
 ] 

ASF GitHub Bot commented on FLINK-9990:
---

asfgit closed pull request #6448: [FLINK-9990] [table] Add regexp_extract 
supported in TableAPI and SQL
URL: https://github.com/apache/flink/pull/6448
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md
index f888d790deb..6491c81b32e 100644
--- a/docs/dev/table/functions.md
+++ b/docs/dev/table/functions.md
@@ -2496,6 +2496,19 @@ REPLACE(string1, string2, string3)
   
 
 
+
+  
+{% highlight text %}
+REGEXP_EXTRACT(string1, string2[, integer])
+{% endhighlight %}
+  
+  
+Returns a string from string1 which extracted with a 
specified regular expression string2 and a regex match group index 
integer. 
+Note: The regex match group index starts from 1 and 0 means 
matching the whole regex. In addition, The regex match group index should not 
exceed the number of the defined groups. 
+E.g. REGEXP_EXTRACT('foothebar', 'foo(.*?)(bar)', 2)" 
returns "bar".
+  
+
+
 
   
 {% highlight text %}
@@ -2748,6 +2761,19 @@ STRING1.replace(STRING2, STRING3)
   
 
 
+
+  
+{% highlight java %}
+STRING1.regexpExtract(STRING2[, INTEGER1])
+{% endhighlight %}
+  
+  
+Returns a string from STRING1 which extracted with a 
specified regular expression STRING2 and a regex match group index 
INTEGER1.
+Note: The regex match group index starts from 1 and 0 means 
matching the whole regex. In addition, The regex match group index should not 
exceed the number of the defined groups. 
+E.g. 'foothebar'.regexpExtract('foo(.*?)(bar)', 2)" 
returns "bar".
+  
+
+
 
   
 {% highlight java %}
@@ -2999,6 +3025,19 @@ STRING1.replace(STRING2, STRING3)
   
 
 
+
+  
+{% highlight scala %}
+STRING1.regexpExtract(STRING2[, INTEGER1])
+{% endhighlight %}
+  
+  
+Returns a string from STRING1 which extracted with a 
specified regular expression STRING2 and a regex match group index 
INTEGER1.
+Note: The regex match group index starts from 1 and 0 means 
matching the whole regex. In addition, The regex match group index should not 
exceed the number of the defined groups.
+E.g. "foothebar".regexpExtract("foo(.*?)(bar)", 2)" 
returns "bar".
+  
+
+
 
   
 {% highlight scala %}
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index e638051a2bd..e1947c37acf 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -584,6 +584,18 @@ trait ImplicitExpressionOperations {
   def regexpReplace(regex: Expression, replacement: Expression) =
 RegexpReplace(expr, regex, replacement)
 
+  /**
+* Returns a string extracted with a specified regular expression and a 
regex match group index.
+*/
+  def regexpExtract(regex: Expression, extractIndex: Expression) =
+RegexpExtract(expr, regex, extractIndex)
+
+  /**
+* Returns a string extracted with a specified regular expression.
+*/
+  def regexpExtract(regex: Expression) =
+RegexpExtract(expr, regex, null)
+
   /**
 * Returns the base string decoded with base64.
 */
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
index 34be86e0400..7781b57825e 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
@@ -143,6 +143,19 @@ object BuiltInMethods {
 classOf[String],
 classOf[String])
 
+  val REGEXP_EXTRACT = Types.lookupMethod(
+classOf[ScalarFunctions],
+"regexp_extract",
+classOf[String],
+classOf[String],
+classOf[Integer])
+
+  val REGEXP_EXTRACT_WITHOUT_INDEX = Types.lookupMethod(
+classOf[ScalarFunctions],
+"regexp_extract",
+classOf[String],
+classOf[String])
+
   val FROMBASE64 = Types.lookupMethod(classOf[ScalarFunctions], "fromBase64", 
classOf[String])
 
   val TOBASE64 = Types.lookupMethod(classOf[Scala

[GitHub] asfgit closed pull request #6448: [FLINK-9990] [table] Add regexp_extract supported in TableAPI and SQL

2018-10-09 Thread GitBox
asfgit closed pull request #6448: [FLINK-9990] [table] Add regexp_extract 
supported in TableAPI and SQL
URL: https://github.com/apache/flink/pull/6448
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md
index f888d790deb..6491c81b32e 100644
--- a/docs/dev/table/functions.md
+++ b/docs/dev/table/functions.md
@@ -2496,6 +2496,19 @@ REPLACE(string1, string2, string3)
   
 
 
+
+  
+{% highlight text %}
+REGEXP_EXTRACT(string1, string2[, integer])
+{% endhighlight %}
+  
+  
+Returns a string from string1 which extracted with a 
specified regular expression string2 and a regex match group index 
integer. 
+Note: The regex match group index starts from 1 and 0 means 
matching the whole regex. In addition, The regex match group index should not 
exceed the number of the defined groups. 
+E.g. REGEXP_EXTRACT('foothebar', 'foo(.*?)(bar)', 2)" 
returns "bar".
+  
+
+
 
   
 {% highlight text %}
@@ -2748,6 +2761,19 @@ STRING1.replace(STRING2, STRING3)
   
 
 
+
+  
+{% highlight java %}
+STRING1.regexpExtract(STRING2[, INTEGER1])
+{% endhighlight %}
+  
+  
+Returns a string from STRING1 which extracted with a 
specified regular expression STRING2 and a regex match group index 
INTEGER1.
+Note: The regex match group index starts from 1 and 0 means 
matching the whole regex. In addition, The regex match group index should not 
exceed the number of the defined groups. 
+E.g. 'foothebar'.regexpExtract('foo(.*?)(bar)', 2)" 
returns "bar".
+  
+
+
 
   
 {% highlight java %}
@@ -2999,6 +3025,19 @@ STRING1.replace(STRING2, STRING3)
   
 
 
+
+  
+{% highlight scala %}
+STRING1.regexpExtract(STRING2[, INTEGER1])
+{% endhighlight %}
+  
+  
+Returns a string from STRING1 which extracted with a 
specified regular expression STRING2 and a regex match group index 
INTEGER1.
+Note: The regex match group index starts from 1 and 0 means 
matching the whole regex. In addition, The regex match group index should not 
exceed the number of the defined groups.
+E.g. "foothebar".regexpExtract("foo(.*?)(bar)", 2)" 
returns "bar".
+  
+
+
 
   
 {% highlight scala %}
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index e638051a2bd..e1947c37acf 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -584,6 +584,18 @@ trait ImplicitExpressionOperations {
   def regexpReplace(regex: Expression, replacement: Expression) =
 RegexpReplace(expr, regex, replacement)
 
+  /**
+* Returns a string extracted with a specified regular expression and a 
regex match group index.
+*/
+  def regexpExtract(regex: Expression, extractIndex: Expression) =
+RegexpExtract(expr, regex, extractIndex)
+
+  /**
+* Returns a string extracted with a specified regular expression.
+*/
+  def regexpExtract(regex: Expression) =
+RegexpExtract(expr, regex, null)
+
   /**
 * Returns the base string decoded with base64.
 */
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
index 34be86e0400..7781b57825e 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
@@ -143,6 +143,19 @@ object BuiltInMethods {
 classOf[String],
 classOf[String])
 
+  val REGEXP_EXTRACT = Types.lookupMethod(
+classOf[ScalarFunctions],
+"regexp_extract",
+classOf[String],
+classOf[String],
+classOf[Integer])
+
+  val REGEXP_EXTRACT_WITHOUT_INDEX = Types.lookupMethod(
+classOf[ScalarFunctions],
+"regexp_extract",
+classOf[String],
+classOf[String])
+
   val FROMBASE64 = Types.lookupMethod(classOf[ScalarFunctions], "fromBase64", 
classOf[String])
 
   val TOBASE64 = Types.lookupMethod(classOf[ScalarFunctions], "toBase64", 
classOf[String])
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGene

[jira] [Comment Edited] (FLINK-10465) Jepsen: runit supervised sshd is stopped on tear down

2018-10-09 Thread Gary Yao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16643266#comment-16643266
 ] 

Gary Yao edited comment on FLINK-10465 at 10/9/18 7:46 PM:
---

fixed via

1.6: 
f3865aa6e61aaed27cfbb7ff89cd29d945981247
34430be7458d70be3916441698b8fc466899064b

1.7: 
02d1057794de9f4868886c08f1abeeb7fd31f9cf
d376f1e1f9880c18d137db414099ffb6833e07f



was (Author: gjy):
1.6: f3865aa6e61aaed27cfbb7ff89cd29d945981247
1.7: 02d1057794de9f4868886c08f1abeeb7fd31f9cf

 

> Jepsen: runit supervised sshd is stopped on tear down
> -
>
> Key: FLINK-10465
> URL: https://issues.apache.org/jira/browse/FLINK-10465
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.7.0, 1.6.2
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2
>
>
> When tearing down the _DB_, we tear down all services supervised by runit. 
> However when running the tests in Docker, sshd is under supervision by runit. 
> When sshd is stopped, the tests cannot be continued because the control node 
> cannot interact with the DB nodes anymore.
> *How to reproduce*
> Run command below in control-node container:
> {code}
> ./docker/run-tests.sh 1 
> [...]/flink/flink-1.6.1/flink-1.6.1-bin-hadoop28-scala_2.11.tgz
> {code}
> *Expected behavior*
> sshd should never be stopped



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10465) Jepsen: runit supervised sshd is stopped on tear down

2018-10-09 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao closed FLINK-10465.

Resolution: Fixed

> Jepsen: runit supervised sshd is stopped on tear down
> -
>
> Key: FLINK-10465
> URL: https://issues.apache.org/jira/browse/FLINK-10465
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.7.0, 1.6.2
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2
>
>
> When tearing down the _DB_, we tear down all services supervised by runit. 
> However when running the tests in Docker, sshd is under supervision by runit. 
> When sshd is stopped, the tests cannot be continued because the control node 
> cannot interact with the DB nodes anymore.
> *How to reproduce*
> Run command below in control-node container:
> {code}
> ./docker/run-tests.sh 1 
> [...]/flink/flink-1.6.1/flink-1.6.1-bin-hadoop28-scala_2.11.tgz
> {code}
> *Expected behavior*
> sshd should never be stopped



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-5542) YARN client incorrectly uses local YARN config to check vcore capacity

2018-10-09 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-5542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao closed FLINK-5542.
---
Resolution: Fixed

fixed via

1.5: 5fb67abd51aca277d1140c274001e89f4bd7cf9a
1.6: 5b3211c6f02cf935308e3ff9ebf9cd1deff24d73
1.7: e959e6d0cd42f0c5b21c0f03ce547f2025ac58d5




> YARN client incorrectly uses local YARN config to check vcore capacity
> --
>
> Key: FLINK-5542
> URL: https://issues.apache.org/jira/browse/FLINK-5542
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.1.4, 1.5.3, 1.6.0, 1.7.0
>Reporter: Shannon Carey
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> See 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/1-1-4-on-YARN-vcores-change-td11016.html
> When using bin/yarn-session.sh, AbstractYarnClusterDescriptor line 271 in 
> 1.1.4 is comparing the user's selected number of vcores to the vcores 
> configured in the local node's YARN config (from YarnConfiguration eg. 
> yarn-site.xml and yarn-default.xml). It incorrectly prevents Flink from 
> launching even if there is sufficient vcore capacity on the cluster.
> That is not correct, because the application will not necessarily run on the 
> local node. For example, if running the yarn-session.sh client from the AWS 
> EMR master node, the vcore count there may be different from the vcore count 
> on the core nodes where Flink will actually run.
> A reasonable way to fix this would probably be to reuse the logic from 
> "yarn-session.sh -q" (FlinkYarnSessionCli line 550) which knows how to get 
> vcore information from the real worker nodes.  Alternatively, perhaps we 
> could remove the check entirely and rely on YARN's Scheduler to determine 
> whether sufficient resources exist.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10519) flink-parent:1.6.1 artifact can't be found on maven central

2018-10-09 Thread Florian Schmidt (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Florian Schmidt updated FLINK-10519:

Description: 
The flink-parent:1.6.1 artifact can't be found on maven central:

*Stacktrace from maven*
{code:java}
...
Caused by: org.eclipse.aether.transfer.ArtifactNotFoundException: Could not 
find artifact org.apache.flink:flink-parent:pom:1.6.1 in central 
(https://repo.maven.apache.org/maven2)
...
{code}
 

Also when browsing the repository in the browser 
([https://repo.maven.apache.org/maven2/org/apache/flink/flink-parent/1.6.1/]) 
it will show the flink-parent artifact in the list, but return 404 when trying 
to download it. This does only seem to happen from some networks, as I was able 
to successfully run the following on a server that I ssh'd into, but not on my 
local device
{code:java}
curl 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-parent/1.6.1/flink-parent-1.6.1.pom{code}

The artifact can't be found locally, where repo.maven.apache.org resolves to
{code}
> host repo.maven.apache.org
repo.maven.apache.org is an alias for repo.apache.maven.org.
repo.apache.maven.org is an alias for maven.map.fastly.net.
maven.map.fastly.net has address 151.101.112.215
{code}

On my server repo.maven.apache.org resolves to 151.101.132.215 where the 
artifact is present.

  was:
The flink-parent:1.6.1 artifact can't be found on maven central:

*Stacktrace from maven*
{code:java}
...
Caused by: org.eclipse.aether.transfer.ArtifactNotFoundException: Could not 
find artifact org.apache.flink:flink-parent:pom:1.6.1 in central 
(https://repo.maven.apache.org/maven2)
...
{code}
 

Also when browsing the repository in the browser 
([https://repo.maven.apache.org/maven2/org/apache/flink/flink-parent/1.6.1/]) 
it will show the flink-parent artifact in the list, but return 404 when trying 
to download it. This does only seem to happen from some networks, as I was able 
to successfully run the following on a server that I ssh'd into, but not on my 
local device
{code:java}
curl 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-parent/1.6.1/flink-parent-1.6.1.pom{code}


> flink-parent:1.6.1 artifact can't be found on maven central
> ---
>
> Key: FLINK-10519
> URL: https://issues.apache.org/jira/browse/FLINK-10519
> Project: Flink
>  Issue Type: Bug
>Reporter: Florian Schmidt
>Priority: Critical
>
> The flink-parent:1.6.1 artifact can't be found on maven central:
> *Stacktrace from maven*
> {code:java}
> ...
> Caused by: org.eclipse.aether.transfer.ArtifactNotFoundException: Could not 
> find artifact org.apache.flink:flink-parent:pom:1.6.1 in central 
> (https://repo.maven.apache.org/maven2)
> ...
> {code}
>  
> Also when browsing the repository in the browser 
> ([https://repo.maven.apache.org/maven2/org/apache/flink/flink-parent/1.6.1/]) 
> it will show the flink-parent artifact in the list, but return 404 when 
> trying to download it. This does only seem to happen from some networks, as I 
> was able to successfully run the following on a server that I ssh'd into, but 
> not on my local device
> {code:java}
> curl 
> https://repo.maven.apache.org/maven2/org/apache/flink/flink-parent/1.6.1/flink-parent-1.6.1.pom{code}
> The artifact can't be found locally, where repo.maven.apache.org resolves to
> {code}
> > host repo.maven.apache.org
> repo.maven.apache.org is an alias for repo.apache.maven.org.
> repo.apache.maven.org is an alias for maven.map.fastly.net.
> maven.map.fastly.net has address 151.101.112.215
> {code}
> On my server repo.maven.apache.org resolves to 151.101.132.215 where the 
> artifact is present.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-5542) YARN client incorrectly uses local YARN config to check vcore capacity

2018-10-09 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-5542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-5542:

Fix Version/s: 1.5.5
   1.6.2

> YARN client incorrectly uses local YARN config to check vcore capacity
> --
>
> Key: FLINK-5542
> URL: https://issues.apache.org/jira/browse/FLINK-5542
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.1.4, 1.5.3, 1.6.0, 1.7.0
>Reporter: Shannon Carey
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> See 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/1-1-4-on-YARN-vcores-change-td11016.html
> When using bin/yarn-session.sh, AbstractYarnClusterDescriptor line 271 in 
> 1.1.4 is comparing the user's selected number of vcores to the vcores 
> configured in the local node's YARN config (from YarnConfiguration eg. 
> yarn-site.xml and yarn-default.xml). It incorrectly prevents Flink from 
> launching even if there is sufficient vcore capacity on the cluster.
> That is not correct, because the application will not necessarily run on the 
> local node. For example, if running the yarn-session.sh client from the AWS 
> EMR master node, the vcore count there may be different from the vcore count 
> on the core nodes where Flink will actually run.
> A reasonable way to fix this would probably be to reuse the logic from 
> "yarn-session.sh -q" (FlinkYarnSessionCli line 550) which knows how to get 
> vcore information from the real worker nodes.  Alternatively, perhaps we 
> could remove the check entirely and rely on YARN's Scheduler to determine 
> whether sufficient resources exist.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5542) YARN client incorrectly uses local YARN config to check vcore capacity

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16643990#comment-16643990
 ] 

ASF GitHub Bot commented on FLINK-5542:
---

asfgit closed pull request #6775: [FLINK-5542] use YarnCluster vcores setting 
to do MaxVCore validation
URL: https://github.com/apache/flink/pull/6775
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index c3ad9f7f42c..c161e227577 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -282,18 +282,27 @@ private void isReadyForDeployment(ClusterSpecification 
clusterSpecification) thr
}
 
// Check if we don't exceed YARN's maximum virtual cores.
-   // The number of cores can be configured in the config.
-   // If not configured, it is set to the number of task slots
-   int numYarnVcores = 
yarnConfiguration.getInt(YarnConfiguration.NM_VCORES, 
YarnConfiguration.DEFAULT_NM_VCORES);
+   // Fetch numYarnMaxVcores from all the RUNNING nodes via 
yarnClient
+   final int numYarnMaxVcores;
+   try {
+   numYarnMaxVcores = 
yarnClient.getNodeReports(NodeState.RUNNING)
+   .stream()
+   .mapToInt(report -> 
report.getCapability().getVirtualCores())
+   .max()
+   .orElse(0);
+   } catch (Exception e) {
+   throw new YarnDeploymentException("Couldn't get cluster 
description, please check on the YarnConfiguration", e);
+   }
+
int configuredVcores = 
flinkConfiguration.getInteger(YarnConfigOptions.VCORES, 
clusterSpecification.getSlotsPerTaskManager());
// don't configure more than the maximum configured number of 
vcores
-   if (configuredVcores > numYarnVcores) {
+   if (configuredVcores > numYarnMaxVcores) {
throw new IllegalConfigurationException(
-   String.format("The number of virtual cores per 
node were configured with %d" +
-   " but Yarn only has %d virtual 
cores available. Please note that the number" +
-   " of virtual cores is set to 
the number of task slots by default unless configured" +
-   " in the Flink config with 
'%s.'",
-   configuredVcores, numYarnVcores, 
YarnConfigOptions.VCORES.key()));
+   String.format("The number of requested virtual 
cores per node %d" +
+   " exceeds the maximum number of 
virtual cores %d available in the Yarn Cluster." +
+   " Please note that the number 
of virtual cores is set to the number of task slots by default" +
+   " unless configured in the 
Flink config with '%s.'",
+   configuredVcores, numYarnMaxVcores, 
YarnConfigOptions.VCORES.key()));
}
 
// check if required Hadoop environment variables are set. If 
not, warn user


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> YARN client incorrectly uses local YARN config to check vcore capacity
> --
>
> Key: FLINK-5542
> URL: https://issues.apache.org/jira/browse/FLINK-5542
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.1.4, 1.5.3, 1.6.0, 1.7.0
>Reporter: Shannon Carey
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> See 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/1-1-4-on-YARN-vcores-change-td11016.html
> When using bin/yarn-session.sh, AbstractYarnClusterDescriptor line 271 in 
> 1.1.4 is comparing the user's selected number 

[GitHub] asfgit closed pull request #6775: [FLINK-5542] use YarnCluster vcores setting to do MaxVCore validation

2018-10-09 Thread GitBox
asfgit closed pull request #6775: [FLINK-5542] use YarnCluster vcores setting 
to do MaxVCore validation
URL: https://github.com/apache/flink/pull/6775
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index c3ad9f7f42c..c161e227577 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -282,18 +282,27 @@ private void isReadyForDeployment(ClusterSpecification 
clusterSpecification) thr
}
 
// Check if we don't exceed YARN's maximum virtual cores.
-   // The number of cores can be configured in the config.
-   // If not configured, it is set to the number of task slots
-   int numYarnVcores = 
yarnConfiguration.getInt(YarnConfiguration.NM_VCORES, 
YarnConfiguration.DEFAULT_NM_VCORES);
+   // Fetch numYarnMaxVcores from all the RUNNING nodes via 
yarnClient
+   final int numYarnMaxVcores;
+   try {
+   numYarnMaxVcores = 
yarnClient.getNodeReports(NodeState.RUNNING)
+   .stream()
+   .mapToInt(report -> 
report.getCapability().getVirtualCores())
+   .max()
+   .orElse(0);
+   } catch (Exception e) {
+   throw new YarnDeploymentException("Couldn't get cluster 
description, please check on the YarnConfiguration", e);
+   }
+
int configuredVcores = 
flinkConfiguration.getInteger(YarnConfigOptions.VCORES, 
clusterSpecification.getSlotsPerTaskManager());
// don't configure more than the maximum configured number of 
vcores
-   if (configuredVcores > numYarnVcores) {
+   if (configuredVcores > numYarnMaxVcores) {
throw new IllegalConfigurationException(
-   String.format("The number of virtual cores per 
node were configured with %d" +
-   " but Yarn only has %d virtual 
cores available. Please note that the number" +
-   " of virtual cores is set to 
the number of task slots by default unless configured" +
-   " in the Flink config with 
'%s.'",
-   configuredVcores, numYarnVcores, 
YarnConfigOptions.VCORES.key()));
+   String.format("The number of requested virtual 
cores per node %d" +
+   " exceeds the maximum number of 
virtual cores %d available in the Yarn Cluster." +
+   " Please note that the number 
of virtual cores is set to the number of task slots by default" +
+   " unless configured in the 
Flink config with '%s.'",
+   configuredVcores, numYarnMaxVcores, 
YarnConfigOptions.VCORES.key()));
}
 
// check if required Hadoop environment variables are set. If 
not, warn user


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-09 Thread Seth Wiesman (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16629155#comment-16629155
 ] 

Seth Wiesman edited comment on FLINK-10423 at 10/9/18 7:34 PM:
---

-I have a question about the best way to handle resource management of the 
RocksDB native handler.-

-The basic implementation uses a TimerTask to periodically pull metrics from 
RocksDB, this works fine until it is time to release the resource. The question 
I have is how to be a good citizen with the ResourceGuard. Manually calling 
cancel on the TimerTask before calling the blocking cancel on ResourceGuard 
seems brittle in the face of refactoring and it does not seem there is any way 
to access a CancellableRegistry from within RocksDBStateBackend.-

 
 -The simplest solution I've been able to come up with is to add a new method 
to ResourceGuard that effectively acts as a way to aquire a weak reference.-
{code:java}
public boolean runIfResourceAvailable(Runnable function) {
synchronized (lock) {
if (!closed) {
function.run();
}

return closed;
}
}
{code}
-Run atomically only if the resource is available and return if the function 
was run or not. When this method returns false the calling class knows the 
resource is being or has been closed and it is time to shut down. The concern I 
have here is the potential of putting to much work inside of the synchronized 
block.-

-I'm curious what you think is the best path forward.-

 

Nevermind
  


was (Author: sjwiesman):
[~srichter] I have a question about the best way to handle resource management 
of the RocksDB native handler.{color}{color}

The basic implementation uses a TimerTask to periodically pull metrics from 
RocksDB, this works fine until it is time to release the resource. The question 
I have is how to be a good citizen with the ResourceGuard. Manually calling 
cancel on the TimerTask before calling the blocking cancel on ResourceGuard 
seems brittle in the face of refactoring and it does not seem there is any way 
to access a CancellableRegistry from within RocksDBStateBackend. 

 
The simplest solution I've been able to come up with is to add a new method to 
ResourceGuard that effectively acts as a way to aquire a weak reference. 

{code:java}
public boolean runIfResourceAvailable(Runnable function) {
synchronized (lock) {
if (!closed) {
function.run();
}

return closed;
}
}
{code}

Run atomically only if the resource is available and return if the function was 
run or not. When this method returns false the calling class knows the resource 
is being or has been closed and it is time to shut down. The concern I have 
here is the potential of putting to much work inside of the synchronized block. 

I'm curious what you think is the best path forward.
 

> Forward RocksDB native metrics to Flink metrics reporter 
> -
>
> Key: FLINK-10423
> URL: https://issues.apache.org/jira/browse/FLINK-10423
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics, State Backends, Checkpointing
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>  Labels: pull-request-available
>
> RocksDB contains a number of metrics at the column family level about current 
> memory usage, open memtables,  etc that would be useful to users wishing 
> greater insight what rocksdb is doing. This work is inspired heavily by the 
> comments on this rocksdb issue thread 
> (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-09 Thread Seth Wiesman (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Seth Wiesman updated FLINK-10423:
-
Summary: Forward RocksDB native metrics to Flink metrics reporter   (was: 
Forward RocksDB memory metrics to Flink metrics reporter )

> Forward RocksDB native metrics to Flink metrics reporter 
> -
>
> Key: FLINK-10423
> URL: https://issues.apache.org/jira/browse/FLINK-10423
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics, State Backends, Checkpointing
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>  Labels: pull-request-available
>
> RocksDB contains a number of metrics at the column family level about current 
> memory usage, open memtables,  etc that would be useful to users wishing 
> greater insight what rocksdb is doing. This work is inspired heavily by the 
> comments on this rocksdb issue thread 
> (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10423) Forward RocksDB memory metrics to Flink metrics reporter

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16643978#comment-16643978
 ] 

ASF GitHub Bot commented on FLINK-10423:


sjwiesman opened a new pull request #6814: [FLINK-10423][rocksdb][metrics] 
rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814
 
 
   ## What is the purpose of the change
   Optionally expose RocksDB native metrics to Flink metrics reporter. All 
metrics are optionally configured using `OptionsFactory` report at the column 
family level and return unsigned long values. Metrics reporting is disabled by 
default to prevent unexpected performance regressions. 
   
   ## Brief change log
 - Implement RocksDBNativeMetricMonitor to periodically pull native metrics 
from RocksDB
 - Make metrics configurable through OptionsFactory 
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 -  Added RocksDBNativeMetricTest
 - Manually verified with a test cluster
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? javadoc
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Forward RocksDB memory metrics to Flink metrics reporter 
> -
>
> Key: FLINK-10423
> URL: https://issues.apache.org/jira/browse/FLINK-10423
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics, State Backends, Checkpointing
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>  Labels: pull-request-available
>
> RocksDB contains a number of metrics at the column family level about current 
> memory usage, open memtables,  etc that would be useful to users wishing 
> greater insight what rocksdb is doing. This work is inspired heavily by the 
> comments on this rocksdb issue thread 
> (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10423) Forward RocksDB memory metrics to Flink metrics reporter

2018-10-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10423:
---
Labels: pull-request-available  (was: )

> Forward RocksDB memory metrics to Flink metrics reporter 
> -
>
> Key: FLINK-10423
> URL: https://issues.apache.org/jira/browse/FLINK-10423
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics, State Backends, Checkpointing
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>  Labels: pull-request-available
>
> RocksDB contains a number of metrics at the column family level about current 
> memory usage, open memtables,  etc that would be useful to users wishing 
> greater insight what rocksdb is doing. This work is inspired heavily by the 
> comments on this rocksdb issue thread 
> (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] sjwiesman opened a new pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-09 Thread GitBox
sjwiesman opened a new pull request #6814: [FLINK-10423][rocksdb][metrics] 
rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814
 
 
   ## What is the purpose of the change
   Optionally expose RocksDB native metrics to Flink metrics reporter. All 
metrics are optionally configured using `OptionsFactory` report at the column 
family level and return unsigned long values. Metrics reporting is disabled by 
default to prevent unexpected performance regressions. 
   
   ## Brief change log
 - Implement RocksDBNativeMetricMonitor to periodically pull native metrics 
from RocksDB
 - Make metrics configurable through OptionsFactory 
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 -  Added RocksDBNativeMetricTest
 - Manually verified with a test cluster
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? javadoc
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-10519) flink-parent:1.6.1 artifact can't be found on maven central

2018-10-09 Thread Florian Schmidt (JIRA)
Florian Schmidt created FLINK-10519:
---

 Summary: flink-parent:1.6.1 artifact can't be found on maven 
central
 Key: FLINK-10519
 URL: https://issues.apache.org/jira/browse/FLINK-10519
 Project: Flink
  Issue Type: Bug
Reporter: Florian Schmidt


The flink-parent:1.6.1 artifact can't be found on maven central:

*Stacktrace from maven*
{code:java}
...
Caused by: org.eclipse.aether.transfer.ArtifactNotFoundException: Could not 
find artifact org.apache.flink:flink-parent:pom:1.6.1 in central 
(https://repo.maven.apache.org/maven2)
...
{code}
 

Also when browsing the repository in the browser 
([https://repo.maven.apache.org/maven2/org/apache/flink/flink-parent/1.6.1/]) 
it will show the flink-parent artifact in the list, but return 404 when trying 
to download it. This does only seem to happen from some networks, as I was able 
to successfully run the following on a server that I ssh'd into, but not on my 
local device
{code:java}
curl 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-parent/1.6.1/flink-parent-1.6.1.pom{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10440) Add a CassandraPojoOutputFormat to write Pojo in Cassandra from batch

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16643912#comment-16643912
 ] 

ASF GitHub Bot commented on FLINK-10440:


bmeriaux edited a comment on issue #6813: [FLINK-10440] Add a new 
CassandraPojoOutputFormat
URL: https://github.com/apache/flink/pull/6813#issuecomment-428299042
 
 
   Hi,
   I need you advice on the inheritance with `CassandraOutputFormatBase`, 
because `CassandraOutputFormatBase` constructor need an insert query, which i 
do not have in the `CassandraPojoOutputFormat`.
   I choose the duplication option.
   What do you think about pull up Row/Tuple specific code to their classes and 
refactor the `CassandraOutputFormatBase` with only common code? if any user 
have made some inheritence of the base class it will break, is it considered as 
a breaking change ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CassandraPojoOutputFormat to write Pojo in Cassandra from batch
> -
>
> Key: FLINK-10440
> URL: https://issues.apache.org/jira/browse/FLINK-10440
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Reporter: Benoit MERIAUX
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Add a CassandraPojoOutputFormat to write Pojo in Cassandra from batch API 
> like the CassandraPojoSink for streaming API



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10440) Add a CassandraPojoOutputFormat to write Pojo in Cassandra from batch

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16643911#comment-16643911
 ] 

ASF GitHub Bot commented on FLINK-10440:


bmeriaux commented on issue #6813: [FLINK-10440] Add a new 
CassandraPojoOutputFormat
URL: https://github.com/apache/flink/pull/6813#issuecomment-428299042
 
 
   Hi,
   I need you advice on the inheritance with `CassandraOutputFormatBase`, 
because `CassandraOutputFormatBase` constructor need an insert query, which i 
do not have in the `CassandraPojoOutputFormat`.
   I choose the duplication option.
   What do you think about pull up Row/Tuple specific code to their class and 
refactor the `CassandraOutputFormatBase`? if any user have made some 
inheritence of the base class it will break


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CassandraPojoOutputFormat to write Pojo in Cassandra from batch
> -
>
> Key: FLINK-10440
> URL: https://issues.apache.org/jira/browse/FLINK-10440
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Reporter: Benoit MERIAUX
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Add a CassandraPojoOutputFormat to write Pojo in Cassandra from batch API 
> like the CassandraPojoSink for streaming API



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] bmeriaux edited a comment on issue #6813: [FLINK-10440] Add a new CassandraPojoOutputFormat

2018-10-09 Thread GitBox
bmeriaux edited a comment on issue #6813: [FLINK-10440] Add a new 
CassandraPojoOutputFormat
URL: https://github.com/apache/flink/pull/6813#issuecomment-428299042
 
 
   Hi,
   I need you advice on the inheritance with `CassandraOutputFormatBase`, 
because `CassandraOutputFormatBase` constructor need an insert query, which i 
do not have in the `CassandraPojoOutputFormat`.
   I choose the duplication option.
   What do you think about pull up Row/Tuple specific code to their classes and 
refactor the `CassandraOutputFormatBase` with only common code? if any user 
have made some inheritence of the base class it will break, is it considered as 
a breaking change ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] bmeriaux commented on issue #6813: [FLINK-10440] Add a new CassandraPojoOutputFormat

2018-10-09 Thread GitBox
bmeriaux commented on issue #6813: [FLINK-10440] Add a new 
CassandraPojoOutputFormat
URL: https://github.com/apache/flink/pull/6813#issuecomment-428299042
 
 
   Hi,
   I need you advice on the inheritance with `CassandraOutputFormatBase`, 
because `CassandraOutputFormatBase` constructor need an insert query, which i 
do not have in the `CassandraPojoOutputFormat`.
   I choose the duplication option.
   What do you think about pull up Row/Tuple specific code to their class and 
refactor the `CassandraOutputFormatBase`? if any user have made some 
inheritence of the base class it will break


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10440) Add a CassandraPojoOutputFormat to write Pojo in Cassandra from batch

2018-10-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10440:
---
Labels: pull-request-available  (was: )

> Add a CassandraPojoOutputFormat to write Pojo in Cassandra from batch
> -
>
> Key: FLINK-10440
> URL: https://issues.apache.org/jira/browse/FLINK-10440
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Reporter: Benoit MERIAUX
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Add a CassandraPojoOutputFormat to write Pojo in Cassandra from batch API 
> like the CassandraPojoSink for streaming API



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10440) Add a CassandraPojoOutputFormat to write Pojo in Cassandra from batch

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16643872#comment-16643872
 ] 

ASF GitHub Bot commented on FLINK-10440:


bmeriaux opened a new pull request #6813: [FLINK-10440] Add a new 
CassandraPojoOutputFormat
URL: https://github.com/apache/flink/pull/6813
 
 
   ## What is the purpose of the change
   
   *Create a new CassandraPojoOutputFormat to write Pojo in Cassandra in Batch*
   
   ## Brief change log
   
   *Create a new CassandraPojoOutputFormat to write Pojo in Cassandra in Batch*
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
*CassandraConnectorITCase*.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency):  no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector:  no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? JavaDocs 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CassandraPojoOutputFormat to write Pojo in Cassandra from batch
> -
>
> Key: FLINK-10440
> URL: https://issues.apache.org/jira/browse/FLINK-10440
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Reporter: Benoit MERIAUX
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Add a CassandraPojoOutputFormat to write Pojo in Cassandra from batch API 
> like the CassandraPojoSink for streaming API



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] bmeriaux opened a new pull request #6813: [FLINK-10440] Add a new CassandraPojoOutputFormat

2018-10-09 Thread GitBox
bmeriaux opened a new pull request #6813: [FLINK-10440] Add a new 
CassandraPojoOutputFormat
URL: https://github.com/apache/flink/pull/6813
 
 
   ## What is the purpose of the change
   
   *Create a new CassandraPojoOutputFormat to write Pojo in Cassandra in Batch*
   
   ## Brief change log
   
   *Create a new CassandraPojoOutputFormat to write Pojo in Cassandra in Batch*
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
*CassandraConnectorITCase*.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency):  no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector:  no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? JavaDocs 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10440) Add a CassandraPojoOutputFormat to write Pojo in Cassandra from batch

2018-10-09 Thread Benoit MERIAUX (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16643840#comment-16643840
 ] 

Benoit MERIAUX commented on FLINK-10440:


Hi,
could you assign this to me ?

I have code ready for PR

Thanks

> Add a CassandraPojoOutputFormat to write Pojo in Cassandra from batch
> -
>
> Key: FLINK-10440
> URL: https://issues.apache.org/jira/browse/FLINK-10440
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Reporter: Benoit MERIAUX
>Priority: Major
> Fix For: 1.7.0
>
>
> Add a CassandraPojoOutputFormat to write Pojo in Cassandra from batch API 
> like the CassandraPojoSink for streaming API



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10440) Add a CassandraPojoOutputFormat to write Pojo in Cassandra from batch

2018-10-09 Thread Benoit MERIAUX (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Benoit MERIAUX updated FLINK-10440:
---
Fix Version/s: 1.7.0

> Add a CassandraPojoOutputFormat to write Pojo in Cassandra from batch
> -
>
> Key: FLINK-10440
> URL: https://issues.apache.org/jira/browse/FLINK-10440
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Reporter: Benoit MERIAUX
>Priority: Major
> Fix For: 1.7.0
>
>
> Add a CassandraPojoOutputFormat to write Pojo in Cassandra from batch API 
> like the CassandraPojoSink for streaming API



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9953) Active Kubernetes integration

2018-10-09 Thread JIN SUN (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16643757#comment-16643757
 ] 

JIN SUN commented on FLINK-9953:


Thanks Till.

> Active Kubernetes integration
> -
>
> Key: FLINK-9953
> URL: https://issues.apache.org/jira/browse/FLINK-9953
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, ResourceManager
>Reporter: Till Rohrmann
>Assignee: JIN SUN
>Priority: Major
> Fix For: 1.7.0
>
>
> This is the umbrella issue tracking Flink's active Kubernetes integration. 
> Active means in this context that the {{ResourceManager}} can talk to 
> Kubernetes to launch new pods similar to Flink's Yarn and Mesos integration.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16643752#comment-16643752
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

HuangZhenQiu commented on issue #6483: [FLINK-7243][flink-formats] Add parquet 
input format
URL: https://github.com/apache/flink/pull/6483#issuecomment-428274826
 
 
   @fhueske 
   Thanks for reviewing this PR. I can't agree more on offering a similar 
experience for both input formats (Parquet and ORC). I will resolve your 
comments in code tonight.
   
   Best Regards
   Peter Huang


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: pull-request-available
>
> Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] HuangZhenQiu commented on issue #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-09 Thread GitBox
HuangZhenQiu commented on issue #6483: [FLINK-7243][flink-formats] Add parquet 
input format
URL: https://github.com/apache/flink/pull/6483#issuecomment-428274826
 
 
   @fhueske 
   Thanks for reviewing this PR. I can't agree more on offering a similar 
experience for both input formats (Parquet and ORC). I will resolve your 
comments in code tonight.
   
   Best Regards
   Peter Huang


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   4   5   >