[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2019-03-02 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r261848167
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
+
+   protected transient long recordsReadSinceLastSync;
+
+   protected long lastSyncedBlock = -1L;
+
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames, boolean isStandard) {
+   super(path);
+   this.readType = new RowTypeInfo(fieldTypes, fieldNames);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   this.unsplittable = true;
 
 Review comment:
   @a10y 
   This PR is closed and merged into master. I am working on the 
ParquetTableSource with predicate pushdown. After that, Another PR will be 
added for rowgroup level splitting.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-12-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r240074303
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
 ##
 @@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Schema converter converts Parquet schema to and from Flink internal types.
+ */
+public class ParquetSchemaConverter {
+   public static final String MAP_KEY = "key";
+   public static final String MAP_VALUE = "value";
+   public static final String LIST_ARRAY_TYPE = "array";
+   public static final String LIST_ELEMENT = "element";
+   public static final String LIST_GROUP_NAME = "list";
+   public static final String MESSAGE_ROOT = "root";
+   private static final AvroSchemaConverter SCHEMA_CONVERTER = new 
AvroSchemaConverter();
+
+   public static TypeInformation fromParquetType(MessageType type) {
+   return convertFields(type.getFields());
+   }
+
+   /**
+* Converts Flink Internal Type to Parquet schema.
+*
+* @param typeInformation  flink type information
+* @param isStandard is standard LIST and MAP schema or back-compatible 
schema
+* @return Parquet schema
+*/
+   public static MessageType toParquetType(TypeInformation 
typeInformation, boolean isStandard) {
+   return (MessageType) convertField(null, typeInformation, 
Type.Repetition.OPTIONAL, isStandard);
+   }
+
+   private static TypeInformation convertFields(List 
parquetFields) {
+   List> types = new ArrayList<>();
+   List names = new ArrayList<>();
+   for (Type field : parquetFields) {
+   TypeInformation subType = convertField(field);
+   if (subType != null) {
+   types.add(subType);
+   names.add(field.getName());
+   }
+   }
+
+   return new RowTypeInfo(types.toArray(new 
TypeInformation[types.size()]),
+   names.toArray(new String[names.size()]));
+   }
+
+   private static TypeInformation convertField(final Type fieldType) {
+   TypeInformation typeInfo = null;
+   if (fieldType.isPrimitive()) {
+   OriginalType originalType = fieldType.getOriginalType();
+   PrimitiveType primitiveType = 
fieldType.asPrimitiveType();
+   switch (primitiveType.getPrimitiveTypeName()) {
+   case BINARY:
+   if (originalType != null) {
+   switch (originalType) {
+   case DECIMAL:
+   typeInfo = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   break;
+   case UTF8:
+   case ENUM:
+   case 

[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-11-19 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r234892484
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java
 ##
 @@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Timestamp;
+
+
+/**
+ * A subclass of {@link ParquetInputFormat} to read from Parquet files and 
convert to {@link Row}.
+ * It is mainly used to integrate with table API and batch SQL.
+ */
+public class ParquetRowInputFormat extends ParquetInputFormat implements 
ResultTypeQueryable {
+   private static final long serialVersionUID = 11L;
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetRowInputFormat.class);
+   private boolean timeStampRewrite;
+   private RowTypeInfo returnType;
+   private int tsIndex;
+
+   public ParquetRowInputFormat(Path path, MessageType messageType) {
+   super(path, messageType);
+   this.returnType = new RowTypeInfo(getFieldTypes(), 
getFieldNames());
+   this.timeStampRewrite = false;
+   }
+
+   @Override
+   public TypeInformation getProducedType() {
+   return new RowTypeInfo(getFieldTypes(), getFieldNames());
 
 Review comment:
   Yes, you are right.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-11-19 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r234892171
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java
 ##
 @@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Timestamp;
+
+
+/**
+ * A subclass of {@link ParquetInputFormat} to read from Parquet files and 
convert to {@link Row}.
+ * It is mainly used to integrate with table API and batch SQL.
+ */
+public class ParquetRowInputFormat extends ParquetInputFormat implements 
ResultTypeQueryable {
+   private static final long serialVersionUID = 11L;
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetRowInputFormat.class);
+   private boolean timeStampRewrite;
+   private RowTypeInfo returnType;
+   private int tsIndex;
+
+   public ParquetRowInputFormat(Path path, MessageType messageType) {
+   super(path, messageType);
+   this.returnType = new RowTypeInfo(getFieldTypes(), 
getFieldNames());
+   this.timeStampRewrite = false;
+   }
+
+   @Override
+   public TypeInformation getProducedType() {
+   return new RowTypeInfo(getFieldTypes(), getFieldNames());
+   }
+
+   @Override
+   protected Row convert(Row row) {
+   if (timeStampRewrite) {
+   row.setField(tsIndex, new Timestamp((long) 
row.getField(tsIndex)));
 
 Review comment:
   Agree. When I work on HiveTableSource internally, it is 1 year ago on Flink 
1.4. As I remember, when I use window functions for group by, for example 
TUMBLE(time_attr, interval). The attribute time_attr has to be the type of 
Timestamp. It means we need to convert to the field to Timestamp type 
somewhere. Do you have any preference in the implementation? Or you just want 
to leave it in ParquetTableSource?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-16 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r225604086
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
 ##
 @@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Schema converter converts Parquet schema to and from Flink internal types.
+ */
+public class ParquetSchemaConverter {
+   public static final String MAP_KEY = "key";
+   public static final String MAP_VALUE = "value";
+   public static final String LIST_ELEMENT = "array";
+   public static final String MESSAGE_ROOT = "root";
+   private static final AvroSchemaConverter SCHEMA_CONVERTER = new 
AvroSchemaConverter();
+
+   public static TypeInformation fromParquetType(MessageType type) {
+   return convertFields(type.getFields());
+   }
+
+   public static MessageType toParquetType(TypeInformation 
typeInformation) {
+   return (MessageType) convertField(null, typeInformation, 
Type.Repetition.OPTIONAL);
+   }
+
+   private static TypeInformation convertFields(List 
parquetFields) {
+   List> types = new ArrayList<>();
+   List names = new ArrayList<>();
+   for (Type field : parquetFields) {
+   TypeInformation subType = convertField(field);
+   if (subType != null) {
+   types.add(subType);
+   names.add(field.getName());
+   }
+   }
+
+   return new RowTypeInfo(types.toArray(new 
TypeInformation[types.size()]),
+   names.toArray(new String[names.size()]));
+   }
+
+   private static TypeInformation convertField(final Type fieldType) {
+   TypeInformation typeInfo = null;
+   if (fieldType.isPrimitive()) {
+   PrimitiveType primitiveType = 
fieldType.asPrimitiveType();
 
 Review comment:
   @twalthr  Agree. It is needed. Added explicit type conversion for all 
Parquet types to Flink types.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-16 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r225605353
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import 
org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.meta.When;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that 
support start read from particular position.
+ */
+public class ParquetRecordReader {
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetRecordReader.class);
+
+   private ColumnIOFactory columnIOFactory;
+   private Filter filter;
+
+   private MessageType readSchema;
+   private MessageType fileSchema;
+   private ReadSupport readSupport;
+
+   private RecordMaterializer recordMaterializer;
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-16 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r225604644
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat
+   extends FileInputFormat
+   implements CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   private final TypeInformation[] fieldTypes;
+
+   private final String[] fieldNames;
+
+   private boolean skipThisSplit = false;
+
+   private transient ParquetRecordReader parquetRecordReader;
+
+   private transient long recordsReadSinceLastSync;
+
+   private long lastSyncedBlock = -1L;
+
+   /**
+* Read parquet files with given result parquet schema.
+*
+* @param path The path of the file to read.
+* @param messageType schema of read result
+*/
+
+   protected ParquetInputFormat(Path path, MessageType messageType) {
+   super(path);
+   RowTypeInfo readType = (RowTypeInfo) 
ParquetSchemaConverter.fromParquetType(messageType);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   // read whole parquet file as one file split
+   this.unsplittable = true;
+   }
+
+   /**
+* Read parquet files with given result field names and types.
+*
+* @param path The path of the file to read.
+* @param fieldTypes field types of read result of fields
+* @param fieldNames field names to read, which can be subset of the 
parquet schema
+*/
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames) {
+   super(path);
+   this.fieldTypes = fieldTypes;
+   this.fieldNames = fieldNames;
+   // read whole parquet file as one file split
+   this.unsplittable = true;
+   }
+
+   @Override
+   public Tuple2 getCurrentState() {
+   return new Tuple2<>(this.lastSyncedBlock, 
this.recordsReadSinceLastSync);
+   }
+
+   @Override
+   public void open(FileInputSplit 

[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-16 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r225603693
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import 
org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.meta.When;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that 
support start read from particular position.
+ */
+public class ParquetRecordReader {
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetRecordReader.class);
+
+   private ColumnIOFactory columnIOFactory;
+   private Filter filter;
+
+   private MessageType readSchema;
+   private MessageType fileSchema;
+   private ReadSupport readSupport;
+
+   private RecordMaterializer recordMaterializer;
+   private T currentValue;
+   private long total;
+   private long current = 0;
+   private int currentBlock = -1;
+   private ParquetFileReader reader;
+   private RecordReader recordReader;
+   private boolean strictTypeChecking = true;
+   private long totalCountLoadedSoFar = 0;
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema, Filter filter) {
+   this.readSupport = readSupport;
+   this.readSchema = readSchema;
+   this.filter = checkNotNull(filter, "filter");
+   }
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema) {
+   this(readSupport, readSchema, FilterCompat.NOOP);
+   }
+
+   public void initialize(ParquetFileReader reader, Configuration 
configuration) {
+   this.reader = reader;
+   FileMetaData parquetFileMetadata = 
reader.getFooter().getFileMetaData();
+   this.fileSchema = parquetFileMetadata.getSchema();
+   Map fileMetadata = 
parquetFileMetadata.getKeyValueMetaData();
+   ReadSupport.ReadContext readContext = readSupport.init(new 
InitContext(
+   configuration, toSetMultiMap(fileMetadata), 
readSchema));
+
+   this.columnIOFactory = new 
ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+   this.readSchema = readContext.getRequestedSchema();
+   this.recordMaterializer = readSupport.prepareForRead(
+   configuration, fileMetadata, readSchema, readContext);
+   this.total = reader.getRecordCount();
+   reader.setRequestedSchema(readSchema);
+   }
+
+   private void checkRead() throws IOException {
+   if (current == totalCountLoadedSoFar) {
+   PageReadStore pages = reader.readNextRowGroup();
+   recordReader = createRecordReader(pages);
+   totalCountLoadedSoFar += pages.getRowCount();
+   currentBlock++;
+   }
+   }
+
+   

[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-16 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r225603575
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import 
org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.meta.When;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that 
support start read from particular position.
+ */
+public class ParquetRecordReader {
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetRecordReader.class);
+
+   private ColumnIOFactory columnIOFactory;
+   private Filter filter;
+
+   private MessageType readSchema;
+   private MessageType fileSchema;
+   private ReadSupport readSupport;
+
+   private RecordMaterializer recordMaterializer;
+   private T currentValue;
+   private long total;
+   private long current = 0;
+   private int currentBlock = -1;
+   private ParquetFileReader reader;
+   private RecordReader recordReader;
+   private boolean strictTypeChecking = true;
+   private long totalCountLoadedSoFar = 0;
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema, Filter filter) {
+   this.readSupport = readSupport;
+   this.readSchema = readSchema;
+   this.filter = checkNotNull(filter, "filter");
+   }
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema) {
+   this(readSupport, readSchema, FilterCompat.NOOP);
+   }
+
+   public void initialize(ParquetFileReader reader, Configuration 
configuration) {
+   this.reader = reader;
+   FileMetaData parquetFileMetadata = 
reader.getFooter().getFileMetaData();
+   this.fileSchema = parquetFileMetadata.getSchema();
+   Map fileMetadata = 
parquetFileMetadata.getKeyValueMetaData();
+   ReadSupport.ReadContext readContext = readSupport.init(new 
InitContext(
+   configuration, toSetMultiMap(fileMetadata), 
readSchema));
+
+   this.columnIOFactory = new 
ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+   this.readSchema = readContext.getRequestedSchema();
+   this.recordMaterializer = readSupport.prepareForRead(
+   configuration, fileMetadata, readSchema, readContext);
+   this.total = reader.getRecordCount();
+   reader.setRequestedSchema(readSchema);
+   }
+
+   private void checkRead() throws IOException {
+   if (current == totalCountLoadedSoFar) {
+   PageReadStore pages = reader.readNextRowGroup();
+   recordReader = createRecordReader(pages);
+   totalCountLoadedSoFar += pages.getRowCount();
+   currentBlock++;
+   }
+   }
+
+   

[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-16 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r225602641
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import 
org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.meta.When;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that 
support start read from particular position.
+ */
+public class ParquetRecordReader {
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetRecordReader.class);
+
+   private ColumnIOFactory columnIOFactory;
+   private Filter filter;
+
+   private MessageType readSchema;
+   private MessageType fileSchema;
+   private ReadSupport readSupport;
+
+   private RecordMaterializer recordMaterializer;
+   private T currentValue;
+   private long total;
+   private long current = 0;
+   private int currentBlock = -1;
+   private ParquetFileReader reader;
+   private RecordReader recordReader;
+   private boolean strictTypeChecking = true;
+   private long totalCountLoadedSoFar = 0;
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema, Filter filter) {
+   this.readSupport = readSupport;
+   this.readSchema = readSchema;
+   this.filter = checkNotNull(filter, "filter");
+   }
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema) {
+   this(readSupport, readSchema, FilterCompat.NOOP);
+   }
+
+   public void initialize(ParquetFileReader reader, Configuration 
configuration) {
+   this.reader = reader;
+   FileMetaData parquetFileMetadata = 
reader.getFooter().getFileMetaData();
+   this.fileSchema = parquetFileMetadata.getSchema();
+   Map fileMetadata = 
parquetFileMetadata.getKeyValueMetaData();
+   ReadSupport.ReadContext readContext = readSupport.init(new 
InitContext(
+   configuration, toSetMultiMap(fileMetadata), 
readSchema));
+
+   this.columnIOFactory = new 
ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+   this.readSchema = readContext.getRequestedSchema();
+   this.recordMaterializer = readSupport.prepareForRead(
+   configuration, fileMetadata, readSchema, readContext);
+   this.total = reader.getRecordCount();
+   reader.setRequestedSchema(readSchema);
+   }
+
+   private void checkRead() throws IOException {
+   if (current == totalCountLoadedSoFar) {
+   PageReadStore pages = reader.readNextRowGroup();
+   recordReader = createRecordReader(pages);
+   totalCountLoadedSoFar += pages.getRowCount();
+   currentBlock++;
+   }
+   }
+
+   

[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-16 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r225601083
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import 
org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.meta.When;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that 
support start read from particular position.
+ */
+public class ParquetRecordReader {
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetRecordReader.class);
+
+   private ColumnIOFactory columnIOFactory;
+   private Filter filter;
+
+   private MessageType readSchema;
+   private MessageType fileSchema;
+   private ReadSupport readSupport;
+
+   private RecordMaterializer recordMaterializer;
+   private T currentValue;
+   private long total;
+   private long current = 0;
+   private int currentBlock = -1;
+   private ParquetFileReader reader;
+   private RecordReader recordReader;
+   private boolean strictTypeChecking = true;
+   private long totalCountLoadedSoFar = 0;
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema, Filter filter) {
+   this.readSupport = readSupport;
+   this.readSchema = readSchema;
+   this.filter = checkNotNull(filter, "filter");
+   }
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema) {
+   this(readSupport, readSchema, FilterCompat.NOOP);
+   }
+
+   public void initialize(ParquetFileReader reader, Configuration 
configuration) {
+   this.reader = reader;
+   FileMetaData parquetFileMetadata = 
reader.getFooter().getFileMetaData();
+   this.fileSchema = parquetFileMetadata.getSchema();
 
 Review comment:
   There are some duplication. I simplified it,


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-10 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223947861
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
+
+   protected transient long recordsReadSinceLastSync;
+
+   protected long lastSyncedBlock = -1L;
+
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames, boolean isStandard) {
+   super(path);
+   this.readType = new RowTypeInfo(fieldTypes, fieldNames);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   this.unsplittable = true;
+   this.isStandard = isStandard;
+   }
+
+   @Override
+   public Tuple2 getCurrentState() {
+   return new Tuple2<>(this.lastSyncedBlock, 
this.recordsReadSinceLastSync);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
 
 Review comment:
   They are reset in the next Record function.
   if (parquetRecordReader.getCurrentBlock() != lastSyncedBlocker) {
   ...
   }


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-10 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223947437
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
+
+   protected transient long recordsReadSinceLastSync;
+
+   protected long lastSyncedBlock = -1L;
+
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames, boolean isStandard) {
+   super(path);
+   this.readType = new RowTypeInfo(fieldTypes, fieldNames);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   this.unsplittable = true;
+   this.isStandard = isStandard;
+   }
+
+   @Override
+   public Tuple2 getCurrentState() {
+   return new Tuple2<>(this.lastSyncedBlock, 
this.recordsReadSinceLastSync);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   org.apache.hadoop.conf.Configuration configuration = new 
org.apache.hadoop.conf.Configuration();
+   InputFile inputFile =
+   HadoopInputFile.fromPath(new 
org.apache.hadoop.fs.Path(split.getPath().toUri()), configuration);
+   ParquetReadOptions options = 
ParquetReadOptions.builder().build();
+   ParquetFileReader fileReader = new ParquetFileReader(inputFile, 
options);
+   MessageType schema = fileReader.getFileMetaData().getSchema();
+   MessageType readSchema = getReadSchema(schema);
+   this.parquetRecordReader = new ParquetRecordReader<>(new 
RowReadSupport(), readSchema, FilterCompat.NOOP);
+   this.parquetRecordReader.initialize(fileReader, configuration);
+  

[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223942389
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
+
+   protected transient long recordsReadSinceLastSync;
+
+   protected long lastSyncedBlock = -1L;
+
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames, boolean isStandard) {
+   super(path);
+   this.readType = new RowTypeInfo(fieldTypes, fieldNames);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   this.unsplittable = true;
+   this.isStandard = isStandard;
+   }
+
+   @Override
+   public Tuple2 getCurrentState() {
+   return new Tuple2<>(this.lastSyncedBlock, 
this.recordsReadSinceLastSync);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   org.apache.hadoop.conf.Configuration configuration = new 
org.apache.hadoop.conf.Configuration();
+   InputFile inputFile =
+   HadoopInputFile.fromPath(new 
org.apache.hadoop.fs.Path(split.getPath().toUri()), configuration);
+   ParquetReadOptions options = 
ParquetReadOptions.builder().build();
+   ParquetFileReader fileReader = new ParquetFileReader(inputFile, 
options);
+   MessageType schema = fileReader.getFileMetaData().getSchema();
+   MessageType readSchema = getReadSchema(schema);
+   this.parquetRecordReader = new ParquetRecordReader<>(new 
RowReadSupport(), readSchema, FilterCompat.NOOP);
+   this.parquetRecordReader.initialize(fileReader, configuration);
+  

[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223942328
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
+
+   protected transient long recordsReadSinceLastSync;
+
+   protected long lastSyncedBlock = -1L;
+
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames, boolean isStandard) {
+   super(path);
+   this.readType = new RowTypeInfo(fieldTypes, fieldNames);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   this.unsplittable = true;
+   this.isStandard = isStandard;
+   }
+
+   @Override
+   public Tuple2 getCurrentState() {
+   return new Tuple2<>(this.lastSyncedBlock, 
this.recordsReadSinceLastSync);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   org.apache.hadoop.conf.Configuration configuration = new 
org.apache.hadoop.conf.Configuration();
+   InputFile inputFile =
+   HadoopInputFile.fromPath(new 
org.apache.hadoop.fs.Path(split.getPath().toUri()), configuration);
+   ParquetReadOptions options = 
ParquetReadOptions.builder().build();
+   ParquetFileReader fileReader = new ParquetFileReader(inputFile, 
options);
+   MessageType schema = fileReader.getFileMetaData().getSchema();
+   MessageType readSchema = getReadSchema(schema);
+   this.parquetRecordReader = new ParquetRecordReader<>(new 
RowReadSupport(), readSchema, FilterCompat.NOOP);
+   this.parquetRecordReader.initialize(fileReader, configuration);
+  

[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223941595
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
+
+   protected transient long recordsReadSinceLastSync;
+
+   protected long lastSyncedBlock = -1L;
+
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames, boolean isStandard) {
+   super(path);
+   this.readType = new RowTypeInfo(fieldTypes, fieldNames);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   this.unsplittable = true;
+   this.isStandard = isStandard;
+   }
+
+   @Override
+   public Tuple2 getCurrentState() {
+   return new Tuple2<>(this.lastSyncedBlock, 
this.recordsReadSinceLastSync);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   org.apache.hadoop.conf.Configuration configuration = new 
org.apache.hadoop.conf.Configuration();
+   InputFile inputFile =
+   HadoopInputFile.fromPath(new 
org.apache.hadoop.fs.Path(split.getPath().toUri()), configuration);
+   ParquetReadOptions options = 
ParquetReadOptions.builder().build();
+   ParquetFileReader fileReader = new ParquetFileReader(inputFile, 
options);
+   MessageType schema = fileReader.getFileMetaData().getSchema();
+   MessageType readSchema = getReadSchema(schema);
+   this.parquetRecordReader = new ParquetRecordReader<>(new 
RowReadSupport(), readSchema, FilterCompat.NOOP);
+   this.parquetRecordReader.initialize(fileReader, configuration);
+  

[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223941595
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
+
+   protected transient long recordsReadSinceLastSync;
+
+   protected long lastSyncedBlock = -1L;
+
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames, boolean isStandard) {
+   super(path);
+   this.readType = new RowTypeInfo(fieldTypes, fieldNames);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   this.unsplittable = true;
+   this.isStandard = isStandard;
+   }
+
+   @Override
+   public Tuple2 getCurrentState() {
+   return new Tuple2<>(this.lastSyncedBlock, 
this.recordsReadSinceLastSync);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   org.apache.hadoop.conf.Configuration configuration = new 
org.apache.hadoop.conf.Configuration();
+   InputFile inputFile =
+   HadoopInputFile.fromPath(new 
org.apache.hadoop.fs.Path(split.getPath().toUri()), configuration);
+   ParquetReadOptions options = 
ParquetReadOptions.builder().build();
+   ParquetFileReader fileReader = new ParquetFileReader(inputFile, 
options);
+   MessageType schema = fileReader.getFileMetaData().getSchema();
+   MessageType readSchema = getReadSchema(schema);
+   this.parquetRecordReader = new ParquetRecordReader<>(new 
RowReadSupport(), readSchema, FilterCompat.NOOP);
+   this.parquetRecordReader.initialize(fileReader, configuration);
+  

[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223940956
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223940919
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
 
 Review comment:
   It should be private. Changed accordingly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223940936
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
+
+   protected transient long recordsReadSinceLastSync;
 
 Review comment:
   Agree.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223940551
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
+
+   protected transient long recordsReadSinceLastSync;
+
+   protected long lastSyncedBlock = -1L;
+
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames, boolean isStandard) {
 
 Review comment:
   1) added java doc for two constructors
   2) Given Path is required and multiple file path is not supported, I would 
prefer leave it there as AvroInputFormat, otherwise, users need to call 
constructor then setPath. It is not that convenient.
   3) Agree. I added another constructor to take MessageType as return type.
   4) I think we need. The getProducerType will be needed for table source in 
early stage. If we rely on schema in parquet file for field type, we can only 
get it after open the split for now. It is too late for SQL magics right? 
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223935440
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
 
 Review comment:
   Agree. I removed the duplicated readType here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223934091
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
 
 Review comment:
   Yes, It was original added to distinguish the backward compatible schema. It 
is not needed here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223932565
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
+
+   protected transient long recordsReadSinceLastSync;
+
+   protected long lastSyncedBlock = -1L;
+
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames, boolean isStandard) {
+   super(path);
+   this.readType = new RowTypeInfo(fieldTypes, fieldNames);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   this.unsplittable = true;
 
 Review comment:
   As a Parquet file is composed of row groups with a separate metadata in 
footer, it is definitely splittable. It acquires to read footer of each files 
in advance and process each file slit with the offset info of original file. It 
could be separate improvement that can be done in a separate PR. How do you 
think? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r223930910
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-08-24 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r212714129
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
 
 Review comment:
   Yes, it is considered. I will add new constructor with Filter for 
ParquetInputFormat. But I will leave the conversion logic from Flink expression 
provided by FilterableTableSource to Parquet FilterPredicate within Parquet 
table source. Thanks for the input. I will update the PR within the weekend.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-08-24 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r212706499
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
 ##
 @@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Schema converter converts Parquet schema to and from Flink internal types.
+ */
+public class ParquetSchemaConverter {
+   public static final String MAP_KEY = "key";
+   public static final String MAP_VALUE = "value";
+   public static final String LIST_ELEMENT = "array";
+   public static final String MESSAGE_ROOT = "root";
+   private static final AvroSchemaConverter SCHEMA_CONVERTER = new 
AvroSchemaConverter();
+
+   public static TypeInformation fromParquetType(MessageType type) {
+   return convertFields(type.getFields());
+   }
+
+   public static MessageType toParquetType(TypeInformation 
typeInformation) {
+   return (MessageType) convertField(null, typeInformation, 
Type.Repetition.OPTIONAL);
+   }
+
+   private static TypeInformation convertFields(List 
parquetFields) {
+   List> types = new ArrayList<>();
+   List names = new ArrayList<>();
+   for (Type field : parquetFields) {
+   TypeInformation subType = convertField(field);
+   if (subType != null) {
+   types.add(subType);
+   names.add(field.getName());
+   }
+   }
+
+   return new RowTypeInfo(types.toArray(new 
TypeInformation[types.size()]),
+   names.toArray(new String[names.size()]));
+   }
+
+   private static TypeInformation convertField(final Type fieldType) {
+   TypeInformation typeInfo = null;
+   if (fieldType.isPrimitive()) {
+   PrimitiveType primitiveType = 
fieldType.asPrimitiveType();
 
 Review comment:
   This function converts a parquet primitive type to a corresponding default 
Flink type. The explicitly type conversion to SqlTimeTypeInfo or other types 
probably can handled by users when there is a need. Otherwise, we need to bring 
user's conversion preference in the interface. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-08-24 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r212704322
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A subclass of {@link ParquetInputFormat} to read from Parquet files and 
convert to {@link Row}.
+ * It is mainly used to integrate with table API and batch SQL.
+ */
+public class ParquetRowInputFormat extends ParquetInputFormat implements 
ResultTypeQueryable {
 
 Review comment:
   Agree. Once this PR is merged, I will create another PR 
https://issues.apache.org/jira/browse/FLINK-7244.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] HuangZhenQiu commented on a change in pull request #6483: [Flink-7243][flink-formats] Add parquet input format

2018-08-19 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[Flink-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r211131715
 
 

 ##
 File path: flink-formats/flink-parquet/pom.xml
 ##
 @@ -73,6 +73,11 @@ under the License.
provided

 
+   
 
 Review comment:
   Used in unit test. Yeah, but is not mandatory.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] HuangZhenQiu commented on a change in pull request #6483: [Flink-7243][flink-formats] Add parquet input format

2018-08-19 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[Flink-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r211131207
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
 ##
 @@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Schema converter converts Parquet schema to and from Flink internal types.
+ */
+public class ParquetSchemaConverter {
+   public static final String MAP_KEY = "key";
+   public static final String MAP_VALUE = "value";
+   public static final String LIST_ELEMENT = "array";
+   public static final String MESSAGE_ROOT = "root";
+   private static final AvroSchemaConverter SCHEMA_CONVERTER = new 
AvroSchemaConverter();
+
+   public static TypeInformation fromParquetType(MessageType type) {
+   return convertFields(type.getFields());
+   }
+
+   public static MessageType toParquetType(TypeInformation 
typeInformation) {
+   return (MessageType) convertField(null, typeInformation, 
Type.Repetition.OPTIONAL);
+   }
+
+   private static TypeInformation convertFields(List 
parquetFields) {
+   List> types = new ArrayList<>();
+   List names = new ArrayList<>();
+   for (Type field : parquetFields) {
+  /*if (field.isRepetition(Type.Repetition.REPEATED)) {
+  throw new UnsupportedOperationException("REPEATED not supported 
outside LIST or MAP. Type: " + field);
+  }*/
+   TypeInformation subType = convertField(field);
+   if (subType != null) {
+   types.add(subType);
+   names.add(field.getName());
+   }
+   }
+
+   return new RowTypeInfo(types.toArray(new 
TypeInformation[types.size()]),
+   names.toArray(new String[names.size()]));
+   }
+
+   private static TypeInformation convertField(final Type fieldType) {
+   TypeInformation typeInfo = null;
+   if (fieldType.isPrimitive()) {
+   PrimitiveType primitiveType = 
fieldType.asPrimitiveType();
+   switch (primitiveType.getPrimitiveTypeName()) {
+   case BINARY:
+  /*if (primitiveType.getOriginalType().equals(OriginalType.UTF8)) {
+  typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
+  } else {
+  typeInfo = BasicTypeInfo.BYTE_TYPE_INFO;
+  }*/
+   typeInfo = 
BasicTypeInfo.STRING_TYPE_INFO;
+   break;
+   case BOOLEAN:
+   typeInfo = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+   break;
+   case INT32:
+   typeInfo = BasicTypeInfo.INT_TYPE_INFO;
+   break;
+   case INT64:
+   typeInfo = BasicTypeInfo.LONG_TYPE_INFO;
+   break;
+   case INT96:
+   typeInfo 

[GitHub] HuangZhenQiu commented on a change in pull request #6483: [Flink-7243][flink-formats] Add parquet input format

2018-08-19 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[Flink-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r211130119
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
 ##
 @@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Schema converter converts Parquet schema to and from Flink internal types.
+ */
+public class ParquetSchemaConverter {
+   public static final String MAP_KEY = "key";
+   public static final String MAP_VALUE = "value";
+   public static final String LIST_ELEMENT = "array";
+   public static final String MESSAGE_ROOT = "root";
+   private static final AvroSchemaConverter SCHEMA_CONVERTER = new 
AvroSchemaConverter();
+
+   public static TypeInformation fromParquetType(MessageType type) {
+   return convertFields(type.getFields());
+   }
+
+   public static MessageType toParquetType(TypeInformation 
typeInformation) {
+   return (MessageType) convertField(null, typeInformation, 
Type.Repetition.OPTIONAL);
+   }
+
+   private static TypeInformation convertFields(List 
parquetFields) {
+   List> types = new ArrayList<>();
+   List names = new ArrayList<>();
+   for (Type field : parquetFields) {
+  /*if (field.isRepetition(Type.Repetition.REPEATED)) {
+  throw new UnsupportedOperationException("REPEATED not supported 
outside LIST or MAP. Type: " + field);
+  }*/
+   TypeInformation subType = convertField(field);
+   if (subType != null) {
+   types.add(subType);
+   names.add(field.getName());
+   }
+   }
+
+   return new RowTypeInfo(types.toArray(new 
TypeInformation[types.size()]),
+   names.toArray(new String[names.size()]));
+   }
+
+   private static TypeInformation convertField(final Type fieldType) {
+   TypeInformation typeInfo = null;
+   if (fieldType.isPrimitive()) {
+   PrimitiveType primitiveType = 
fieldType.asPrimitiveType();
+   switch (primitiveType.getPrimitiveTypeName()) {
+   case BINARY:
+  /*if (primitiveType.getOriginalType().equals(OriginalType.UTF8)) {
+  typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
+  } else {
+  typeInfo = BasicTypeInfo.BYTE_TYPE_INFO;
+  }*/
+   typeInfo = 
BasicTypeInfo.STRING_TYPE_INFO;
+   break;
+   case BOOLEAN:
+   typeInfo = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+   break;
+   case INT32:
+   typeInfo = BasicTypeInfo.INT_TYPE_INFO;
+   break;
+   case INT64:
+   typeInfo = BasicTypeInfo.LONG_TYPE_INFO;
+   break;
+   case INT96:
+   typeInfo 

[GitHub] HuangZhenQiu commented on a change in pull request #6483: [Flink-7243][flink-formats] Add parquet input format

2018-08-19 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[Flink-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r211129763
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
+
+   protected transient long recordsReadSinceLastSync;
+
+   protected long lastSyncedBlock = -1L;
+
+   protected ParquetInputFormat(
+   Path path, TypeInformation[] fieldTypes, String[] fieldNames) {
+   super(path);
+   this.readType = new RowTypeInfo(fieldTypes, fieldNames);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   this.unsplittable = true;
+   }
+
+   @Override
+   public Tuple2 getCurrentState() {
+   return new Tuple2<>(this.lastSyncedBlock, 
this.recordsReadSinceLastSync);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   org.apache.hadoop.conf.Configuration configuration = new 
org.apache.hadoop.conf.Configuration();
+   InputFile inputFile =
+   HadoopInputFile.fromPath(new 
org.apache.hadoop.fs.Path(split.getPath().toUri()), configuration);
+   ParquetReadOptions options = 
ParquetReadOptions.builder().build();
+   ParquetFileReader fileReader = new ParquetFileReader(inputFile, 
options);
+   MessageType schema = fileReader.getFileMetaData().getSchema();
+   checkSchema(schema);
+   MessageType readSchema = 
ParquetSchemaConverter.toParquetType(readType);
+   this.parquetRecordReader = new ParquetRecordReader<>(new 
RowReadSupport(), readSchema);
+   this.parquetRecordReader.initialize(fileReader, configuration);
+   if (this.recordConsumed == null) {
+   this.recordConsumed = 
getRuntimeContext().getMetricGroup().counter("parquet-record-consumed");
+   }
+

[GitHub] HuangZhenQiu commented on a change in pull request #6483: [Flink-7243][flink-formats] Add parquet input format

2018-08-19 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[Flink-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r211128582
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
+
+   protected transient long recordsReadSinceLastSync;
+
+   protected long lastSyncedBlock = -1L;
+
+   protected ParquetInputFormat(
+   Path path, TypeInformation[] fieldTypes, String[] fieldNames) {
+   super(path);
+   this.readType = new RowTypeInfo(fieldTypes, fieldNames);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   this.unsplittable = true;
+   }
+
+   @Override
+   public Tuple2 getCurrentState() {
+   return new Tuple2<>(this.lastSyncedBlock, 
this.recordsReadSinceLastSync);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   org.apache.hadoop.conf.Configuration configuration = new 
org.apache.hadoop.conf.Configuration();
+   InputFile inputFile =
+   HadoopInputFile.fromPath(new 
org.apache.hadoop.fs.Path(split.getPath().toUri()), configuration);
+   ParquetReadOptions options = 
ParquetReadOptions.builder().build();
+   ParquetFileReader fileReader = new ParquetFileReader(inputFile, 
options);
+   MessageType schema = fileReader.getFileMetaData().getSchema();
+   checkSchema(schema);
+   MessageType readSchema = 
ParquetSchemaConverter.toParquetType(readType);
 
 Review comment:
   The checkSchema function needs to read the parquet footer first. It is too 
heavy to open and read through all of the files in constructor. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific