SQL for Avro GenericRecords on Parquet

2019-11-18 Thread Hanan Yehudai
I have tried to persist Generic Avro records in a parquet file and then read it 
via ParquetTablesource – using SQL.
Seems that the SQL I not executed properly !

The persisted records are :
Id  ,  type
333,Type1
22,Type2
333,Type1
22,Type2
333,Type1
22,Type2
333,Type1
22,Type2
333,Type1
22,Type2
333,Type1
22,Type2

While SQL  of SELECT id  ,recordType_  FROM ParquetTable  - return the above ( 
which is correct)
Running  : "SELECT id  ,recordType_  FROM ParquetTable  where 
recordType_='Type1' "
Will result in :
333,Type1
22,Type1
333,Type1
22,Type1
333,Type1
22,Type1
333,Type1
22,Type1
333,Type1
22,Type1
333,Type1
22,Type1

As if the equal sign is assignment and not equal …

am I doing something wrong ? is it an issue of Generic record vs 
SpecificRecords ?




Re: SQL for Avro GenericRecords on Parquet

2019-11-18 Thread Peter Huang
Hi Hanan,

Thanks for reporting the issue. Would you please attach your test code
here? I may help to investigate.



Best Regards
Peter Huang

On Mon, Nov 18, 2019 at 2:51 AM Hanan Yehudai 
wrote:

> I have tried to persist Generic Avro records in a parquet file and then
> read it via ParquetTablesource – using SQL.
> Seems that the SQL I not executed properly !
>
> The persisted records are :
> Id  ,  type
> 333,Type1
> 22,Type2
> 333,Type1
> 22,Type2
> 333,Type1
> 22,Type2
> 333,Type1
> 22,Type2
> 333,Type1
> 22,Type2
> 333,Type1
> 22,Type2
>
> While SQL  of SELECT id  ,recordType_  FROM ParquetTable  - return the
> above ( which is correct)
> Running  : "SELECT id  ,recordType_  FROM ParquetTable  where
> recordType_='Type1' "
> Will result in :
> 333,Type1
> 22,Type1
> 333,Type1
> 22,Type1
> 333,Type1
> 22,Type1
> 333,Type1
> 22,Type1
> 333,Type1
> 22,Type1
> 333,Type1
> 22,Type1
>
> As if the equal sign is assignment and not equal …
>
> am I doing something wrong ? is it an issue of Generic record vs
> SpecificRecords ?
>
>
>


RE: SQL for Avro GenericRecords on Parquet

2019-11-18 Thread Hanan Yehudai
chm);
ParquetTableSource parquetTableSource = ParquetTableSource.builder()
.path(path.getPath())
.forParquetSchema(nestedSchema)
.build();
return parquetTableSource;
}

@Test
public void testScan() throws Exception {
ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

BatchTableEnvironment batchTableEnvironment  =
BatchTableEnvironment.create(env);
ParquetTableSource tableSource = createParquetTableSource(testPath);
batchTableEnvironment.registerTableSource("ParquetTable", tableSource);

 Table tab = batchTableEnvironment.sqlQuery("select id,recordType_  
from ParquetTable where id > 22 ");

DataSet result = batchTableEnvironment.toDataSet(tab, Row.class);

result.print();

}


}


From: Peter Huang 
Sent: Monday, November 18, 2019 7:22 PM
To: dev 
Cc: user@flink.apache.org
Subject: Re: SQL for Avro GenericRecords on Parquet

Hi Hanan,

Thanks for reporting the issue. Would you please attach your test code here? I 
may help to investigate.



Best Regards
Peter Huang

On Mon, Nov 18, 2019 at 2:51 AM Hanan Yehudai 
mailto:hanan.yehu...@radcom.com>> wrote:
I have tried to persist Generic Avro records in a parquet file and then read it 
via ParquetTablesource – using SQL.
Seems that the SQL I not executed properly !

The persisted records are :
Id  ,  type
333,Type1
22,Type2
333,Type1
22,Type2
333,Type1
22,Type2
333,Type1
22,Type2
333,Type1
22,Type2
333,Type1
22,Type2

While SQL  of SELECT id  ,recordType_  FROM ParquetTable  - return the above ( 
which is correct)
Running  : "SELECT id  ,recordType_  FROM ParquetTable  where 
recordType_='Type1' "
Will result in :
333,Type1
22,Type1
333,Type1
22,Type1
333,Type1
22,Type1
333,Type1
22,Type1
333,Type1
22,Type1
333,Type1
22,Type1

As if the equal sign is assignment and not equal …

am I doing something wrong ? is it an issue of Generic record vs 
SpecificRecords ?



Re: SQL for Avro GenericRecords on Parquet

