[GitHub] [hudi] rmnlchh commented on issue #9282: [ISSUE] Hudi 0.13.0. Spark 3.3.2 Deltastreamed table read failure

2023-07-26 Thread via GitHub


rmnlchh commented on issue #9282:
URL: https://github.com/apache/hudi/issues/9282#issuecomment-1651743365

   @ad1happy2go 
   Kafka topic record schema:
   {
 "type": "record",
 "name": "Creative",
 "namespace": "Cardlytics.Ops.Messages.Portal",
 "fields": [
   {
 "name": "CreativeId",
 "type": "string"
   },
   {
 "name": "PreMessageImpression",
 "type": "string",
 "default": ""
   },
   {
 "name": "PostMessageImpression",
 "type": "string",
 "default": ""
   },
   {
 "name": "Assets",
 "type": {
   "type": "array",
   "items": {
 "type": "record",
 "name": "AdAsset",
 "fields": [
   {
 "name": "Type",
 "type": {
   "type": "enum",
   "name": "AssetType",
   "symbols": [
 "Other",
 "Text",
 "Image",
 "Video",
 "Link"
   ],
   "default": "Other"
 }
   },
   {
 "name": "Slot",
 "type": "string"
   },
   {
 "name": "Label",
 "type": [
   "null",
   "string"
 ],
 "default": null
   },
   {
 "name": "Value",
 "type": "string"
   }
 ]
   }
 }
   }
 ]
   }
   DS transform query
   SELECT
   'Creative' Entity
   ,o.CreativeId
   ,o.PreMessageImpression
   ,o.PostMessageImpression
   ,o.Assets.Type AssetType
   ,o.Assets.Slot AssetSlot
   ,o.Assets.Label AssetLabel
   ,o.Assets.Value AssetValue
   FROM
   (SELECT a.CreativeId, a.PreMessageImpression, a.PostMessageImpression, 
explode(a.Assets) Assets
FROM
 a) o
  
   expected result table schema:
   {
 "type": "record",
 "name": "Creative",
 "namespace": "Cardlytics.Ops.Messages.Portal",
 "fields": [
   {
 "name": "Entity",
 "type": "string"
   },
   {
 "name": "CreativeId",
 "type": "string"
   },
   {
 "name": "PreMessageImpression",
 "type": "string",
 "default": ""
   },
   {
 "name": "PostMessageImpression",
 "type": "string",
 "default": ""
   },
   {
 "name": "AssetType",
 "type": "string"
   },
   {
 "name": "AssetSlot",
 "type": "string"
   },
   {
 "name": "AssetLabel",
 "type": [
   "null",
   "string"
 ],
 "default": null
   },
   {
 "name": "AssetValue",
 "type": "string"
   }
 ]
   }


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] rmnlchh commented on issue #9282: [ISSUE] Hudi 0.13.0. Spark 3.3.2 Deltastreamed table read failure

2023-07-25 Thread via GitHub


rmnlchh commented on issue #9282:
URL: https://github.com/apache/hudi/issues/9282#issuecomment-1650228180

   > @rmnlchh Just curious, Did you set these configs
   > 
   > ```
   > sc.set("spark.sql.legacy.parquet.nanosAsLong", "false");
   > sc.set("spark.sql.parquet.binaryAsString", "false");
   > sc.set("spark.sql.parquet.int96AsTimestamp", "true");
   > sc.set("spark.sql.caseSensitive", "false");
   > ```
   > 
   > with your deltastreamer also? I will try to reproduce this issue .
   
   Yes, adding all the DS configs
   println(s"hoodieDeltaStreamerConfig=$hoodieDeltaStreamerConfig")
   println(s"typedProperties=$typedProperties")
   println("HERE JSC" + jsc.getConf.getAll.mkString)
   val hoodieDeltaStreamer = new HoodieDeltaStreamer(hoodieDeltaStreamerConfig, 
jsc
, FSUtils.getFs(hoodieDeltaStreamerConfig.targetBasePath, conf), 
jsc.hadoopConfiguration
, org.apache.hudi.common.util.Option.of(typedProperties)
   )
   
