nsivabalan commented on code in PR #8638:
URL: https://github.com/apache/hudi/pull/8638#discussion_r1203336055


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala:
##########
@@ -138,18 +139,26 @@ object AvroConversionUtils {
   def convertStructTypeToAvroSchema(structType: DataType,
                                     structName: String,
                                     recordNamespace: String): Schema = {
-    val schemaConverters = sparkAdapter.getAvroSchemaConverters
-    val avroSchema = schemaConverters.toAvroType(structType, nullable = false, 
structName, recordNamespace)
-    getAvroSchemaWithDefaults(avroSchema, structType)
+    try {
+      val schemaConverters = sparkAdapter.getAvroSchemaConverters
+      val avroSchema = schemaConverters.toAvroType(structType, nullable = 
false, structName, recordNamespace)
+      getAvroSchemaWithDefaults(avroSchema, structType)
+    } catch {
+      case e: Exception => throw new HoodieSchemaException("Failed to convert 
struct type to avro schema", e)

Review Comment:
   do you think we should log the structType here? 



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala:
##########
@@ -138,18 +139,26 @@ object AvroConversionUtils {
   def convertStructTypeToAvroSchema(structType: DataType,
                                     structName: String,
                                     recordNamespace: String): Schema = {
-    val schemaConverters = sparkAdapter.getAvroSchemaConverters
-    val avroSchema = schemaConverters.toAvroType(structType, nullable = false, 
structName, recordNamespace)
-    getAvroSchemaWithDefaults(avroSchema, structType)
+    try {
+      val schemaConverters = sparkAdapter.getAvroSchemaConverters
+      val avroSchema = schemaConverters.toAvroType(structType, nullable = 
false, structName, recordNamespace)
+      getAvroSchemaWithDefaults(avroSchema, structType)
+    } catch {
+      case e: Exception => throw new HoodieSchemaException("Failed to convert 
struct type to avro schema", e)
+    }
   }
 
   /**
    * Converts Avro's [[Schema]] to Catalyst's [[StructType]]
    */
   def convertAvroSchemaToStructType(avroSchema: Schema): StructType = {
-    val schemaConverters = sparkAdapter.getAvroSchemaConverters
-    schemaConverters.toSqlType(avroSchema) match {
-      case (dataType, _) => dataType.asInstanceOf[StructType]
+    try {
+      val schemaConverters = sparkAdapter.getAvroSchemaConverters
+      schemaConverters.toSqlType(avroSchema) match {
+        case (dataType, _) => dataType.asInstanceOf[StructType]
+      }
+    } catch {
+      case e: Exception => throw new HoodieSchemaException("Failed to convert 
avro schema to struct type", e)

Review Comment:
   same here. should we log the schema here? 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java:
##########
@@ -59,13 +60,18 @@ public KafkaOffsetPostProcessor(TypedProperties props, 
JavaSparkContext jssc) {
   @Override
   public Schema processSchema(Schema schema) {
     // this method adds kafka offset fields namely source offset, partition 
and timestamp to the schema of the batch.
-    List<Schema.Field> fieldList = schema.getFields();
-    List<Schema.Field> newFieldList = fieldList.stream()
-        .map(f -> new Schema.Field(f.name(), f.schema(), f.doc(), 
f.defaultVal())).collect(Collectors.toList());
-    newFieldList.add(new Schema.Field(KAFKA_SOURCE_OFFSET_COLUMN, 
Schema.create(Schema.Type.LONG), "offset column", 0));
-    newFieldList.add(new Schema.Field(KAFKA_SOURCE_PARTITION_COLUMN, 
Schema.create(Schema.Type.INT), "partition column", 0));
-    newFieldList.add(new Schema.Field(KAFKA_SOURCE_TIMESTAMP_COLUMN, 
Schema.create(Schema.Type.LONG), "timestamp column", 0));
-    Schema newSchema = Schema.createRecord(schema.getName() + "_processed", 
schema.getDoc(), schema.getNamespace(), false, newFieldList);
-    return newSchema;
+    try {
+      List<Schema.Field> fieldList = schema.getFields();
+      List<Schema.Field> newFieldList = fieldList.stream()
+          .map(f -> new Schema.Field(f.name(), f.schema(), f.doc(), 
f.defaultVal())).collect(Collectors.toList());
+      newFieldList.add(new Schema.Field(KAFKA_SOURCE_OFFSET_COLUMN, 
Schema.create(Schema.Type.LONG), "offset column", 0));
+      newFieldList.add(new Schema.Field(KAFKA_SOURCE_PARTITION_COLUMN, 
Schema.create(Schema.Type.INT), "partition column", 0));
+      newFieldList.add(new Schema.Field(KAFKA_SOURCE_TIMESTAMP_COLUMN, 
Schema.create(Schema.Type.LONG), "timestamp column", 0));
+      Schema newSchema = Schema.createRecord(schema.getName() + "_processed", 
schema.getDoc(), schema.getNamespace(), false, newFieldList);
+      return newSchema;
+    } catch (Exception e) {
+      throw new HoodieSchemaException("Kafka offset post processor failed", e);

Review Comment:
   can we log the schema 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java:
##########
@@ -89,12 +90,16 @@ public interface SchemaConverter {
     String convert(String schema) throws IOException;
   }
 
-  public Schema parseSchemaFromRegistry(String registryUrl) throws IOException 
{
+  public Schema parseSchemaFromRegistry(String registryUrl) {
     String schema = fetchSchemaFromRegistry(registryUrl);
-    SchemaConverter converter = 
config.containsKey(HoodieSchemaProviderConfig.SCHEMA_CONVERTER.key())
-        ? 
ReflectionUtils.loadClass(config.getString(HoodieSchemaProviderConfig.SCHEMA_CONVERTER.key()))
-        : s -> s;
-    return new Schema.Parser().parse(converter.convert(schema));
+    try {
+      SchemaConverter converter = 
config.containsKey(HoodieSchemaProviderConfig.SCHEMA_CONVERTER.key())
+          ? 
ReflectionUtils.loadClass(config.getString(HoodieSchemaProviderConfig.SCHEMA_CONVERTER.key()))
+          : s -> s;
+      return new Schema.Parser().parse(converter.convert(schema));
+    } catch (Exception e) {
+      throw new HoodieSchemaException("Failed to parse schema from registry", 
e);

Review Comment:
   can we log the schema 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java:
##########
@@ -108,9 +109,13 @@ private void initJsonConvertor() {
   }
 
   public GenericRecord fromJson(String json) {
-    initSchema();
-    initJsonConvertor();
-    return jsonConverter.convert(json, schema);
+    try {
+      initSchema();
+      initJsonConvertor();
+      return jsonConverter.convert(json, schema);
+    } catch (Exception e) {
+      throw new HoodieSchemaException("Failed to convert schema from json to 
avro", e);

Review Comment:
   if schema is not null, can we log schema



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java:
##########
@@ -84,8 +84,13 @@ public ProtoClassBasedSchemaProvider(TypedProperties props, 
JavaSparkContext jss
   @Override
   public Schema getSourceSchema() {
     if (schema == null) {
-      Schema.Parser parser = new Schema.Parser();
-      schema = parser.parse(schemaString);
+      try {
+        Schema.Parser parser = new Schema.Parser();
+        schema = parser.parse(schemaString);
+      } catch (Exception e) {
+        throw new HoodieSchemaException("Failed to parse schema", e);

Review Comment:
   can we log schemaString 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java:
##########
@@ -124,18 +129,31 @@ public Either<GenericRecord,String> 
fromJsonWithError(String json) {
   }
 
   public Schema getSchema() {
-    return new Schema.Parser().parse(schemaStr);
+    try {
+      return new Schema.Parser().parse(schemaStr);
+    } catch (Exception e) {
+      throw new HoodieSchemaException("Failed to parse json schema", e);

Review Comment:
   lets log schemaStr



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieTransformExecutionException.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.exception;

Review Comment:
   can you add java docs on whats the diff b/w this and 
HoodieTransformException ? 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java:
##########
@@ -124,18 +129,31 @@ public Either<GenericRecord,String> 
fromJsonWithError(String json) {
   }
 
   public Schema getSchema() {
-    return new Schema.Parser().parse(schemaStr);
+    try {
+      return new Schema.Parser().parse(schemaStr);
+    } catch (Exception e) {
+      throw new HoodieSchemaException("Failed to parse json schema", e);
+    }
   }
 
   public GenericRecord fromAvroBinary(byte[] avroBinary) {
-    initSchema();
-    initInjection();
-    return recordInjection.invert(avroBinary).get();
+    try {
+      initSchema();
+      initInjection();
+      return recordInjection.invert(avroBinary).get();
+    } catch (Exception e) {
+      throw new HoodieSchemaException("Failed to get avro schema from avro 
binary", e);

Review Comment:
   same here



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java:
##########
@@ -193,8 +201,8 @@ public Schema getTargetSchema() {
     String targetRegistryUrl = 
config.getString(HoodieSchemaProviderConfig.TARGET_SCHEMA_REGISTRY_URL.key(), 
registryUrl);
     try {
       return parseSchemaFromRegistry(targetRegistryUrl);
-    } catch (IOException ioe) {
-      throw new HoodieIOException("Error reading target schema from registry 
:" + registryUrl, ioe);
+    } catch (Exception e) {

Review Comment:
   shouldn't we log targetRegistryUrl instead of registryUrl 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to