[CARBONDATA-2554] Added support for logical type Added support for date and timestamp logical types in AvroCarbonWriter.
This closes #2347 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2f234869 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2f234869 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2f234869 Branch: refs/heads/carbonstore Commit: 2f2348690964ac87c2f38939280958f2469d212d Parents: 27d7059 Author: kunal642 <kunalkapoor...@gmail.com> Authored: Mon May 28 11:41:59 2018 +0530 Committer: kumarvishal09 <kumarvishal1...@gmail.com> Committed: Tue Jun 5 11:52:09 2018 +0530 ---------------------------------------------------------------------- .../DirectDictionaryGenerator.java | 2 + .../DateDirectDictionaryGenerator.java | 2 +- .../TimeStampDirectDictionaryGenerator.java | 2 +- .../TestNonTransactionalCarbonTable.scala | 145 ++++++++++++++++++- .../processing/datatypes/PrimitiveDataType.java | 44 +++++- .../loading/dictionary/DirectDictionary.java | 4 + .../InputProcessorStepWithNoConverterImpl.java | 24 ++- .../carbondata/sdk/file/AvroCarbonWriter.java | 71 ++++++++- 8 files changed, 279 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f234869/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryGenerator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryGenerator.java index 469fe1e..2139f31 100644 --- a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryGenerator.java +++ b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryGenerator.java @@ -40,6 +40,8 @@ public interface DirectDictionaryGenerator { */ Object getValueFromSurrogate(int key); + int generateKey(long value); + /** * The method generate and returns the dictionary / surrogate key for direct dictionary column * This Method is called while executing filter queries for getting direct surrogate members. http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f234869/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java index c49af9c..329e260 100644 --- a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java +++ b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java @@ -163,7 +163,7 @@ public class DateDirectDictionaryGenerator implements DirectDictionaryGenerator } } - private int generateKey(long timeValue) { + public int generateKey(long timeValue) { if (timeValue < MIN_VALUE || timeValue > MAX_VALUE) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Value for date type column is not in valid range. Value considered as null."); http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f234869/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java index d218e99..c7a4194 100644 --- a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java +++ b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java @@ -206,7 +206,7 @@ public class TimeStampDirectDictionaryGenerator implements DirectDictionaryGener } } - private int generateKey(long timeValue) { + public int generateKey(long timeValue) { long time = (timeValue - cutOffTimeStamp) / granularityFactor; int keyValue = -1; if (time >= (long) Integer.MIN_VALUE && time <= (long) Integer.MAX_VALUE) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f234869/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala index 5beb9c4..095d12d 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala @@ -17,7 +17,7 @@ package org.apache.carbondata.spark.testsuite.createTable -import java.sql.Timestamp +import java.sql.{Date, Timestamp} import java.io.{File, FileFilter, IOException} import java.util import java.util.concurrent.TimeUnit @@ -42,6 +42,7 @@ import scala.concurrent.duration.Duration import org.apache.avro import org.apache.commons.lang.CharEncoding +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import tech.allegro.schema.json2avro.converter.JsonAvroConverter import org.apache.carbondata.core.metadata.datatype.{DataTypes, StructField} @@ -2151,4 +2152,146 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { writer.close() } + test("test logical type date") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { + | "name": "id", + | "type": {"type" : "int", "logicalType": "date"} + | }, + | { + | "name": "course_details", + | "type": { + | "name": "course_details", + | "type": "record", + | "fields": [ + | { + | "name": "course_struct_course_time", + | "type": {"type" : "int", "logicalType": "date"} + | } + | ] + | } + | } + | ] + |}""".stripMargin + + val json1 = + """{"id": 101, "course_details": { "course_struct_course_time":10}}""".stripMargin + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val converter = new JsonAvroConverter + val record = converter + .convertToGenericDataRecord(json1.getBytes(CharEncoding.UTF_8), nn) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable(dateType date, course_details struct<course_struct_course_time: date>) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(java.sql.Date.valueOf("1970-04-12"), Row(java.sql.Date.valueOf("1970-01-11"))))) + } + + test("test logical type timestamp-millis") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { + | "name": "id", + | "type": {"type" : "long", "logicalType": "timestamp-millis"} + | }, + | { + | "name": "course_details", + | "type": { + | "name": "course_details", + | "type": "record", + | "fields": [ + | { + | "name": "course_struct_course_time", + | "type": {"type" : "long", "logicalType": "timestamp-millis"} + | } + | ] + | } + | } + | ] + |}""".stripMargin + + val json1 = + """{"id": 172800000,"course_details": { "course_struct_course_time":172800000}}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val converter = new JsonAvroConverter + val record = converter + .convertToGenericDataRecord(json1.getBytes(CharEncoding.UTF_8), nn) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable(dateType timestamp, course_details struct<course_struct_course_time: timestamp>) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Timestamp.valueOf("1970-01-02 16:00:00"), Row(Timestamp.valueOf("1970-01-02 16:00:00"))))) + } + + test("test logical type-micros timestamp") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { + | "name": "id", + | "type": {"type" : "long", "logicalType": "timestamp-micros"} + | }, + | { + | "name": "course_details", + | "type": { + | "name": "course_details", + | "type": "record", + | "fields": [ + | { + | "name": "course_struct_course_time", + | "type": {"type" : "long", "logicalType": "timestamp-micros"} + | } + | ] + | } + | } + | ] + |}""".stripMargin + + val json1 = + """{"id": 172800000000,"course_details": { "course_struct_course_time":172800000000}}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val converter = new JsonAvroConverter + val record = converter + .convertToGenericDataRecord(json1.getBytes(CharEncoding.UTF_8), nn) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable(dateType timestamp, course_details struct<course_struct_course_time: timestamp>) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Timestamp.valueOf("1970-01-02 16:00:00"), Row(Timestamp.valueOf("1970-01-02 16:00:00"))))) + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f234869/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java index 7450b82..3a477ce 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java @@ -288,7 +288,11 @@ public class PrimitiveDataType implements GenericDataType<Object> { logHolder.setReason(message); } } else { - surrogateKey = dictionaryGenerator.getOrGenerateKey(parsedValue); + if (dictionaryGenerator instanceof DirectDictionary && input instanceof Long) { + surrogateKey = ((DirectDictionary) dictionaryGenerator).generateKey((long) input); + } else { + surrogateKey = dictionaryGenerator.getOrGenerateKey(parsedValue); + } if (surrogateKey == CarbonCommonConstants.INVALID_SURROGATE_KEY) { surrogateKey = CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY; message = CarbonDataProcessorUtil @@ -316,15 +320,36 @@ public class PrimitiveDataType implements GenericDataType<Object> { if (!this.carbonDimension.getUseActualData()) { byte[] value = null; if (isDirectDictionary) { - int surrogateKey = dictionaryGenerator.getOrGenerateKey(parsedValue); + int surrogateKey; + // If the input is a long value then this means that logical type was provided by + // the user using AvroCarbonWriter. In this case directly generate surrogate key + // using dictionaryGenerator. + if (dictionaryGenerator instanceof DirectDictionary && input instanceof Long) { + surrogateKey = ((DirectDictionary) dictionaryGenerator).generateKey((long) input); + } else { + surrogateKey = dictionaryGenerator.getOrGenerateKey(parsedValue); + } if (surrogateKey == CarbonCommonConstants.INVALID_SURROGATE_KEY) { value = new byte[0]; } else { value = ByteUtil.toBytes(surrogateKey); } } else { - value = DataTypeUtil.getBytesBasedOnDataTypeForNoDictionaryColumn(parsedValue, - this.carbonDimension.getDataType(), dateFormat); + // If the input is a long value then this means that logical type was provided by + // the user using AvroCarbonWriter. In this case directly generate Bytes from value. + if (this.carbonDimension.getDataType().equals(DataTypes.DATE) + || this.carbonDimension.getDataType().equals(DataTypes.TIMESTAMP) + && input instanceof Long) { + if (dictionaryGenerator != null) { + value = ByteUtil.toBytes(((DirectDictionary) dictionaryGenerator) + .generateKey((long) input)); + } else { + value = ByteUtil.toBytes(Long.parseLong(parsedValue)); + } + } else { + value = DataTypeUtil.getBytesBasedOnDataTypeForNoDictionaryColumn(parsedValue, + this.carbonDimension.getDataType(), dateFormat); + } if (this.carbonDimension.getDataType() == DataTypes.STRING && value.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) { throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed " @@ -333,8 +358,15 @@ public class PrimitiveDataType implements GenericDataType<Object> { } updateValueToByteStream(dataOutputStream, value); } else { - Object value = DataTypeUtil.getDataDataTypeForNoDictionaryColumn(parsedValue, - this.carbonDimension.getDataType(), dateFormat); + Object value; + if (dictionaryGenerator instanceof DirectDictionary + && input instanceof Long) { + value = ByteUtil.toBytes( + ((DirectDictionary) dictionaryGenerator).generateKey((long) input)); + } else { + value = DataTypeUtil.getDataDataTypeForNoDictionaryColumn(parsedValue, + this.carbonDimension.getDataType(), dateFormat); + } if (this.carbonDimension.getDataType() == DataTypes.STRING && value.toString().length() > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) { throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed " http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f234869/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/DirectDictionary.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/DirectDictionary.java b/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/DirectDictionary.java index 165e5a4..33dc8e3 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/DirectDictionary.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/DirectDictionary.java @@ -46,6 +46,10 @@ public class DirectDictionary implements BiDictionary<Integer, Object> { return dictionaryGenerator.generateDirectSurrogateKey(value.toString()); } + public Integer generateKey(long value) { + return dictionaryGenerator.generateKey(value); + } + @Override public Object getValue(Integer key) { return dictionaryGenerator.getValueFromSurrogate(key); http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f234869/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java index c99a413..5f7a94c 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java @@ -28,6 +28,8 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.encoder.Encoding; @@ -215,6 +217,10 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce private Map<Integer, GenericDataType> dataFieldsWithComplexDataType; + private DirectDictionaryGenerator dateDictionaryGenerator; + + private DirectDictionaryGenerator timestampDictionaryGenerator; + public InputProcessorIterator(List<CarbonIterator<Object[]>> inputIterators, int batchSize, boolean preFetch, AtomicLong rowCounter, int[] orderOfData, boolean[] noDictionaryMapping, DataType[] dataTypes, CarbonDataLoadConfiguration configuration, @@ -313,7 +319,23 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce throw new CarbonDataLoadingException("Loading Exception", e); } } else { - newData[i] = data[orderOfData[i]]; + DataType dataType = dataFields[i].getColumn().getDataType(); + if (dataType == DataTypes.DATE && data[orderOfData[i]] instanceof Long) { + if (dateDictionaryGenerator == null) { + dateDictionaryGenerator = DirectDictionaryKeyGeneratorFactory + .getDirectDictionaryGenerator(dataType, dataFields[i].getDateFormat()); + } + newData[i] = dateDictionaryGenerator.generateKey((long) data[orderOfData[i]]); + } else if (dataType == DataTypes.TIMESTAMP && data[orderOfData[i]] instanceof Long) { + if (timestampDictionaryGenerator == null) { + timestampDictionaryGenerator = + DirectDictionaryKeyGeneratorFactory + .getDirectDictionaryGenerator(dataType, dataFields[i].getTimestampFormat()); + } + newData[i] = timestampDictionaryGenerator.generateKey((long) data[orderOfData[i]]); + } else { + newData[i] = data[orderOfData[i]]; + } } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f234869/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java index 8bbf364..edecd6b 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java @@ -24,15 +24,21 @@ import java.util.Random; import java.util.UUID; import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; import org.apache.carbondata.hadoop.internal.ObjectArrayWritable; import org.apache.carbondata.processing.loading.complexobjects.ArrayObject; import org.apache.carbondata.processing.loading.complexobjects.StructObject; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.hadoop.conf.Configuration; @@ -55,6 +61,8 @@ public class AvroCarbonWriter extends CarbonWriter { private TaskAttemptContext context; private ObjectArrayWritable writable; private Schema avroSchema; + private static final LogService LOGGER = + LogServiceFactory.getLogService(CarbonTable.class.getName()); AvroCarbonWriter(CarbonLoadModel loadModel) throws IOException { Configuration hadoopConf = new Configuration(); @@ -88,10 +96,35 @@ public class AvroCarbonWriter extends CarbonWriter { private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) { Object out; Schema.Type type = avroField.schema().getType(); + LogicalType logicalType = avroField.schema().getLogicalType(); switch (type) { - case BOOLEAN: case INT: + if (logicalType != null) { + if (logicalType instanceof LogicalTypes.Date) { + int dateIntValue = (int) fieldValue; + out = dateIntValue * DateDirectDictionaryGenerator.MILLIS_PER_DAY; + } else { + LOGGER.warn("Actual type: INT, Logical Type: " + logicalType.getName()); + out = fieldValue; + } + } else { + out = fieldValue; + } + break; + case BOOLEAN: case LONG: + if (logicalType != null && !(logicalType instanceof LogicalTypes.TimestampMillis)) { + if (logicalType instanceof LogicalTypes.TimestampMicros) { + long dateIntValue = (long) fieldValue; + out = dateIntValue / 1000L; + } else { + LOGGER.warn("Actual type: INT, Logical Type: " + logicalType.getName()); + out = fieldValue; + } + } else { + out = fieldValue; + } + break; case DOUBLE: case STRING: out = fieldValue; @@ -177,13 +210,27 @@ public class AvroCarbonWriter extends CarbonWriter { String FieldName = avroField.name(); Schema childSchema = avroField.schema(); Schema.Type type = childSchema.getType(); + LogicalType logicalType = childSchema.getLogicalType(); switch (type) { case BOOLEAN: return new Field(FieldName, DataTypes.BOOLEAN); case INT: - return new Field(FieldName, DataTypes.INT); + if (logicalType instanceof LogicalTypes.Date) { + return new Field(FieldName, DataTypes.DATE); + } else { + LOGGER.warn("Unsupported logical type. Considering Data Type as INT for " + childSchema + .getName()); + return new Field(FieldName, DataTypes.INT); + } case LONG: - return new Field(FieldName, DataTypes.LONG); + if (logicalType instanceof LogicalTypes.TimestampMillis + || logicalType instanceof LogicalTypes.TimestampMicros) { + return new Field(FieldName, DataTypes.TIMESTAMP); + } else { + LOGGER.warn("Unsupported logical type. Considering Data Type as LONG for " + childSchema + .getName()); + return new Field(FieldName, DataTypes.LONG); + } case DOUBLE: return new Field(FieldName, DataTypes.DOUBLE); case STRING: @@ -221,13 +268,27 @@ public class AvroCarbonWriter extends CarbonWriter { private static StructField prepareSubFields(String FieldName, Schema childSchema) { Schema.Type type = childSchema.getType(); + LogicalType logicalType = childSchema.getLogicalType(); switch (type) { case BOOLEAN: return new StructField(FieldName, DataTypes.BOOLEAN); case INT: - return new StructField(FieldName, DataTypes.INT); + if (logicalType instanceof LogicalTypes.Date) { + return new StructField(FieldName, DataTypes.DATE); + } else { + LOGGER.warn("Unsupported logical type. Considering Data Type as INT for " + childSchema + .getName()); + return new StructField(FieldName, DataTypes.INT); + } case LONG: - return new StructField(FieldName, DataTypes.LONG); + if (logicalType instanceof LogicalTypes.TimestampMillis + || logicalType instanceof LogicalTypes.TimestampMicros) { + return new StructField(FieldName, DataTypes.TIMESTAMP); + } else { + LOGGER.warn("Unsupported logical type. Considering Data Type as LONG for " + childSchema + .getName()); + return new StructField(FieldName, DataTypes.LONG); + } case DOUBLE: return new StructField(FieldName, DataTypes.DOUBLE); case STRING: