Hi Hanan,

I created a fix for the problem. Would you please try it from your side?
https://github.com/apache/flink/pull/10371


Best Regards
Peter Huang

On Tue, Nov 26, 2019 at 8:07 AM Peter Huang <huangzhenqiu0...@gmail.com>
wrote:

> Hi Hanan,
>
> After investigating the issue by using the test case you provided, I think
> there is a big in it. Currently, the parquet predicts push down use the
> predicate literal type to construct the FilterPredicate.
> The issue happens when the data type of value in predicate inferred from
> SQL doesn't match the parquet schema. For example, foo is a long type, foo
> < 1 is the predicate. Literal will be recognized as an integration. It
> causes the parquet FilterPredicate is mistakenly created for the column of
> Integer type. I created a ticket for the issue.
> https://issues.apache.org/jira/browse/FLINK-14953. Please also add more
> insight by comment directly on it.
>
>
> Best Regards
> Peter Huang
>
> On Mon, Nov 18, 2019 at 12:40 PM Hanan Yehudai <hanan.yehu...@radcom.com>
> wrote:
>
>> HI Peter.  Thanks.
>>
>> This is my code .  I used one of the parquet / avro tests as a reference.
>>
>>
>>
>> The code will fail on
>>
>> *Test testScan(ParquetTestCase) failed with:*
>>
>> *java.lang.UnsupportedOperationException*
>>
>> *               at
>> org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate$ValueInspector.update(IncrementallyUpdatedFilterPredicate.java:71)*
>>
>> *               at
>> org.apache.parquet.filter2.recordlevel.FilteringPrimitiveConverter.addLong(FilteringPrimitiveConverter.java:105)*
>>
>> *               at
>> org.apache.parquet.column.impl.ColumnReaderImpl$2$4.writeValue(ColumnReaderImpl.java:268)*
>>
>>
>>
>>
>>
>> CODE :
>>
>>
>>
>> import org.apache.avro.Schema;
>>
>> import org.apache.avro.generic.GenericRecord;
>>
>> import org.apache.avro.generic.GenericRecordBuilder;
>>
>> import org.apache.avro.specific.SpecificRecord;
>>
>> import org.apache.avro.specific.SpecificRecordBuilderBase;
>>
>> import org.apache.flink.api.common.typeinfo.Types;
>>
>> import org.apache.flink.api.java.DataSet;
>>
>> import org.apache.flink.api.java.ExecutionEnvironment;
>>
>> import org.apache.flink.api.java.io.ParallelIteratorInputFormat;
>>
>> import org.apache.flink.api.java.io.TupleCsvInputFormat;
>>
>> import org.apache.flink.api.java.tuple.Tuple;
>>
>> import org.apache.flink.core.fs.FileSystem;
>>
>> import org.apache.flink.core.fs.Path;
>>
>>
>>
>> import org.apache.flink.formats.parquet.ParquetTableSource;
>>
>> import org.apache.flink.streaming.api.datastream.DataStream;
>>
>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>
>> import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
>>
>> import org.apache.flink.table.api.Table;
>>
>> import org.apache.flink.table.api.TableEnvironment;
>>
>> import org.apache.flink.table.api.java.BatchTableEnvironment;
>>
>>
>>
>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>>
>> import org.apache.flink.table.sinks.CsvTableSink;
>>
>> import org.apache.flink.table.sinks.TableSink;
>>
>> import org.apache.flink.test.util.MultipleProgramsTestBase;
>>
>> import org.apache.flink.types.Row;
>>
>>
>>
>> import org.apache.avro.generic.IndexedRecord;
>>
>> import org.apache.parquet.avro.AvroSchemaConverter;
>>
>> import org.apache.parquet.schema.MessageType;
>>
>> import org.junit.BeforeClass;
>>
>> import org.junit.ClassRule;
>>
>> import org.junit.Test;
>>
>> import org.junit.rules.TemporaryFolder;
>>
>>
>>
>> import java.io.IOException;
>>
>> import java.util.ArrayList;
>>
>> import java.util.List;
>>
>> import java.util.UUID;
>>
>>
>>
>> import static org.junit.Assert.assertEquals;
>>
>>
>>
>> import org.apache.parquet.avro.AvroParquetWriter;
>>
>> import org.apache.parquet.hadoop.ParquetWriter;
>>
>>
>>
>>
>>
>> public class  ParquetTestCase extends MultipleProgramsTestBase {
>>
>>
>>
>>     private static String avroSchema = "{\n" +
>>
>>             "  \"name\": \"SimpleRecord\",\n" +
>>
>>             "  \"type\": \"record\",\n" +
>>
>>             "  \"fields\": [\n" +
>>
>>             "    { \"default\": null, \"name\": \"timestamp_edr\",
>> \"type\": [ \"null\", \"long\" ]},\n" +
>>
>>             "    { \"default\": null, \"name\": \"id\", \"type\": [
>> \"null\", \"long\" ]},\n" +
>>
>>             "    { \"default\": null, \"name\": \"recordType_\",
>> \"type\": [ \"null\", \"string\"]}\n" +
>>
>>             "  ],\n" +
>>
>>             "  \"schema_id\": 1,\n" +
>>
>>             "  \"type\": \"record\"\n" +
>>
>>             "}";
>>
>>
>>
>>     private static final AvroSchemaConverter SCHEMA_CONVERTER = new
>> AvroSchemaConverter();
>>
>>     private static Schema schm = new Schema.Parser().parse(avroSchema);
>>
>>     private static Path testPath;
>>
>>
>>
>>
>>
>>     public ParquetTestCase() {
>>
>>         super(TestExecutionMode.COLLECTION);
>>
>>     }
>>
>>
>>
>>
>>
>>     @BeforeClass
>>
>>     public static void setup() throws Exception {
>>
>>
>>
>>         GenericRecordBuilder genericRecordBuilder = new
>> GenericRecordBuilder(schm);
>>
>>
>>
>>
>>
>>         List<IndexedRecord> recs = new ArrayList<>();
>>
>>         for (int i = 0; i < 6; i++) {
>>
>>             GenericRecord gr = genericRecordBuilder.set("timestamp_edr",
>> System.currentTimeMillis() / 1000).set("id", 3333333L).set("recordType_",
>> "Type1").build();
>>
>>             recs.add(gr);
>>
>>             GenericRecord gr2 = genericRecordBuilder.set("timestamp_edr",
>> System.currentTimeMillis() / 1000).set("id", 222222L).set("recordType_",
>> "Type2").build();
>>
>>             recs.add(gr2);
>>
>>         }
>>
>>
>>
>>         testPath = new Path("/tmp",  UUID.randomUUID().toString());
>>
>>
>>
>>
>>
>>         ParquetWriter<IndexedRecord> writer =
>> AvroParquetWriter.<IndexedRecord>builder(
>>
>>                 new
>> org.apache.hadoop.fs.Path(testPath.toUri())).withSchema(schm).build();
>>
>>
>>
>>         for (IndexedRecord record : recs) {
>>
>>             writer.write(record);
>>
>>         }
>>
>>         writer.close();
>>
>>     }
>>
>>
>>
>>
>>
>>     private ParquetTableSource createParquetTableSource(Path path) throws
>> IOException {
>>
>>         MessageType nestedSchema = SCHEMA_CONVERTER.convert(schm);
>>
>>         ParquetTableSource parquetTableSource =
>> ParquetTableSource.builder()
>>
>>                 .path(path.getPath())
>>
>>                 .forParquetSchema(nestedSchema)
>>
>>                 .build();
>>
>>         return parquetTableSource;
>>
>>     }
>>
>>
>>
>>     @Test
>>
>>     public void testScan() throws Exception {
>>
>>         ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>>
>>
>>
>>         BatchTableEnvironment batchTableEnvironment  =
>> BatchTableEnvironment.create(env);
>>
>>         ParquetTableSource tableSource =
>> createParquetTableSource(testPath);
>>
>>         batchTableEnvironment.registerTableSource("ParquetTable",
>> tableSource);
>>
>>
>>
>>          Table tab = batchTableEnvironment.sqlQuery("select
>> id,recordType_  from ParquetTable where id > 222222 ");
>>
>>
>>
>>         DataSet<Row> result = batchTableEnvironment.toDataSet(tab,
>> Row.class);
>>
>>
>>
>>         result.print();
>>
>>
>>
>>     }
>>
>>
>>
>>
>>
>> }
>>
>>
>>
>>
>>
>> *From:* Peter Huang <huangzhenqiu0...@gmail.com>
>> *Sent:* Monday, November 18, 2019 7:22 PM
>> *To:* dev <d...@flink.apache.org>
>> *Cc:* user@flink.apache.org
>> *Subject:* Re: SQL for Avro GenericRecords on Parquet
>>
>>
>>
>> Hi Hanan,
>>
>>
>>
>> Thanks for reporting the issue. Would you please attach your test code
>> here? I may help to investigate.
>>
>>
>>
>>
>>
>>
>>
>> Best Regards
>>
>> Peter Huang
>>
>>
>>
>> On Mon, Nov 18, 2019 at 2:51 AM Hanan Yehudai <hanan.yehu...@radcom.com>
>> wrote:
>>
>> I have tried to persist Generic Avro records in a parquet file and then
>> read it via ParquetTablesource – using SQL.
>> Seems that the SQL I not executed properly !
>>
>> The persisted records are :
>> Id  ,  type
>> 3333333,Type1
>> 222222,Type2
>> 3333333,Type1
>> 222222,Type2
>> 3333333,Type1
>> 222222,Type2
>> 3333333,Type1
>> 222222,Type2
>> 3333333,Type1
>> 222222,Type2
>> 3333333,Type1
>> 222222,Type2
>>
>> While SQL  of SELECT id  ,recordType_  FROM ParquetTable  - return the
>> above ( which is correct)
>> Running  : "SELECT id  ,recordType_  FROM ParquetTable  where
>> recordType_='Type1' "
>> Will result in :
>> 3333333,Type1
>> 222222,Type1
>> 3333333,Type1
>> 222222,Type1
>> 3333333,Type1
>> 222222,Type1
>> 3333333,Type1
>> 222222,Type1
>> 3333333,Type1
>> 222222,Type1
>> 3333333,Type1
>> 222222,Type1
>>
>> As if the equal sign is assignment and not equal …
>>
>> am I doing something wrong ? is it an issue of Generic record vs
>> SpecificRecords ?
>>
>>

Reply via email to