Re: [I] Nested object support in Hudi Table using Flink [hudi]

2024-04-08 Thread via GitHub


waytoharish closed issue #10895: Nested object support in Hudi Table using Flink
URL: https://github.com/apache/hudi/issues/10895


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Nested object support in Hudi Table using Flink [hudi]

2024-04-08 Thread via GitHub


waytoharish commented on issue #10895:
URL: https://github.com/apache/hudi/issues/10895#issuecomment-2042890195

   Thanks @ad1happy2go @danny0405 its worked for me after using the 
GenericRowData. 
   
   I am closing the issue 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Nested object support in Hudi Table using Flink [hudi]

2024-04-08 Thread via GitHub


waytoharish commented on issue #10895:
URL: https://github.com/apache/hudi/issues/10895#issuecomment-2042234221

   I am keep getting 
   
   lang.ClassCastException: class 
org.apache.flink.table.data.binary.BinaryStringData cannot be cast to class 
org.apache.flink.table.data.ArrayData 
(org.apache.flink.table.data.binary.BinaryStringData and 
org.apache.flink.table.data.ArrayData are in unnamed module of loader 'app')
at 
org.apache.flink.table.data.GenericRowData.getArray(GenericRowData.java:195) 
~[flink-table-common-1.15.0.jar:1.15.0]
at 
org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$10(RowData.java:265)
 ~[flink-table-common-1.15.0.jar:1.15.0]
at 
org.apache.flink.table.data.RowData.lambda$createFieldGetter$25774257$1(RowData.java:296)
 ~[flink-table-common-1.15.0.jar:1.15.0]
at 
org.apache.hudi.util.RowDataToAvroConverters$11.convert(RowDataToAvroConverters.java:271)
 ~[hudi-flink1.15-bundle-0.14.0.jar:0.14.0]
at 
org.apache.hudi.util.RowDataToAvroConverters$10.convert(RowDataToAvroConverters.java:239)
 ~[hudi-flink1.15-bundle-0.14.0.jar:0.14.0]
at 
org.apache.hudi.sink.transform.RowDataToHoodieFunction.toHoodieRecord(RowDataToHoodieFunction.java:109)
 ~[hudi-flink1.15-bundle-0.14.0.jar:0.14.0]
at 
org.apache.hudi.sink.transform.RowDataToHoodieFunction.map(RowDataToHoodieFunction.java:97)
 ~[hudi-flink1.15-bundle-0.14.0.jar:0.14.0]
at 
org.apache.hudi.sink.transform.RowDataToHoodieFunction.map(RowDataToHoodieFunction.java:46)
 ~[hudi-flink1.15-bundle-0.14.0.jar:0.14.0]
at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Nested object support in Hudi Table using Flink [hudi]

2024-04-08 Thread via GitHub


waytoharish commented on issue #10895:
URL: https://github.com/apache/hudi/issues/10895#issuecomment-2042041118

   Hi @ad1happy2go ,
   I have added the -
 @Override
   public RowData map(Telemetry kafkaRecord) throws Exception {
   GenericRowData row = new GenericRowData(6);
   row.setField(0, 
StringData.fromString(kafkaRecord.getCampaignName()));
   row.setField(1,StringData.fromString("name9"));
   row.setField(2,23);
   row.setField(3, TimestampData.fromEpochMillis(1));
   row.setField(4,StringData.fromString("p1"));
   row.setField(5,kafkaRecord.getData());
   
   return row;
   }
   
   I can see data pushed to the S3 but there is no table in the Glue 
has been created
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Nested object support in Hudi Table using Flink [hudi]

2024-04-02 Thread via GitHub


waytoharish commented on issue #10895:
URL: https://github.com/apache/hudi/issues/10895#issuecomment-2031270238

   HI @ad1happy2go Still facing the issue . Not able to figure out how to do 
with GenericRowData


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Nested object support in Hudi Table using Flink [hudi]

2024-04-01 Thread via GitHub


ad1happy2go commented on issue #10895:
URL: https://github.com/apache/hudi/issues/10895#issuecomment-2029718076

   @waytoharish Did you got a chance to try out GenericRowData, Are you still 
facing the issue?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Nested object support in Hudi Table using Flink [hudi]

2024-03-25 Thread via GitHub


danny0405 commented on issue #10895:
URL: https://github.com/apache/hudi/issues/10895#issuecomment-2019378477

   Another way is to use the `GenericRowData`, it's much more simpler or just 
the `Row` and convert it back to `RowData` with utility.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Nested object support in Hudi Table using Flink [hudi]

2024-03-25 Thread via GitHub


waytoharish commented on issue #10895:
URL: https://github.com/apache/hudi/issues/10895#issuecomment-2019330463

   @danny0405 @ad1happy2go Please can you help me to understand if this is a 
Bug or I am doing anything wrong


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Nested object support in Hudi Table using Flink [hudi]

2024-03-22 Thread via GitHub


waytoharish commented on issue #10895:
URL: https://github.com/apache/hudi/issues/10895#issuecomment-2014848588

   Thanks for your time @ad1happy2go 
@ad1happy2go @danny0405 Here is the error which I am getting after the code 
change :

14:38:06,411 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph   
[] - Source: Kafka Source -> (Map -> row_data_to_hoodie_record, Sink: Print 
to Std. Out) (6/10) (b41f7215661da4d1d3d1c157a58c57e4) switched from RUNNING to 
FAILED on 8d293143-6737-4451-a2c5-4e4f6f85cbd4 @ localhost (dataPort=-1).
   java.io.IOException: Failed to deserialize consumer record due to
at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
 ~[flink-connector-kafka-1.15.0.jar:1.15.0]
at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
 ~[flink-connector-kafka-1.15.0.jar:1.15.0]
at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
 ~[flink-connector-base-1.15.0.jar:1.15.0]
at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) 
~[flink-streaming-java-1.15.0.jar:1.15.0]
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
 ~[flink-runtime-1.15.0.jar:1.15.0]
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
~[flink-runtime-1.15.0.jar:1.15.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) 
~[flink-runtime-1.15.0.jar:1.15.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 
~[flink-runtime-1.15.0.jar:1.15.0]
at java.lang.Thread.run(Thread.java:829) ~[?:?]
   Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
at 
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
at 
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
 ~[flink-connector-kafka-1.15.0.jar:1.15.0]
at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
 ~[flink-core-1.15.0.jar:1.15.0]
at 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
 ~[flink-connector-kafka-1.15.0.jar:1.15.0]
at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
 ~[flink-connector-kafka-1.15.0.jar:1.15.0]
... 14 more
   Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
 

Re: [I] Nested object support in Hudi Table using Flink [hudi]

2024-03-22 Thread via GitHub


ad1happy2go commented on issue #10895:
URL: https://github.com/apache/hudi/issues/10895#issuecomment-2014803349

   @waytoharish As discussed on call, can you provide latest code and exception 
what we were getting.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Nested object support in Hudi Table using Flink [hudi]

2024-03-21 Thread via GitHub


waytoharish commented on issue #10895:
URL: https://github.com/apache/hudi/issues/10895#issuecomment-2011376344

   Thank @danny0405 
   
   I am able to map the arraytype using below but not sure if I am doing it 
correctly
   
   private static HoodiePipeline.Builder createHudiPipeline(String targetTable, 
Map options) {
   return HoodiePipeline.builder(targetTable)
   .column("uuid VARCHAR(256)")
   .column("name VARCHAR(10)")
   .column("age INT")
   .column("ts TIMESTAMP(3)")
   .column("data ARRAY>")
   .column("`partition` VARCHAR(20)")
   .pk("uuid")
   .partition("partition")
   .options(options);
   }
   
   // Define the schema for Hudi records
   
   public static final DataType ROW_ARRAY_DATA_TYPE = 
ROW(DataTypes.FIELD("measure_name", DataTypes.VARCHAR(256)), // record key
   DataTypes.FIELD("type", DataTypes.VARCHAR(10))).notNull();
   public static final DataType ROW_DATA_TYPE = ROW(
   DataTypes.FIELD("uuid", DataTypes.VARCHAR(256)), // 
record key
   DataTypes.FIELD("name", DataTypes.VARCHAR(10)),
   DataTypes.FIELD("age", DataTypes.INT()),
   DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)),
   DataTypes.FIELD("partition", DataTypes.VARCHAR(10)),
   
DataTypes.FIELD("data",DataTypes.ARRAY(ROW_ARRAY_DATA_TYPE))
   
   
  )
   .notNull();
   
   Now I am trying to figure out how to map it to rowdata where I 
tried like
   
   static class HudiDataSource implements MapFunction {
   @Override
   public RowData map(Telemetry kafkaRecord) throws Exception {
   
   return 
insertRow(StringData.fromString(kafkaRecord.getCampaignName()), 
StringData.fromString("Danny"), 23,
   TimestampData.fromEpochMillis(1), 
StringData.fromString("par1"),insertRow(StringData.fromString(kafkaRecord.getCampaignName()),
 StringData.fromString("Danny"))
  );
   }
   }
   
   but its falling with
   
   static class HudiDataSource implements MapFunction {
   @Override
   public RowData map(Telemetry kafkaRecord) throws Exception {
   
   return 
insertRow(StringData.fromString(kafkaRecord.getCampaignName()), 
StringData.fromString("Danny"), 23,
   TimestampData.fromEpochMillis(1), 
StringData.fromString("par1"),insertRow(StringData.fromString(kafkaRecord.getCampaignName()),
 StringData.fromString("Danny"))
  );
   }
   }
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Nested object support in Hudi Table using Flink [hudi]

2024-03-21 Thread via GitHub


danny0405 commented on issue #10895:
URL: https://github.com/apache/hudi/issues/10895#issuecomment-2011334497

   Hmm, i guess it is because the HoodiePipeline clazz does not concatenate the 
fiels in good shape, that might be a bug.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Nested object support in Hudi Table using Flink [hudi]

2024-03-21 Thread via GitHub


waytoharish commented on issue #10895:
URL: https://github.com/apache/hudi/issues/10895#issuecomment-2011288296

   Hi @danny0405 I dont see the SQL in Flink.
   
   Here is my code which I am trying in Java 
   
   `public static final DataType ROW_DATA_TYPE = ROW(
   DataTypes.FIELD("uuid", DataTypes.VARCHAR(256)), // record key
   DataTypes.FIELD("name", DataTypes.VARCHAR(10)),
   DataTypes.FIELD("age", DataTypes.INT()),
   DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)),
   DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)),
   DataTypes.FIELD("data", DataTypes.ARRAY(
   ROW(
   DataTypes.FIELD("test", DataTypes.VARCHAR(256)), // record key
   DataTypes.FIELD("type", DataTypes.VARCHAR(10))
   
))),
   DataTypes.FIELD("partition", DataTypes.VARCHAR(10))
  )
   .notNull();`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Nested object support in Hudi Table using Flink [hudi]

2024-03-20 Thread via GitHub


danny0405 commented on issue #10895:
URL: https://github.com/apache/hudi/issues/10895#issuecomment-2011275481

   Can you show us the complete sql text?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Nested object support in Hudi Table using Flink [hudi]

2024-03-20 Thread via GitHub


waytoharish commented on issue #10895:
URL: https://github.com/apache/hudi/issues/10895#issuecomment-2011212848

   @ad1happy2go  I am getting following error when tried with 
   DataTypes.FIELD("data", DataTypes.ARRAY(
   ROW(
   DataTypes.FIELD("test", DataTypes.VARCHAR(256)), // record key
   DataTypes.FIELD("type", DataTypes.VARCHAR(10))
   
)))
   
   Exception in thread "main" org.apache.flink.table.api.SqlParserException: 
SQL parse failed. Incorrect syntax near the keyword 'ARRAY' at line 6, column 8.
   Was expecting one of:
   "ROW" ...
...
...
...
...
...
...
   "STRING" ...
   "BYTES" ...
   "TIMESTAMP_LTZ" ...
   "ARRAY" ...
   "ARRAY" "<" ...
   "RAW" ...
   "BOOLEAN" ...
   "INTEGER" ...
   "INT" ...
   "TINYINT" ...
   "SMALLINT" ...
   "BIGINT" ...
   "REAL" ...
   "DOUBLE" ...
   "FLOAT" ...
   "BINARY" ...
   "VARBINARY" ...
   "DECIMAL" ...
   "DEC" ...
   "NUMERIC" ...
   "ANY" ...
   "CHARACTER" ...
   "CHAR" ...
   "VARCHAR" ...
   "DATE" ...
   "TIME" ...
   "TIMESTAMP" ...
   
at 
org.apache.flink.table.planner.parse.CalciteParser.parseSqlList(CalciteParser.java:82)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:695)
at 
org.apache.hudi.util.HoodiePipeline$Builder.getTableDescriptor(HoodiePipeline.java:172)
at 
org.apache.hudi.util.HoodiePipeline$Builder.sink(HoodiePipeline.java:162)
at 
com.hudi.flink.quickstart.Kafka2HudiPipelineNew.main(Kafka2HudiPipelineNew.java:77)
   Caused by: org.apache.calcite.sql.parser.SqlParseException: Incorrect syntax 
near the keyword 'ARRAY' at line 6, column 8.
   Was expecting one of:
   "ROW" ...
...
...
...
...
...
...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Nested object support in Hudi Table using Flink [hudi]

2024-03-20 Thread via GitHub


ad1happy2go commented on issue #10895:
URL: https://github.com/apache/hudi/issues/10895#issuecomment-2009802993

   @waytoharish Can you let us know what error/issue you are facing? Or is it 
giving the compilation error only?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org