2019-11-26 Thread Peter Huang
t;}";
>
>
>
> private static final AvroSchemaConverter SCHEMA_CONVERTER = new
> AvroSchemaConverter();
>
> private static Schema schm = new Schema.Parser().parse(avroSchema);
>
> private static Path testPath;
>
>
>
>
>
> public ParquetTestCase() {
>
> super(TestExecutionMode.COLLECTION);
>
> }
>
>
>
>
>
> @BeforeClass
>
> public static void setup() throws Exception {
>
>
>
> GenericRecordBuilder genericRecordBuilder = new
> GenericRecordBuilder(schm);
>
>
>
>
>
> List recs = new ArrayList<>();
>
> for (int i = 0; i < 6; i++) {
>
> GenericRecord gr = genericRecordBuilder.set("timestamp_edr",
> System.currentTimeMillis() / 1000).set("id", 333L).set("recordType_",
> "Type1").build();
>
> recs.add(gr);
>
> GenericRecord gr2 = genericRecordBuilder.set("timestamp_edr",
> System.currentTimeMillis() / 1000).set("id", 22L).set("recordType_",
> "Type2").build();
>
> recs.add(gr2);
>
> }
>
>
>
> testPath = new Path("/tmp",  UUID.randomUUID().toString());
>
>
>
>
>
> ParquetWriter writer =
> AvroParquetWriter.builder(
>
> new
> org.apache.hadoop.fs.Path(testPath.toUri())).withSchema(schm).build();
>
>
>
> for (IndexedRecord record : recs) {
>
> writer.write(record);
>
> }
>
> writer.close();
>
> }
>
>
>
>
>
> private ParquetTableSource createParquetTableSource(Path path) throws
> IOException {
>
> MessageType nestedSchema = SCHEMA_CONVERTER.convert(schm);
>
> ParquetTableSource parquetTableSource =
> ParquetTableSource.builder()
>
> .path(path.getPath())
>
> .forParquetSchema(nestedSchema)
>
> .build();
>
> return parquetTableSource;
>
> }
>
>
>
> @Test
>
> public void testScan() throws Exception {
>
> ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>
>
>
> BatchTableEnvironment batchTableEnvironment  =
> BatchTableEnvironment.create(env);
>
> ParquetTableSource tableSource =
> createParquetTableSource(testPath);
>
> batchTableEnvironment.registerTableSource("ParquetTable",
> tableSource);
>
>
>
>  Table tab = batchTableEnvironment.sqlQuery("select
> id,recordType_  from ParquetTable where id > 22 ");
>
>
>
> DataSet result = batchTableEnvironment.toDataSet(tab,
> Row.class);
>
>
>
> result.print();
>
>
>
> }
>
>
>
>
>
> }
>
>
>
>
>
> *From:* Peter Huang 
> *Sent:* Monday, November 18, 2019 7:22 PM
> *To:* dev 
> *Cc:* user@flink.apache.org
> *Subject:* Re: SQL for Avro GenericRecords on Parquet
>
>
>
> Hi Hanan,
>
>
>
> Thanks for reporting the issue. Would you please attach your test code
> here? I may help to investigate.
>
>
>
>
>
>
>
> Best Regards
>
> Peter Huang
>
>
>
> On Mon, Nov 18, 2019 at 2:51 AM Hanan Yehudai 
> wrote:
>
> I have tried to persist Generic Avro records in a parquet file and then
> read it via ParquetTablesource – using SQL.
> Seems that the SQL I not executed properly !
>
> The persisted records are :
> Id  ,  type
> 333,Type1
> 22,Type2
> 333,Type1
> 22,Type2
> 333,Type1
> 22,Type2
> 333,Type1
> 22,Type2
> 333,Type1
> 22,Type2
> 333,Type1
> 22,Type2
>
> While SQL  of SELECT id  ,recordType_  FROM ParquetTable  - return the
> above ( which is correct)
> Running  : "SELECT id  ,recordType_  FROM ParquetTable  where
> recordType_='Type1' "
> Will result in :
> 333,Type1
> 22,Type1
> 333,Type1
> 22,Type1
> 333,Type1
> 22,Type1
> 333,Type1
> 22,Type1
> 333,Type1
> 22,Type1
> 333,Type1
> 22,Type1
>
> As if the equal sign is assignment and not equal …
>
> am I doing something wrong ? is it an issue of Generic record vs
> SpecificRecords ?
>
>


Re: SQL for Avro GenericRecords on Parquet

2019-12-09 Thread Peter Huang
t;
>> "  \"fields\": [\n" +
>>
>> "{ \"default\": null, \"name\": \"timestamp_edr\",
>> \"type\": [ \"null\", \"long\" ]},\n" +
>>
>> "{ \"default\": null, \"name\": \"id\", \"type\": [
>> \"null\", \"long\" ]},\n" +
>>
>> "{ \"default\": null, \"name\": \"recordType_\",
>> \"type\": [ \"null\", \"string\"]}\n" +
>>
>> "  ],\n" +
>>
>> "  \"schema_id\": 1,\n" +
>>
>> "  \"type\": \"record\"\n" +
>>
>> "}";
>>
>>
>>
>> private static final AvroSchemaConverter SCHEMA_CONVERTER = new
>> AvroSchemaConverter();
>>
>> private static Schema schm = new Schema.Parser().parse(avroSchema);
>>
>> private static Path testPath;
>>
>>
>>
>>
>>
>> public ParquetTestCase() {
>>
>> super(TestExecutionMode.COLLECTION);
>>
>> }
>>
>>
>>
>>
>>
>> @BeforeClass
>>
>> public static void setup() throws Exception {
>>
>>
>>
>> GenericRecordBuilder genericRecordBuilder = new
>> GenericRecordBuilder(schm);
>>
>>
>>
>>
>>
>> List recs = new ArrayList<>();
>>
>> for (int i = 0; i < 6; i++) {
>>
>> GenericRecord gr = genericRecordBuilder.set("timestamp_edr",
>> System.currentTimeMillis() / 1000).set("id", 333L).set("recordType_",
>> "Type1").build();
>>
>> recs.add(gr);
>>
>> GenericRecord gr2 = genericRecordBuilder.set("timestamp_edr",
>> System.currentTimeMillis() / 1000).set("id", 22L).set("recordType_",
>> "Type2").build();
>>
>> recs.add(gr2);
>>
>>         }
>>
>>
>>
>> testPath = new Path("/tmp",  UUID.randomUUID().toString());
>>
>>
>>
>>
>>
>> ParquetWriter writer =
>> AvroParquetWriter.builder(
>>
>> new
>> org.apache.hadoop.fs.Path(testPath.toUri())).withSchema(schm).build();
>>
>>
>>
>> for (IndexedRecord record : recs) {
>>
>> writer.write(record);
>>
>> }
>>
>> writer.close();
>>
>> }
>>
>>
>>
>>
>>
>> private ParquetTableSource createParquetTableSource(Path path) throws
>> IOException {
>>
>> MessageType nestedSchema = SCHEMA_CONVERTER.convert(schm);
>>
>> ParquetTableSource parquetTableSource =
>> ParquetTableSource.builder()
>>
>> .path(path.getPath())
>>
>> .forParquetSchema(nestedSchema)
>>
>> .build();
>>
>> return parquetTableSource;
>>
>> }
>>
>>
>>
>> @Test
>>
>> public void testScan() throws Exception {
>>
>> ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>>
>>
>>
>> BatchTableEnvironment batchTableEnvironment  =
>> BatchTableEnvironment.create(env);
>>
>> ParquetTableSource tableSource =
>> createParquetTableSource(testPath);
>>
>> batchTableEnvironment.registerTableSource("ParquetTable",
>> tableSource);
>>
>>
>>
>>  Table tab = batchTableEnvironment.sqlQuery("select
>> id,recordType_  from ParquetTable where id > 22 ");
>>
>>
>>
>> DataSet result = batchTableEnvironment.toDataSet(tab,
>> Row.class);
>>
>>
>>
>> result.print();
>>
>>
>>
>> }
>>
>>
>>
>>
>>
>> }
>>
>>
>>
>>
>>
>> *From:* Peter Huang 
>> *Sent:* Monday, November 18, 2019 7:22 PM
>> *To:* dev 
>> *Cc:* user@flink.apache.org
>> *Subject:* Re: SQL for Avro GenericRecords on Parquet
>>
>>
>>
>> Hi Hanan,
>>
>>
>>
>> Thanks for reporting the issue. Would you please attach your test code
>> here? I may help to investigate.
>>
>>
>>
>>
>>
>>
>>
>> Best Regards
>>
>> Peter Huang
>>
>>
>>
>> On Mon, Nov 18, 2019 at 2:51 AM Hanan Yehudai 
>> wrote:
>>
>> I have tried to persist Generic Avro records in a parquet file and then
>> read it via ParquetTablesource – using SQL.
>> Seems that the SQL I not executed properly !
>>
>> The persisted records are :
>> Id  ,  type
>> 333,Type1
>> 22,Type2
>> 333,Type1
>> 22,Type2
>> 333,Type1
>> 22,Type2
>> 333,Type1
>> 22,Type2
>> 333,Type1
>> 22,Type2
>> 333,Type1
>> 22,Type2
>>
>> While SQL  of SELECT id  ,recordType_  FROM ParquetTable  - return the
>> above ( which is correct)
>> Running  : "SELECT id  ,recordType_  FROM ParquetTable  where
>> recordType_='Type1' "
>> Will result in :
>> 333,Type1
>> 22,Type1
>> 333,Type1
>> 22,Type1
>> 333,Type1
>> 22,Type1
>> 333,Type1
>> 22,Type1
>> 333,Type1
>> 22,Type1
>> 333,Type1
>> 22,Type1
>>
>> As if the equal sign is assignment and not equal …
>>
>> am I doing something wrong ? is it an issue of Generic record vs
>> SpecificRecords ?
>>
>>