[GitHub] [incubator-iceberg] jerryshao commented on a change in pull request #674: [WIP]Supports spark-3.0 data source V2 APIs

2019-12-09 Thread GitBox
jerryshao commented on a change in pull request #674: [WIP]Supports spark-3.0 
data source V2 APIs
URL: https://github.com/apache/incubator-iceberg/pull/674#discussion_r355806525
 
 

 ##
 File path: 
spark/src/main/java/org/apache/iceberg/spark/source/IcebergTable.java
 ##
 @@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkUtils;
+import org.apache.iceberg.transforms.UnknownTransform;
+import org.apache.iceberg.types.CheckCompatibility;
+import org.apache.spark.sql.connector.catalog.SupportsRead;
+import org.apache.spark.sql.connector.catalog.SupportsWrite;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCapability;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.connector.write.BatchWrite;
+import org.apache.spark.sql.connector.write.SupportsDynamicOverwrite;
+import org.apache.spark.sql.connector.write.SupportsTruncate;
+import org.apache.spark.sql.connector.write.WriteBuilder;
+import org.apache.spark.sql.connector.write.streaming.StreamingWrite;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+final class IcebergTable implements Table, SupportsRead, SupportsWrite {
+
+  private static final Set CAPABILITIES = ImmutableSet.of(
+  TableCapability.BATCH_READ,
+  TableCapability.BATCH_WRITE,
+  TableCapability.MICRO_BATCH_READ,
+  TableCapability.STREAMING_WRITE,
+  TableCapability.TRUNCATE,
 
 Review comment:
   I see, thanks for explanation. let me check it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] jerryshao commented on a change in pull request #674: [WIP]Supports spark-3.0 data source V2 APIs

2019-12-08 Thread GitBox
jerryshao commented on a change in pull request #674: [WIP]Supports spark-3.0 
data source V2 APIs
URL: https://github.com/apache/incubator-iceberg/pull/674#discussion_r355282067
 
 

 ##
 File path: 
spark/src/main/java/org/apache/iceberg/spark/source/IcebergTable.java
 ##
 @@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkUtils;
+import org.apache.iceberg.transforms.UnknownTransform;
+import org.apache.iceberg.types.CheckCompatibility;
+import org.apache.spark.sql.connector.catalog.SupportsRead;
+import org.apache.spark.sql.connector.catalog.SupportsWrite;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCapability;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.connector.write.BatchWrite;
+import org.apache.spark.sql.connector.write.SupportsDynamicOverwrite;
+import org.apache.spark.sql.connector.write.SupportsTruncate;
+import org.apache.spark.sql.connector.write.WriteBuilder;
+import org.apache.spark.sql.connector.write.streaming.StreamingWrite;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+final class IcebergTable implements Table, SupportsRead, SupportsWrite {
+
+  private static final Set CAPABILITIES = ImmutableSet.of(
+  TableCapability.BATCH_READ,
+  TableCapability.BATCH_WRITE,
+  TableCapability.MICRO_BATCH_READ,
+  TableCapability.STREAMING_WRITE,
+  TableCapability.TRUNCATE,
 
 Review comment:
   IIUC, with the current code, we cannot support `OVERWRITE_BY_FILTER`. 
Overwrite by filter requires to delete the rows which match the filter, but the 
current Iceberg API only supports file based deletion by filter, not row based.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] jerryshao commented on a change in pull request #674: [WIP]Supports spark-3.0 data source V2 APIs

2019-12-04 Thread GitBox
jerryshao commented on a change in pull request #674: [WIP]Supports spark-3.0 
data source V2 APIs
URL: https://github.com/apache/incubator-iceberg/pull/674#discussion_r353595466
 
 

 ##
 File path: 
spark/src/main/java/org/apache/iceberg/spark/source/IcebergBatchScan.java
 ##
 @@ -321,7 +302,160 @@ private Schema lazyExpectedSchema() {
 }
   }
 
-  private static class TaskDataReader implements 
InputPartitionReader {
+  @Override
+  public InputPartition[] planInputPartitions() {
+String tableSchemaString = SchemaParser.toJson(table.schema());
+String expectedSchemaString = SchemaParser.toJson(lazySchema());
+
+List scanTasks = tasks();
+InputPartition[] readTasks = new InputPartition[scanTasks.size()];
+for (int i = 0; i < scanTasks.size(); i++) {
+  readTasks[i] = new BatchReadInputPartition(scanTasks.get(i), 
tableSchemaString, expectedSchemaString, fileIo,
+  encryptionManager, caseSensitive);
+}
+
+return readTasks;
+  }
+
+  @Override
+  public IcebergRowReaderFactory createReaderFactory() {
+return new IcebergRowReaderFactory();
+  }
+
+  private Schema lazySchema() {
+if (schema == null) {
+  if (requestedSchema != null) {
+this.schema = SparkSchemaUtil.prune(table.schema(), requestedSchema);
+  } else {
+this.schema = table.schema();
+  }
+}
+return schema;
+  }
+
+  private StructType lazyType() {
+if (type == null) {
+  this.type = SparkSchemaUtil.convert(lazySchema());
+}
+return type;
+  }
+
+  @Override
+  public StructType readSchema() {
+return lazyType();
+  }
+
+
 
 Review comment:
   nit: additional line. Is this the required style?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] jerryshao commented on a change in pull request #674: [WIP]Supports spark-3.0 data source V2 APIs

2019-12-04 Thread GitBox
jerryshao commented on a change in pull request #674: [WIP]Supports spark-3.0 
data source V2 APIs
URL: https://github.com/apache/incubator-iceberg/pull/674#discussion_r353593990
 
 

 ##
 File path: 
spark/src/main/java/org/apache/iceberg/spark/source/IcebergBatchScan.java
 ##
 @@ -115,61 +116,83 @@
   private StructType type = null; // cached because Spark accesses it multiple 
times
   private List tasks = null; // lazy cache of tasks
 
-  Reader(Table table, boolean caseSensitive, DataSourceOptions options) {
+  public IcebergBatchScan(Table table, Boolean caseSensitive, 
CaseInsensitiveStringMap options) {
 this.table = table;
-this.snapshotId = 
options.get("snapshot-id").map(Long::parseLong).orElse(null);
-this.asOfTimestamp = 
options.get("as-of-timestamp").map(Long::parseLong).orElse(null);
+this.snapshotId = options.containsKey("snapshot-id") ? 
options.getLong("snapshot-id", 0) : null;
+this.asOfTimestamp = options.containsKey("as-of-timestamp") ? 
options.getLong("as-of-timestamp", 0) : null;
 
 Review comment:
   why do we need to change here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] jerryshao commented on a change in pull request #674: [WIP]Supports spark-3.0 data source V2 APIs

2019-12-04 Thread GitBox
jerryshao commented on a change in pull request #674: [WIP]Supports spark-3.0 
data source V2 APIs
URL: https://github.com/apache/incubator-iceberg/pull/674#discussion_r353594129
 
 

 ##
 File path: 
spark/src/main/java/org/apache/iceberg/spark/source/IcebergBatchScan.java
 ##
 @@ -115,61 +116,83 @@
   private StructType type = null; // cached because Spark accesses it multiple 
times
   private List tasks = null; // lazy cache of tasks
 
-  Reader(Table table, boolean caseSensitive, DataSourceOptions options) {
+  public IcebergBatchScan(Table table, Boolean caseSensitive, 
CaseInsensitiveStringMap options) {
 this.table = table;
-this.snapshotId = 
options.get("snapshot-id").map(Long::parseLong).orElse(null);
-this.asOfTimestamp = 
options.get("as-of-timestamp").map(Long::parseLong).orElse(null);
+this.snapshotId = options.containsKey("snapshot-id") ? 
options.getLong("snapshot-id", 0) : null;
+this.asOfTimestamp = options.containsKey("as-of-timestamp") ? 
options.getLong("as-of-timestamp", 0) : null;
+
 if (snapshotId != null && asOfTimestamp != null) {
   throw new IllegalArgumentException(
   "Cannot scan using both snapshot-id and as-of-timestamp to select 
the table snapshot");
 }
 
 // look for split behavior overrides in options
-this.splitSize = 
options.get("split-size").map(Long::parseLong).orElse(null);
-this.splitLookback = 
options.get("lookback").map(Integer::parseInt).orElse(null);
-this.splitOpenFileCost = 
options.get("file-open-cost").map(Long::parseLong).orElse(null);
+this.splitSize = options.containsKey("split-size") ? 
options.getLong("split-size",
+TableProperties.SPLIT_SIZE_DEFAULT) : null;
+this.splitLookback = options.containsKey("lookback") ? 
options.getInt("lookback",
+TableProperties.SPLIT_LOOKBACK_DEFAULT) : null;
+this.splitOpenFileCost = options.containsKey("file-open-cost") ? 
options.getLong("file-open-cost",
+TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT) : null;
 
 Review comment:
   Also here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] jerryshao commented on a change in pull request #674: [WIP]Supports spark-3.0 data source V2 APIs

2019-12-04 Thread GitBox
jerryshao commented on a change in pull request #674: [WIP]Supports spark-3.0 
data source V2 APIs
URL: https://github.com/apache/incubator-iceberg/pull/674#discussion_r353593239
 
 

 ##
 File path: 
spark/src/main/java/org/apache/iceberg/spark/source/IcebergBatchScan.java
 ##
 @@ -115,61 +116,83 @@
   private StructType type = null; // cached because Spark accesses it multiple 
times
   private List tasks = null; // lazy cache of tasks
 
-  Reader(Table table, boolean caseSensitive, DataSourceOptions options) {
+  public IcebergBatchScan(Table table, Boolean caseSensitive, 
CaseInsensitiveStringMap options) {
 
 Review comment:
   Also here, why the constructor changes to `public`? Besides, `Boolean` 
should be changed to primitive type.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] jerryshao commented on a change in pull request #674: [WIP]Supports spark-3.0 data source V2 APIs

2019-12-03 Thread GitBox
jerryshao commented on a change in pull request #674: [WIP]Supports spark-3.0 
data source V2 APIs
URL: https://github.com/apache/incubator-iceberg/pull/674#discussion_r353592567
 
 

 ##
 File path: 
spark/src/main/java/org/apache/iceberg/spark/source/IcebergBatchScan.java
 ##
 @@ -70,31 +70,32 @@
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 import org.apache.spark.sql.catalyst.expressions.JoinedRow;
 import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.Statistics;
+import org.apache.spark.sql.connector.read.SupportsPushDownFilters;
+import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns;
+import org.apache.spark.sql.connector.read.SupportsReportStatistics;
 import org.apache.spark.sql.sources.Filter;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
-import org.apache.spark.sql.sources.v2.reader.InputPartition;
-import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
-import org.apache.spark.sql.sources.v2.reader.Statistics;
-import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters;
-import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns;
-import org.apache.spark.sql.sources.v2.reader.SupportsReportStatistics;
 import org.apache.spark.sql.types.BinaryType;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.sql.types.DecimalType;
 import org.apache.spark.sql.types.StringType;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 import org.apache.spark.unsafe.types.UTF8String;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import scala.collection.JavaConverters;
 
-class Reader implements DataSourceReader, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns,
+public class IcebergBatchScan implements Scan,
 
 Review comment:
   Why do we need to change to `public`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] jerryshao commented on a change in pull request #674: [WIP]Supports spark-3.0 data source V2 APIs

2019-12-03 Thread GitBox
jerryshao commented on a change in pull request #674: [WIP]Supports spark-3.0 
data source V2 APIs
URL: https://github.com/apache/incubator-iceberg/pull/674#discussion_r353586701
 
 

 ##
 File path: api/src/main/java/org/apache/iceberg/Table.java
 ##
 @@ -228,4 +228,13 @@ default AppendFiles newFastAppend() {
* @return a {@link LocationProvider} to provide locations for new data files
*/
   LocationProvider locationProvider();
+
+  /**
+   * Return the name of this table.
+   *
+   * @return this table's name
+   */
+  default String name() {
 
 Review comment:
   This newly added method should be well defined, should it return 
`TableIdentifier` or just `String`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org