Thanks Ryan and Manish. How can I see what name columns its having issues with?
I did not change anything explicitly. Just to be sure if its really an issue on
the iceberg end and I cannot do anything to rectify and open up the issue, I am
pasting my code here for you to verify:
package com.mkyong.hashing;
import org.apache.spark.sql.*;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.json.simple.JSONObject;
import org.json.simple.parser.*;
import org.apache.spark.api.java.JavaSparkContext;
import com.netflix.iceberg.PartitionSpec;
import com.netflix.iceberg.Schema;
import com.netflix.iceberg.Table;
import com.netflix.iceberg.TableProperties;
import com.netflix.iceberg.hadoop.HadoopTables;
import com.netflix.iceberg.types.Types;
import org.apache.hadoop.conf.Configuration;
import java.io.FileReader;
import java.util.Arrays;
import static com.netflix.iceberg.types.Types.NestedField.optional;
public class App {
private static String AWS_KEY = "";
private static String AWS_SECRET_KEY = “";
private static final Schema SCHEMA = new Schema(
optional(1, "ad_id", Types.StringType.get()),
optional(2, "latitude", Types.DoubleType.get()),
optional(3, "longitude", Types.DoubleType.get()),
optional(4, "horizontal_accuracy", Types.DoubleType.get()),
optional(5, "id_type", Types.StringType.get()),
optional(6, "utc_timestamp", Types.LongType.get()),
optional(7, "geo_hash", Types.StringType.get()),
optional(8, "cluster", Types.StringType.get()),
optional(9, "new_cluster", Types.StringType.get())
);
private static final Configuration CONF = new Configuration();
public static void main(String[] args) throws Exception {
SparkSession spark = SparkSession
.builder()
.master("local[*]")
.appName("my-spark-iceberg")
.config("spark.driver.memory","50g")
.config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
.config("spark.sql.shuffle.partitions", "400")
.config("spark.eventLog.enabled", "true")
.config("spark.eventLog.dir", "/base/logs")
.getOrCreate();
String location = "/orbital/base/iceberg_local_tes";
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
sc.hadoopConfiguration().set("fs.s3a.access.key", AWS_KEY);
sc.hadoopConfiguration().set("fs.s3a.secret.key", AWS_SECRET_KEY);
sc.hadoopConfiguration().set("spark.driver.memory","50g");
sc.hadoopConfiguration().set("spark.eventLog.enabled", "true");
sc.hadoopConfiguration().set("spark.eventLog.dir", “/base/logs");
sc.setLogLevel("ERROR");
HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("new_cluster").build();
Table table = tables.create(SCHEMA, spec, location);
tables.load(location).schema();
Dataset<Row> df =
spark.read().parquet("s3a://key_path/part-002-tid-1544256-c000.gz.parquet");
// PARSING JSON FILE
Object obj = new JSONParser().parse(new
FileReader("/orbital/base/gh_clusters.json"));
JSONObject jo = (JSONObject) obj;
spark.udf().register("getJsonVal", (UDF1<String, String>) key ->
(String) jo.get(key.substring(0, 5)), DataTypes.StringType);
df = df.withColumn("cluster", functions.callUDF("getJsonVal",
df.col("geo_hash")));
// END PARSING JSON FILE
System.out.println(df.count());
df.createOrReplaceTempView("df_with_cluster");
//Here adding a constant geohash value if cluster is null instead of
geohash column
String sql2 = "select *, coalesce(nullif(cluster, ''), '2h5rd') as
new_cluster from df_with_cluster";
Dataset<Row> df2 = spark.sql(sql2);
System.out.println("I am done part 1");
table.updateProperties().set(TableProperties.WRITE_NEW_DATA_LOCATION,
"s3a://output-bucket/data").commit();
df2.sort("new_cluster")
.write()
.format("iceberg")
.mode("append")
.save(location); // metadata location
System.out.println(String.format("Done writing: %s", location));
Dataset<Row> res_df = spark.read().format("iceberg").load(location);
res_df.show();
}
}
Regards
Akshita
> On Feb 5, 2019, at 4:29 PM, Ryan Blue <[email protected]> wrote:
>
> [email protected] <mailto:[email protected]>. That's the correct
> list now.
>
> Akshita,
>
> The latest master doesn't yet have the fix that removes the conflict with
> Spark's Parquet version. That fix is in PR #63
> <https://github.com/apache/incubator-iceberg/pull/63>. If you build that PR,
> then you should be able to shade everything in org.apache.parquet to avoid
> the conflict with Spark.
>
> The error you're getting does look like the one Manish hit. I think that is
> because Spark can't match the column that you're trying to project by name.
> Make sure the case you're using matches the case used in the table definition
> because Iceberg is currently case sensitive. Xabriel is working on fixing
> case sensitivity or at least making it optional for expressions in PR #89
> <https://github.com/apache/incubator-iceberg/pull/89>. If it turns out to be
> case sensitivity, could you please open an issue on the project and we'll fix
> it before the release?
>
> rb
>
> On Tue, Feb 5, 2019 at 3:02 PM Akshita Gupta
> <[email protected] <mailto:[email protected]>>
> wrote:
> Forwarding to iceberg developers. Not sure if dev@iceberg is valid.
>
>> Begin forwarded message:
>>
>> From: Akshita Gupta <[email protected]
>> <mailto:[email protected]>>
>> Subject: Quick question on writing parquet files to iceberg
>> Date: February 5, 2019 at 12:09:14 PM PST
>> To: [email protected] <mailto:[email protected]>
>>
>> Hello
>>
>> Hello, I am having issue reading from partitioned parquet file in iceberg. I
>> am able to do csv easily. I get the following error at .save(location):
>>
>> Exception in thread "main" java.lang.NullPointerException at
>> com.netflix.iceberg.types.ReassignIds.field(ReassignIds.java:74) at
>> com.netflix.iceberg.types.ReassignIds.field(ReassignIds.java:25) at
>> com.netflix.iceberg.types.TypeUtil$VisitFieldFuture.get(TypeUtil.java:308)
>> at com.google.common.collect.Iterators$8.next(Iterators.java:812) at
>> com.google.common.collect.Lists.newArrayList(Lists.java:139) at
>> com.google.common.collect.Lists.newArrayList(Lists.java:119) at
>> com.netflix.iceberg.types.ReassignIds.struct(ReassignIds.java:52) at
>> com.netflix.iceberg.types.ReassignIds.struct(ReassignIds.java:25) at
>> com.netflix.iceberg.types.TypeUtil.visit(TypeUtil.java:341) at
>> com.netflix.iceberg.types.TypeUtil$VisitFuture.get(TypeUtil.java:293) at
>> com.netflix.iceberg.types.ReassignIds.schema(ReassignIds.java:37) at
>> com.netflix.iceberg.types.ReassignIds.schema(ReassignIds.java:25) at
>> com.netflix.iceberg.types.TypeUtil.visit(TypeUtil.java:313) at
>> com.netflix.iceberg.types.TypeUtil.reassignIds(TypeUtil.java:122) at
>> com.netflix.iceberg.spark.SparkSchemaUtil.convert(SparkSchemaUtil.java:163)
>> at
>> com.netflix.iceberg.spark.source.IcebergSource.createWriter(IcebergSource.java:67)
>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:254) at
>> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225) at
>> com.mkyong.hashing.App.main(App.java:115).
>>
>> I remember Ryan mentioning to use the bundled version of iceberg earlier but
>> I am not sure how to use that? I am partitioning in Java and added
>> dependency using pom.xml.
>>
>> Here is my dependency xml. What else needs to be done to be able to read
>> parquet?
>>
>> <?xml version="1.0" encoding="UTF-8"?>
>> <project xmlns="http://maven.apache.org/POM/4.0.0
>> <http://maven.apache.org/POM/4.0.0>"
>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance
>> <http://www.w3.org/2001/XMLSchema-instance>"
>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
>> <http://maven.apache.org/POM/4.0.0>
>> http://maven.apache.org/xsd/maven-4.0.0.xsd
>> <http://maven.apache.org/xsd/maven-4.0.0.xsd>">
>> <modelVersion>4.0.0</modelVersion>
>>
>> <groupId>iceberg-sample</groupId>
>> <artifactId>java-iceberg</artifactId>
>> <version>1.0-SNAPSHOT</version>
>>
>> <properties>
>> <!-- https://maven.apache.org/general.html#encoding-warning
>> <https://maven.apache.org/general.html#encoding-warning> -->
>> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>>
>> <maven.compiler.source>1.8</maven.compiler.source>
>> <maven.compiler.target>1.8</maven.compiler.target>
>> </properties>
>>
>> <repositories>
>> <repository>
>> <id>jitpack.io <http://jitpack.io/></id>
>> <url>https://jitpack.io <https://jitpack.io/></url>
>> </repository>
>> </repositories>
>> <build>
>> <plugins>
>> <plugin>
>> <groupId>org.apache.maven.plugins</groupId>
>> <artifactId>maven-jar-plugin</artifactId>
>> <configuration>
>> <archive>
>> <manifest>
>> <mainClass>com.mkyong.hashing.App</mainClass>
>> </manifest>
>> </archive>
>> </configuration>
>> </plugin>
>> <plugin>
>> <groupId>org.apache.maven.plugins</groupId>
>> <artifactId>maven-shade-plugin</artifactId>
>> <version>3.2.0</version>
>> <executions>
>> <!-- Attach the shade goal into the package phase -->
>> <execution>
>> <phase>package</phase>
>> <goals>
>> <goal>shade</goal>
>> </goals>
>> <configuration>
>> <transformers>
>> <transformer
>> implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
>> </transformers>
>> </configuration>
>> </execution>
>> </executions>
>> </plugin>
>> </plugins>
>> </build>
>>
>>
>> <dependencies>
>> <dependency>
>> <groupId>org.apache.spark</groupId>
>> <artifactId>spark-core_2.11</artifactId>
>> <version>2.3.2</version>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.spark</groupId>
>> <artifactId>spark-sql_2.11</artifactId>
>> <version>2.3.2</version>
>> </dependency>
>> <dependency>
>> <groupId>com.amazonaws</groupId>
>> <artifactId>aws-java-sdk</artifactId>
>> <version>1.11.417</version>
>> </dependency>
>> <dependency>
>> <groupId>com.github.Netflix.iceberg</groupId>
>> <artifactId>iceberg-spark</artifactId>
>> <version>0.6.2</version>
>> </dependency>
>>
>> <dependency>
>> <groupId>com.github.Netflix.iceberg</groupId>
>> <artifactId>iceberg-common</artifactId>
>> <version>0.6.2</version>
>> </dependency>
>> <dependency>
>> <groupId>com.github.Netflix.iceberg</groupId>
>> <artifactId>iceberg-api</artifactId>
>> <version>0.6.2</version>
>> </dependency>
>> <dependency>
>> <groupId>com.github.Netflix.iceberg</groupId>
>> <artifactId>iceberg-core</artifactId>
>> <version>0.6.2</version>
>> </dependency>
>> <dependency>
>> <groupId>com.github.Netflix.iceberg</groupId>
>> <artifactId>iceberg-parquet</artifactId>
>> <version>0.6.2</version>
>> </dependency>
>>
>> <dependency>
>> <groupId>org.apache.hadoop</groupId>
>> <artifactId>hadoop-aws</artifactId>
>> <version>3.1.1</version>
>> </dependency>
>> <!--FATAL: DO NOT ADD THIS-->
>> <!--<dependency>-->
>> <!--<groupId>org.apache.hadoop</groupId>-->
>> <!--<artifactId>hadoop-hdfs</artifactId>-->
>> <!--<version>3.1.1</version>-->
>> <!--</dependency>-->
>> <dependency>
>> <groupId>org.apache.hadoop</groupId>
>> <artifactId>hadoop-common</artifactId>
>> <version>3.1.1</version>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.parquet</groupId>
>> <artifactId>parquet-common</artifactId>
>> <version>1.10.0</version>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.parquet</groupId>
>> <artifactId>parquet-column</artifactId>
>> <version>1.10.0</version>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.parquet</groupId>
>> <artifactId>parquet-hadoop</artifactId>
>> <version>1.10.0</version>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.avro</groupId>
>> <artifactId>avro</artifactId>
>> <version>1.8.2</version>
>> </dependency>
>> <dependency>
>> <groupId>com.googlecode.json-simple</groupId>
>> <artifactId> json-simple</artifactId>
>> <version>1.1</version>
>> </dependency>
>> </dependencies>
>> </project>
>>
>>
>
>
> --
> You received this message because you are subscribed to the Google Groups
> "Iceberg Developers" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected]
> <mailto:[email protected]>.
> To post to this group, send email to [email protected]
> <mailto:[email protected]>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/iceberg-devel/31CCFFC7-913C-4CA6-882C-155C47B22DF6%40orbitalinsight.com
>
> <https://groups.google.com/d/msgid/iceberg-devel/31CCFFC7-913C-4CA6-882C-155C47B22DF6%40orbitalinsight.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout
> <https://groups.google.com/d/optout>.
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix