This is an automated email from the ASF dual-hosted git repository. blue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push: new ac5df85 Hive: Add Hive3 module and testing (#1478) ac5df85 is described below commit ac5df85961ebb9d73e77d39a4442ee439e5e1bb6 Author: Marton Bod <marton....@gmail.com> AuthorDate: Wed Oct 7 18:53:48 2020 +0200 Hive: Add Hive3 module and testing (#1478) Hive 3 classes are included in the iceberg-hive-runtime Jar. --- build.gradle | 66 +++++++++++++ .../org/apache/iceberg/hive/HiveClientPool.java | 19 +++- .../org/apache/iceberg/hive/MetastoreUtil.java | 48 ++++++++++ .../org/apache/iceberg/hive/TestHiveMetastore.java | 22 ++++- .../IcebergDateObjectInspectorHive3.java | 62 +++++++++++++ .../IcebergTimestampObjectInspectorHive3.java | 90 ++++++++++++++++++ .../TestIcebergDateObjectInspectorHive3.java | 66 +++++++++++++ .../TestIcebergTimestampObjectInspectorHive3.java | 103 +++++++++++++++++++++ .../iceberg/mr/hive/HiveIcebergFilterFactory.java | 3 + .../iceberg/mr/hive/HiveIcebergStorageHandler.java | 6 ++ .../objectinspector/IcebergObjectInspector.java | 25 ++++- .../mr/hive/HiveIcebergStorageHandlerBaseTest.java | 43 +++++++-- .../TestIcebergObjectInspector.java | 23 ++++- settings.gradle | 2 + 14 files changed, 561 insertions(+), 17 deletions(-) diff --git a/build.gradle b/build.gradle index 87791fa..3069f7c 100644 --- a/build.gradle +++ b/build.gradle @@ -472,6 +472,69 @@ project(':iceberg-mr') { } } +if (jdkVersion == '8') { + project(':iceberg-hive3') { + + // run the tests in iceberg-mr with Hive3 dependencies + sourceSets { + test { + java.srcDirs = ['../mr/src/test/java', 'src/test/java'] + resources.srcDirs = ['../mr/src/test/resources', 'src/test/resources'] + } + } + + // exclude these Hive2-specific tests from iceberg-mr + test { + exclude '**/TestIcebergDateObjectInspector.class' + exclude '**/TestIcebergTimestampObjectInspector.class' + } + + dependencies { + compile project(':iceberg-api') + compile project(':iceberg-core') + compile project(':iceberg-data') + compile project(':iceberg-hive-metastore') + compile project(':iceberg-orc') + compile project(':iceberg-parquet') + compile project(':iceberg-mr') + + compileOnly("org.apache.hadoop:hadoop-client:3.1.0") { + exclude group: 'org.apache.avro', module: 'avro' + } + + compileOnly("org.apache.hive:hive-exec:3.1.2:core") { + exclude group: 'com.google.code.findbugs', module: 'jsr305' + exclude group: 'com.google.guava' + exclude group: 'com.google.protobuf', module: 'protobuf-java' + exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'org.apache.calcite.avatica' + exclude group: 'org.apache.hive', module: 'hive-llap-tez' + exclude group: 'org.apache.logging.log4j' + exclude group: 'org.pentaho' // missing dependency + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + } + compileOnly("org.apache.hive:hive-metastore:3.1.2") + compileOnly("org.apache.hive:hive-serde:3.1.2") + + testCompile project(path: ':iceberg-data', configuration: 'testArtifacts') + testCompile project(path: ':iceberg-api', configuration: 'testArtifacts') + testCompile project(path: ':iceberg-core', configuration: 'testArtifacts') + testCompile project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') + + testCompile("org.apache.avro:avro:1.9.2") + testCompile("org.apache.calcite:calcite-core") + testCompile("com.esotericsoftware:kryo-shaded:4.0.2") + testCompile("com.fasterxml.jackson.core:jackson-annotations:2.6.5") + testCompile("com.klarna:hiverunner:6.0.1") { + exclude group: 'javax.jms', module: 'jms' + exclude group: 'org.apache.hive', module: 'hive-exec' + exclude group: 'org.codehaus.jettison', module: 'jettison' + exclude group: 'org.apache.calcite.avatica' + } + } + } +} + project(':iceberg-hive-runtime') { apply plugin: 'com.github.johnrengelman.shadow' @@ -491,6 +554,9 @@ project(':iceberg-hive-runtime') { dependencies { compile project(':iceberg-mr') + if (jdkVersion == '8') { + compile project(':iceberg-hive3') + } } shadowJar { diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveClientPool.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveClientPool.java index 0ad1321..1df705b 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveClientPool.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveClientPool.java @@ -23,10 +23,19 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.iceberg.common.DynConstructors; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransportException; public class HiveClientPool extends ClientPool<HiveMetaStoreClient, TException> { + + // use appropriate ctor depending on whether we're working with Hive2 or Hive3 dependencies + // we need to do this because there is a breaking API change between Hive2 and Hive3 + private static final DynConstructors.Ctor<HiveMetaStoreClient> CLIENT_CTOR = DynConstructors.builder() + .impl(HiveMetaStoreClient.class, HiveConf.class) + .impl(HiveMetaStoreClient.class, Configuration.class) + .build(); + private final HiveConf hiveConf; HiveClientPool(Configuration conf) { @@ -41,7 +50,15 @@ public class HiveClientPool extends ClientPool<HiveMetaStoreClient, TException> @Override protected HiveMetaStoreClient newClient() { try { - return new HiveMetaStoreClient(hiveConf); + try { + return CLIENT_CTOR.newInstance(hiveConf); + } catch (RuntimeException e) { + // any MetaException would be wrapped into RuntimeException during reflection, so let's double-check type here + if (e.getCause() instanceof MetaException) { + throw (MetaException) e.getCause(); + } + throw e; + } } catch (MetaException e) { throw new RuntimeMetaException(e, "Failed to connect to Hive Metastore"); } catch (Throwable t) { diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/MetastoreUtil.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/MetastoreUtil.java new file mode 100644 index 0000000..ad0ec80 --- /dev/null +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/MetastoreUtil.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hive; + +public class MetastoreUtil { + + // this class is unique to Hive3 and cannot be found in Hive2, therefore a good proxy to see if + // we are working against Hive3 dependencies + private static final String HIVE3_UNIQUE_CLASS = "org.apache.hadoop.hive.serde2.io.DateWritableV2"; + + private static final boolean HIVE3_PRESENT_ON_CLASSPATH = detectHive3(); + + private MetastoreUtil() { + } + + /** + * Returns true if Hive3 dependencies are found on classpath, false otherwise. + */ + public static boolean hive3PresentOnClasspath() { + return HIVE3_PRESENT_ON_CLASSPATH; + } + + private static boolean detectHive3() { + try { + Class.forName(HIVE3_UNIQUE_CLASS); + return true; + } catch (ClassNotFoundException e) { + return false; + } + } +} diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java index d45d3df..1a7b006 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java @@ -35,6 +35,8 @@ import org.apache.hadoop.hive.metastore.HiveMetaStore; import org.apache.hadoop.hive.metastore.IHMSHandler; import org.apache.hadoop.hive.metastore.RetryingHMSHandler; import org.apache.hadoop.hive.metastore.TSetIpAddressProcessor; +import org.apache.iceberg.common.DynConstructors; +import org.apache.iceberg.common.DynMethods; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TThreadPoolServer; @@ -47,10 +49,23 @@ import static java.nio.file.attribute.PosixFilePermissions.fromString; public class TestHiveMetastore { + // create the metastore handlers based on whether we're working with Hive2 or Hive3 dependencies + // we need to do this because there is a breaking API change between Hive2 and Hive3 + private static final DynConstructors.Ctor<HiveMetaStore.HMSHandler> HMS_HANDLER_CTOR = DynConstructors.builder() + .impl(HiveMetaStore.HMSHandler.class, String.class, Configuration.class) + .impl(HiveMetaStore.HMSHandler.class, String.class, HiveConf.class) + .build(); + + private static final DynMethods.StaticMethod GET_BASE_HMS_HANDLER = DynMethods.builder("getProxy") + .impl(RetryingHMSHandler.class, Configuration.class, IHMSHandler.class, boolean.class) + .impl(RetryingHMSHandler.class, HiveConf.class, IHMSHandler.class, boolean.class) + .buildStatic(); + private File hiveLocalDir; private HiveConf hiveConf; private ExecutorService executorService; private TServer server; + private HiveMetaStore.HMSHandler baseHandler; public void start() { try { @@ -80,6 +95,9 @@ public class TestHiveMetastore { if (hiveLocalDir != null) { hiveLocalDir.delete(); } + if (baseHandler != null) { + baseHandler.shutdown(); + } } public HiveConf hiveConf() { @@ -94,8 +112,8 @@ public class TestHiveMetastore { private TServer newThriftServer(TServerSocket socket, HiveConf conf) throws Exception { HiveConf serverConf = new HiveConf(conf); serverConf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, "jdbc:derby:" + getDerbyPath() + ";create=true"); - HiveMetaStore.HMSHandler baseHandler = new HiveMetaStore.HMSHandler("new db based metaserver", serverConf); - IHMSHandler handler = RetryingHMSHandler.getProxy(serverConf, baseHandler, false); + baseHandler = HMS_HANDLER_CTOR.newInstance("new db based metaserver", serverConf); + IHMSHandler handler = GET_BASE_HMS_HANDLER.invoke(serverConf, baseHandler, false); TThreadPoolServer.Args args = new TThreadPoolServer.Args(socket) .processor(new TSetIpAddressProcessor<>(handler)) diff --git a/hive3/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDateObjectInspectorHive3.java b/hive3/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDateObjectInspectorHive3.java new file mode 100644 index 0000000..e37ec07 --- /dev/null +++ b/hive3/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDateObjectInspectorHive3.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive.serde.objectinspector; + +import java.time.LocalDate; +import org.apache.hadoop.hive.common.type.Date; +import org.apache.hadoop.hive.serde2.io.DateWritableV2; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveJavaObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.iceberg.util.DateTimeUtil; + +public final class IcebergDateObjectInspectorHive3 extends AbstractPrimitiveJavaObjectInspector + implements DateObjectInspector { + + private static final IcebergDateObjectInspectorHive3 INSTANCE = new IcebergDateObjectInspectorHive3(); + + public static IcebergDateObjectInspectorHive3 get() { + return INSTANCE; + } + + private IcebergDateObjectInspectorHive3() { + super(TypeInfoFactory.dateTypeInfo); + } + + @Override + public Date getPrimitiveJavaObject(Object o) { + if (o == null) { + return null; + } + LocalDate date = (LocalDate) o; + return Date.ofEpochDay(DateTimeUtil.daysFromDate(date)); + } + + @Override + public DateWritableV2 getPrimitiveWritableObject(Object o) { + return o == null ? null : new DateWritableV2(DateTimeUtil.daysFromDate((LocalDate) o)); + } + + @Override + public Object copyObject(Object o) { + return o == null ? null : new Date((Date) o); + } + +} diff --git a/hive3/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampObjectInspectorHive3.java b/hive3/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampObjectInspectorHive3.java new file mode 100644 index 0000000..1413df4 --- /dev/null +++ b/hive3/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampObjectInspectorHive3.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive.serde.objectinspector; + +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import org.apache.hadoop.hive.common.type.Timestamp; +import org.apache.hadoop.hive.serde2.io.TimestampWritableV2; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveJavaObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; + +public abstract class IcebergTimestampObjectInspectorHive3 extends AbstractPrimitiveJavaObjectInspector + implements TimestampObjectInspector { + + private static final IcebergTimestampObjectInspectorHive3 INSTANCE_WITH_ZONE = + new IcebergTimestampObjectInspectorHive3() { + @Override + LocalDateTime toLocalDateTime(Object o) { + return ((OffsetDateTime) o).toLocalDateTime(); + } + }; + + private static final IcebergTimestampObjectInspectorHive3 INSTANCE_WITHOUT_ZONE = + new IcebergTimestampObjectInspectorHive3() { + @Override + LocalDateTime toLocalDateTime(Object o) { + return (LocalDateTime) o; + } + }; + + public static IcebergTimestampObjectInspectorHive3 get(boolean adjustToUTC) { + return adjustToUTC ? INSTANCE_WITH_ZONE : INSTANCE_WITHOUT_ZONE; + } + + private IcebergTimestampObjectInspectorHive3() { + super(TypeInfoFactory.timestampTypeInfo); + } + + + abstract LocalDateTime toLocalDateTime(Object object); + + @Override + public Timestamp getPrimitiveJavaObject(Object o) { + if (o == null) { + return null; + } + LocalDateTime time = toLocalDateTime(o); + Timestamp timestamp = Timestamp.ofEpochMilli(time.toInstant(ZoneOffset.UTC).toEpochMilli()); + timestamp.setNanos(time.getNano()); + return timestamp; + } + + @Override + public TimestampWritableV2 getPrimitiveWritableObject(Object o) { + Timestamp ts = getPrimitiveJavaObject(o); + return ts == null ? null : new TimestampWritableV2(ts); + } + + @Override + public Object copyObject(Object o) { + if (o == null) { + return null; + } + + Timestamp ts = (Timestamp) o; + Timestamp copy = new Timestamp(ts); + copy.setNanos(ts.getNanos()); + return copy; + } + +} diff --git a/hive3/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergDateObjectInspectorHive3.java b/hive3/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergDateObjectInspectorHive3.java new file mode 100644 index 0000000..ca48639 --- /dev/null +++ b/hive3/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergDateObjectInspectorHive3.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive.serde.objectinspector; + +import java.time.LocalDate; +import org.apache.hadoop.hive.common.type.Date; +import org.apache.hadoop.hive.serde2.io.DateWritableV2; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.junit.Assert; +import org.junit.Test; + +public class TestIcebergDateObjectInspectorHive3 { + + @Test + public void testIcebergDateObjectInspector() { + DateObjectInspector oi = IcebergDateObjectInspectorHive3.get(); + + Assert.assertEquals(ObjectInspector.Category.PRIMITIVE, oi.getCategory()); + Assert.assertEquals(PrimitiveObjectInspector.PrimitiveCategory.DATE, oi.getPrimitiveCategory()); + + Assert.assertEquals(TypeInfoFactory.dateTypeInfo, oi.getTypeInfo()); + Assert.assertEquals(TypeInfoFactory.dateTypeInfo.getTypeName(), oi.getTypeName()); + + Assert.assertEquals(Date.class, oi.getJavaPrimitiveClass()); + Assert.assertEquals(DateWritableV2.class, oi.getPrimitiveWritableClass()); + + Assert.assertNull(oi.copyObject(null)); + Assert.assertNull(oi.getPrimitiveJavaObject(null)); + Assert.assertNull(oi.getPrimitiveWritableObject(null)); + + int epochDays = 5005; + LocalDate local = LocalDate.ofEpochDay(epochDays); + Date date = Date.ofEpochDay(epochDays); + + Assert.assertEquals(date, oi.getPrimitiveJavaObject(local)); + Assert.assertEquals(new DateWritableV2(date), oi.getPrimitiveWritableObject(local)); + + Date copy = (Date) oi.copyObject(date); + + Assert.assertEquals(date, copy); + Assert.assertNotSame(date, copy); + + Assert.assertFalse(oi.preferWritable()); + } + +} diff --git a/hive3/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampObjectInspectorHive3.java b/hive3/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampObjectInspectorHive3.java new file mode 100644 index 0000000..e885689 --- /dev/null +++ b/hive3/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampObjectInspectorHive3.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive.serde.objectinspector; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import org.apache.hadoop.hive.common.type.Timestamp; +import org.apache.hadoop.hive.serde2.io.TimestampWritableV2; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.junit.Assert; +import org.junit.Test; + +public class TestIcebergTimestampObjectInspectorHive3 { + + @Test + public void testIcebergTimestampObjectInspector() { + TimestampObjectInspector oi = IcebergTimestampObjectInspectorHive3.get(false); + + Assert.assertEquals(ObjectInspector.Category.PRIMITIVE, oi.getCategory()); + Assert.assertEquals(PrimitiveObjectInspector.PrimitiveCategory.TIMESTAMP, oi.getPrimitiveCategory()); + + Assert.assertEquals(TypeInfoFactory.timestampTypeInfo, oi.getTypeInfo()); + Assert.assertEquals(TypeInfoFactory.timestampTypeInfo.getTypeName(), oi.getTypeName()); + + Assert.assertEquals(Timestamp.class, oi.getJavaPrimitiveClass()); + Assert.assertEquals(TimestampWritableV2.class, oi.getPrimitiveWritableClass()); + + Assert.assertNull(oi.copyObject(null)); + Assert.assertNull(oi.getPrimitiveJavaObject(null)); + Assert.assertNull(oi.getPrimitiveWritableObject(null)); + + long epochMilli = 1601471970000L; + LocalDateTime local = LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMilli), ZoneId.of("UTC")); + Timestamp ts = Timestamp.ofEpochMilli(epochMilli); + + Assert.assertEquals(ts, oi.getPrimitiveJavaObject(local)); + Assert.assertEquals(new TimestampWritableV2(ts), oi.getPrimitiveWritableObject(local)); + + Timestamp copy = (Timestamp) oi.copyObject(ts); + + Assert.assertEquals(ts, copy); + Assert.assertNotSame(ts, copy); + + Assert.assertFalse(oi.preferWritable()); + } + + @Test + public void testIcebergTimestampObjectInspectorWithUTCAdjustment() { + TimestampObjectInspector oi = IcebergTimestampObjectInspectorHive3.get(true); + + Assert.assertEquals(ObjectInspector.Category.PRIMITIVE, oi.getCategory()); + Assert.assertEquals(PrimitiveObjectInspector.PrimitiveCategory.TIMESTAMP, oi.getPrimitiveCategory()); + + Assert.assertEquals(TypeInfoFactory.timestampTypeInfo, oi.getTypeInfo()); + Assert.assertEquals(TypeInfoFactory.timestampTypeInfo.getTypeName(), oi.getTypeName()); + + Assert.assertEquals(Timestamp.class, oi.getJavaPrimitiveClass()); + Assert.assertEquals(TimestampWritableV2.class, oi.getPrimitiveWritableClass()); + + Assert.assertNull(oi.copyObject(null)); + Assert.assertNull(oi.getPrimitiveJavaObject(null)); + Assert.assertNull(oi.getPrimitiveWritableObject(null)); + + long epochMilli = 1601471970000L; + LocalDateTime local = LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMilli), ZoneId.of("UTC")); + OffsetDateTime offsetDateTime = OffsetDateTime.of(local, ZoneOffset.ofHours(4)); + Timestamp ts = Timestamp.ofEpochMilli(epochMilli); + + Assert.assertEquals(ts, oi.getPrimitiveJavaObject(offsetDateTime)); + Assert.assertEquals(new TimestampWritableV2(ts), oi.getPrimitiveWritableObject(offsetDateTime)); + + Timestamp copy = (Timestamp) oi.copyObject(ts); + + Assert.assertEquals(ts, copy); + Assert.assertNotSame(ts, copy); + + Assert.assertFalse(oi.preferWritable()); + } + +} diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java index 63e823c..c800f72 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java @@ -127,6 +127,9 @@ public class HiveIcebergFilterFactory { case FLOAT: return leaf.getLiteral(); case DATE: + if (leaf.getLiteral() instanceof java.sql.Date) { + return daysFromDate((Date) leaf.getLiteral()); + } return daysFromTimestamp((Timestamp) leaf.getLiteral()); case TIMESTAMP: return microsFromTimestamp((Timestamp) LITERAL_FIELD.get(leaf)); diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index d6c4c2f..9f77aa9 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -91,6 +91,12 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H } + // Override annotation commented out, since this interface method has been introduced only in Hive 3 + // @Override + public void configureInputJobCredentials(TableDesc tableDesc, Map<String, String> secrets) { + + } + @Override public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergObjectInspector.java index a5fed0b..f9b2214 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergObjectInspector.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergObjectInspector.java @@ -27,12 +27,33 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.iceberg.Schema; +import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; public final class IcebergObjectInspector extends TypeUtil.SchemaVisitor<ObjectInspector> { + // get the correct inspectors depending on whether we're working with Hive2 or Hive3 dependencies + // we need to do this because there is a breaking API change in Date/TimestampObjectInspector between Hive2 and Hive3 + private static final ObjectInspector DATE_INSPECTOR = DynMethods.builder("get") + .impl("org.apache.iceberg.mr.hive.serde.objectinspector.IcebergDateObjectInspectorHive3") + .impl("org.apache.iceberg.mr.hive.serde.objectinspector.IcebergDateObjectInspector") + .buildStatic() + .invoke(); + + private static final ObjectInspector TIMESTAMP_INSPECTOR = DynMethods.builder("get") + .impl("org.apache.iceberg.mr.hive.serde.objectinspector.IcebergTimestampObjectInspectorHive3", boolean.class) + .impl("org.apache.iceberg.mr.hive.serde.objectinspector.IcebergTimestampObjectInspector", boolean.class) + .buildStatic() + .invoke(false); + + private static final ObjectInspector TIMESTAMP_INSPECTOR_WITH_TZ = DynMethods.builder("get") + .impl("org.apache.iceberg.mr.hive.serde.objectinspector.IcebergTimestampObjectInspectorHive3", boolean.class) + .impl("org.apache.iceberg.mr.hive.serde.objectinspector.IcebergTimestampObjectInspector", boolean.class) + .buildStatic() + .invoke(true); + public static ObjectInspector create(@Nullable Schema schema) { if (schema == null) { return IcebergRecordObjectInspector.empty(); @@ -72,7 +93,7 @@ public final class IcebergObjectInspector extends TypeUtil.SchemaVisitor<ObjectI primitiveTypeInfo = TypeInfoFactory.booleanTypeInfo; break; case DATE: - return IcebergDateObjectInspector.get(); + return DATE_INSPECTOR; case DECIMAL: Types.DecimalType type = (Types.DecimalType) primitiveType; return IcebergDecimalObjectInspector.get(type.precision(), type.scale()); @@ -96,7 +117,7 @@ public final class IcebergObjectInspector extends TypeUtil.SchemaVisitor<ObjectI break; case TIMESTAMP: boolean adjustToUTC = ((Types.TimestampType) primitiveType).shouldAdjustToUTC(); - return IcebergTimestampObjectInspector.get(adjustToUTC); + return adjustToUTC ? TIMESTAMP_INSPECTOR_WITH_TZ : TIMESTAMP_INSPECTOR; case TIME: default: diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerBaseTest.java b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerBaseTest.java index d0c8dcf..857e0db 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerBaseTest.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerBaseTest.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -37,8 +38,10 @@ import org.apache.iceberg.mr.TestHelper; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.types.Types; import org.junit.After; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -49,6 +52,8 @@ import static org.apache.iceberg.types.Types.NestedField.required; @RunWith(StandaloneHiveRunner.class) public abstract class HiveIcebergStorageHandlerBaseTest { + private static final String DEFAULT_DATABASE_NAME = "default"; + @HiveSQL(files = {}, autoStart = false) private HiveShell shell; @@ -79,16 +84,30 @@ public abstract class HiveIcebergStorageHandlerBaseTest { private static final PartitionSpec SPEC = PartitionSpec.unpartitioned(); - // before variables - protected TestHiveMetastore metastore; + protected static TestHiveMetastore metastore; + private TestTables testTables; public abstract TestTables testTables(Configuration conf, TemporaryFolder tmp) throws IOException; - @Before - public void before() throws IOException { + + @BeforeClass + public static void beforeClass() { metastore = new TestHiveMetastore(); metastore.start(); + } + + @AfterClass + public static void afterClass() { + metastore.stop(); + metastore = null; + } + + @Before + public void before() throws IOException { + String metastoreUris = metastore.hiveConf().getVar(HiveConf.ConfVars.METASTOREURIS); + // in Hive3, setting this as a system prop ensures that it will be picked up whenever a new HiveConf is created + System.setProperty(HiveConf.ConfVars.METASTOREURIS.varname, metastoreUris); testTables = testTables(metastore.hiveConf(), temp); @@ -96,9 +115,7 @@ public abstract class HiveIcebergStorageHandlerBaseTest { shell.setHiveConfValue(property.getKey(), property.getValue()); } - String metastoreUris = metastore.hiveConf().getVar(HiveConf.ConfVars.METASTOREURIS); shell.setHiveConfValue(HiveConf.ConfVars.METASTOREURIS.varname, metastoreUris); - String metastoreWarehouse = metastore.hiveConf().getVar(HiveConf.ConfVars.METASTOREWAREHOUSE); shell.setHiveConfValue(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, metastoreWarehouse); @@ -106,9 +123,17 @@ public abstract class HiveIcebergStorageHandlerBaseTest { } @After - public void after() { - metastore.stop(); - metastore = null; + public void after() throws Exception { + Hive db = Hive.get(metastore.hiveConf()); + for (String dbName : db.getAllDatabases()) { + for (String tblName : db.getAllTables(dbName)) { + db.dropTable(dbName, tblName); + } + if (!DEFAULT_DATABASE_NAME.equals(dbName)) { + // Drop cascade, functions dropped by cascade + db.dropDatabase(dbName, true, true, true); + } + } } @Test diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergObjectInspector.java b/mr/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergObjectInspector.java index dc2a95d..0c79b0b 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergObjectInspector.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergObjectInspector.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.Schema; +import org.apache.iceberg.hive.MetastoreUtil; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.types.Types; import org.junit.Assert; @@ -90,7 +91,13 @@ public class TestIcebergObjectInspector { Assert.assertEquals(3, dateField.getFieldID()); Assert.assertEquals("date_field", dateField.getFieldName()); Assert.assertEquals("date comment", dateField.getFieldComment()); - Assert.assertEquals(IcebergDateObjectInspector.get(), dateField.getFieldObjectInspector()); + if (MetastoreUtil.hive3PresentOnClasspath()) { + Assert.assertEquals("org.apache.iceberg.mr.hive.serde.objectinspector.IcebergDateObjectInspectorHive3", + dateField.getFieldObjectInspector().getClass().getName()); + } else { + Assert.assertEquals("org.apache.iceberg.mr.hive.serde.objectinspector.IcebergDateObjectInspector", + dateField.getFieldObjectInspector().getClass().getName()); + } // decimal StructField decimalField = soi.getStructFieldRef("decimal_field"); @@ -146,14 +153,24 @@ public class TestIcebergObjectInspector { Assert.assertEquals(11, timestampField.getFieldID()); Assert.assertEquals("timestamp_field", timestampField.getFieldName()); Assert.assertEquals("timestamp comment", timestampField.getFieldComment()); - Assert.assertEquals(IcebergTimestampObjectInspector.get(false), timestampField.getFieldObjectInspector()); + if (MetastoreUtil.hive3PresentOnClasspath()) { + Assert.assertTrue(timestampField.getFieldObjectInspector().getClass().getName() + .startsWith("org.apache.iceberg.mr.hive.serde.objectinspector.IcebergTimestampObjectInspectorHive3")); + } else { + Assert.assertEquals(IcebergTimestampObjectInspector.get(false), timestampField.getFieldObjectInspector()); + } // timestamp with tz StructField timestampTzField = soi.getStructFieldRef("timestamptz_field"); Assert.assertEquals(12, timestampTzField.getFieldID()); Assert.assertEquals("timestamptz_field", timestampTzField.getFieldName()); Assert.assertEquals("timestamptz comment", timestampTzField.getFieldComment()); - Assert.assertEquals(IcebergTimestampObjectInspector.get(true), timestampTzField.getFieldObjectInspector()); + if (MetastoreUtil.hive3PresentOnClasspath()) { + Assert.assertTrue(timestampTzField.getFieldObjectInspector().getClass().getName() + .startsWith("org.apache.iceberg.mr.hive.serde.objectinspector.IcebergTimestampObjectInspectorHive3")); + } else { + Assert.assertEquals(IcebergTimestampObjectInspector.get(true), timestampTzField.getFieldObjectInspector()); + } // UUID StructField uuidField = soi.getStructFieldRef("uuid_field"); diff --git a/settings.gradle b/settings.gradle index 18b07fc..0377d31 100644 --- a/settings.gradle +++ b/settings.gradle @@ -57,7 +57,9 @@ project(':hive-metastore').name = 'iceberg-hive-metastore' if (JavaVersion.current() == JavaVersion.VERSION_1_8) { include 'spark2' include 'spark-runtime' + include 'hive3' project(':spark2').name = 'iceberg-spark2' project(':spark-runtime').name = 'iceberg-spark-runtime' + project(':hive3').name = 'iceberg-hive3' }