hoodieDeltaStreamerConfig=Config{targetBasePath='/XXX/cdp-datapipeline-curation/cdp-datapipeline-curation/datalake-deltastreamer/./tmp/CreativeDeltaStreamerTest/Domain=CampaignBuild/Table=published_creative/',
 targetTableName='published_creative', tableType='MERGE_ON_READ', 
baseFileFormat='PARQUET', 
propsFilePath='file://XXX/cdp-datapipeline-curation/cdp-datapipeline-curation/datalake-deltastreamer/src/test/resources/delta-streamer-config/dfs-source.properties',
 configs=[], 
sourceClassName='org.apache.hudi.utilities.sources.AvroKafkaSource', 
sourceOrderingField='AssetValue', 
payloadClassName='org.apache.hudi.common.model.OverwriteWithLatestAvroPayload', 
schemaProviderClassName='com.cardlytics.datapipeline.deltastreamer.schema.ResourceBasedSchemaProvider',
 
transformerClassNames=[org.apache.hudi.utilities.transform.SqlQueryBasedTransformer],
 sourceLimit=9223372036854775807, operation=UPSERT, filterDupes=false, 
enableHiveSync=false, enableMetaSync=false, forceEmptyMetaSync=false, syn
 cClientToolClassNames=org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool, 
maxPendingCompactions=5, maxPendingClustering=5, continuousMode=false, 
minSyncIntervalSeconds=0, sparkMaster='', commitOnErrors=false, 
deltaSyncSchedulingWeight=1, compactSchedulingWeight=1, 
clusterSchedulingWeight=1, deltaSyncSchedulingMinShare=0, 
compactSchedulingMinShare=0, clusterSchedulingMinShare=0, 
forceDisableCompaction=true, checkpoint='null', 
initialCheckpointProvider='null', help=false}
   typedProperties={spark.sql.avro.compression.codec=snappy, 
hoodie.datasource.hive_sync.table=published_creative, 
hoodie.datasource.hive_sync.partition_fields=Entity, 
hoodie.metadata.index.column.stats.enable=false, hoodie.index.type=BLOOM, 
hoodie.datasource.write.reconcile.schema=true, 
hoodie.deltastreamer.schemaprovider.source.schema.file=domain/campaignbuild/schema/creative.avsc,
 bootstrap.servers=PLAINTEXT://localhost:34873, hoodie.compact.inline=false, 
hoodie.deltastreamer.transformer.sql=
   SELECT
   'Creative' Entity
   ,o.CreativeId
   ,o.PreMessageImpression
   ,o.PostMessageImpression
   ,o.Assets.Type AssetType
   ,o.Assets.Slot AssetSlot
   ,o.Assets.Label AssetLabel
   ,o.Assets.Value AssetValue
   FROM
   (SELECT a.CreativeId, a.PreMessageImpression, a.PostMessageImpression, 
explode(a.Assets) Assets
FROM
 a) o
, hoodie.parquet.max.file.size=6291456, 
hoodie.datasource.write.recordkey.field=CreativeId,AssetSlot, 
hoodie.index.bloom.num_entries=6, 
hoodie.datasource.hive_sync.support_timestamp=true, 
hoodie.metadata.enable=false, schema.registry.url=http://localhost:34874, 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator,
 hoodie.datasource.write.table.type=MERGE_ON_READ, 
hoodie.deltastreamer.source.kafka.topic=CMPN-CmpnPub-AdServer-Creative, 
hoodie.datasource.write.hive_style_partitioning=true, 
hoodie.metadata.insert.parallelism=1, 
hoodie.deltastreamer.schemaprovider.spark_avro_post_processor.enable=false, 
hoodie.parquet.compression.codec=snappy, spark.io.compression.codec=snappy, 
hoodie.deltastreamer.schemaprovider.target.schema.file=domain/campaignbuild/schema/published_creative_table.json,
 hoodie.bloom.index.prune.by.ranges=true, 
hoodie.datasource.write.partitionpath.field=Entity, 
hoodie.datasource.write.keygenerator.consistent.logical.time
 stamp.enabled=true, hoodie.parquet.block.size=6291456, 
hoodie.cleaner.fileversions.retained=2, hoodie.table.name=published_creative, 
hoodie.upsert.shuffle.parallelism=4, 
hoodie.meta.sync.client.tool.class=org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool,
 spark.sql.parquet.compression.codec=snappy, 
hoodie.datasource.write.precombine.field=AssetValue, 
hoodie.datasource.write.payload.class=org.apache.hudi.common.model.DefaultHoodieRecordPayload,
 
hoodie.datasource.meta.sync.base.path=/XXX/cdp-datapipeline-curation/cdp-datapipeline-curation/datalake-deltastreamer/./tmp/CreativeDeltaStreamerTest/Domain=CampaignBuild/Table=published_