[GitHub] [drill] cgivre commented on pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-06-27 Thread GitBox


cgivre commented on pull request #2084:
URL: https://github.com/apache/drill/pull/2084#issuecomment-650694408


   > @cgivre I've added more tests. The tests are not passing, something about 
`Error while applying rule DrillScanRule`. However, I was able to successfully 
execute the test queries through Drill web interface. I don't know how to fix 
these tests?
   > 
   > Edit: attach log file.
   > 
[org.apache.drill.exec.store.ipfs.TestIPFSQueries.txt](https://github.com/apache/drill/files/4840854/org.apache.drill.exec.store.ipfs.TestIPFSQueries.txt)
   
   It appears that the query is not getting through the planning phase.  My 
suggestion is to take a look at this tutorial about writing storage plugins:
   https://github.com/paul-rogers/drill/wiki/Storage-Plugin, and 
   and specifically, follow the debugging procedures that Paul outlines.  My 
hunch here is that something is going wrong with the  schema resolution. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] cgivre opened a new pull request #2089: Drill-7751: Add Storage Plugin for Splunk

2020-06-28 Thread GitBox


cgivre opened a new pull request #2089:
URL: https://github.com/apache/drill/pull/2089


   # [DRILL-7751](https://issues.apache.org/jira/browse/DRILL-7751): Add 
Storage Plugin for Splunk
   
   ## Description
   
   # Drill Connector for Splunk
   This plugin enables Drill to query Splunk. 
   
   ## Configuration
   To connect Drill to Splunk, create a new storage plugin with the following 
configuration:
   
   To successfully connect, Splunk uses port `8089` for interfaces.  This port 
must be open for Drill to query Splunk. 
   
   ```json
   {
  "type":"splunk",
  "username": "admin",
  "password": "changeme",
  "hostname": "localhost",
  "port": 8089,
  "earliestTime": "-14d",
  "latestTime": "now",
  "enabled": false
   }
   ```
   
   ## Understanding Splunk's Data Model
   Splunk's primary use case is analyzing event logs with a timestamp. As such, 
data is indexed by the timestamp, with the most recent data being indexed 
first.  By default, Splunk
will sort the data in reverse chronological order.  Large Splunk 
installations will put older data into buckets of hot, warm and cold storage 
with the "cold" storage on the
 slowest and cheapest disks.
 
   With this understood, it is **very** important to put time boundaries on 
your Splunk queries. The Drill plugin allows you to set default values in the 
configuration such that every
query you run will be bounded by these boundaries.  Alternatively, you can 
set the time boundaries at query time.  In either case, you will achieve the 
best performance when
 you are asking Splunk for the smallest amount of data possible.
 
   ## Understanding Drill's Data Model with Splunk
   Drill treats Splunk indexes as tables. Splunk's access model does not 
restrict to the catalog, but does restrict access to the actual data. It is 
therefore possible that you can
see the names of indexes to which you do not have access.  You can view the 
list of available indexes with a `SHOW TABLES IN splunk` query.
 
   ```
   apache drill> SHOW TABLES IN splunk;
   +--++
   | TABLE_SCHEMA |   TABLE_NAME   |
   +--++
   | splunk   | summary|
   | splunk   | splunklogger   |
   | splunk   | _thefishbucket |
   | splunk   | _audit |
   | splunk   | _internal  |
   | splunk   | _introspection |
   | splunk   | main   |
   | splunk   | history|
   | splunk   | _telemetry |
   +--++
   9 rows selected (0.304 seconds)
   ```
   To query Splunk from Drill, use the following format: 
   ```sql
   SELECT 
   FROM splunk.
   ```
 
## Bounding Your Queries
 When you learn to query Splunk via their interface, the first thing you 
learn is to bound your queries so that they are looking at the shortest time 
span possible. When using
  Drill to query Splunk, it is advisable to do the same thing, and Drill 
offers two ways to accomplish this: via the configuration and at query time.
  
 ### Bounding your Queries at Query Time
 The easiest way to bound your query is to do so at querytime via special 
filters in the `WHERE` clause. There are two special fields, `earliestTime` and 
`latestTime` which can
  be set to bound the query. If they are not set, the query will be bounded 
to the defaults set in the configuration.
  
  You can use any of the time formats specified in the Splunk documentation 
here:   
 
https://docs.splunk.com/Documentation/Splunk/8.0.3/SearchReference/SearchTimeModifiers
 
 So if you wanted to see your data for the last 15 minutes, you could 
execute the following query:
   
   ```sql
   SELECT 
   FROM splunk.
   WHERE earliestTime='-15m' AND latestTime='now'
   ```
   The variables set in a query override the defaults from the configuration. 
 
## Data Types
 Splunk does not have sophisticated data types and unfortunately does not 
provide metadata from its query results.  With the exception of the fields 
below, Drill will interpret
  all fields as `VARCHAR` and hence you will have to convert them to the 
appropriate data type at query time.
 
  Timestamp Fields
 * `_indextime`
 * `_time` 
 
  Numeric Fields
 * `date_hour` 
 * `date_mday`
 * `date_minute`
 * `date_second` 
 * `date_year`
 * `linecount`
 
### Nested Data
Splunk has two different types of nested data which roughly map to Drill's 
`LIST` and `MAP` data types. Unfortunately, there is no easy way to identify 
whether a field is a
 nested field at querytime as Splunk does not provide any metadata and 
therefore all fields are treated as `VARCHAR`.
 
 However, Drill does have built in functions to easily convert Splunk 
multifields into Drill `LIST` and `MAP` data types. For a LIST, simply use the 
 `SPLIT(, ' ')` function to spli

[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-06-28 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r446665500



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSPeer.java
##
@@ -0,0 +1,107 @@
+/*
+ * Copyright (c) 2018-2020 Bowen Ding, Yuedong Xu, Liang Wang
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.multiaddr.MultiAddress;
+import io.ipfs.multihash.Multihash;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+public class IPFSPeer {
+  private IPFSHelper helper;
+
+  private Multihash id;
+  private List addrs;
+  private boolean isDrillReady;
+  private boolean isDrillReadyChecked = false;
+  private Optional drillbitAddress = Optional.empty();
+  private boolean drillbitAddressChecked = false;
+
+
+  public IPFSPeer(IPFSHelper helper, Multihash id) {
+this.helper = helper;
+this.id = id;
+  }
+
+  IPFSPeer(IPFSHelper helper, Multihash id, List addrs) {
+this.helper = helper;
+this.id = id;
+this.addrs = addrs;
+this.isDrillReady = helper.isDrillReady(id);
+this.isDrillReadyChecked = true;
+this.drillbitAddress = IPFSHelper.pickPeerHost(addrs);
+this.drillbitAddressChecked = true;
+  }
+
+  public boolean isDrillReady() {
+if (!isDrillReadyChecked) {
+  isDrillReady = helper.isDrillReady(id);
+  isDrillReadyChecked = true;
+}
+return isDrillReady;
+  }
+
+  public boolean hasDrillbitAddress() {

Review comment:
   Changed in 160a909.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-06-28 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r446667334



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePluginConfig.java
##
@@ -0,0 +1,191 @@
+/*
+ * Copyright (c) 2018-2020 Bowen Ding, Yuedong Xu, Liang Wang
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfigBase;
+
+import java.security.InvalidParameterException;
+import java.util.Map;
+
+@JsonTypeName(IPFSStoragePluginConfig.NAME)
+public class IPFSStoragePluginConfig extends StoragePluginConfigBase{
+static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(IPFSStoragePluginConfig.class);
+
+public static final String NAME = "ipfs";
+
+private final String host;
+private final int port;
+
+@JsonProperty("max-nodes-per-leaf")
+private final int maxNodesPerLeaf;
+
+//TODO add more specific timeout configs fot different operations in IPFS,
+// eg. provider resolution, data read, etc.
+@JsonProperty("ipfs-timeouts")
+private final Map ipfsTimeouts;
+
+@JsonIgnore
+private static final Map ipfsTimeoutDefaults = 
ImmutableMap.of(
+IPFSTimeOut.FIND_PROV, 4,
+IPFSTimeOut.FIND_PEER_INFO, 4,
+IPFSTimeOut.FETCH_DATA, 6
+);
+
+public enum IPFSTimeOut {
+@JsonProperty("find-provider")
+FIND_PROV("find-provider"),
+@JsonProperty("find-peer-info")
+FIND_PEER_INFO("find-peer-info"),
+@JsonProperty("fetch-data")
+FETCH_DATA("fetch-data");
+
+@JsonProperty("type")
+private String which;
+IPFSTimeOut(String which) {
+this.which = which;
+}
+
+@JsonCreator
+public static IPFSTimeOut of(String which) {
+switch (which) {
+case "find-provider":
+return FIND_PROV;
+case "find-peer-info":
+return FIND_PEER_INFO;
+case "fetch-data":
+return FETCH_DATA;
+default:
+throw new InvalidParameterException("Unknown key for IPFS 
timeout config entry: " + which);
+}
+}
+
+@Override
+public String toString() {
+return this.which;
+}
+}
+
+@JsonProperty("groupscan-worker-threads")
+private final int numWorkerThreads;
+
+@JsonProperty
+private final Map formats;
+
+@JsonCreator
+public IPFSStoragePluginConfig(
+@JsonProperty("host") String host,
+@JsonProperty("port") int port,
+@JsonProperty("max-nodes-per-leaf") int maxNodesPerLeaf,
+@JsonProperty("ipfs-timeouts") Map ipfsTimeouts,
+@JsonProperty("groupscan-worker-threads") int numWorkerThreads,
+@JsonProperty("formats") Map formats) {
+this.host = host;
+this.port = port;
+this.maxNodesPerLeaf = maxNodesPerLeaf > 0 ? maxNodesPerLeaf : 1;
+//TODO Jackson failed to deserialize the ipfsTimeouts map causing NPE
+if (ipfsTimeouts != null) {
+ipfsTimeoutDefaults.forEach(ipfsTimeouts::putIfAbsent);
+} else {
+ipfsTimeouts = ipfsTimeoutDefaults;
+}
+this.ipfsTimeouts = ipfsTimeouts;
+this.numWorkerThreads = numWorkerThreads > 0 ? numWorkerThreads : 1;
+this.formats = formats;
+}
+
+public String getHost() {
+return host;
+}
+
+public int getPort() {
+return port;
+}
+
+public int getMaxNodesPerLeaf() {
+return maxNodesPerLeaf;
+}
+
+public int getIpfsTimeout(IPFSTimeOut which) {
+return ipfsTimeouts.get(which);
+}
+
+public Map getIpfsTimeouts() {
+  

[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-06-28 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r446669408



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePluginConfig.java
##
@@ -0,0 +1,191 @@
+/*
+ * Copyright (c) 2018-2020 Bowen Ding, Yuedong Xu, Liang Wang
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfigBase;
+
+import java.security.InvalidParameterException;
+import java.util.Map;
+
+@JsonTypeName(IPFSStoragePluginConfig.NAME)
+public class IPFSStoragePluginConfig extends StoragePluginConfigBase{
+static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(IPFSStoragePluginConfig.class);
+
+public static final String NAME = "ipfs";
+
+private final String host;
+private final int port;
+
+@JsonProperty("max-nodes-per-leaf")
+private final int maxNodesPerLeaf;
+
+//TODO add more specific timeout configs fot different operations in IPFS,
+// eg. provider resolution, data read, etc.
+@JsonProperty("ipfs-timeouts")
+private final Map ipfsTimeouts;
+
+@JsonIgnore
+private static final Map ipfsTimeoutDefaults = 
ImmutableMap.of(
+IPFSTimeOut.FIND_PROV, 4,
+IPFSTimeOut.FIND_PEER_INFO, 4,
+IPFSTimeOut.FETCH_DATA, 6
+);
+
+public enum IPFSTimeOut {
+@JsonProperty("find-provider")
+FIND_PROV("find-provider"),
+@JsonProperty("find-peer-info")
+FIND_PEER_INFO("find-peer-info"),
+@JsonProperty("fetch-data")
+FETCH_DATA("fetch-data");
+
+@JsonProperty("type")
+private String which;
+IPFSTimeOut(String which) {
+this.which = which;
+}
+
+@JsonCreator
+public static IPFSTimeOut of(String which) {
+switch (which) {
+case "find-provider":
+return FIND_PROV;
+case "find-peer-info":
+return FIND_PEER_INFO;
+case "fetch-data":
+return FETCH_DATA;
+default:
+throw new InvalidParameterException("Unknown key for IPFS 
timeout config entry: " + which);
+}
+}
+
+@Override
+public String toString() {
+return this.which;
+}
+}
+
+@JsonProperty("groupscan-worker-threads")
+private final int numWorkerThreads;
+
+@JsonProperty
+private final Map formats;
+
+@JsonCreator
+public IPFSStoragePluginConfig(
+@JsonProperty("host") String host,
+@JsonProperty("port") int port,
+@JsonProperty("max-nodes-per-leaf") int maxNodesPerLeaf,
+@JsonProperty("ipfs-timeouts") Map ipfsTimeouts,
+@JsonProperty("groupscan-worker-threads") int numWorkerThreads,
+@JsonProperty("formats") Map formats) {
+this.host = host;
+this.port = port;
+this.maxNodesPerLeaf = maxNodesPerLeaf > 0 ? maxNodesPerLeaf : 1;
+//TODO Jackson failed to deserialize the ipfsTimeouts map causing NPE
+if (ipfsTimeouts != null) {
+ipfsTimeoutDefaults.forEach(ipfsTimeouts::putIfAbsent);
+} else {
+ipfsTimeouts = ipfsTimeoutDefaults;
+}
+this.ipfsTimeouts = ipfsTimeouts;
+this.numWorkerThreads = numWorkerThreads > 0 ? numWorkerThreads : 1;
+this.formats = formats;
+}
+
+public String getHost() {
+return host;
+}
+
+public int getPort() {
+return port;
+}
+
+public int getMaxNodesPerLeaf() {
+return maxNodesPerLeaf;
+}
+
+public int getIpfsTimeout(IPFSTimeOut which) {
+return ipfsTimeouts.get(which);
+}
+
+public Map getIpfsTimeouts() {
+  

[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-06-28 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r446669488



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePluginConfig.java
##
@@ -0,0 +1,191 @@
+/*
+ * Copyright (c) 2018-2020 Bowen Ding, Yuedong Xu, Liang Wang
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfigBase;
+
+import java.security.InvalidParameterException;
+import java.util.Map;
+
+@JsonTypeName(IPFSStoragePluginConfig.NAME)
+public class IPFSStoragePluginConfig extends StoragePluginConfigBase{
+static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(IPFSStoragePluginConfig.class);
+
+public static final String NAME = "ipfs";
+
+private final String host;
+private final int port;
+
+@JsonProperty("max-nodes-per-leaf")

Review comment:
   I don't why, but removing this annotation seems to make the tests hang 
forever.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-06-28 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r446669668



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePluginConfig.java
##
@@ -0,0 +1,191 @@
+/*
+ * Copyright (c) 2018-2020 Bowen Ding, Yuedong Xu, Liang Wang
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfigBase;
+
+import java.security.InvalidParameterException;
+import java.util.Map;
+
+@JsonTypeName(IPFSStoragePluginConfig.NAME)
+public class IPFSStoragePluginConfig extends StoragePluginConfigBase{
+static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(IPFSStoragePluginConfig.class);
+
+public static final String NAME = "ipfs";
+
+private final String host;
+private final int port;
+
+@JsonProperty("max-nodes-per-leaf")
+private final int maxNodesPerLeaf;
+
+//TODO add more specific timeout configs fot different operations in IPFS,
+// eg. provider resolution, data read, etc.
+@JsonProperty("ipfs-timeouts")
+private final Map ipfsTimeouts;
+
+@JsonIgnore
+private static final Map ipfsTimeoutDefaults = 
ImmutableMap.of(
+IPFSTimeOut.FIND_PROV, 4,
+IPFSTimeOut.FIND_PEER_INFO, 4,
+IPFSTimeOut.FETCH_DATA, 6
+);
+
+public enum IPFSTimeOut {
+@JsonProperty("find-provider")
+FIND_PROV("find-provider"),
+@JsonProperty("find-peer-info")
+FIND_PEER_INFO("find-peer-info"),
+@JsonProperty("fetch-data")
+FETCH_DATA("fetch-data");
+
+@JsonProperty("type")
+private String which;
+IPFSTimeOut(String which) {
+this.which = which;
+}
+
+@JsonCreator
+public static IPFSTimeOut of(String which) {
+switch (which) {
+case "find-provider":
+return FIND_PROV;
+case "find-peer-info":
+return FIND_PEER_INFO;
+case "fetch-data":
+return FETCH_DATA;
+default:
+throw new InvalidParameterException("Unknown key for IPFS 
timeout config entry: " + which);
+}
+}
+
+@Override
+public String toString() {
+return this.which;
+}
+}
+
+@JsonProperty("groupscan-worker-threads")
+private final int numWorkerThreads;
+
+@JsonProperty
+private final Map formats;
+
+@JsonCreator
+public IPFSStoragePluginConfig(
+@JsonProperty("host") String host,
+@JsonProperty("port") int port,
+@JsonProperty("max-nodes-per-leaf") int maxNodesPerLeaf,
+@JsonProperty("ipfs-timeouts") Map ipfsTimeouts,
+@JsonProperty("groupscan-worker-threads") int numWorkerThreads,
+@JsonProperty("formats") Map formats) {
+this.host = host;
+this.port = port;
+this.maxNodesPerLeaf = maxNodesPerLeaf > 0 ? maxNodesPerLeaf : 1;
+//TODO Jackson failed to deserialize the ipfsTimeouts map causing NPE
+if (ipfsTimeouts != null) {

Review comment:
   Hmm, it seems that this comment was made very early in development, and 
the issue it describes no longer exists. I deleted the comment in 282a89d.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-06-28 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r446669892



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSCompat.java
##
@@ -0,0 +1,202 @@
+/*
+ * Copyright (c) 2018-2020 Bowen Ding, Yuedong Xu, Liang Wang
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.JSONParser;
+import io.ipfs.multihash.Multihash;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+/*
+ * Compatibility fixes for java-ipfs-http-client library
+ */
+public class IPFSCompat {
+  public final String host;
+  public final int port;
+  private final String version;
+  public final String protocol;
+  public final int readTimeout;
+  public static final int DEFAULT_READ_TIMEOUT = 0;
+
+  public final DHT dht = new DHT();
+  public final Name name = new Name();
+
+  public IPFSCompat(String host, int port) {
+this(host, port, "/api/v0", false, DEFAULT_READ_TIMEOUT);
+  }
+
+  public IPFSCompat(String host, int port, String version, boolean ssl, int 
readTimeout) {
+this.host = host;
+this.port = port;
+
+if(ssl) {
+  this.protocol = "https";
+} else {
+  this.protocol = "http";
+}
+
+this.version = version;
+this.readTimeout = readTimeout;
+  }
+
+  public class DHT {
+public List findpeerListTimeout(Multihash id, int timeout, 
ExecutorService executor) {
+  BlockingQueue> results = new 
LinkedBlockingQueue<>();
+  executor.submit(() -> retrieveAndParseStream("dht/findpeer?arg=" + id, 
results));
+
+  try {
+long stop = System.currentTimeMillis() + 
TimeUnit.SECONDS.toMillis(timeout);
+while(System.currentTimeMillis() < stop) {
+  Map peer = (Map) results.poll(timeout, TimeUnit.SECONDS);
+  if ( peer != null ) {
+if ( (int) peer.get("Type") == 2 ) {
+  return (List)
+  ((Map)
+  ((List) peer.get("Responses")
+  ).get(0)
+  ).get("Addrs");
+}
+//else: response contains no Addrs, so ignore it.

Review comment:
   I think they are removed in ebc0dc6.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] dbw9580 commented on pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-06-28 Thread GitBox


dbw9580 commented on pull request #2084:
URL: https://github.com/apache/drill/pull/2084#issuecomment-650790085


   @cgivre it turned out what was blocking the tests was that the default 
number of providers in test config was too large, as a result IPFS could not 
find any other providers in time, thus the `TimeoutException`s. I wish the test 
logs had included full stack traces, which could have saved me hours looking 
into the Drill planner internals... 😓



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-06-28 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r446672603



##
File path: 
contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSQueries.java
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.categories.IPFSStorageTest;
+import org.apache.drill.categories.SlowTest;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.fail;
+
+@Category({SlowTest.class, IPFSStorageTest.class})
+public class TestIPFSQueries extends IPFSTestBase {
+
+  @Test
+  public void testNullQuery() throws Exception {
+testBuilder()
+.sqlQuery(getSelectStar(IPFSHelper.IPFS_NULL_OBJECT))
+.unOrdered()
+.expectsNumRecords(1)

Review comment:
   Since we are running query against the null object, it's expected that 
the result set is empty. However, the test log file says it has one row, while 
the web interface clearly shows "no results". I changed this line to make the 
test pass, but I don't know what's going on here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] KazydubB opened a new pull request #2090: DRILL-7759: Code compilation exception for queries containing (untyped) NULL

2020-06-30 Thread GitBox


KazydubB opened a new pull request #2090:
URL: https://github.com/apache/drill/pull/2090


   # [DRILL-7759](https://issues.apache.org/jira/browse/DRILL-7759): Code 
compilation exception for queries containing (untyped) NULL
   
   ## Description
   
   (Please describe the change. If more than one ticket is fixed, include a 
reference to those tickets.)
   
   ## Documentation
   None.
   
   ## Testing
   Added unit test and ran test suits.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] vvysotskyi opened a new pull request #2091: DRILL-7761: Drill fails with OOM for the case of large filter conditions

2020-06-30 Thread GitBox


vvysotskyi opened a new pull request #2091:
URL: https://github.com/apache/drill/pull/2091


   # [DRILL-7761](https://issues.apache.org/jira/browse/DRILL-7761): Drill 
fails with OOM for the case of large filter conditions
   
   ## Description
   Added threshold for the number of nodes that can be created out of the 
conversion to avoid OOM.
   
   ## Documentation
   NA
   
   ## Testing
   Added unit test.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] vvysotskyi merged pull request #2091: DRILL-7761: Drill fails with OOM for the case of large filter conditions

2020-07-01 Thread GitBox


vvysotskyi merged pull request #2091:
URL: https://github.com/apache/drill/pull/2091


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] KazydubB merged pull request #2090: DRILL-7759: Code compilation exception for queries containing (untyped) NULL

2020-07-01 Thread GitBox


KazydubB merged pull request #2090:
URL: https://github.com/apache/drill/pull/2090


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] cgivre opened a new pull request #2092: DRILL-7763: Add Limit Pushdown to File Based Storage Plugins

2020-07-02 Thread GitBox


cgivre opened a new pull request #2092:
URL: https://github.com/apache/drill/pull/2092


   # [DRILL-7763](https://issues.apache.org/jira/browse/DRILL-7763): Add Limit 
Pushdown to File Based Storage Plugins
   
   ## Description
   
   As currently implemented, when querying a file, Drill will read the entire 
file even if a limit is specified in the query.  This PR does a few things:
   Refactors the `EasyGroupScan`, `EasySubScan`, and `EasyFormatConfig` to 
allow the option of pushing down limits.
   Applies this to all the EVF based format plugins which are: LogRegex, PCAP, 
SPSS, Esri, Excel and Text (CSV). 
   Due to JSON's fluid schema, it would be unwise to adopt the limit pushdown 
as it could result in very inconsistent schemata.
   
   ## Documentation
   No user visible changes.  Queries with limits on large files are 
considerably faster.
   
   ## Testing
   All existing unit tests are run. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] vvysotskyi commented on pull request #2092: DRILL-7763: Add Limit Pushdown to File Based Storage Plugins

2020-07-03 Thread GitBox


vvysotskyi commented on pull request #2092:
URL: https://github.com/apache/drill/pull/2092#issuecomment-653427039


   @cgivre, how it would work for the case when there was created multiple 
fragments with their own scan? From the code, it looks like every fragment 
would read the same number of rows specified in the limit. Also, will the limit 
operator be preserved in the plan if the scan supports limit pushdown?
   
   Metastore also provides capabilities for pushing the limit, but it works 
slightly differently - it prunes files and leaves only minimum files number 
with specific row count. Would these two features coexist and work correctly?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] KazydubB commented on a change in pull request #2092: DRILL-7763: Add Limit Pushdown to File Based Storage Plugins

2020-07-03 Thread GitBox


KazydubB commented on a change in pull request #2092:
URL: https://github.com/apache/drill/pull/2092#discussion_r449480526



##
File path: 
contrib/format-spss/src/test/java/org/apache/drill/exec/store/spss/TestSpssReader.java
##
@@ -108,7 +108,7 @@ public void testStarQuery() throws Exception {
 
   @Test
   public void testExplicitQuery() throws Exception {
-String sql = "SELECT ID, Urban, Urban_value FROM dfs.`spss/testdata.sav` 
WHERE d16=4";
+String sql = "SELECT ID, Urban, Urban_value FROM dfs.`spss/testdata.sav` 
WHERE d16=4 LIMIT 5";

Review comment:
   Why not write another test method with `LIMIT` instead of editing the 
existing one without a `LIMIT`?
   In case with `LIMIT`, should query plan be validated also?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] KazydubB commented on a change in pull request #2092: DRILL-7763: Add Limit Pushdown to File Based Storage Plugins

2020-07-03 Thread GitBox


KazydubB commented on a change in pull request #2092:
URL: https://github.com/apache/drill/pull/2092#discussion_r449480526



##
File path: 
contrib/format-spss/src/test/java/org/apache/drill/exec/store/spss/TestSpssReader.java
##
@@ -108,7 +108,7 @@ public void testStarQuery() throws Exception {
 
   @Test
   public void testExplicitQuery() throws Exception {
-String sql = "SELECT ID, Urban, Urban_value FROM dfs.`spss/testdata.sav` 
WHERE d16=4";
+String sql = "SELECT ID, Urban, Urban_value FROM dfs.`spss/testdata.sav` 
WHERE d16=4 LIMIT 5";

Review comment:
   Why not write another test method with `LIMIT` instead of editing the 
existing one without a `LIMIT`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] KazydubB opened a new pull request #2093: DRILL-7764: Cleanup warning messages in GuavaPatcher class

2020-07-03 Thread GitBox


KazydubB opened a new pull request #2093:
URL: https://github.com/apache/drill/pull/2093


   # [DRILL-7764](https://issues.apache.org/jira/browse/DRILL-7764): Cleanup 
warning messages in GuavaPatcher class
   
   ## Description
   
   Removed unnecessary logging of stack trace (on `WARN` level) when patching 
Guava fails.
   
   ## Documentation
   N/A
   
   ## Testing
   Ran all existing tests.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] sanel commented on a change in pull request #2093: DRILL-7764: Cleanup warning messages in GuavaPatcher class

2020-07-03 Thread GitBox


sanel commented on a change in pull request #2093:
URL: https://github.com/apache/drill/pull/2093#discussion_r449513573



##
File path: common/src/main/java/org/apache/drill/common/util/GuavaPatcher.java
##
@@ -73,7 +73,8 @@ private static void patchStopwatch() {
 
   logger.info("Google's Stopwatch patched for old HBase Guava version.");
 } catch (Exception e) {
-  logger.warn("Unable to patch Guava classes.", e);
+  logger.warn("Unable to patch Guava classes: {}", e.getMessage());

Review comment:
   These calls can be wrapped in a small static function





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] sanel commented on a change in pull request #2092: DRILL-7763: Add Limit Pushdown to File Based Storage Plugins

2020-07-03 Thread GitBox


sanel commented on a change in pull request #2092:
URL: https://github.com/apache/drill/pull/2092#discussion_r449517251



##
File path: 
contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpBatchReader.java
##
@@ -180,6 +182,10 @@ private String byteArrayToString(byte[] in) {
   }
 
   private void processShapefileSet(RowSetLoader rowWriter, final int gid, 
final Geometry geom, final Object[] dbfRow) {
+if (maxRecords > 0 && rowWriter.rowCount() >= maxRecords) {

Review comment:
   Same code happens to be used in multiple places? Maybe static function 
will make it clearer





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] KazydubB commented on a change in pull request #2093: DRILL-7764: Cleanup warning messages in GuavaPatcher class

2020-07-03 Thread GitBox


KazydubB commented on a change in pull request #2093:
URL: https://github.com/apache/drill/pull/2093#discussion_r449575594



##
File path: common/src/main/java/org/apache/drill/common/util/GuavaPatcher.java
##
@@ -73,7 +73,8 @@ private static void patchStopwatch() {
 
   logger.info("Google's Stopwatch patched for old HBase Guava version.");
 } catch (Exception e) {
-  logger.warn("Unable to patch Guava classes.", e);
+  logger.warn("Unable to patch Guava classes: {}", e.getMessage());

Review comment:
   Thanks, updated.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] cgivre commented on pull request #2092: DRILL-7763: Add Limit Pushdown to File Based Storage Plugins

2020-07-03 Thread GitBox


cgivre commented on pull request #2092:
URL: https://github.com/apache/drill/pull/2092#issuecomment-653570450


   @vvysotskyi 
   Thanks for taking a look.  
   
   > @cgivre, how it would work for the case when there was created multiple 
fragments with their own scan? From the code, it looks like every fragment 
would read the same number of rows specified in the limit. Also, will the limit 
operator be preserved in the plan if the scan supports limit pushdown?
   
   Firstly, the format plugin has to explicitly enable the pushdown.  I don't 
have the best test infrastructure, so maybe you could assist with that, but I 
do believe that each fragment would read the same number of rows in their own 
scan.  Ideally, I'd like to fix that, but let's say you have 5 scans that are 
reading files with 1000 rows and you put a limit of 100 on the query.  Without 
this PR, my observation was that Drill will still read 5000 rows, whereas with 
this PR, it will only reduce that to 500.  
   
   > 
   > Metastore also provides capabilities for pushing the limit, but it works 
slightly differently - it prunes files and leaves only minimum files number 
with specific row count. Would these two features coexist and work correctly?
   
   I didn't know about this feature in the metastore.  I would like for these 
features to coexist if possible.  Could you point me to some resources, or docs 
for this so that I can take a look?  Ideally, I'd like to make it such that we 
get the minimum files number from the metastore AND we get the row limit as 
well, so that we are looking at the absolute minimum amount of data.
   
   For some background I was working on a project where I had several GB of 
PCAP files in multiple directories.  I found that Drill could query these files 
fairly rapidly, but it seemed to still have a lot of overhead in terms of how 
many files it was actually reading.  Separately, when I was working on the 
Splunk plugin (https://github.com/apache/drill/pull/2089), I discovered that 
virtually no storage plugins actually seemed to have a limit pushdown.  This 
was puzzling since the rules and logic for this were actually already in Drill 
and in the GroupScan.  On top of that, it's actually a fairly easy addition.  
   
   Getting back to this PR, I wanted to see if it made a performance difference 
on querying some large files on my machine and the difference was shocking.  
Simple queries and queries with a `WHERE` clause, which used to take seconds, 
would now be virtually instantaneous.  The difference is user experience is 
really shocking.  
   
   Anyway, I'd appreciate any help you can give with respect to the metastore 
and incorporating that into the PR. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] cgivre commented on pull request #2092: DRILL-7763: Add Limit Pushdown to File Based Storage Plugins

2020-07-03 Thread GitBox


cgivre commented on pull request #2092:
URL: https://github.com/apache/drill/pull/2092#issuecomment-653570853


   @vvysotskyi 
   One more thought, it might make sense, if we can get the metastore sorted 
out, to do the same thing for parquet format.  



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] cgivre commented on a change in pull request #2092: DRILL-7763: Add Limit Pushdown to File Based Storage Plugins

2020-07-07 Thread GitBox


cgivre commented on a change in pull request #2092:
URL: https://github.com/apache/drill/pull/2092#discussion_r451255529



##
File path: 
contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpBatchReader.java
##
@@ -180,6 +182,10 @@ private String byteArrayToString(byte[] in) {
   }
 
   private void processShapefileSet(RowSetLoader rowWriter, final int gid, 
final Geometry geom, final Object[] dbfRow) {
+if (maxRecords > 0 && rowWriter.rowCount() >= maxRecords) {

Review comment:
   Fixed in 
https://github.com/apache/drill/pull/2092/commits/90c82f22cf201c046964a8a8ca45d8d0f66b2168





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] KazydubB merged pull request #2093: DRILL-7764: Cleanup warning messages in GuavaPatcher class

2020-07-08 Thread GitBox


KazydubB merged pull request #2093:
URL: https://github.com/apache/drill/pull/2093


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] cgivre commented on a change in pull request #2092: DRILL-7763: Add Limit Pushdown to File Based Storage Plugins

2020-07-08 Thread GitBox


cgivre commented on a change in pull request #2092:
URL: https://github.com/apache/drill/pull/2092#discussion_r451646671



##
File path: 
contrib/format-spss/src/test/java/org/apache/drill/exec/store/spss/TestSpssReader.java
##
@@ -108,7 +108,7 @@ public void testStarQuery() throws Exception {
 
   @Test
   public void testExplicitQuery() throws Exception {
-String sql = "SELECT ID, Urban, Urban_value FROM dfs.`spss/testdata.sav` 
WHERE d16=4";
+String sql = "SELECT ID, Urban, Urban_value FROM dfs.`spss/testdata.sav` 
WHERE d16=4 LIMIT 5";

Review comment:
   @KazydubB 
   I'll add additional unit tests this weekend.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] cgivre commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-07-08 Thread GitBox


cgivre commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r449034470



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSSubScan.java
##
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+/*import org.apache.drill.common.expression.SchemaPath;*/
+
+@JsonTypeName("ipfs-sub-scan")
+public class IPFSSubScan extends AbstractBase implements SubScan {
+  private static int IPFS_SUB_SCAN_VALUE = 19155;
+  private final IPFSContext ipfsContext;
+  private final List ipfsSubScanSpecList;

Review comment:
   Can this just be a regular `ArrayList`?  If there's a reason why you 
chose to use this, that's fine, but I've not seen this done that way before.

##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSSubScan.java
##
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import io.ip

[GitHub] [drill] dbw9580 commented on pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-07-09 Thread GitBox


dbw9580 commented on pull request #2084:
URL: https://github.com/apache/drill/pull/2084#issuecomment-656170430


   > Cleaning up the PR. I was thinking about the unit tests and it might be 
good to include unit tests using Mockito to mock up some of the various 
components. That way we can test at least some of this without the IPFS daemon. 
I can post an example if you'd like.
   
   Would appreciate that.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-07-09 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r452279329



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSSchemaFactory.java
##
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.SchemaFactory;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+public class IPFSSchemaFactory implements SchemaFactory{
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSSchemaFactory.class);
+
+  final String schemaName;
+  final IPFSContext context;
+
+  public IPFSSchemaFactory(IPFSContext context, String name) throws 
IOException {
+this.context = context;
+this.schemaName = name;
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) 
throws IOException {
+logger.debug("registerSchemas {}", schemaName);
+IPFSTables schema = new IPFSTables(schemaName);
+SchemaPlus hPlus = parent.add(schemaName, schema);
+schema.setHolder(hPlus);
+  }
+
+  class IPFSTables extends AbstractSchema {
+private Set tableNames = Sets.newHashSet();
+private final ConcurrentMap tables = new 
ConcurrentSkipListMap<>(String::compareToIgnoreCase);
+public IPFSTables (String name) {
+  super(ImmutableList.of(), name);
+  tableNames.add(name);
+}
+
+public void setHolder(SchemaPlus pulsOfThis) {
+}
+
+@Override
+public String getTypeName() {
+  return IPFSStoragePluginConfig.NAME;
+}
+
+@Override
+public Set getTableNames() {
+  return Collections.emptySet();
+}
+
+@Override
+public Table getTable(String tableName) {
+  //TODO: better handling of table names

Review comment:
   This is actually related to writer support. The initial design was to 
use a placeholder name for a yet-to-create table on IPFS, e.g. ``ipfs.`create` 
``. Since the table names are hashes of the content, they cannot be known 
before they are created. I could delete this part of code, they don't do 
anything anyway.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] cgivre commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-07-09 Thread GitBox


cgivre commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r452281356



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSSchemaFactory.java
##
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.SchemaFactory;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+public class IPFSSchemaFactory implements SchemaFactory{
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSSchemaFactory.class);
+
+  final String schemaName;
+  final IPFSContext context;
+
+  public IPFSSchemaFactory(IPFSContext context, String name) throws 
IOException {
+this.context = context;
+this.schemaName = name;
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) 
throws IOException {
+logger.debug("registerSchemas {}", schemaName);
+IPFSTables schema = new IPFSTables(schemaName);
+SchemaPlus hPlus = parent.add(schemaName, schema);
+schema.setHolder(hPlus);
+  }
+
+  class IPFSTables extends AbstractSchema {
+private Set tableNames = Sets.newHashSet();
+private final ConcurrentMap tables = new 
ConcurrentSkipListMap<>(String::compareToIgnoreCase);
+public IPFSTables (String name) {
+  super(ImmutableList.of(), name);
+  tableNames.add(name);
+}
+
+public void setHolder(SchemaPlus pulsOfThis) {
+}
+
+@Override
+public String getTypeName() {
+  return IPFSStoragePluginConfig.NAME;
+}
+
+@Override
+public Set getTableNames() {
+  return Collections.emptySet();
+}
+
+@Override
+public Table getTable(String tableName) {
+  //TODO: better handling of table names

Review comment:
   In that case, perhaps create a JIRA and reference it in the code 
comments. It's fine with me to leave the code, but just please put an 
explanation of why it's there and what the plans are. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-07-09 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r452297400



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSSubScan.java
##
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+/*import org.apache.drill.common.expression.SchemaPath;*/
+
+@JsonTypeName("ipfs-sub-scan")
+public class IPFSSubScan extends AbstractBase implements SubScan {
+  private static int IPFS_SUB_SCAN_VALUE = 19155;
+  private final IPFSContext ipfsContext;
+  private final List ipfsSubScanSpecList;

Review comment:
   Yes, which variant of `List` it is doesn't really matter, but the rest 
of the code does not rely on a specific implementation of `List`, either. I 
made a `LinkedList` instance here and that was a mistake:
   
https://github.com/apache/drill/blob/df4a7b2993e6752481d6b35d636f5fef4a20aebf/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSSubScan.java#L182
   
   Should I change it to `ArrayList`? I mean using the interface as 
the type seems like the default way.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-07-09 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r452319373



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSSubScan.java
##
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+/*import org.apache.drill.common.expression.SchemaPath;*/

Review comment:
   Fixed in 0f9c2db.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-07-09 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r452319560



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSSubScan.java
##
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+/*import org.apache.drill.common.expression.SchemaPath;*/
+
+@JsonTypeName("ipfs-sub-scan")
+public class IPFSSubScan extends AbstractBase implements SubScan {
+  private static int IPFS_SUB_SCAN_VALUE = 19155;
+  private final IPFSContext ipfsContext;
+  private final List ipfsSubScanSpecList;
+  private final IPFSScanSpec.Format format;
+  private final List columns;
+
+
+  @JsonCreator
+  public IPFSSubScan(@JacksonInject StoragePluginRegistry registry,
+ @JsonProperty("IPFSStoragePluginConfig") 
IPFSStoragePluginConfig ipfsStoragePluginConfig,
+ @JsonProperty("IPFSSubScanSpec") 
@JsonDeserialize(using=MultihashDeserializer.class) List 
ipfsSubScanSpecList,
+ @JsonProperty("format") IPFSScanSpec.Format format,
+ @JsonProperty("columns") List columns
+ ) throws ExecutionSetupException {
+super((String) null);
+IPFSStoragePlugin plugin = (IPFSStoragePlugin) 
registry.getPlugin(ipfsStoragePluginConfig);
+ipfsContext = plugin.getIPFSContext();
+this.ipfsSubScanSpecList = ipfsSubScanSpecList;
+this.format = format;
+this.columns = columns;
+  }
+
+  public IPFSSubScan(IPFSContext ipfsContext, List 
ipfsSubScanSpecList, IPFSScanSpec.Format format, List columns) {
+super((String) null);
+this.ipfsContext = ipfsContext;
+this.ipfsSubScanSpecList = ipfsSubScanSpecList;
+this.format = format;
+this.columns = columns;
+  }
+
+  @JsonIgnore
+  public IPFSContext getIPFSContext() {
+return ipfsContext;
+  }
+
+  @JsonProperty("IPFSStoragePluginConfig")
+  public IPFSStoragePluginConfig getIPFSStoragePluginConfig() {
+return ipfsContext.getStoragePluginConfig();
+  }
+
+  @JsonProperty("columns")
+  public List getColumns() {
+return columns;
+  }
+
+  @JsonProperty("format")
+  public IPFSScanSpec.Format getFormat() {
+return format;
+  }
+
+  @Override
+  public String toString() {
+return new PlanStringBuilder(this)
+.field("scan spec", ipfsSubScanSpecList)
+.field("format", format)
+.field("columns", columns)
+.toString();
+  }
+
+  @JsonSerialize(using = MultihashSerializer.class)
+  @JsonProperty("IPFSSubScanSpec")
+  public List getIPFSSubScanSpecList() {
+return ipfsSubScanSpecList;
+  }
+
+  

[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-07-09 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r452320145



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSSchemaFactory.java
##
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.SchemaFactory;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+public class IPFSSchemaFactory implements SchemaFactory{
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSSchemaFactory.class);
+
+  final String schemaName;
+  final IPFSContext context;
+
+  public IPFSSchemaFactory(IPFSContext context, String name) throws 
IOException {
+this.context = context;
+this.schemaName = name;
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) 
throws IOException {
+logger.debug("registerSchemas {}", schemaName);
+IPFSTables schema = new IPFSTables(schemaName);
+SchemaPlus hPlus = parent.add(schemaName, schema);
+schema.setHolder(hPlus);
+  }
+
+  class IPFSTables extends AbstractSchema {
+private Set tableNames = Sets.newHashSet();
+private final ConcurrentMap tables = new 
ConcurrentSkipListMap<>(String::compareToIgnoreCase);
+public IPFSTables (String name) {
+  super(ImmutableList.of(), name);
+  tableNames.add(name);
+}
+
+public void setHolder(SchemaPlus pulsOfThis) {
+}
+
+@Override
+public String getTypeName() {
+  return IPFSStoragePluginConfig.NAME;
+}
+
+@Override
+public Set getTableNames() {
+  return Collections.emptySet();
+}
+
+@Override
+public Table getTable(String tableName) {
+  //TODO: better handling of table names

Review comment:
   This is now DRILL-7766.

##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSSchemaFactory.java
##
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.SchemaFactory;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+public class IPFSSchemaFactory implements SchemaFactory{

[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-07-09 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r452320526



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePluginConfig.java
##
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfigBase;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.security.InvalidParameterException;
+import java.util.Map;
+
+@JsonTypeName(IPFSStoragePluginConfig.NAME)
+public class IPFSStoragePluginConfig extends StoragePluginConfigBase{
+private static final Logger logger = 
LoggerFactory.getLogger(IPFSStoragePluginConfig.class);

Review comment:
   Fixed in 48d2058.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-07-09 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r452321021



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePluginConfig.java
##
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfigBase;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.security.InvalidParameterException;
+import java.util.Map;
+
+@JsonTypeName(IPFSStoragePluginConfig.NAME)
+public class IPFSStoragePluginConfig extends StoragePluginConfigBase{
+private static final Logger logger = 
LoggerFactory.getLogger(IPFSStoragePluginConfig.class);
+
+public static final String NAME = "ipfs";
+
+private final String host;
+private final int port;
+
+@JsonProperty("max-nodes-per-leaf")
+private final int maxNodesPerLeaf;
+
+@JsonProperty("ipfs-timeouts")
+private final Map ipfsTimeouts;
+
+@JsonIgnore
+private static final Map ipfsTimeoutDefaults = 
ImmutableMap.of(
+IPFSTimeOut.FIND_PROV, 4,
+IPFSTimeOut.FIND_PEER_INFO, 4,
+IPFSTimeOut.FETCH_DATA, 6
+);
+
+public enum IPFSTimeOut {
+@JsonProperty("find-provider")
+FIND_PROV("find-provider"),
+@JsonProperty("find-peer-info")
+FIND_PEER_INFO("find-peer-info"),
+@JsonProperty("fetch-data")
+FETCH_DATA("fetch-data");
+
+@JsonProperty("type")
+private final String which;
+IPFSTimeOut(String which) {
+this.which = which;
+}
+
+@JsonCreator
+public static IPFSTimeOut of(String which) {
+switch (which) {
+case "find-provider":
+return FIND_PROV;
+case "find-peer-info":
+return FIND_PEER_INFO;
+case "fetch-data":
+return FETCH_DATA;
+default:
+throw new InvalidParameterException("Unknown key for IPFS 
timeout config entry: " + which);
+}
+}
+
+@Override
+public String toString() {
+return this.which;
+}
+}
+
+@JsonProperty("groupscan-worker-threads")
+private final int numWorkerThreads;
+
+@JsonProperty
+private final Map formats;
+
+@JsonCreator
+public IPFSStoragePluginConfig(
+@JsonProperty("host") String host,
+@JsonProperty("port") int port,
+@JsonProperty("max-nodes-per-leaf") int maxNodesPerLeaf,
+@JsonProperty("ipfs-timeouts") Map ipfsTimeouts,
+@JsonProperty("groupscan-worker-threads") int numWorkerThreads,
+@JsonProperty("formats") Map formats) {
+this.host = host;
+this.port = port;
+this.maxNodesPerLeaf = maxNodesPerLeaf > 0 ? maxNodesPerLeaf : 1;
+if (ipfsTimeouts != null) {
+ipfsTimeoutDefaults.forEach(ipfsTimeouts::putIfAbsent);
+} else {
+ipfsTimeouts = ipfsTimeoutDefaults;
+}
+this.ipfsTimeouts = ipfsTimeouts;
+this.numWorkerThreads = numWorkerThreads > 0 ? numWorkerThreads : 1;
+this.formats = formats;
+}
+
+public String getHost() {
+return host;
+}
+
+public int getPort() {
+return port;
+}
+
+public int getMaxNodesPerLeaf() {
+return maxNodesPerLeaf;
+}
+
+public int getIpfsTimeout(IPFSTimeOut which) {
+return ipfsTimeouts.get(which);
+}
+
+public Map getIpfsTimeouts() {
+return ipfsTimeouts;
+}
+
+public int getNumWorkerThreads() {
+return numWorkerThreads;
+}
+
+public Map getFormats() {
+return formats;
+}
+
+@Override
+public int hashCod

[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-07-09 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r452321224



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePluginConfig.java
##
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfigBase;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.security.InvalidParameterException;
+import java.util.Map;
+
+@JsonTypeName(IPFSStoragePluginConfig.NAME)
+public class IPFSStoragePluginConfig extends StoragePluginConfigBase{
+private static final Logger logger = 
LoggerFactory.getLogger(IPFSStoragePluginConfig.class);
+
+public static final String NAME = "ipfs";
+
+private final String host;

Review comment:
   Fixed in 48d2058.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] cgivre commented on pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-07-09 Thread GitBox


cgivre commented on pull request #2084:
URL: https://github.com/apache/drill/pull/2084#issuecomment-656479289


   > > Cleaning up the PR. I was thinking about the unit tests and it might be 
good to include unit tests using Mockito to mock up some of the various 
components. That way we can test at least some of this without the IPFS daemon. 
I can post an example if you'd like.
   > 
   > Would appreciate that.
   
   Take a look here for an example:
   
   
https://github.com/apache/drill/blob/5900cdfaae20e216d4b87795bd2efc8199e648e6/contrib/storage-elastic/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchGroupScanTest.java#L42-L96
   
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] vvysotskyi commented on pull request #2092: DRILL-7763: Add Limit Pushdown to File Based Storage Plugins

2020-07-20 Thread GitBox


vvysotskyi commented on pull request #2092:
URL: https://github.com/apache/drill/pull/2092#issuecomment-660886165


   @cgivre, there already was a similar functionality for Parquet: 
https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java#L53.
   Please take a look at 
[`AbstractGroupScanWithMetadata.applyLimit()`](https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java#L453)
 method - it contains required logic for pruning files. With your changes, you 
are overriding it, so it breaks this functionality. To coexist with this 
feature, please take a look at the implementation of this method for the 
parquet group scan, move common logic to AbstractGroupScanWithMetadata and use 
it in the easy group scan.
   
   The behavior with the metastore usage is the following: if for example, we 
have 10 files with 100 records, and query with limit 5 is submitted, only the 
single file would be left in the group scan, since it contains all required 
records.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] vvysotskyi commented on a change in pull request #2092: DRILL-7763: Add Limit Pushdown to File Based Storage Plugins

2020-07-20 Thread GitBox


vvysotskyi commented on a change in pull request #2092:
URL: https://github.com/apache/drill/pull/2092#discussion_r457178768



##
File path: 
exec/java-exec/src/test/java/org/apache/drill/exec/metastore/TestMetastoreWithEasyFormatPlugin.java
##
@@ -1079,14 +1079,14 @@ public void testFilesPruningWithLimit() throws 
Exception {
   queryBuilder()
   .sql("select * from dfs.tmp.`%s` limit 1", tableName)
   .planMatcher()
-  .include("Limit", "numFiles=1,")
+  .include("Limit", "numFiles=12,")

Review comment:
   This is the regression I mentioned in the previous comment. It is 
non-optimal to read all 12 files instead of the single one.

##
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
##
@@ -172,6 +175,26 @@ private EasyGroupScan(final EasyGroupScan that) {
 mappings = that.mappings;
 partitionDepth = that.partitionDepth;
 metadataProvider = that.metadataProvider;
+maxRecords = that.maxRecords;
+supportsLimitPushdown = 
that.formatPlugin.easyConfig().supportsLimitPushdown;
+  }
+
+  // Constructor to get the limit pushed down
+  private EasyGroupScan(final EasyGroupScan that, int maxRecords) {

Review comment:
   Please do not introduce the new constructor. `maxRecords` is not final, 
so it is possible to assign its value after using the existing copy constructor.

##
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
##
@@ -17,31 +17,30 @@
  */
 package org.apache.drill.exec.store.dfs.easy;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
+import com.fasterxml.jackson.annotation.JacksonInject;

Review comment:
   Please keep the original imports order.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] cgivre commented on a change in pull request #2092: DRILL-7763: Add Limit Pushdown to File Based Storage Plugins

2020-07-21 Thread GitBox


cgivre commented on a change in pull request #2092:
URL: https://github.com/apache/drill/pull/2092#discussion_r458052450



##
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
##
@@ -172,6 +175,26 @@ private EasyGroupScan(final EasyGroupScan that) {
 mappings = that.mappings;
 partitionDepth = that.partitionDepth;
 metadataProvider = that.metadataProvider;
+maxRecords = that.maxRecords;
+supportsLimitPushdown = 
that.formatPlugin.easyConfig().supportsLimitPushdown;
+  }
+
+  // Constructor to get the limit pushed down
+  private EasyGroupScan(final EasyGroupScan that, int maxRecords) {

Review comment:
   Fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] oleg-zinovev opened a new pull request #2094: DRILL-7773: Incorrect conversion result to TIME_EPOCH_BE

2020-07-29 Thread GitBox


oleg-zinovev opened a new pull request #2094:
URL: https://github.com/apache/drill/pull/2094


   # [DRILL-7773](https://issues.apache.org/jira/browse/DRILL-7773): Incorrect 
conversion result to TIME_EPOCH_BE
   
   ## Description
   
   An error was made when converting to BIG ENGIAN. 
   First, the bytes of an integer were reversed, and only then the result was 
extended to long. 
   Because of this, null zero at the beginning of long did not participate in 
the reverse operation
   
   ## Testing
   (Please describe how this PR has been tested.)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] cgivre merged pull request #2094: DRILL-7773: Incorrect conversion result to TIME_EPOCH_BE

2020-08-02 Thread GitBox


cgivre merged pull request #2094:
URL: https://github.com/apache/drill/pull/2094


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] vvysotskyi opened a new pull request #2095: DRILL-7774: IS NOT NULL predicate fails with NPE for the case of non-existing columns with non-deterministic expression

2020-08-06 Thread GitBox


vvysotskyi opened a new pull request #2095:
URL: https://github.com/apache/drill/pull/2095


   # [DRILL-7774](https://issues.apache.org/jira/browse/DRILL-7774): IS NOT 
NULL predicate fails with NPE for the case of non-existing columns with 
non-deterministic expression
   
   ## Description
   Added null check.
   
   ## Documentation
   NA
   
   ## Testing
   Added unit test for the issue.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] vvysotskyi commented on pull request #2095: DRILL-7774: IS NOT NULL predicate fails with NPE for the case of non-existing columns with non-deterministic expression

2020-08-06 Thread GitBox


vvysotskyi commented on pull request #2095:
URL: https://github.com/apache/drill/pull/2095#issuecomment-670187689


   @KazydubB, could you please take a look?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] cgivre commented on pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-06 Thread GitBox


cgivre commented on pull request #2084:
URL: https://github.com/apache/drill/pull/2084#issuecomment-670308917


   @dbw9580 
   I'll take a look over the weekend.  Thanks for the contribution!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] vvysotskyi merged pull request #2095: DRILL-7774: IS NOT NULL predicate fails with NPE for the case of non-existing columns with non-deterministic expression

2020-08-07 Thread GitBox


vvysotskyi merged pull request #2095:
URL: https://github.com/apache/drill/pull/2095


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] cgivre commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-09 Thread GitBox


cgivre commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r467667133



##
File path: 
contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSGroupScan.java
##
@@ -0,0 +1,162 @@
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.JSONParser;
+import io.ipfs.api.MerkleNode;
+import io.ipfs.multiaddr.MultiAddress;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.categories.IPFSStorageTest;
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder;
+import org.apache.drill.shaded.guava.com.google.common.cache.CacheLoader;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.apache.drill.shaded.guava.com.google.common.io.Resources;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut.*;

Review comment:
   The star import is a check-style violation.  

##
File path: 
contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/IPFSTestSuit.java
##
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.drill.categories.IPFSStorageTest;
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.shaded.guava.com.google.common.io.Resources;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({TestIPFSQueries.class, TestIPFSGroupScan.class})

Review comment:
   This is missing the scan spec test.

##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSCompat.java
##
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.JSONParser;
+import io.ipfs.multihash.Multihash;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+
+/**
+ * Compatibility fixes for java-ipfs-http-client library
+ *
+ * Supports IPFS up to version v0.4.23, due to new restrictions enforcing all 
API

[GitHub] [drill] dzamo opened a new pull request #2096: Documentation updates for 1.18

2020-08-09 Thread GitBox


dzamo opened a new pull request #2096:
URL: https://github.com/apache/drill/pull/2096


   TODO: Either open a Jira ticket for this work and replace this description 
with the template below or avoid the Jira process for docs and delete the 
template below.
   
   # [DRILL-](https://issues.apache.org/jira/browse/DRILL-): PR Title
   
   (Please replace `PR Title` with actual PR Title)
   
   ## Description
   
   (Please describe the change. If more than one ticket is fixed, include a 
reference to those tickets.)
   
   ## Documentation
   (Please describe user-visible changes similar to what should appear in the 
Drill documentation.)
   
   ## Testing
   (Please describe how this PR has been tested.)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-10 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r467923605



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSCompat.java
##
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.JSONParser;
+import io.ipfs.multihash.Multihash;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+
+/**
+ * Compatibility fixes for java-ipfs-http-client library
+ *
+ * Supports IPFS up to version v0.4.23, due to new restrictions enforcing all 
API calls to be made with POST method.
+ * Upstream issue tracker: 
https://github.com/ipfs-shipyard/java-ipfs-http-client/issues/157
+ */

Review comment:
   It upgraded target Java version to 11: 
https://github.com/ipfs-shipyard/java-ipfs-http-client/commit/6c0016c00b9a3cd213343fa25adb5099be52a401
   Drill's still using Java 8, I'm not sure we can do this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] cgivre commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-10 Thread GitBox


cgivre commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r467934049



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSCompat.java
##
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.JSONParser;
+import io.ipfs.multihash.Multihash;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+
+/**
+ * Compatibility fixes for java-ipfs-http-client library
+ *
+ * Supports IPFS up to version v0.4.23, due to new restrictions enforcing all 
API calls to be made with POST method.
+ * Upstream issue tracker: 
https://github.com/ipfs-shipyard/java-ipfs-http-client/issues/157
+ */

Review comment:
   Ah ok..  Seems like we need to update Drill to use a more recent version 
of Java..





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] cgivre commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-10 Thread GitBox


cgivre commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r467938032



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSCompat.java
##
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.JSONParser;
+import io.ipfs.multihash.Multihash;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+
+/**
+ * Compatibility fixes for java-ipfs-http-client library
+ *
+ * Supports IPFS up to version v0.4.23, due to new restrictions enforcing all 
API calls to be made with POST method.
+ * Upstream issue tracker: 
https://github.com/ipfs-shipyard/java-ipfs-http-client/issues/157
+ */

Review comment:
   Let me make sure I understand this:
   Drill can't query IPFS version > 0.4.2 due to library restrictions.  We 
can't simply upgrade the library because it requires Java 11 and Drill is built 
on Java 8.  Is that correct?
   
   (Sorry.. not an expert on IPFS, and I just want to make sure I'm 
understanding all this.)  
   How criticial would you say this is for functionality?  Is there some 
workaround possible so that Drill will work with the latest IPFS version?
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] cgivre commented on a change in pull request #2096: Documentation updates for 1.18

2020-08-10 Thread GitBox


cgivre commented on a change in pull request #2096:
URL: https://github.com/apache/drill/pull/2096#discussion_r467897882



##
File path: 
_docs/configure-drill/configuration-options/030-planning-and-exececution-options.md
##
@@ -27,11 +27,11 @@ You can run the following query to see a list of options:
 The query returns a table that lists options with descriptions and other 
details. As of Drill 1.15, there are 179 options:  
 
SELECT COUNT() AS num_of_sysopts FROM sys.options;
-   +-+
+   |-|
| num_of_sysopts  |
-   +-+
+   |-|
| 179 |
-   +-+  
+   |-|  

Review comment:
   Nit: `###Drill-override.conf` Row Limit Settings is not formatted 
correctly.

##
File path: 
_docs/connect-a-data-source/plugins/114-image-metadata-format-plugin.md
##
@@ -1,6 +1,6 @@
 ---
 title: "Image Metadata Format Plugin"

Review comment:
   IMHO, We could use a little re-organization here.  This particular page 
is about querying image metadata via Drill. We should collect any pages about 
formats under the section about querying the file system instead of treating 
this like it was a storage plugin.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] cgivre commented on pull request #2096: Documentation updates for 1.18

2020-08-10 Thread GitBox


cgivre commented on pull request #2096:
URL: https://github.com/apache/drill/pull/2096#issuecomment-671404618


   Could you also please add the following docs to the section about querying 
files?
   
   - https://github.com/apache/drill/tree/master/contrib/format-excel
   - https://github.com/apache/drill/tree/master/contrib/format-hdf5
   - https://github.com/apache/drill/tree/master/contrib/format-spss
   - https://github.com/apache/drill/tree/master/contrib/format-syslog
   - https://github.com/apache/drill/tree/master/contrib/format-esri
   
   Also, we have a few new storage plugins which we should add to the docs 
which are:
   - https://github.com/apache/drill/tree/master/contrib/storage-http
   - https://github.com/apache/drill/tree/master/contrib/storage-druid
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] cgivre commented on pull request #2096: Documentation updates for 1.18

2020-08-10 Thread GitBox


cgivre commented on pull request #2096:
URL: https://github.com/apache/drill/pull/2096#issuecomment-671406571


   One more thing... take a look here 
https://github.com/apache/drill/tree/master/contrib/udfs for UDFs included with 
Drill.  Ideally these should be in the docs.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-10 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r467971039



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSCompat.java
##
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.JSONParser;
+import io.ipfs.multihash.Multihash;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+
+/**
+ * Compatibility fixes for java-ipfs-http-client library
+ *
+ * Supports IPFS up to version v0.4.23, due to new restrictions enforcing all 
API calls to be made with POST method.
+ * Upstream issue tracker: 
https://github.com/ipfs-shipyard/java-ipfs-http-client/issues/157
+ */

Review comment:
   > Drill can't query IPFS version > 0.4.2 due to library restrictions. We 
can't simply upgrade the library because it requires Java 11 and Drill is built 
on Java 8. Is that correct?
   
   Yes, `java-ipfs-http-client` v1.2.3, the last version which requires Java 8, 
supports IPFS up to version 0.4.23, the last release before version 0.5 which 
introduced the incompatibility in 
https://github.com/ipfs-shipyard/java-ipfs-http-client/issues/157. The latest 
library version v.1.3.2 supports IPFS v0.5+ but requires Java 11.
   
   >How criticial would you say this is for functionality?
   
   I'm not sure how many users of IPFS have upgraded to v0.5+, but users can 
[downgrade to a previous version of 
IPFS](https://github.com/ipfs/ipfs-update#revert) if they want to run Drill 
with IPFS support for the time being. Newer IPFS versions bring performance 
improvements, which could help Drill do queries faster, but the basic 
functionalities should be the same.
   
   > Is there some workaround possible so that Drill will work with the latest 
IPFS version?
   
   At a first glance the `java-ipfs-http-client` lib seems to be using some 
features from Java 11, but only in tests. We could fork the library and revert 
the Java target version to 8 and ignore those tests. I need to investigate more 
about this to see if it's really a solution.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-11 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r468469590



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSCompat.java
##
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.JSONParser;
+import io.ipfs.multihash.Multihash;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+
+/**
+ * Compatibility fixes for java-ipfs-http-client library
+ *
+ * Supports IPFS up to version v0.4.23, due to new restrictions enforcing all 
API calls to be made with POST method.
+ * Upstream issue tracker: 
https://github.com/ipfs-shipyard/java-ipfs-http-client/issues/157
+ */

Review comment:
   Good news, it's trivial to revert to Java 8: 
https://github.com/ipfs-shipyard/java-ipfs-http-client/pull/172.
   Let's hope it gets merged soon.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] cgivre commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-12 Thread GitBox


cgivre commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r469626333



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSCompat.java
##
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.JSONParser;
+import io.ipfs.multihash.Multihash;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+
+/**
+ * Compatibility fixes for java-ipfs-http-client library
+ *
+ * Supports IPFS up to version v0.4.23, due to new restrictions enforcing all 
API calls to be made with POST method.
+ * Upstream issue tracker: 
https://github.com/ipfs-shipyard/java-ipfs-http-client/issues/157
+ */

Review comment:
   Cool, let's just keep an eye on that for now. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] cgivre commented on pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-12 Thread GitBox


cgivre commented on pull request #2084:
URL: https://github.com/apache/drill/pull/2084#issuecomment-673221990


   @dbw9580 
   Please verify that the project builds and passes all checkstyles.  
`TestIPFQueries` fails the checkstyle due to unused imports.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-13 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r469944365



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java
##
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.ipfs.api.MerkleNode;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.util.DrillVersionInfo;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
+import 
org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.RecursiveTask;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@JsonTypeName("ipfs-scan")
+public class IPFSGroupScan extends AbstractGroupScan {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSGroupScan.class);
+  private IPFSContext ipfsContext;
+  private IPFSScanSpec ipfsScanSpec;
+  private IPFSStoragePluginConfig config;
+  private List columns;
+
+  private static long DEFAULT_NODE_SIZE = 1000l;
+
+  private ListMultimap assignments;
+  private List ipfsWorkList = Lists.newArrayList();
+  private Map> endpointWorksMap;
+  private List affinities;
+
+  @JsonCreator
+  public IPFSGroupScan(@JsonProperty("IPFSScanSpec") IPFSScanSpec ipfsScanSpec,
+   @JsonProperty("IPFSStoragePluginConfig") 
IPFSStoragePluginConfig ipfsStoragePluginConfig,
+   @JsonProperty("columns") List columns,
+   @JacksonInject StoragePluginRegistry pluginRegistry) 
throws IOException, ExecutionSetupException {
+this(
+((IPFSStoragePlugin) 
pluginRegistry.getPlugin(ipfsStoragePluginConfig)).getIPFSContext(),
+ipfsScanSpec,
+columns
+);
+  }
+
+  public IPFSGroupScan(IPFSContext ipfsContext,
+   IPFSScanSpec ipfsScanSpec,
+   List columns) {
+super((String) null);
+this.ipfsContext = ipfsContext;
+this.ipfsScanSpec = ipfsScanSpec;
+this.config = ipfsContext.getStoragePluginConfig();
+logger.debug("GroupScan constructor called with columns {}", columns);
+this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : 
columns;
+init();
+  }
+
+  pr

[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-13 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r469960924



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSCompat.java
##
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.JSONParser;
+import io.ipfs.multihash.Multihash;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+
+/**
+ * Compatibility fixes for java-ipfs-http-client library
+ *
+ * Supports IPFS up to version v0.4.23, due to new restrictions enforcing all 
API calls to be made with POST method.
+ * Upstream issue tracker: 
https://github.com/ipfs-shipyard/java-ipfs-http-client/issues/157
+ */
+public class IPFSCompat {
+  public final String host;
+  public final int port;
+  private final String version;
+  public final String protocol;
+  public final int readTimeout;
+  public static final int DEFAULT_READ_TIMEOUT = 0;
+
+  public final DHT dht = new DHT();
+  public final Name name = new Name();
+
+  public IPFSCompat(IPFS ipfs) {
+this(ipfs.host, ipfs.port);
+  }
+
+  public IPFSCompat(String host, int port) {
+this(host, port, "/api/v0/", false, DEFAULT_READ_TIMEOUT);
+  }
+
+  public IPFSCompat(String host, int port, String version, boolean ssl, int 
readTimeout) {
+this.host = host;
+this.port = port;
+
+if(ssl) {
+  this.protocol = "https";
+} else {
+  this.protocol = "http";
+}
+
+this.version = version;
+this.readTimeout = readTimeout;
+  }
+
+  /**
+   * Resolve names to IPFS CIDs.
+   * See https://docs.ipfs.io/reference/http/api/#api-v0-resolve";>resolve in IPFS 
doc.
+   * @param scheme the scheme of the name to resolve, usually IPFS or IPNS
+   * @param path the path to the object
+   * @param recursive whether recursively resolve names until it is a IPFS CID
+   * @return a Map of JSON object, with the result as the value of key "Path"
+   */
+  public Map resolve(String scheme, String path, boolean recursive) {
+AtomicReference ret = new AtomicReference<>();
+getObjectStream(
+"resolve?arg=/" + scheme+"/"+path +"&r="+recursive,
+res -> {
+  ret.set((Map) res);
+  return true;
+},
+err -> {
+  throw new RuntimeException(err);
+}
+);
+return ret.get();
+  }
+
+  public class DHT {
+/**
+ * Find internet addresses of a given peer.
+ * See https://docs.ipfs.io/reference/http/api/#api-v0-dht-findpeer";>dht/findpeer
 in IPFS doc.
+ * @param id the id of the peer to query
+ * @param timeout timeout value in seconds
+ * @param executor executor
+ * @return List of Multiaddresses of the peer
+ */
+public List findpeerListTimeout(Multihash id, int timeout, 
ExecutorService executor) {
+  AtomicReference> ret = new AtomicReference<>();
+  timeLimitedExec(
+  "name/resolve?arg=" + id,
+  timeout,
+  res -> {
+Map peer = (Map) res;

Review comment:
   I think it's unnecessary to specify all the type parameters. These 
`Map`s are JSON responses from the IPFS daemon, and can be deeply nested. It 
would be best handled by a library to properly define the types and structures 
of these responses, e.g. via DAOs, but the `java-ipfs-http-client` library does 
not make such efforts. If we must specify the type parameters, most of the type 
parameters are just `Map`, which is mostly meaningless.
   The IPFS HTTP API docs have proper specification for the types and field 
n

[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-13 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470025016



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSCompat.java
##
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.JSONParser;
+import io.ipfs.multihash.Multihash;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+
+/**
+ * Compatibility fixes for java-ipfs-http-client library
+ *
+ * Supports IPFS up to version v0.4.23, due to new restrictions enforcing all 
API calls to be made with POST method.
+ * Upstream issue tracker: 
https://github.com/ipfs-shipyard/java-ipfs-http-client/issues/157
+ */
+public class IPFSCompat {
+  public final String host;
+  public final int port;
+  private final String version;
+  public final String protocol;
+  public final int readTimeout;
+  public static final int DEFAULT_READ_TIMEOUT = 0;
+
+  public final DHT dht = new DHT();
+  public final Name name = new Name();
+
+  public IPFSCompat(IPFS ipfs) {
+this(ipfs.host, ipfs.port);
+  }
+
+  public IPFSCompat(String host, int port) {
+this(host, port, "/api/v0/", false, DEFAULT_READ_TIMEOUT);
+  }
+
+  public IPFSCompat(String host, int port, String version, boolean ssl, int 
readTimeout) {
+this.host = host;
+this.port = port;
+
+if(ssl) {
+  this.protocol = "https";
+} else {
+  this.protocol = "http";
+}
+
+this.version = version;
+this.readTimeout = readTimeout;
+  }
+
+  /**
+   * Resolve names to IPFS CIDs.
+   * See https://docs.ipfs.io/reference/http/api/#api-v0-resolve";>resolve in IPFS 
doc.
+   * @param scheme the scheme of the name to resolve, usually IPFS or IPNS
+   * @param path the path to the object
+   * @param recursive whether recursively resolve names until it is a IPFS CID
+   * @return a Map of JSON object, with the result as the value of key "Path"
+   */
+  public Map resolve(String scheme, String path, boolean recursive) {
+AtomicReference ret = new AtomicReference<>();
+getObjectStream(
+"resolve?arg=/" + scheme+"/"+path +"&r="+recursive,
+res -> {
+  ret.set((Map) res);
+  return true;
+},
+err -> {
+  throw new RuntimeException(err);
+}
+);
+return ret.get();
+  }
+
+  public class DHT {
+/**
+ * Find internet addresses of a given peer.
+ * See https://docs.ipfs.io/reference/http/api/#api-v0-dht-findpeer";>dht/findpeer
 in IPFS doc.
+ * @param id the id of the peer to query
+ * @param timeout timeout value in seconds
+ * @param executor executor
+ * @return List of Multiaddresses of the peer
+ */
+public List findpeerListTimeout(Multihash id, int timeout, 
ExecutorService executor) {
+  AtomicReference> ret = new AtomicReference<>();
+  timeLimitedExec(
+  "name/resolve?arg=" + id,
+  timeout,
+  res -> {
+Map peer = (Map) res;

Review comment:
   Made some changes in 39bab37.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-13 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470035068



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java
##
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.ipfs.api.MerkleNode;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.util.DrillVersionInfo;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
+import 
org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.RecursiveTask;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@JsonTypeName("ipfs-scan")
+public class IPFSGroupScan extends AbstractGroupScan {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSGroupScan.class);
+  private IPFSContext ipfsContext;
+  private IPFSScanSpec ipfsScanSpec;
+  private IPFSStoragePluginConfig config;
+  private List columns;
+
+  private static long DEFAULT_NODE_SIZE = 1000l;
+
+  private ListMultimap assignments;
+  private List ipfsWorkList = Lists.newArrayList();
+  private Map> endpointWorksMap;
+  private List affinities;
+
+  @JsonCreator
+  public IPFSGroupScan(@JsonProperty("IPFSScanSpec") IPFSScanSpec ipfsScanSpec,
+   @JsonProperty("IPFSStoragePluginConfig") 
IPFSStoragePluginConfig ipfsStoragePluginConfig,
+   @JsonProperty("columns") List columns,
+   @JacksonInject StoragePluginRegistry pluginRegistry) 
throws IOException, ExecutionSetupException {
+this(
+((IPFSStoragePlugin) 
pluginRegistry.getPlugin(ipfsStoragePluginConfig)).getIPFSContext(),
+ipfsScanSpec,
+columns
+);
+  }
+
+  public IPFSGroupScan(IPFSContext ipfsContext,
+   IPFSScanSpec ipfsScanSpec,
+   List columns) {
+super((String) null);
+this.ipfsContext = ipfsContext;
+this.ipfsScanSpec = ipfsScanSpec;
+this.config = ipfsContext.getStoragePluginConfig();
+logger.debug("GroupScan constructor called with columns {}", columns);
+this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : 
columns;
+init();
+  }
+
+  pr

[GitHub] [drill] dbw9580 commented on pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-13 Thread GitBox


dbw9580 commented on pull request #2084:
URL: https://github.com/apache/drill/pull/2084#issuecomment-673543986


   > `TestIPFQueries` fails the checkstyle due to unused imports.
   @cgivre Hmm I don't see any unused imports in this file and my builds are 
passing.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] cgivre commented on pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-13 Thread GitBox


cgivre commented on pull request #2084:
URL: https://github.com/apache/drill/pull/2084#issuecomment-673552669


   @dbw9580 
   I redownloaded and it built for me.  Please disregard previous comments.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] cgivre commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-13 Thread GitBox


cgivre commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470080197



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSCompat.java
##
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.JSONParser;
+import io.ipfs.multihash.Multihash;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+
+/**
+ * Compatibility fixes for java-ipfs-http-client library
+ *
+ * Supports IPFS up to version v0.4.23, due to new restrictions enforcing all 
API calls to be made with POST method.
+ * Upstream issue tracker: 
https://github.com/ipfs-shipyard/java-ipfs-http-client/issues/157
+ */

Review comment:
   I saw this PR 
(https://github.com/ipfs-shipyard/java-ipfs-http-client/pull/172) was merged!  
Can we:
   1.  Once there is a release with this PR merged, update the `pom.xml` so 
that we are using the "official" library.
   
   Should this now work will all versions of IPFS?  





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] cgivre commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-13 Thread GitBox


cgivre commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470081506



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSCompat.java
##
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.JSONParser;
+import io.ipfs.multihash.Multihash;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+
+/**
+ * Compatibility fixes for java-ipfs-http-client library
+ */
+public class IPFSCompat {
+  public final String host;
+  public final int port;
+  private final String version;
+  public final String protocol;
+  public final int readTimeout;
+  public static final int DEFAULT_READ_TIMEOUT = 0;
+
+  public final DHT dht = new DHT();
+  public final Name name = new Name();
+
+  public IPFSCompat(IPFS ipfs) {
+this(ipfs.host, ipfs.port);
+  }
+
+  public IPFSCompat(String host, int port) {
+this(host, port, "/api/v0/", false, DEFAULT_READ_TIMEOUT);
+  }
+
+  public IPFSCompat(String host, int port, String version, boolean ssl, int 
readTimeout) {
+this.host = host;
+this.port = port;
+
+if(ssl) {
+  this.protocol = "https";
+} else {
+  this.protocol = "http";
+}
+
+this.version = version;
+this.readTimeout = readTimeout;
+  }
+
+  /**
+   * Resolve names to IPFS CIDs.
+   * See https://docs.ipfs.io/reference/http/api/#api-v0-resolve";>resolve in IPFS 
doc.
+   * @param scheme the scheme of the name to resolve, usually IPFS or IPNS
+   * @param path the path to the object
+   * @param recursive whether recursively resolve names until it is a IPFS CID
+   * @return a Map of JSON object, with the result as the value of key "Path"
+   */
+  public Map resolve(String scheme, String path, boolean recursive) {
+AtomicReference ret = new AtomicReference<>();
+getObjectStream(
+"resolve?arg=/" + scheme+"/"+path +"&r="+recursive,
+res -> {
+  ret.set((Map) res);
+  return true;
+},
+err -> {
+  throw new RuntimeException(err);
+}
+);
+return ret.get();
+  }
+
+  /**
+   * As defined in 
https://github.com/libp2p/go-libp2p-core/blob/b77fd280f2bfcce22f10a000e8e1d9ec53c47049/routing/query.go#L16
+   */
+  public enum DHTQueryEventType {
+// Sending a query to a peer.
+SendingQuery,
+// Got a response from a peer.
+PeerResponse,
+// Found a "closest" peer (not currently used).
+FinalPeer,
+// Got an error when querying.
+QueryError,
+// Found a provider.
+Provider,
+// Found a value.
+Value,
+// Adding a peer to the query.
+AddingPeer,
+// Dialing a peer.
+DialingPeer;
+  }
+
+  public class DHT {
+/**
+ * Find internet addresses of a given peer.
+ * See https://docs.ipfs.io/reference/http/api/#api-v0-dht-findpeer";>dht/findpeer
 in IPFS doc.
+ * @param id the id of the peer to query
+ * @param timeout timeout value in seconds
+ * @param executor executor
+ * @return List of Multiaddresses of the peer
+ */
+public List findpeerListTimeout(Multihash id, int timeout, 
ExecutorService executor) {
+  AtomicReference> ret = new AtomicReference<>();
+  timeLimitedExec(
+  "name/resolve?arg=" + id,
+  timeout,
+  res -> {
+Map peer = (Map) res;
+if (peer == null) {
+  return false;
+}
+if ( (int) peer.get("Type") != 
DHTQueryEventType.FinalPeer.ordinal() ) {
+ 

[GitHub] [drill] cgivre commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-13 Thread GitBox


cgivre commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470082144



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSCompat.java
##
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.JSONParser;
+import io.ipfs.multihash.Multihash;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+
+/**
+ * Compatibility fixes for java-ipfs-http-client library
+ */
+public class IPFSCompat {
+  public final String host;
+  public final int port;
+  private final String version;
+  public final String protocol;
+  public final int readTimeout;
+  public static final int DEFAULT_READ_TIMEOUT = 0;
+
+  public final DHT dht = new DHT();
+  public final Name name = new Name();
+
+  public IPFSCompat(IPFS ipfs) {
+this(ipfs.host, ipfs.port);
+  }
+
+  public IPFSCompat(String host, int port) {
+this(host, port, "/api/v0/", false, DEFAULT_READ_TIMEOUT);
+  }
+
+  public IPFSCompat(String host, int port, String version, boolean ssl, int 
readTimeout) {
+this.host = host;
+this.port = port;
+
+if(ssl) {
+  this.protocol = "https";
+} else {
+  this.protocol = "http";
+}
+
+this.version = version;
+this.readTimeout = readTimeout;
+  }
+
+  /**
+   * Resolve names to IPFS CIDs.
+   * See https://docs.ipfs.io/reference/http/api/#api-v0-resolve";>resolve in IPFS 
doc.
+   * @param scheme the scheme of the name to resolve, usually IPFS or IPNS
+   * @param path the path to the object
+   * @param recursive whether recursively resolve names until it is a IPFS CID
+   * @return a Map of JSON object, with the result as the value of key "Path"
+   */
+  public Map resolve(String scheme, String path, boolean recursive) {
+AtomicReference ret = new AtomicReference<>();
+getObjectStream(
+"resolve?arg=/" + scheme+"/"+path +"&r="+recursive,
+res -> {
+  ret.set((Map) res);
+  return true;
+},
+err -> {
+  throw new RuntimeException(err);
+}
+);
+return ret.get();
+  }
+
+  /**
+   * As defined in 
https://github.com/libp2p/go-libp2p-core/blob/b77fd280f2bfcce22f10a000e8e1d9ec53c47049/routing/query.go#L16
+   */
+  public enum DHTQueryEventType {
+// Sending a query to a peer.
+SendingQuery,
+// Got a response from a peer.
+PeerResponse,
+// Found a "closest" peer (not currently used).
+FinalPeer,
+// Got an error when querying.
+QueryError,
+// Found a provider.
+Provider,
+// Found a value.
+Value,

Review comment:
   Are `Value`, `AddingPeer` and `DialingPeer` ever used?  





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] cgivre commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-13 Thread GitBox


cgivre commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470083089



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java
##
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.ipfs.api.MerkleNode;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.util.DrillVersionInfo;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
+import 
org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.RecursiveTask;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@JsonTypeName("ipfs-scan")
+public class IPFSGroupScan extends AbstractGroupScan {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSGroupScan.class);
+  private final IPFSContext ipfsContext;
+  private final IPFSScanSpec ipfsScanSpec;
+  private final IPFSStoragePluginConfig config;
+  private List columns;
+
+  private static final long DEFAULT_NODE_SIZE = 1000L;
+  private static final int DEFAULT_USER_PORT = 31010;
+  private static final int DEFAULT_CONTROL_PORT = 31011;
+  private static final int DEFAULT_DATA_PORT = 31012;
+  private static final int DEFAULT_HTTP_PORT = 8047;
+
+  private ListMultimap assignments;
+  private List ipfsWorkList = Lists.newArrayList();
+  private Map> endpointWorksMap;
+  private List affinities;
+
+  @JsonCreator
+  public IPFSGroupScan(@JsonProperty("IPFSScanSpec") IPFSScanSpec ipfsScanSpec,
+   @JsonProperty("IPFSStoragePluginConfig") 
IPFSStoragePluginConfig ipfsStoragePluginConfig,
+   @JsonProperty("columns") List columns,
+   @JacksonInject StoragePluginRegistry pluginRegistry) {
+this(
+pluginRegistry.resolve(ipfsStoragePluginConfig, 
IPFSStoragePlugin.class).getIPFSContext(),
+ipfsScanSpec,
+columns
+);
+  }
+
+  public IPFSGroupScan(IPFSContext ipfsContext,
+   IPFSScanSpec ipfsScanSpec,
+   List columns) {
+super((String) null);
+this.ipfsContext = ipfsContext;
+this.ipfsScanSpec = ipfsScanSpec;
+this.config = ipfsContext.getStoragePluginConfig();
+logger.debug("GroupScan constructor called with

[GitHub] [drill] cgivre commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-13 Thread GitBox


cgivre commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470084386



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java
##
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.ipfs.api.MerkleNode;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.util.DrillVersionInfo;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
+import 
org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.RecursiveTask;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@JsonTypeName("ipfs-scan")
+public class IPFSGroupScan extends AbstractGroupScan {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSGroupScan.class);
+  private final IPFSContext ipfsContext;
+  private final IPFSScanSpec ipfsScanSpec;
+  private final IPFSStoragePluginConfig config;
+  private List columns;
+
+  private static final long DEFAULT_NODE_SIZE = 1000L;
+  private static final int DEFAULT_USER_PORT = 31010;
+  private static final int DEFAULT_CONTROL_PORT = 31011;
+  private static final int DEFAULT_DATA_PORT = 31012;
+  private static final int DEFAULT_HTTP_PORT = 8047;
+
+  private ListMultimap assignments;
+  private List ipfsWorkList = Lists.newArrayList();
+  private Map> endpointWorksMap;
+  private List affinities;
+
+  @JsonCreator
+  public IPFSGroupScan(@JsonProperty("IPFSScanSpec") IPFSScanSpec ipfsScanSpec,
+   @JsonProperty("IPFSStoragePluginConfig") 
IPFSStoragePluginConfig ipfsStoragePluginConfig,
+   @JsonProperty("columns") List columns,
+   @JacksonInject StoragePluginRegistry pluginRegistry) {
+this(
+pluginRegistry.resolve(ipfsStoragePluginConfig, 
IPFSStoragePlugin.class).getIPFSContext(),
+ipfsScanSpec,
+columns
+);
+  }
+
+  public IPFSGroupScan(IPFSContext ipfsContext,
+   IPFSScanSpec ipfsScanSpec,
+   List columns) {
+super((String) null);
+this.ipfsContext = ipfsContext;
+this.ipfsScanSpec = ipfsScanSpec;
+this.config = ipfsContext.getStoragePluginConfig();
+logger.debug("GroupScan constructor called with

[GitHub] [drill] cgivre commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-13 Thread GitBox


cgivre commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470085822



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java
##
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.ipfs.api.MerkleNode;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.util.DrillVersionInfo;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
+import 
org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.RecursiveTask;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@JsonTypeName("ipfs-scan")
+public class IPFSGroupScan extends AbstractGroupScan {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSGroupScan.class);
+  private final IPFSContext ipfsContext;
+  private final IPFSScanSpec ipfsScanSpec;
+  private final IPFSStoragePluginConfig config;
+  private List columns;
+
+  private static final long DEFAULT_NODE_SIZE = 1000L;
+  private static final int DEFAULT_USER_PORT = 31010;
+  private static final int DEFAULT_CONTROL_PORT = 31011;
+  private static final int DEFAULT_DATA_PORT = 31012;
+  private static final int DEFAULT_HTTP_PORT = 8047;
+
+  private ListMultimap assignments;
+  private List ipfsWorkList = Lists.newArrayList();
+  private Map> endpointWorksMap;
+  private List affinities;
+
+  @JsonCreator
+  public IPFSGroupScan(@JsonProperty("IPFSScanSpec") IPFSScanSpec ipfsScanSpec,
+   @JsonProperty("IPFSStoragePluginConfig") 
IPFSStoragePluginConfig ipfsStoragePluginConfig,
+   @JsonProperty("columns") List columns,
+   @JacksonInject StoragePluginRegistry pluginRegistry) {
+this(
+pluginRegistry.resolve(ipfsStoragePluginConfig, 
IPFSStoragePlugin.class).getIPFSContext(),
+ipfsScanSpec,
+columns
+);
+  }
+
+  public IPFSGroupScan(IPFSContext ipfsContext,
+   IPFSScanSpec ipfsScanSpec,
+   List columns) {
+super((String) null);
+this.ipfsContext = ipfsContext;
+this.ipfsScanSpec = ipfsScanSpec;
+this.config = ipfsContext.getStoragePluginConfig();
+logger.debug("GroupScan constructor called with

[GitHub] [drill] cgivre commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-13 Thread GitBox


cgivre commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470086686



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java
##
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.ipfs.api.MerkleNode;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.util.DrillVersionInfo;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
+import 
org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.RecursiveTask;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@JsonTypeName("ipfs-scan")
+public class IPFSGroupScan extends AbstractGroupScan {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSGroupScan.class);
+  private final IPFSContext ipfsContext;
+  private final IPFSScanSpec ipfsScanSpec;
+  private final IPFSStoragePluginConfig config;
+  private List columns;
+
+  private static final long DEFAULT_NODE_SIZE = 1000L;
+  private static final int DEFAULT_USER_PORT = 31010;
+  private static final int DEFAULT_CONTROL_PORT = 31011;
+  private static final int DEFAULT_DATA_PORT = 31012;
+  private static final int DEFAULT_HTTP_PORT = 8047;
+
+  private ListMultimap assignments;
+  private List ipfsWorkList = Lists.newArrayList();
+  private Map> endpointWorksMap;
+  private List affinities;
+
+  @JsonCreator
+  public IPFSGroupScan(@JsonProperty("IPFSScanSpec") IPFSScanSpec ipfsScanSpec,
+   @JsonProperty("IPFSStoragePluginConfig") 
IPFSStoragePluginConfig ipfsStoragePluginConfig,
+   @JsonProperty("columns") List columns,
+   @JacksonInject StoragePluginRegistry pluginRegistry) {
+this(
+pluginRegistry.resolve(ipfsStoragePluginConfig, 
IPFSStoragePlugin.class).getIPFSContext(),
+ipfsScanSpec,
+columns
+);
+  }
+
+  public IPFSGroupScan(IPFSContext ipfsContext,
+   IPFSScanSpec ipfsScanSpec,
+   List columns) {
+super((String) null);
+this.ipfsContext = ipfsContext;
+this.ipfsScanSpec = ipfsScanSpec;
+this.config = ipfsContext.getStoragePluginConfig();
+logger.debug("GroupScan constructor called with

[GitHub] [drill] cgivre commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-13 Thread GitBox


cgivre commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470087561



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSHelper.java
##
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.MerkleNode;
+import io.ipfs.multiaddr.MultiAddress;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.bouncycastle.util.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static 
org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut.FETCH_DATA;
+import static 
org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut.FIND_PEER_INFO;
+
+/**
+ * Helper class with some utilities that are specific to Drill with an IPFS 
storage
+ */
+public class IPFSHelper {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSHelper.class);
+
+  public static final String IPFS_NULL_OBJECT_HASH = 
"QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n";
+  public static final Multihash IPFS_NULL_OBJECT = 
Multihash.fromBase58(IPFS_NULL_OBJECT_HASH);
+
+  private ExecutorService executorService;
+  private final IPFS client;
+  private final IPFSCompat clientCompat;
+  private IPFSPeer myself;
+  private int maxPeersPerLeaf;
+  private Map timeouts;
+
+  public IPFSHelper(IPFS ipfs) {
+this.client = ipfs;
+this.clientCompat = new IPFSCompat(ipfs);
+  }
+
+  public IPFSHelper(IPFS ipfs, ExecutorService executorService) {
+this(ipfs);
+this.executorService = executorService;
+  }
+
+  public void setTimeouts(Map timeouts) {
+this.timeouts = timeouts;
+  }
+
+  public void setMyself(IPFSPeer myself) {
+this.myself = myself;
+  }
+
+  /**
+   * Set maximum number of providers per leaf node. The more providers, the 
more time it takes to do DHT queries, while
+   * it is more likely we can find an optimal peer.
+   * @param maxPeersPerLeaf max number of providers to search per leaf node
+   */
+  public void setMaxPeersPerLeaf(int maxPeersPerLeaf) {
+this.maxPeersPerLeaf = maxPeersPerLeaf;
+  }
+
+  public IPFS getClient() {
+return client;
+  }
+
+  public IPFSCompat getClientCompat() {
+return clientCompat;
+  }
+
+  public List findprovsTimeout(Multihash id) {
+List providers;
+providers = clientCompat.dht.findprovsListTimeout(id, maxPeersPerLeaf, 
timeouts.get(IPFSTimeOut.FIND_PROV), executorService);
+
+return 
providers.stream().map(Multihash::fromBase58).collect(Collectors.toList());
+  }
+
+  public List findpeerTimeout(Multihash peerId) {
+// trying to resolve addresses of a node itself will always hang
+// so we treat it specially
+if(peerId.equals(myself.getId())) {
+  return myself.getMultiAddresses();
+}
+
+List addrs;
+addrs = clientCompat.dht.findpeerListTimeout(peerId, 
timeouts.get(IPFSTimeOut.FIND_PEER_INFO), executorService);
+return addrs.stream()
+.filter(addr -> !addr.equals(""))
+.map(MultiAddress::new).collect(Collectors.toList());
+  }
+
+  public byte[] getObjectDataTimeout(Multihash object) throws IOException {
+return timedFailure(client.object::data, object, 
timeouts.get(IPFSTimeOut.FETCH_DATA));
+  }
+
+  public MerkleNode getObjectLinksTimeout(Multihash object) throws IOException 
{
+return timedFailure(client.object::links, object, 
timeouts.get(IPFSTimeOut.FETCH

[GitHub] [drill] cgivre commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-13 Thread GitBox


cgivre commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470088211



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSHelper.java
##
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.MerkleNode;
+import io.ipfs.multiaddr.MultiAddress;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.bouncycastle.util.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static 
org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut.FETCH_DATA;
+import static 
org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut.FIND_PEER_INFO;
+
+/**
+ * Helper class with some utilities that are specific to Drill with an IPFS 
storage
+ */
+public class IPFSHelper {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSHelper.class);
+
+  public static final String IPFS_NULL_OBJECT_HASH = 
"QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n";
+  public static final Multihash IPFS_NULL_OBJECT = 
Multihash.fromBase58(IPFS_NULL_OBJECT_HASH);
+
+  private ExecutorService executorService;
+  private final IPFS client;
+  private final IPFSCompat clientCompat;
+  private IPFSPeer myself;
+  private int maxPeersPerLeaf;
+  private Map timeouts;
+
+  public IPFSHelper(IPFS ipfs) {
+this.client = ipfs;
+this.clientCompat = new IPFSCompat(ipfs);
+  }
+
+  public IPFSHelper(IPFS ipfs, ExecutorService executorService) {
+this(ipfs);
+this.executorService = executorService;
+  }
+
+  public void setTimeouts(Map timeouts) {
+this.timeouts = timeouts;
+  }
+
+  public void setMyself(IPFSPeer myself) {
+this.myself = myself;
+  }
+
+  /**
+   * Set maximum number of providers per leaf node. The more providers, the 
more time it takes to do DHT queries, while
+   * it is more likely we can find an optimal peer.
+   * @param maxPeersPerLeaf max number of providers to search per leaf node
+   */
+  public void setMaxPeersPerLeaf(int maxPeersPerLeaf) {
+this.maxPeersPerLeaf = maxPeersPerLeaf;
+  }
+
+  public IPFS getClient() {
+return client;
+  }
+
+  public IPFSCompat getClientCompat() {
+return clientCompat;
+  }
+
+  public List findprovsTimeout(Multihash id) {
+List providers;
+providers = clientCompat.dht.findprovsListTimeout(id, maxPeersPerLeaf, 
timeouts.get(IPFSTimeOut.FIND_PROV), executorService);
+
+return 
providers.stream().map(Multihash::fromBase58).collect(Collectors.toList());
+  }
+
+  public List findpeerTimeout(Multihash peerId) {
+// trying to resolve addresses of a node itself will always hang
+// so we treat it specially
+if(peerId.equals(myself.getId())) {
+  return myself.getMultiAddresses();
+}
+
+List addrs;
+addrs = clientCompat.dht.findpeerListTimeout(peerId, 
timeouts.get(IPFSTimeOut.FIND_PEER_INFO), executorService);
+return addrs.stream()
+.filter(addr -> !addr.equals(""))
+.map(MultiAddress::new).collect(Collectors.toList());
+  }
+
+  public byte[] getObjectDataTimeout(Multihash object) throws IOException {
+return timedFailure(client.object::data, object, 
timeouts.get(IPFSTimeOut.FETCH_DATA));
+  }
+
+  public MerkleNode getObjectLinksTimeout(Multihash object) throws IOException 
{
+return timedFailure(client.object::links, object, 
timeouts.get(IPFSTimeOut.FETCH

[GitHub] [drill] cgivre commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-13 Thread GitBox


cgivre commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470087869



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSHelper.java
##
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.MerkleNode;
+import io.ipfs.multiaddr.MultiAddress;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.bouncycastle.util.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static 
org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut.FETCH_DATA;
+import static 
org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut.FIND_PEER_INFO;
+
+/**
+ * Helper class with some utilities that are specific to Drill with an IPFS 
storage
+ */
+public class IPFSHelper {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSHelper.class);
+
+  public static final String IPFS_NULL_OBJECT_HASH = 
"QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n";
+  public static final Multihash IPFS_NULL_OBJECT = 
Multihash.fromBase58(IPFS_NULL_OBJECT_HASH);
+
+  private ExecutorService executorService;
+  private final IPFS client;
+  private final IPFSCompat clientCompat;
+  private IPFSPeer myself;
+  private int maxPeersPerLeaf;
+  private Map timeouts;
+
+  public IPFSHelper(IPFS ipfs) {
+this.client = ipfs;
+this.clientCompat = new IPFSCompat(ipfs);
+  }
+
+  public IPFSHelper(IPFS ipfs, ExecutorService executorService) {
+this(ipfs);
+this.executorService = executorService;
+  }
+
+  public void setTimeouts(Map timeouts) {
+this.timeouts = timeouts;
+  }
+
+  public void setMyself(IPFSPeer myself) {
+this.myself = myself;
+  }
+
+  /**
+   * Set maximum number of providers per leaf node. The more providers, the 
more time it takes to do DHT queries, while
+   * it is more likely we can find an optimal peer.
+   * @param maxPeersPerLeaf max number of providers to search per leaf node
+   */
+  public void setMaxPeersPerLeaf(int maxPeersPerLeaf) {
+this.maxPeersPerLeaf = maxPeersPerLeaf;
+  }
+
+  public IPFS getClient() {
+return client;
+  }
+
+  public IPFSCompat getClientCompat() {
+return clientCompat;
+  }
+
+  public List findprovsTimeout(Multihash id) {
+List providers;
+providers = clientCompat.dht.findprovsListTimeout(id, maxPeersPerLeaf, 
timeouts.get(IPFSTimeOut.FIND_PROV), executorService);
+
+return 
providers.stream().map(Multihash::fromBase58).collect(Collectors.toList());
+  }
+
+  public List findpeerTimeout(Multihash peerId) {
+// trying to resolve addresses of a node itself will always hang
+// so we treat it specially
+if(peerId.equals(myself.getId())) {
+  return myself.getMultiAddresses();
+}
+
+List addrs;
+addrs = clientCompat.dht.findpeerListTimeout(peerId, 
timeouts.get(IPFSTimeOut.FIND_PEER_INFO), executorService);
+return addrs.stream()
+.filter(addr -> !addr.equals(""))
+.map(MultiAddress::new).collect(Collectors.toList());
+  }
+
+  public byte[] getObjectDataTimeout(Multihash object) throws IOException {
+return timedFailure(client.object::data, object, 
timeouts.get(IPFSTimeOut.FETCH_DATA));
+  }
+
+  public MerkleNode getObjectLinksTimeout(Multihash object) throws IOException 
{
+return timedFailure(client.object::links, object, 
timeouts.get(IPFSTimeOut.FETCH

[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-13 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470089240



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSCompat.java
##
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.JSONParser;
+import io.ipfs.multihash.Multihash;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+
+/**
+ * Compatibility fixes for java-ipfs-http-client library
+ */
+public class IPFSCompat {
+  public final String host;
+  public final int port;
+  private final String version;
+  public final String protocol;
+  public final int readTimeout;
+  public static final int DEFAULT_READ_TIMEOUT = 0;
+
+  public final DHT dht = new DHT();
+  public final Name name = new Name();
+
+  public IPFSCompat(IPFS ipfs) {
+this(ipfs.host, ipfs.port);
+  }
+
+  public IPFSCompat(String host, int port) {
+this(host, port, "/api/v0/", false, DEFAULT_READ_TIMEOUT);
+  }
+
+  public IPFSCompat(String host, int port, String version, boolean ssl, int 
readTimeout) {
+this.host = host;
+this.port = port;
+
+if(ssl) {
+  this.protocol = "https";
+} else {
+  this.protocol = "http";
+}
+
+this.version = version;
+this.readTimeout = readTimeout;
+  }
+
+  /**
+   * Resolve names to IPFS CIDs.
+   * See https://docs.ipfs.io/reference/http/api/#api-v0-resolve";>resolve in IPFS 
doc.
+   * @param scheme the scheme of the name to resolve, usually IPFS or IPNS
+   * @param path the path to the object
+   * @param recursive whether recursively resolve names until it is a IPFS CID
+   * @return a Map of JSON object, with the result as the value of key "Path"
+   */
+  public Map resolve(String scheme, String path, boolean recursive) {
+AtomicReference ret = new AtomicReference<>();
+getObjectStream(
+"resolve?arg=/" + scheme+"/"+path +"&r="+recursive,
+res -> {
+  ret.set((Map) res);
+  return true;
+},
+err -> {
+  throw new RuntimeException(err);
+}
+);
+return ret.get();
+  }
+
+  /**
+   * As defined in 
https://github.com/libp2p/go-libp2p-core/blob/b77fd280f2bfcce22f10a000e8e1d9ec53c47049/routing/query.go#L16
+   */
+  public enum DHTQueryEventType {
+// Sending a query to a peer.
+SendingQuery,
+// Got a response from a peer.
+PeerResponse,
+// Found a "closest" peer (not currently used).
+FinalPeer,
+// Got an error when querying.
+QueryError,
+// Found a provider.
+Provider,
+// Found a value.
+Value,

Review comment:
   No. I included them for sake of completeness. Should they be removed?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] cgivre commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-13 Thread GitBox


cgivre commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470091514



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSScanSpec.java
##
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.security.InvalidParameterException;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+@JsonTypeName("IPFSScanSpec")
+public class IPFSScanSpec {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSScanSpec.class);
+
+  public enum Prefix {
+@JsonProperty("ipfs")
+IPFS("ipfs"),
+@JsonProperty("ipns")
+IPNS("ipns");
+
+@JsonProperty("prefix")
+private final String name;
+Prefix(String prefix) {
+  this.name = prefix;
+}
+
+@Override
+public String toString() {
+  return this.name;
+}
+
+@JsonCreator
+public static Prefix of(String what) {
+  switch (what) {
+case "ipfs" :
+  return IPFS;
+case "ipns":
+  return IPNS;
+default:
+  throw new InvalidParameterException("Unsupported prefix: " + what);
+  }
+}
+  }
+
+  public enum Format {
+@JsonProperty("json")
+JSON("json"),
+@JsonProperty("csv")
+CSV("csv");
+
+@JsonProperty("format")
+private final String name;
+Format(String prefix) {
+  this.name = prefix;
+}
+
+@Override
+public String toString() {
+  return this.name;
+}
+
+@JsonCreator
+public static Format of(String what) {
+  switch (what) {
+case "json" :
+  return JSON;
+case "csv":
+  return CSV;
+default:
+  throw new InvalidParameterException("Unsupported format: " + what);
+  }
+}
+  }
+
+  public static Set formats = ImmutableSet.of("json", "csv");
+  private Prefix prefix;
+  private String path;
+  private Format formatExtension;
+  private final IPFSContext ipfsContext;
+
+  @JsonCreator
+  public IPFSScanSpec (@JacksonInject StoragePluginRegistry registry,
+   @JsonProperty("IPFSStoragePluginConfig") 
IPFSStoragePluginConfig ipfsStoragePluginConfig,
+   @JsonProperty("prefix") Prefix prefix,
+   @JsonProperty("format") Format format,
+   @JsonProperty("path") String path) {
+this.ipfsContext = registry.resolve(ipfsStoragePluginConfig, 
IPFSStoragePlugin.class).getIPFSContext();
+this.prefix = prefix;
+this.formatExtension = format;
+this.path = path;
+  }
+
+  public IPFSScanSpec (IPFSContext ipfsContext, String path) {
+this.ipfsContext = ipfsContext;
+parsePath(path);
+  }
+
+  private void parsePath(String path) {
+//FIXME: IPFS hashes are actually Base58 encoded, so "0" "O" "I" "l" are 
not valid

Review comment:
   Again, please either remove, or include a reference to a JIRA to 
document what needs to be done. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] cgivre commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-13 Thread GitBox


cgivre commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470091789



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSScanSpec.java
##
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.security.InvalidParameterException;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+@JsonTypeName("IPFSScanSpec")
+public class IPFSScanSpec {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSScanSpec.class);
+
+  public enum Prefix {
+@JsonProperty("ipfs")
+IPFS("ipfs"),
+@JsonProperty("ipns")
+IPNS("ipns");
+
+@JsonProperty("prefix")
+private final String name;
+Prefix(String prefix) {
+  this.name = prefix;
+}
+
+@Override
+public String toString() {
+  return this.name;
+}
+
+@JsonCreator
+public static Prefix of(String what) {
+  switch (what) {
+case "ipfs" :
+  return IPFS;
+case "ipns":
+  return IPNS;
+default:
+  throw new InvalidParameterException("Unsupported prefix: " + what);
+  }
+}
+  }
+
+  public enum Format {
+@JsonProperty("json")
+JSON("json"),
+@JsonProperty("csv")
+CSV("csv");
+
+@JsonProperty("format")
+private final String name;
+Format(String prefix) {
+  this.name = prefix;
+}
+
+@Override
+public String toString() {
+  return this.name;
+}
+
+@JsonCreator
+public static Format of(String what) {
+  switch (what) {
+case "json" :
+  return JSON;
+case "csv":
+  return CSV;
+default:
+  throw new InvalidParameterException("Unsupported format: " + what);
+  }
+}
+  }
+
+  public static Set formats = ImmutableSet.of("json", "csv");
+  private Prefix prefix;
+  private String path;
+  private Format formatExtension;
+  private final IPFSContext ipfsContext;
+
+  @JsonCreator
+  public IPFSScanSpec (@JacksonInject StoragePluginRegistry registry,
+   @JsonProperty("IPFSStoragePluginConfig") 
IPFSStoragePluginConfig ipfsStoragePluginConfig,
+   @JsonProperty("prefix") Prefix prefix,
+   @JsonProperty("format") Format format,
+   @JsonProperty("path") String path) {
+this.ipfsContext = registry.resolve(ipfsStoragePluginConfig, 
IPFSStoragePlugin.class).getIPFSContext();
+this.prefix = prefix;
+this.formatExtension = format;
+this.path = path;
+  }
+
+  public IPFSScanSpec (IPFSContext ipfsContext, String path) {
+this.ipfsContext = ipfsContext;
+parsePath(path);
+  }
+
+  private void parsePath(String path) {
+//FIXME: IPFS hashes are actually Base58 encoded, so "0" "O" "I" "l" are 
not valid
+//also CIDs can be encoded with different encodings, not necessarily Base58
+Pattern tableNamePattern = 
Pattern.compile("^/(ipfs|ipns)/([A-Za-z0-9]{46}(/[^#]+)*)(?:#(\\w+))?$");
+Matcher matcher = tableNamePattern.matcher(path);
+if (!matcher.matches()) {
+  throw UserException.validationError().message("Invalid IPFS path in 
query string. Use paths of pattern `/scheme/hashpath#format`, where scheme:= 
\"ipfs\"|\"ipns\", hashpath:= HASH [\"/\" path], HASH is IPFS Base58 encoded 
hash, path:= TEXT [\"/\" path], format:= \"json\"|\"csv\"").build(logger);

Review comment:
   Please break this up into multiple lines.  Here and elsewhere.


[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-13 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470098319



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSHelper.java
##
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.MerkleNode;
+import io.ipfs.multiaddr.MultiAddress;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.bouncycastle.util.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static 
org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut.FETCH_DATA;
+import static 
org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut.FIND_PEER_INFO;
+
+/**
+ * Helper class with some utilities that are specific to Drill with an IPFS 
storage
+ */
+public class IPFSHelper {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSHelper.class);
+
+  public static final String IPFS_NULL_OBJECT_HASH = 
"QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n";
+  public static final Multihash IPFS_NULL_OBJECT = 
Multihash.fromBase58(IPFS_NULL_OBJECT_HASH);
+
+  private ExecutorService executorService;
+  private final IPFS client;
+  private final IPFSCompat clientCompat;
+  private IPFSPeer myself;
+  private int maxPeersPerLeaf;
+  private Map timeouts;
+
+  public IPFSHelper(IPFS ipfs) {
+this.client = ipfs;
+this.clientCompat = new IPFSCompat(ipfs);
+  }
+
+  public IPFSHelper(IPFS ipfs, ExecutorService executorService) {
+this(ipfs);
+this.executorService = executorService;
+  }
+
+  public void setTimeouts(Map timeouts) {
+this.timeouts = timeouts;
+  }
+
+  public void setMyself(IPFSPeer myself) {
+this.myself = myself;
+  }
+
+  /**
+   * Set maximum number of providers per leaf node. The more providers, the 
more time it takes to do DHT queries, while
+   * it is more likely we can find an optimal peer.
+   * @param maxPeersPerLeaf max number of providers to search per leaf node
+   */
+  public void setMaxPeersPerLeaf(int maxPeersPerLeaf) {
+this.maxPeersPerLeaf = maxPeersPerLeaf;
+  }
+
+  public IPFS getClient() {
+return client;
+  }
+
+  public IPFSCompat getClientCompat() {
+return clientCompat;
+  }
+
+  public List findprovsTimeout(Multihash id) {
+List providers;
+providers = clientCompat.dht.findprovsListTimeout(id, maxPeersPerLeaf, 
timeouts.get(IPFSTimeOut.FIND_PROV), executorService);
+
+return 
providers.stream().map(Multihash::fromBase58).collect(Collectors.toList());
+  }
+
+  public List findpeerTimeout(Multihash peerId) {
+// trying to resolve addresses of a node itself will always hang
+// so we treat it specially
+if(peerId.equals(myself.getId())) {
+  return myself.getMultiAddresses();
+}
+
+List addrs;
+addrs = clientCompat.dht.findpeerListTimeout(peerId, 
timeouts.get(IPFSTimeOut.FIND_PEER_INFO), executorService);
+return addrs.stream()
+.filter(addr -> !addr.equals(""))
+.map(MultiAddress::new).collect(Collectors.toList());
+  }
+
+  public byte[] getObjectDataTimeout(Multihash object) throws IOException {
+return timedFailure(client.object::data, object, 
timeouts.get(IPFSTimeOut.FETCH_DATA));
+  }
+
+  public MerkleNode getObjectLinksTimeout(Multihash object) throws IOException 
{
+return timedFailure(client.object::links, object, 
timeouts.get(IPFSTimeOut.FETC

[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-13 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470098319



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSHelper.java
##
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.MerkleNode;
+import io.ipfs.multiaddr.MultiAddress;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.bouncycastle.util.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static 
org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut.FETCH_DATA;
+import static 
org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut.FIND_PEER_INFO;
+
+/**
+ * Helper class with some utilities that are specific to Drill with an IPFS 
storage
+ */
+public class IPFSHelper {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSHelper.class);
+
+  public static final String IPFS_NULL_OBJECT_HASH = 
"QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n";
+  public static final Multihash IPFS_NULL_OBJECT = 
Multihash.fromBase58(IPFS_NULL_OBJECT_HASH);
+
+  private ExecutorService executorService;
+  private final IPFS client;
+  private final IPFSCompat clientCompat;
+  private IPFSPeer myself;
+  private int maxPeersPerLeaf;
+  private Map timeouts;
+
+  public IPFSHelper(IPFS ipfs) {
+this.client = ipfs;
+this.clientCompat = new IPFSCompat(ipfs);
+  }
+
+  public IPFSHelper(IPFS ipfs, ExecutorService executorService) {
+this(ipfs);
+this.executorService = executorService;
+  }
+
+  public void setTimeouts(Map timeouts) {
+this.timeouts = timeouts;
+  }
+
+  public void setMyself(IPFSPeer myself) {
+this.myself = myself;
+  }
+
+  /**
+   * Set maximum number of providers per leaf node. The more providers, the 
more time it takes to do DHT queries, while
+   * it is more likely we can find an optimal peer.
+   * @param maxPeersPerLeaf max number of providers to search per leaf node
+   */
+  public void setMaxPeersPerLeaf(int maxPeersPerLeaf) {
+this.maxPeersPerLeaf = maxPeersPerLeaf;
+  }
+
+  public IPFS getClient() {
+return client;
+  }
+
+  public IPFSCompat getClientCompat() {
+return clientCompat;
+  }
+
+  public List findprovsTimeout(Multihash id) {
+List providers;
+providers = clientCompat.dht.findprovsListTimeout(id, maxPeersPerLeaf, 
timeouts.get(IPFSTimeOut.FIND_PROV), executorService);
+
+return 
providers.stream().map(Multihash::fromBase58).collect(Collectors.toList());
+  }
+
+  public List findpeerTimeout(Multihash peerId) {
+// trying to resolve addresses of a node itself will always hang
+// so we treat it specially
+if(peerId.equals(myself.getId())) {
+  return myself.getMultiAddresses();
+}
+
+List addrs;
+addrs = clientCompat.dht.findpeerListTimeout(peerId, 
timeouts.get(IPFSTimeOut.FIND_PEER_INFO), executorService);
+return addrs.stream()
+.filter(addr -> !addr.equals(""))
+.map(MultiAddress::new).collect(Collectors.toList());
+  }
+
+  public byte[] getObjectDataTimeout(Multihash object) throws IOException {
+return timedFailure(client.object::data, object, 
timeouts.get(IPFSTimeOut.FETCH_DATA));
+  }
+
+  public MerkleNode getObjectLinksTimeout(Multihash object) throws IOException 
{
+return timedFailure(client.object::links, object, 
timeouts.get(IPFSTimeOut.FETC

[GitHub] [drill] dbw9580 commented on pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-13 Thread GitBox


dbw9580 commented on pull request #2084:
URL: https://github.com/apache/drill/pull/2084#issuecomment-673615795


   > I'm still having issues actually getting the unit tests that require the 
IPFS daemon to actually execute.
   
   @cgivre Actually I am having trouble making that test run, too. I keep 
getting errors like "connection rejected: /127.0.0.1:31011" or "Protocol family 
unavailable: /0:0:0:0:0:0:0:1:31011". I can test successfully manually through 
the web ui with drill-embedded, though.
   
   Can you try testing through the web ui, too? The simple dataset should be 
easy to add to IPFS and test:
   
   ```bash
   ipfs object patch set-data $(ipfs object new) 
   ```
   
   This will return the hash of the simple dataset, which is 
`QmcbeavnEofA6NjG7vkpe1yLJo6En6ML4JnDooDn1BbKmR`.
   
   Then run a query through the web ui: ``select * from 
ipfs.`/ipfs/QmcbeavnEofA6NjG7vkpe1yLJo6En6ML4JnDooDn1BbKmR#json` `` .
   If the query takes too long to complete, try reducing the timeout values as 
well as the `max-peers-per-leaf` value in the plugin config.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] dbw9580 commented on pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-13 Thread GitBox


dbw9580 commented on pull request #2084:
URL: https://github.com/apache/drill/pull/2084#issuecomment-673621711


   If I leave an instance of Drill running and then run the unit test 
(`TestIPFSQueries`), then it passes. I think the unit test does not actually 
build and  run a full Drill server, which is why the connections are rejected.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-13 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r469944365



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java
##
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.ipfs.api.MerkleNode;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.util.DrillVersionInfo;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
+import 
org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.RecursiveTask;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@JsonTypeName("ipfs-scan")
+public class IPFSGroupScan extends AbstractGroupScan {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSGroupScan.class);
+  private IPFSContext ipfsContext;
+  private IPFSScanSpec ipfsScanSpec;
+  private IPFSStoragePluginConfig config;
+  private List columns;
+
+  private static long DEFAULT_NODE_SIZE = 1000l;
+
+  private ListMultimap assignments;
+  private List ipfsWorkList = Lists.newArrayList();
+  private Map> endpointWorksMap;
+  private List affinities;
+
+  @JsonCreator
+  public IPFSGroupScan(@JsonProperty("IPFSScanSpec") IPFSScanSpec ipfsScanSpec,
+   @JsonProperty("IPFSStoragePluginConfig") 
IPFSStoragePluginConfig ipfsStoragePluginConfig,
+   @JsonProperty("columns") List columns,
+   @JacksonInject StoragePluginRegistry pluginRegistry) 
throws IOException, ExecutionSetupException {
+this(
+((IPFSStoragePlugin) 
pluginRegistry.getPlugin(ipfsStoragePluginConfig)).getIPFSContext(),
+ipfsScanSpec,
+columns
+);
+  }
+
+  public IPFSGroupScan(IPFSContext ipfsContext,
+   IPFSScanSpec ipfsScanSpec,
+   List columns) {
+super((String) null);
+this.ipfsContext = ipfsContext;
+this.ipfsScanSpec = ipfsScanSpec;
+this.config = ipfsContext.getStoragePluginConfig();
+logger.debug("GroupScan constructor called with columns {}", columns);
+this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : 
columns;
+init();
+  }
+
+  pr

[GitHub] [drill] cgivre commented on pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-13 Thread GitBox


cgivre commented on pull request #2084:
URL: https://github.com/apache/drill/pull/2084#issuecomment-673636877


   @dbw9580 
   The `ClusterTest` class is supposed to start a Drill cluster so that you can 
execute queries.  You should not need to have a Drill cluster running for the 
unit tests to complete.  
   
   I think the reason this isn't doing what you're expecting is that in the 
`initIPFS` function in `IPFSTestSuit` you are creating a plugin with a null 
configuration and hence isn't initializing correctly.   
   
   I stepped through `testSimple()` with the debugger and the `dataset` object 
is `null`, hence the test fails.  My suspicion is that there is one small step 
being missed here.  Could you please take a look and step through this to make 
sure that Drill is being initialized correctly?
   Thanks
   
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] dbw9580 commented on pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-14 Thread GitBox


dbw9580 commented on pull request #2084:
URL: https://github.com/apache/drill/pull/2084#issuecomment-673976246


   @cgivre Does Drill support connections from IPv6 sockets? Is it enabled by 
default or do I have to toggle some configuration items? The "protocol family 
unavailable" error could be due to lack of support for IPv6.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] cgivre commented on pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-14 Thread GitBox


cgivre commented on pull request #2084:
URL: https://github.com/apache/drill/pull/2084#issuecomment-674043258


   @dbw9580 I believe Drill does support connections from IPv6 sockets.  There 
was a recent PR for this in fact: (https://github.com/apache/drill/pull/1857) 
but I'm not sure if that is directly relevant. 
   Were you able to get it working?
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] dbw9580 commented on pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-14 Thread GitBox


dbw9580 commented on pull request #2084:
URL: https://github.com/apache/drill/pull/2084#issuecomment-674045895


   The `connection rejected: /127.0.0.1:31011` failure was because sometimes 
Drill does not bind to the default ports (`31010, 31011, 31012`). It can bind 
to later ports like `31013, 31014, 31015`, hence the connection was rejected.
   
   I believe the reason Drill didn't bind to the default ports is that those 
ports was used by the process from the last test run and had not been recycled 
by the system. If I wait for a minute or two before starting another round of 
testing, it's likely the test will pass. 
   
   This is part of DRILL-7754, but I haven't come up with a plan to reliably 
store the ports info in IPFS.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] dbw9580 commented on pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-14 Thread GitBox


dbw9580 commented on pull request #2084:
URL: https://github.com/apache/drill/pull/2084#issuecomment-674047192


   > @dbw9580 I believe Drill does support connections from IPv6 sockets. There 
was a recent PR for this in fact: (#1857) but I'm not sure if that is directly 
relevant.
   > Were you able to get it working?
   
   I don't see Drill binding to any IPv6 address in `ss -6ltnp`. I blocked IPv6 
addresses in 9494a30 and the tests are now passing (most of the time, due to 
https://github.com/apache/drill/pull/2084#issuecomment-674045895).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] cgivre commented on pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-14 Thread GitBox


cgivre commented on pull request #2084:
URL: https://github.com/apache/drill/pull/2084#issuecomment-674062414


   @dbw9580 
   The unit tests are passing now on my machine.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] dbw9580 commented on pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-14 Thread GitBox


dbw9580 commented on pull request #2084:
URL: https://github.com/apache/drill/pull/2084#issuecomment-674079153


   @cgivre 
   I tried to set the ports to their default values in c090a43, but it did not 
seem to do the trick. Why is that?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-14 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470673138



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSCompat.java
##
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.JSONParser;
+import io.ipfs.multihash.Multihash;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+
+/**
+ * Compatibility fixes for java-ipfs-http-client library
+ */
+public class IPFSCompat {
+  public final String host;
+  public final int port;
+  private final String version;
+  public final String protocol;
+  public final int readTimeout;
+  public static final int DEFAULT_READ_TIMEOUT = 0;
+
+  public final DHT dht = new DHT();
+  public final Name name = new Name();
+
+  public IPFSCompat(IPFS ipfs) {
+this(ipfs.host, ipfs.port);
+  }
+
+  public IPFSCompat(String host, int port) {
+this(host, port, "/api/v0/", false, DEFAULT_READ_TIMEOUT);
+  }
+
+  public IPFSCompat(String host, int port, String version, boolean ssl, int 
readTimeout) {
+this.host = host;
+this.port = port;
+
+if(ssl) {
+  this.protocol = "https";
+} else {
+  this.protocol = "http";
+}
+
+this.version = version;
+this.readTimeout = readTimeout;
+  }
+
+  /**
+   * Resolve names to IPFS CIDs.
+   * See https://docs.ipfs.io/reference/http/api/#api-v0-resolve";>resolve in IPFS 
doc.
+   * @param scheme the scheme of the name to resolve, usually IPFS or IPNS
+   * @param path the path to the object
+   * @param recursive whether recursively resolve names until it is a IPFS CID
+   * @return a Map of JSON object, with the result as the value of key "Path"
+   */
+  public Map resolve(String scheme, String path, boolean recursive) {
+AtomicReference ret = new AtomicReference<>();
+getObjectStream(
+"resolve?arg=/" + scheme+"/"+path +"&r="+recursive,
+res -> {
+  ret.set((Map) res);
+  return true;
+},
+err -> {
+  throw new RuntimeException(err);
+}
+);
+return ret.get();
+  }
+
+  /**
+   * As defined in 
https://github.com/libp2p/go-libp2p-core/blob/b77fd280f2bfcce22f10a000e8e1d9ec53c47049/routing/query.go#L16
+   */
+  public enum DHTQueryEventType {
+// Sending a query to a peer.
+SendingQuery,
+// Got a response from a peer.
+PeerResponse,
+// Found a "closest" peer (not currently used).
+FinalPeer,
+// Got an error when querying.
+QueryError,
+// Found a provider.
+Provider,
+// Found a value.
+Value,
+// Adding a peer to the query.
+AddingPeer,
+// Dialing a peer.
+DialingPeer;
+  }
+
+  public class DHT {
+/**
+ * Find internet addresses of a given peer.
+ * See https://docs.ipfs.io/reference/http/api/#api-v0-dht-findpeer";>dht/findpeer
 in IPFS doc.
+ * @param id the id of the peer to query
+ * @param timeout timeout value in seconds
+ * @param executor executor
+ * @return List of Multiaddresses of the peer
+ */
+public List findpeerListTimeout(Multihash id, int timeout, 
ExecutorService executor) {
+  AtomicReference> ret = new AtomicReference<>();
+  timeLimitedExec(
+  "name/resolve?arg=" + id,
+  timeout,
+  res -> {
+Map peer = (Map) res;
+if (peer == null) {
+  return false;
+}
+if ( (int) peer.get("Type") != 
DHTQueryEventType.FinalPeer.ordinal() ) {
+

[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-14 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470673442



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java
##
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.ipfs.api.MerkleNode;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.util.DrillVersionInfo;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
+import 
org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.RecursiveTask;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@JsonTypeName("ipfs-scan")
+public class IPFSGroupScan extends AbstractGroupScan {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSGroupScan.class);
+  private final IPFSContext ipfsContext;
+  private final IPFSScanSpec ipfsScanSpec;
+  private final IPFSStoragePluginConfig config;
+  private List columns;
+
+  private static final long DEFAULT_NODE_SIZE = 1000L;
+  private static final int DEFAULT_USER_PORT = 31010;
+  private static final int DEFAULT_CONTROL_PORT = 31011;
+  private static final int DEFAULT_DATA_PORT = 31012;
+  private static final int DEFAULT_HTTP_PORT = 8047;
+
+  private ListMultimap assignments;
+  private List ipfsWorkList = Lists.newArrayList();
+  private Map> endpointWorksMap;
+  private List affinities;
+
+  @JsonCreator
+  public IPFSGroupScan(@JsonProperty("IPFSScanSpec") IPFSScanSpec ipfsScanSpec,
+   @JsonProperty("IPFSStoragePluginConfig") 
IPFSStoragePluginConfig ipfsStoragePluginConfig,
+   @JsonProperty("columns") List columns,
+   @JacksonInject StoragePluginRegistry pluginRegistry) {
+this(
+pluginRegistry.resolve(ipfsStoragePluginConfig, 
IPFSStoragePlugin.class).getIPFSContext(),
+ipfsScanSpec,
+columns
+);
+  }
+
+  public IPFSGroupScan(IPFSContext ipfsContext,
+   IPFSScanSpec ipfsScanSpec,
+   List columns) {
+super((String) null);
+this.ipfsContext = ipfsContext;
+this.ipfsScanSpec = ipfsScanSpec;
+this.config = ipfsContext.getStoragePluginConfig();
+logger.debug("GroupScan constructor called wit

[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-14 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470673620



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java
##
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.ipfs.api.MerkleNode;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.util.DrillVersionInfo;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
+import 
org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.RecursiveTask;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@JsonTypeName("ipfs-scan")
+public class IPFSGroupScan extends AbstractGroupScan {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSGroupScan.class);
+  private final IPFSContext ipfsContext;
+  private final IPFSScanSpec ipfsScanSpec;
+  private final IPFSStoragePluginConfig config;
+  private List columns;
+
+  private static final long DEFAULT_NODE_SIZE = 1000L;
+  private static final int DEFAULT_USER_PORT = 31010;
+  private static final int DEFAULT_CONTROL_PORT = 31011;
+  private static final int DEFAULT_DATA_PORT = 31012;
+  private static final int DEFAULT_HTTP_PORT = 8047;
+
+  private ListMultimap assignments;
+  private List ipfsWorkList = Lists.newArrayList();
+  private Map> endpointWorksMap;
+  private List affinities;
+
+  @JsonCreator
+  public IPFSGroupScan(@JsonProperty("IPFSScanSpec") IPFSScanSpec ipfsScanSpec,
+   @JsonProperty("IPFSStoragePluginConfig") 
IPFSStoragePluginConfig ipfsStoragePluginConfig,
+   @JsonProperty("columns") List columns,
+   @JacksonInject StoragePluginRegistry pluginRegistry) {
+this(
+pluginRegistry.resolve(ipfsStoragePluginConfig, 
IPFSStoragePlugin.class).getIPFSContext(),
+ipfsScanSpec,
+columns
+);
+  }
+
+  public IPFSGroupScan(IPFSContext ipfsContext,
+   IPFSScanSpec ipfsScanSpec,
+   List columns) {
+super((String) null);
+this.ipfsContext = ipfsContext;
+this.ipfsScanSpec = ipfsScanSpec;
+this.config = ipfsContext.getStoragePluginConfig();
+logger.debug("GroupScan constructor called wit

[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-14 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470675112



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java
##
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.ipfs.api.MerkleNode;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.util.DrillVersionInfo;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
+import 
org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.RecursiveTask;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@JsonTypeName("ipfs-scan")
+public class IPFSGroupScan extends AbstractGroupScan {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSGroupScan.class);
+  private final IPFSContext ipfsContext;
+  private final IPFSScanSpec ipfsScanSpec;
+  private final IPFSStoragePluginConfig config;
+  private List columns;
+
+  private static final long DEFAULT_NODE_SIZE = 1000L;
+  private static final int DEFAULT_USER_PORT = 31010;
+  private static final int DEFAULT_CONTROL_PORT = 31011;
+  private static final int DEFAULT_DATA_PORT = 31012;
+  private static final int DEFAULT_HTTP_PORT = 8047;
+
+  private ListMultimap assignments;
+  private List ipfsWorkList = Lists.newArrayList();
+  private Map> endpointWorksMap;
+  private List affinities;
+
+  @JsonCreator
+  public IPFSGroupScan(@JsonProperty("IPFSScanSpec") IPFSScanSpec ipfsScanSpec,
+   @JsonProperty("IPFSStoragePluginConfig") 
IPFSStoragePluginConfig ipfsStoragePluginConfig,
+   @JsonProperty("columns") List columns,
+   @JacksonInject StoragePluginRegistry pluginRegistry) {
+this(
+pluginRegistry.resolve(ipfsStoragePluginConfig, 
IPFSStoragePlugin.class).getIPFSContext(),
+ipfsScanSpec,
+columns
+);
+  }
+
+  public IPFSGroupScan(IPFSContext ipfsContext,
+   IPFSScanSpec ipfsScanSpec,
+   List columns) {
+super((String) null);
+this.ipfsContext = ipfsContext;
+this.ipfsScanSpec = ipfsScanSpec;
+this.config = ipfsContext.getStoragePluginConfig();
+logger.debug("GroupScan constructor called wit

[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-14 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470675247



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSHelper.java
##
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.MerkleNode;
+import io.ipfs.multiaddr.MultiAddress;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.bouncycastle.util.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static 
org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut.FETCH_DATA;
+import static 
org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut.FIND_PEER_INFO;
+
+/**
+ * Helper class with some utilities that are specific to Drill with an IPFS 
storage
+ */
+public class IPFSHelper {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSHelper.class);
+
+  public static final String IPFS_NULL_OBJECT_HASH = 
"QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n";
+  public static final Multihash IPFS_NULL_OBJECT = 
Multihash.fromBase58(IPFS_NULL_OBJECT_HASH);
+
+  private ExecutorService executorService;
+  private final IPFS client;
+  private final IPFSCompat clientCompat;
+  private IPFSPeer myself;
+  private int maxPeersPerLeaf;
+  private Map timeouts;
+
+  public IPFSHelper(IPFS ipfs) {
+this.client = ipfs;
+this.clientCompat = new IPFSCompat(ipfs);
+  }
+
+  public IPFSHelper(IPFS ipfs, ExecutorService executorService) {
+this(ipfs);
+this.executorService = executorService;
+  }
+
+  public void setTimeouts(Map timeouts) {
+this.timeouts = timeouts;
+  }
+
+  public void setMyself(IPFSPeer myself) {
+this.myself = myself;
+  }
+
+  /**
+   * Set maximum number of providers per leaf node. The more providers, the 
more time it takes to do DHT queries, while
+   * it is more likely we can find an optimal peer.
+   * @param maxPeersPerLeaf max number of providers to search per leaf node
+   */
+  public void setMaxPeersPerLeaf(int maxPeersPerLeaf) {
+this.maxPeersPerLeaf = maxPeersPerLeaf;
+  }
+
+  public IPFS getClient() {
+return client;
+  }
+
+  public IPFSCompat getClientCompat() {
+return clientCompat;
+  }
+
+  public List findprovsTimeout(Multihash id) {
+List providers;
+providers = clientCompat.dht.findprovsListTimeout(id, maxPeersPerLeaf, 
timeouts.get(IPFSTimeOut.FIND_PROV), executorService);
+
+return 
providers.stream().map(Multihash::fromBase58).collect(Collectors.toList());
+  }
+
+  public List findpeerTimeout(Multihash peerId) {
+// trying to resolve addresses of a node itself will always hang
+// so we treat it specially
+if(peerId.equals(myself.getId())) {
+  return myself.getMultiAddresses();
+}
+
+List addrs;
+addrs = clientCompat.dht.findpeerListTimeout(peerId, 
timeouts.get(IPFSTimeOut.FIND_PEER_INFO), executorService);
+return addrs.stream()
+.filter(addr -> !addr.equals(""))
+.map(MultiAddress::new).collect(Collectors.toList());
+  }
+
+  public byte[] getObjectDataTimeout(Multihash object) throws IOException {
+return timedFailure(client.object::data, object, 
timeouts.get(IPFSTimeOut.FETCH_DATA));
+  }
+
+  public MerkleNode getObjectLinksTimeout(Multihash object) throws IOException 
{
+return timedFailure(client.object::links, object, 
timeouts.get(IPFSTimeOut.FETC

[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-14 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470676009



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSHelper.java
##
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.MerkleNode;
+import io.ipfs.multiaddr.MultiAddress;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.bouncycastle.util.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static 
org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut.FETCH_DATA;
+import static 
org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut.FIND_PEER_INFO;
+
+/**
+ * Helper class with some utilities that are specific to Drill with an IPFS 
storage
+ */
+public class IPFSHelper {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSHelper.class);
+
+  public static final String IPFS_NULL_OBJECT_HASH = 
"QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n";
+  public static final Multihash IPFS_NULL_OBJECT = 
Multihash.fromBase58(IPFS_NULL_OBJECT_HASH);
+
+  private ExecutorService executorService;
+  private final IPFS client;
+  private final IPFSCompat clientCompat;
+  private IPFSPeer myself;
+  private int maxPeersPerLeaf;
+  private Map timeouts;
+
+  public IPFSHelper(IPFS ipfs) {
+this.client = ipfs;
+this.clientCompat = new IPFSCompat(ipfs);
+  }
+
+  public IPFSHelper(IPFS ipfs, ExecutorService executorService) {
+this(ipfs);
+this.executorService = executorService;
+  }
+
+  public void setTimeouts(Map timeouts) {
+this.timeouts = timeouts;
+  }
+
+  public void setMyself(IPFSPeer myself) {
+this.myself = myself;
+  }
+
+  /**
+   * Set maximum number of providers per leaf node. The more providers, the 
more time it takes to do DHT queries, while
+   * it is more likely we can find an optimal peer.
+   * @param maxPeersPerLeaf max number of providers to search per leaf node
+   */
+  public void setMaxPeersPerLeaf(int maxPeersPerLeaf) {
+this.maxPeersPerLeaf = maxPeersPerLeaf;
+  }
+
+  public IPFS getClient() {
+return client;
+  }
+
+  public IPFSCompat getClientCompat() {
+return clientCompat;
+  }
+
+  public List findprovsTimeout(Multihash id) {
+List providers;
+providers = clientCompat.dht.findprovsListTimeout(id, maxPeersPerLeaf, 
timeouts.get(IPFSTimeOut.FIND_PROV), executorService);
+
+return 
providers.stream().map(Multihash::fromBase58).collect(Collectors.toList());
+  }
+
+  public List findpeerTimeout(Multihash peerId) {
+// trying to resolve addresses of a node itself will always hang
+// so we treat it specially
+if(peerId.equals(myself.getId())) {
+  return myself.getMultiAddresses();
+}
+
+List addrs;
+addrs = clientCompat.dht.findpeerListTimeout(peerId, 
timeouts.get(IPFSTimeOut.FIND_PEER_INFO), executorService);
+return addrs.stream()
+.filter(addr -> !addr.equals(""))
+.map(MultiAddress::new).collect(Collectors.toList());
+  }
+
+  public byte[] getObjectDataTimeout(Multihash object) throws IOException {
+return timedFailure(client.object::data, object, 
timeouts.get(IPFSTimeOut.FETCH_DATA));
+  }
+
+  public MerkleNode getObjectLinksTimeout(Multihash object) throws IOException 
{
+return timedFailure(client.object::links, object, 
timeouts.get(IPFSTimeOut.FETC

[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-14 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470676371



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSScanSpec.java
##
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.security.InvalidParameterException;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+@JsonTypeName("IPFSScanSpec")
+public class IPFSScanSpec {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSScanSpec.class);
+
+  public enum Prefix {
+@JsonProperty("ipfs")
+IPFS("ipfs"),
+@JsonProperty("ipns")
+IPNS("ipns");
+
+@JsonProperty("prefix")
+private final String name;
+Prefix(String prefix) {
+  this.name = prefix;
+}
+
+@Override
+public String toString() {
+  return this.name;
+}
+
+@JsonCreator
+public static Prefix of(String what) {
+  switch (what) {
+case "ipfs" :
+  return IPFS;
+case "ipns":
+  return IPNS;
+default:
+  throw new InvalidParameterException("Unsupported prefix: " + what);
+  }
+}
+  }
+
+  public enum Format {
+@JsonProperty("json")
+JSON("json"),
+@JsonProperty("csv")
+CSV("csv");
+
+@JsonProperty("format")
+private final String name;
+Format(String prefix) {
+  this.name = prefix;
+}
+
+@Override
+public String toString() {
+  return this.name;
+}
+
+@JsonCreator
+public static Format of(String what) {
+  switch (what) {
+case "json" :
+  return JSON;
+case "csv":
+  return CSV;
+default:
+  throw new InvalidParameterException("Unsupported format: " + what);
+  }
+}
+  }
+
+  public static Set formats = ImmutableSet.of("json", "csv");
+  private Prefix prefix;
+  private String path;
+  private Format formatExtension;
+  private final IPFSContext ipfsContext;
+
+  @JsonCreator
+  public IPFSScanSpec (@JacksonInject StoragePluginRegistry registry,
+   @JsonProperty("IPFSStoragePluginConfig") 
IPFSStoragePluginConfig ipfsStoragePluginConfig,
+   @JsonProperty("prefix") Prefix prefix,
+   @JsonProperty("format") Format format,
+   @JsonProperty("path") String path) {
+this.ipfsContext = registry.resolve(ipfsStoragePluginConfig, 
IPFSStoragePlugin.class).getIPFSContext();
+this.prefix = prefix;
+this.formatExtension = format;
+this.path = path;
+  }
+
+  public IPFSScanSpec (IPFSContext ipfsContext, String path) {
+this.ipfsContext = ipfsContext;
+parsePath(path);
+  }
+
+  private void parsePath(String path) {
+//FIXME: IPFS hashes are actually Base58 encoded, so "0" "O" "I" "l" are 
not valid

Review comment:
   Removed in d2ea637.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-14 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470676806



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSScanSpec.java
##
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.security.InvalidParameterException;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+@JsonTypeName("IPFSScanSpec")
+public class IPFSScanSpec {
+  private static final Logger logger = 
LoggerFactory.getLogger(IPFSScanSpec.class);
+
+  public enum Prefix {
+@JsonProperty("ipfs")
+IPFS("ipfs"),
+@JsonProperty("ipns")
+IPNS("ipns");
+
+@JsonProperty("prefix")
+private final String name;
+Prefix(String prefix) {
+  this.name = prefix;
+}
+
+@Override
+public String toString() {
+  return this.name;
+}
+
+@JsonCreator
+public static Prefix of(String what) {
+  switch (what) {
+case "ipfs" :
+  return IPFS;
+case "ipns":
+  return IPNS;
+default:
+  throw new InvalidParameterException("Unsupported prefix: " + what);
+  }
+}
+  }
+
+  public enum Format {
+@JsonProperty("json")
+JSON("json"),
+@JsonProperty("csv")
+CSV("csv");
+
+@JsonProperty("format")
+private final String name;
+Format(String prefix) {
+  this.name = prefix;
+}
+
+@Override
+public String toString() {
+  return this.name;
+}
+
+@JsonCreator
+public static Format of(String what) {
+  switch (what) {
+case "json" :
+  return JSON;
+case "csv":
+  return CSV;
+default:
+  throw new InvalidParameterException("Unsupported format: " + what);
+  }
+}
+  }
+
+  public static Set formats = ImmutableSet.of("json", "csv");
+  private Prefix prefix;
+  private String path;
+  private Format formatExtension;
+  private final IPFSContext ipfsContext;
+
+  @JsonCreator
+  public IPFSScanSpec (@JacksonInject StoragePluginRegistry registry,
+   @JsonProperty("IPFSStoragePluginConfig") 
IPFSStoragePluginConfig ipfsStoragePluginConfig,
+   @JsonProperty("prefix") Prefix prefix,
+   @JsonProperty("format") Format format,
+   @JsonProperty("path") String path) {
+this.ipfsContext = registry.resolve(ipfsStoragePluginConfig, 
IPFSStoragePlugin.class).getIPFSContext();
+this.prefix = prefix;
+this.formatExtension = format;
+this.path = path;
+  }
+
+  public IPFSScanSpec (IPFSContext ipfsContext, String path) {
+this.ipfsContext = ipfsContext;
+parsePath(path);
+  }
+
+  private void parsePath(String path) {
+//FIXME: IPFS hashes are actually Base58 encoded, so "0" "O" "I" "l" are 
not valid
+//also CIDs can be encoded with different encodings, not necessarily Base58
+Pattern tableNamePattern = 
Pattern.compile("^/(ipfs|ipns)/([A-Za-z0-9]{46}(/[^#]+)*)(?:#(\\w+))?$");
+Matcher matcher = tableNamePattern.matcher(path);
+if (!matcher.matches()) {
+  throw UserException.validationError().message("Invalid IPFS path in 
query string. Use paths of pattern `/scheme/hashpath#format`, where scheme:= 
\"ipfs\"|\"ipns\", hashpath:= HASH [\"/\" path], HASH is IPFS Base58 encoded 
hash, path:= TEXT [\"/\" path], format:= \"json\"|\"csv\"").build(logger);

Review comment:
   Fixed in d2ea637.




-

[GitHub] [drill] dbw9580 commented on a change in pull request #2084: DRILL-7745: Add storage plugin for IPFS

2020-08-14 Thread GitBox


dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r470681998



##
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePluginConfig.java
##
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfigBase;
+import org.apache.drill.shaded.guava.com.google.common.base.Objects;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+
+import java.security.InvalidParameterException;
+import java.util.Map;
+
+@JsonTypeName(IPFSStoragePluginConfig.NAME)
+public class IPFSStoragePluginConfig extends StoragePluginConfigBase{
+public static final String NAME = "ipfs";
+
+@JsonProperty
+private final String host;
+
+@JsonProperty
+private final int port;
+
+@JsonProperty("max-nodes-per-leaf")
+private final int maxNodesPerLeaf;
+
+@JsonProperty("ipfs-timeouts")
+private final Map ipfsTimeouts;
+
+@JsonIgnore
+private static final Map ipfsTimeoutDefaults = 
ImmutableMap.of(
+IPFSTimeOut.FIND_PROV, 4,
+IPFSTimeOut.FIND_PEER_INFO, 4,
+IPFSTimeOut.FETCH_DATA, 6
+);
+
+public enum IPFSTimeOut {
+@JsonProperty("find-provider")
+FIND_PROV("find-provider"),
+@JsonProperty("find-peer-info")
+FIND_PEER_INFO("find-peer-info"),
+@JsonProperty("fetch-data")
+FETCH_DATA("fetch-data");
+
+@JsonProperty("type")
+private final String which;
+IPFSTimeOut(String which) {
+this.which = which;
+}
+
+@JsonCreator
+public static IPFSTimeOut of(String which) {
+switch (which) {
+case "find-provider":
+return FIND_PROV;
+case "find-peer-info":
+return FIND_PEER_INFO;
+case "fetch-data":
+return FETCH_DATA;
+default:
+throw new InvalidParameterException("Unknown key for IPFS 
timeout config entry: " + which);
+}
+}
+
+@Override
+public String toString() {
+return this.which;
+}
+}
+
+@JsonProperty("groupscan-worker-threads")
+private final int numWorkerThreads;
+
+@JsonProperty
+private final Map formats;
+
+@JsonCreator
+public IPFSStoragePluginConfig(
+@JsonProperty("host") String host,
+@JsonProperty("port") int port,
+@JsonProperty("max-nodes-per-leaf") int maxNodesPerLeaf,
+@JsonProperty("ipfs-timeouts") Map ipfsTimeouts,
+@JsonProperty("groupscan-worker-threads") int numWorkerThreads,
+@JsonProperty("formats") Map formats) {
+this.host = host;
+this.port = port;
+this.maxNodesPerLeaf = maxNodesPerLeaf > 0 ? maxNodesPerLeaf : 1;
+if (ipfsTimeouts != null) {
+this.ipfsTimeouts = Maps.newHashMap();
+ipfsTimeouts.forEach(this.ipfsTimeouts::put);
+ipfsTimeoutDefaults.forEach(this.ipfsTimeouts::putIfAbsent);
+} else {
+this.ipfsTimeouts = ipfsTimeoutDefaults;
+}
+this.numWorkerThreads = numWorkerThreads > 0 ? numWorkerThreads : 1;
+this.formats = formats;
+}
+
+public String getHost() {
+return host;
+}
+
+public int getPort() {
+return port;
+}
+
+public int getMaxNodesPerLeaf() {
+return maxNodesPerLeaf;
+}
+
+public int getIpfsTimeout(IPFSTimeOut which) {
+return ipfsTimeouts.get(which);
+}
+
+public Map getIpfsTimeouts() {
+return ipfsTimeouts;
+}
+
+public int getNumWorkerThreads() {
+return numWorkerThreads;
+}
+
+

<    3   4   5   6   7   8   9   10   11   12   >