Jenkins build is still unstable: beam_PostCommit_Java_ValidatesRunner_Spark #3584

2017-11-28 Thread Apache Jenkins Server
See 




[jira] [Commented] (BEAM-2500) Add support for S3 as a Apache Beam FileSystem

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270310#comment-16270310
 ] 

ASF GitHub Bot commented on BEAM-2500:
--

jacobmarble commented on issue #4080: [BEAM-2500] Add S3 FileSystem to Java SDK
URL: https://github.com/apache/beam/pull/4080#issuecomment-347774873
 
 
   There are a couple of TODOs remaining. Here are some notes from tonight:
   - rebased to master and squashed
   - WordCount works, reading from and writing to S3
   - need to do some bigger testing
   - haven't looked at coverage yet
   - Kinesis tests fail for me both before and after this PR, so I made a fix 
and we'll see what Jenkins does with it. If that doesn't work, AWS SDK version 
1.11.24 is the last version that should pass without any change in Beam.
   - there are two open questions to @jkff , mentioned inline
   - tried to do more tonight, but fixing the content encoding bug turned out 
to be more complex than I anticipated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for S3 as a Apache Beam FileSystem
> --
>
> Key: BEAM-2500
> URL: https://issues.apache.org/jira/browse/BEAM-2500
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: Luke Cwik
>Assignee: Jacob Marble
>Priority: Minor
> Attachments: hadoop_fs_patch.patch
>
>
> Note that this is for providing direct integration with S3 as an Apache Beam 
> FileSystem.
> There is already support for using the Hadoop S3 connector by depending on 
> the Hadoop File System module[1], configuring HadoopFileSystemOptions[2] with 
> a S3 configuration[3].
> 1: https://github.com/apache/beam/tree/master/sdks/java/io/hadoop-file-system
> 2: 
> https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java#L53
> 3: https://wiki.apache.org/hadoop/AmazonS3



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2500) Add support for S3 as a Apache Beam FileSystem

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270301#comment-16270301
 ] 

ASF GitHub Bot commented on BEAM-2500:
--

jacobmarble commented on a change in pull request #4080: [BEAM-2500] Add S3 
FileSystem to Java SDK
URL: https://github.com/apache/beam/pull/4080#discussion_r153708170
 
 

 ##
 File path: 
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/util/S3Util.java
 ##
 @@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CopyPartRequest;
+import com.amazonaws.services.s3.model.CopyPartResult;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
+import com.amazonaws.services.s3.model.ListObjectsV2Request;
+import com.amazonaws.services.s3.model.ListObjectsV2Result;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Strings;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.io.aws.options.S3Options;
+import org.apache.beam.sdk.io.aws.s3.S3Path;
+import org.apache.beam.sdk.io.aws.s3.S3ResourceId;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * S3Util wraps the Amazon Web Services client library.
+ */
+public class S3Util {
+
+  private static final Logger LOG = LoggerFactory.getLogger(S3Util.class);
+
+  /**
+   * This is a {@link DefaultValueFactory} able to create an {@link S3Util} 
using any transport
+   * flags specified on the {@link PipelineOptions}.
+   */
+  public static class S3UtilFactory implements DefaultValueFactory {
+
+@Override
+public S3Util create(PipelineOptions options) {
+  S3Options s3Options = options.as(S3Options.class);
+
+  checkArgument(
+  !Strings.isNullOrEmpty(s3Options.getAwsAccessKeyId()), "AWS access 
key ID is required");
+  checkArgument(
+  !Strings.isNullOrEmpty(s3Options.getAwsSecretAccessKey()),
+  "AWS secret access key is required");
+  checkArgument(!Strings.isNullOrEmpty(s3Options.getAwsRegion()), "AWS 
region is required");
+
+  return new S3Util(
+  

[jira] [Commented] (BEAM-2500) Add support for S3 as a Apache Beam FileSystem

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270302#comment-16270302
 ] 

ASF GitHub Bot commented on BEAM-2500:
--

jacobmarble commented on a change in pull request #4080: [BEAM-2500] Add S3 
FileSystem to Java SDK
URL: https://github.com/apache/beam/pull/4080#discussion_r153708192
 
 

 ##
 File path: 
sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/util/MatchResultMatcher.java
 ##
 @@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.aws.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+
+/**
+ * Hamcrest {@link Matcher} to match {@link MatchResult}. Necessary because 
{@link
+ * MatchResult#metadata()} throws an exception under normal circumstances.
+ */
+public class MatchResultMatcher extends BaseMatcher {
 
 Review comment:
   @jkff another question here


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for S3 as a Apache Beam FileSystem
> --
>
> Key: BEAM-2500
> URL: https://issues.apache.org/jira/browse/BEAM-2500
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: Luke Cwik
>Assignee: Jacob Marble
>Priority: Minor
> Attachments: hadoop_fs_patch.patch
>
>
> Note that this is for providing direct integration with S3 as an Apache Beam 
> FileSystem.
> There is already support for using the Hadoop S3 connector by depending on 
> the Hadoop File System module[1], configuring HadoopFileSystemOptions[2] with 
> a S3 configuration[3].
> 1: https://github.com/apache/beam/tree/master/sdks/java/io/hadoop-file-system
> 2: 
> https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java#L53
> 3: https://wiki.apache.org/hadoop/AmazonS3



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2500) Add support for S3 as a Apache Beam FileSystem

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270303#comment-16270303
 ] 

ASF GitHub Bot commented on BEAM-2500:
--

jacobmarble commented on a change in pull request #4080: [BEAM-2500] Add S3 
FileSystem to Java SDK
URL: https://github.com/apache/beam/pull/4080#discussion_r151237642
 
 

 ##
 File path: 
sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/util/MatchResultMatcher.java
 ##
 @@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.aws.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+
+/**
+ * Hamcrest {@link Matcher} to match {@link MatchResult}. Necessary because 
{@link
+ * MatchResult#metadata()} throws an exception under normal circumstances.
+ */
+public class MatchResultMatcher extends BaseMatcher {
 
 Review comment:
   TODO


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for S3 as a Apache Beam FileSystem
> --
>
> Key: BEAM-2500
> URL: https://issues.apache.org/jira/browse/BEAM-2500
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: Luke Cwik
>Assignee: Jacob Marble
>Priority: Minor
> Attachments: hadoop_fs_patch.patch
>
>
> Note that this is for providing direct integration with S3 as an Apache Beam 
> FileSystem.
> There is already support for using the Hadoop S3 connector by depending on 
> the Hadoop File System module[1], configuring HadoopFileSystemOptions[2] with 
> a S3 configuration[3].
> 1: https://github.com/apache/beam/tree/master/sdks/java/io/hadoop-file-system
> 2: 
> https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java#L53
> 3: https://wiki.apache.org/hadoop/AmazonS3



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2500) Add support for S3 as a Apache Beam FileSystem

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270300#comment-16270300
 ] 

ASF GitHub Bot commented on BEAM-2500:
--

jacobmarble commented on a change in pull request #4080: [BEAM-2500] Add S3 
FileSystem to Java SDK
URL: https://github.com/apache/beam/pull/4080#discussion_r151237682
 
 

 ##
 File path: 
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/util/S3Util.java
 ##
 @@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CopyPartRequest;
+import com.amazonaws.services.s3.model.CopyPartResult;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
+import com.amazonaws.services.s3.model.ListObjectsV2Request;
+import com.amazonaws.services.s3.model.ListObjectsV2Result;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Strings;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.io.aws.options.S3Options;
+import org.apache.beam.sdk.io.aws.s3.S3Path;
+import org.apache.beam.sdk.io.aws.s3.S3ResourceId;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * S3Util wraps the Amazon Web Services client library.
+ */
+public class S3Util {
+
+  private static final Logger LOG = LoggerFactory.getLogger(S3Util.class);
+
+  /**
+   * This is a {@link DefaultValueFactory} able to create an {@link S3Util} 
using any transport
+   * flags specified on the {@link PipelineOptions}.
+   */
+  public static class S3UtilFactory implements DefaultValueFactory {
+
+@Override
+public S3Util create(PipelineOptions options) {
+  S3Options s3Options = options.as(S3Options.class);
+
+  checkArgument(
+  !Strings.isNullOrEmpty(s3Options.getAwsAccessKeyId()), "AWS access 
key ID is required");
+  checkArgument(
+  !Strings.isNullOrEmpty(s3Options.getAwsSecretAccessKey()),
+  "AWS secret access key is required");
+  checkArgument(!Strings.isNullOrEmpty(s3Options.getAwsRegion()), "AWS 
region is required");
+
+  return new S3Util(
+  

[jira] [Commented] (BEAM-2500) Add support for S3 as a Apache Beam FileSystem

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270296#comment-16270296
 ] 

ASF GitHub Bot commented on BEAM-2500:
--

jacobmarble commented on a change in pull request #4080: [BEAM-2500] Add S3 
FileSystem to Java SDK
URL: https://github.com/apache/beam/pull/4080#discussion_r153707765
 
 

 ##
 File path: 
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/util/S3Util.java
 ##
 @@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CopyPartRequest;
+import com.amazonaws.services.s3.model.CopyPartResult;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
+import com.amazonaws.services.s3.model.ListObjectsV2Request;
+import com.amazonaws.services.s3.model.ListObjectsV2Result;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Strings;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.io.aws.options.S3Options;
+import org.apache.beam.sdk.io.aws.s3.S3Path;
+import org.apache.beam.sdk.io.aws.s3.S3ResourceId;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * S3Util wraps the Amazon Web Services client library.
+ */
+public class S3Util {
+
+  private static final Logger LOG = LoggerFactory.getLogger(S3Util.class);
+
+  /**
+   * This is a {@link DefaultValueFactory} able to create an {@link S3Util} 
using any transport
+   * flags specified on the {@link PipelineOptions}.
+   */
+  public static class S3UtilFactory implements DefaultValueFactory {
+
+@Override
+public S3Util create(PipelineOptions options) {
+  S3Options s3Options = options.as(S3Options.class);
+
+  checkArgument(
+  !Strings.isNullOrEmpty(s3Options.getAwsAccessKeyId()), "AWS access 
key ID is required");
+  checkArgument(
+  !Strings.isNullOrEmpty(s3Options.getAwsSecretAccessKey()),
+  "AWS secret access key is required");
+  checkArgument(!Strings.isNullOrEmpty(s3Options.getAwsRegion()), "AWS 
region is required");
+
+  return new S3Util(
+  

[beam] branch master updated: Use pip from currently executing Python.

2017-11-28 Thread altay
This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new db51176  Use pip from currently executing Python.
 new 9be42fe  This closes #3902
db51176 is described below

commit db5117670fffce7cd6c74bfcc065931427cfb2c6
Author: Robert Bradshaw 
AuthorDate: Tue Sep 26 14:01:53 2017 -0700

Use pip from currently executing Python.
---
 sdks/python/gen_protos.py |  6 +-
 sdks/python/pom.xml   | 20 
 2 files changed, 25 insertions(+), 1 deletion(-)

diff --git a/sdks/python/gen_protos.py b/sdks/python/gen_protos.py
index c7bf55f..6caf3ac 100644
--- a/sdks/python/gen_protos.py
+++ b/sdks/python/gen_protos.py
@@ -21,8 +21,10 @@ import glob
 import logging
 import multiprocessing
 import os
+import pip
 import pkg_resources
 import platform
+import pprint
 import shutil
 import subprocess
 import sys
@@ -126,8 +128,10 @@ def _install_grpcio_tools_and_generate_proto_files():
   logging.warning('Installing grpcio-tools into %s' % install_path)
   try:
 start = time.time()
+pprint.pprint(pip.pep425tags.get_supported())
 subprocess.check_call(
-['pip', 'install', '--target', install_path, '--build', build_path,
+[sys.executable, '-m', 'pip', 'install',
+ '--target', install_path, '--build', build_path,
  '--upgrade', GRPC_TOOLS])
 logging.warning(
 'Installing grpcio-tools took %0.2f seconds.' % (time.time() - start))
diff --git a/sdks/python/pom.xml b/sdks/python/pom.xml
index 3e67e3b..2c25014 100644
--- a/sdks/python/pom.xml
+++ b/sdks/python/pom.xml
@@ -107,6 +107,26 @@
 
   
   
+setup-compile-grpcio
+compile
+
+  exec
+
+
+  ${python.pip.bin}
+  
+install
+--user
+--upgrade
+--ignore-installed
+grpcio-tools
+  
+  
+${python.user.base}
+  
+
+  
+  
 setuptools-build
 compile
 

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" '].


Jenkins build is still unstable: beam_PostCommit_Java_ValidatesRunner_Spark #3583

2017-11-28 Thread Apache Jenkins Server
See 




Build failed in Jenkins: beam_PerformanceTests_Spark #1060

2017-11-28 Thread Apache Jenkins Server
See 


Changes:

[niemo] Fix BigTableIO to use non-deprecated Futures.addCallback function

[dimon.ivan] Adds withInstanceId and withProjectId to the BigtableIO Read and 
Write

[dimon.ivan] Removes call to .withBitableOptions() form JavaDoc.

[xumingmingv] [BEAM-3238][SQL] Move Coders to map in BeamRecordSqlType

[xumingmingv] [BEAM-3238][SQL] Add BeamRecordSqlTypeBuilder

--
Started by timer
[EnvInject] - Loading node environment variables.
Building remotely on beam2 (beam) in workspace 

 > git rev-parse --is-inside-work-tree # timeout=10
Fetching changes from the remote Git repository
 > git config remote.origin.url https://github.com/apache/beam.git # timeout=10
Fetching upstream changes from https://github.com/apache/beam.git
 > git --version # timeout=10
 > git fetch --tags --progress https://github.com/apache/beam.git 
 > +refs/heads/*:refs/remotes/origin/* 
 > +refs/pull/${ghprbPullId}/*:refs/remotes/origin/pr/${ghprbPullId}/*
 > git rev-parse origin/master^{commit} # timeout=10
Checking out Revision 2eb7de0fe6e96da9805fc827294da1e1329ff716 (origin/master)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f 2eb7de0fe6e96da9805fc827294da1e1329ff716
Commit message: "[BEAM-3238][SQL] Add BeamRecordSqlTypeBuilder"
 > git rev-list f8d8ff14c49e4dfb15541f4b73aa66513c9a9d23 # timeout=10
Cleaning workspace
 > git rev-parse --verify HEAD # timeout=10
Resetting working tree
 > git reset --hard # timeout=10
 > git clean -fdx # timeout=10
[EnvInject] - Executing scripts and injecting environment variables after the 
SCM step.
[EnvInject] - Injecting as environment variables the properties content 
SPARK_LOCAL_IP=127.0.0.1

[EnvInject] - Variables injected successfully.
[beam_PerformanceTests_Spark] $ /bin/bash -xe /tmp/jenkins2180130005249250077.sh
+ rm -rf PerfKitBenchmarker
[beam_PerformanceTests_Spark] $ /bin/bash -xe /tmp/jenkins325786581868369970.sh
+ git clone https://github.com/GoogleCloudPlatform/PerfKitBenchmarker.git
Cloning into 'PerfKitBenchmarker'...
[beam_PerformanceTests_Spark] $ /bin/bash -xe /tmp/jenkins3579506461592166630.sh
+ pip install --user -r PerfKitBenchmarker/requirements.txt
Requirement already satisfied: absl-py in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 14))
Requirement already satisfied: jinja2>=2.7 in 
/usr/local/lib/python2.7/dist-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 15))
Requirement already satisfied: setuptools in /usr/lib/python2.7/dist-packages 
(from -r PerfKitBenchmarker/requirements.txt (line 16))
Requirement already satisfied: colorlog[windows]==2.6.0 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 17))
Requirement already satisfied: blinker>=1.3 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 18))
Requirement already satisfied: futures>=3.0.3 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 19))
Requirement already satisfied: PyYAML==3.12 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 20))
Requirement already satisfied: pint>=0.7 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 21))
Requirement already satisfied: numpy in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 22))
Requirement already satisfied: functools32 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 23))
Requirement already satisfied: contextlib2>=0.5.1 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 24))
Requirement already satisfied: pywinrm in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 25))
Requirement already satisfied: six in 
/home/jenkins/.local/lib/python2.7/site-packages (from absl-py->-r 
PerfKitBenchmarker/requirements.txt (line 14))
Requirement already satisfied: MarkupSafe in 
/usr/local/lib/python2.7/dist-packages (from jinja2>=2.7->-r 
PerfKitBenchmarker/requirements.txt (line 15))
Requirement already satisfied: colorama; extra == "windows" in 
/usr/lib/python2.7/dist-packages (from colorlog[windows]==2.6.0->-r 
PerfKitBenchmarker/requirements.txt (line 17))
Requirement already satisfied: xmltodict in 
/home/jenkins/.local/lib/python2.7/site-packages (from pywinrm->-r 
PerfKitBenchmarker/requirements.txt (line 25))
Requirement already satisfied: requests-ntlm>=0.3.0 in 
/home/jenkins/.local/lib/python2.7/site-packages (from pywinrm->-r 
PerfKitBenchmarker/requirements.txt (line 25))
Requirement 

Jenkins build is back to normal : beam_PostCommit_Java_MavenInstall #5332

2017-11-28 Thread Apache Jenkins Server
See 




[jira] [Updated] (BEAM-991) DatastoreIO Write should flush early for large batches

2017-11-28 Thread Neville Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neville Li updated BEAM-991:

Fix Version/s: 2.1.0

> DatastoreIO Write should flush early for large batches
> --
>
> Key: BEAM-991
> URL: https://issues.apache.org/jira/browse/BEAM-991
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-gcp
>Reporter: Vikas Kedigehalli
>Assignee: Vikas Kedigehalli
> Fix For: 2.1.0
>
>
> If entities are large (avg size > 20KB) then the a single batched write (500 
> entities) would exceed the Datastore size limit of a single request (10MB) 
> from https://cloud.google.com/datastore/docs/concepts/limits.
> First reported in: 
> http://stackoverflow.com/questions/40156400/why-does-dataflow-erratically-fail-in-datastore-access



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Jenkins build is back to stable : beam_PostCommit_Java_ValidatesRunner_Apex #2892

2017-11-28 Thread Apache Jenkins Server
See 




Jenkins build is still unstable: beam_PostCommit_Java_ValidatesRunner_Spark #3582

2017-11-28 Thread Apache Jenkins Server
See 




Jenkins build is still unstable: beam_PostCommit_Java_ValidatesRunner_Spark #3581

2017-11-28 Thread Apache Jenkins Server
See 




Jenkins build became unstable: beam_PostCommit_Java_ValidatesRunner_Apex #2891

2017-11-28 Thread Apache Jenkins Server
See 




[jira] [Commented] (BEAM-3238) [SQL] Add builder to BeamRecordSqlType

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270113#comment-16270113
 ] 

ASF GitHub Bot commented on BEAM-3238:
--

xumingming closed pull request #4168: [BEAM-3238][SQL] Add 
BeamRecordSqlTypeBuilder
URL: https://github.com/apache/beam/pull/4168
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java
index 982494ad2e5..a6b23b6310b 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java
@@ -17,13 +17,14 @@
  */
 package org.apache.beam.sdk.extensions.sql;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import java.math.BigDecimal;
 import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
 import java.util.GregorianCalendar;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.coders.BigDecimalCoder;
@@ -50,26 +51,39 @@
  *
  */
 public class BeamRecordSqlType extends BeamRecordType {
-  private static final Map SQL_TYPE_TO_JAVA_CLASS = new 
HashMap<>();
-  static {
-SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class);
-SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class);
-SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class);
-SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class);
-SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class);
-SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class);
-SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class);
-
-SQL_TYPE_TO_JAVA_CLASS.put(Types.BOOLEAN, Boolean.class);
-
-SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class);
-SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class);
-
-SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class);
-
-SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class);
-SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class);
-  }
+  private static final Map JAVA_CLASSES = ImmutableMap
+  .builder()
+  .put(Types.TINYINT, Byte.class)
+  .put(Types.SMALLINT, Short.class)
+  .put(Types.INTEGER, Integer.class)
+  .put(Types.BIGINT, Long.class)
+  .put(Types.FLOAT, Float.class)
+  .put(Types.DOUBLE, Double.class)
+  .put(Types.DECIMAL, BigDecimal.class)
+  .put(Types.BOOLEAN, Boolean.class)
+  .put(Types.CHAR, String.class)
+  .put(Types.VARCHAR, String.class)
+  .put(Types.TIME, GregorianCalendar.class)
+  .put(Types.DATE, Date.class)
+  .put(Types.TIMESTAMP, Date.class)
+  .build();
+
+  private static final Map CODERS = ImmutableMap
+  .builder()
+  .put(Types.TINYINT, ByteCoder.of())
+  .put(Types.SMALLINT, ShortCoder.of())
+  .put(Types.INTEGER, BigEndianIntegerCoder.of())
+  .put(Types.BIGINT, BigEndianLongCoder.of())
+  .put(Types.FLOAT, FloatCoder.of())
+  .put(Types.DOUBLE, DoubleCoder.of())
+  .put(Types.DECIMAL, BigDecimalCoder.of())
+  .put(Types.BOOLEAN, BooleanCoder.of())
+  .put(Types.CHAR, StringUtf8Coder.of())
+  .put(Types.VARCHAR, StringUtf8Coder.of())
+  .put(Types.TIME, TimeCoder.of())
+  .put(Types.DATE, DateCoder.of())
+  .put(Types.TIMESTAMP, DateCoder.of())
+  .build();
 
   public List fieldTypes;
 
@@ -84,54 +98,24 @@ private BeamRecordSqlType(List fieldsName, 
List fieldTypes
   }
 
   public static BeamRecordSqlType create(List fieldNames,
-  List fieldTypes) {
+ List fieldTypes) {
 if (fieldNames.size() != fieldTypes.size()) {
   throw new IllegalStateException("the sizes of 'dataType' and 
'fieldTypes' must match.");
 }
+
 List fieldCoders = new ArrayList<>(fieldTypes.size());
+
 for (int idx = 0; idx < fieldTypes.size(); ++idx) {
-  switch (fieldTypes.get(idx)) {
-  case Types.INTEGER:
-fieldCoders.add(BigEndianIntegerCoder.of());
-break;
-  case Types.SMALLINT:
-fieldCoders.add(ShortCoder.of());
-break;
-  case Types.TINYINT:
-fieldCoders.add(ByteCoder.of());
-break;
-  case Types.DOUBLE:
-fieldCoders.add(DoubleCoder.of());
-break;
-  case Types.FLOAT:
-fieldCoders.add(FloatCoder.of());
-break;
-  case Types.DECIMAL:
-

[beam] 01/02: [BEAM-3238][SQL] Move Coders to map in BeamRecordSqlType

2017-11-28 Thread xumingming
This is an automated email from the ASF dual-hosted git repository.

xumingming pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit e2c3de092b0a7a5bbfcb5b7797984d5f4bd10138
Author: Anton Kedin 
AuthorDate: Wed Nov 22 16:04:05 2017 -0800

[BEAM-3238][SQL] Move Coders to map in BeamRecordSqlType

Improve readability. All coders are supposed to be thread safe and
currently are backed by the static instances.
---
 .../beam/sdk/extensions/sql/BeamRecordSqlType.java | 111 +
 1 file changed, 47 insertions(+), 64 deletions(-)

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java
index 982494a..1784ec1 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java
@@ -17,13 +17,13 @@
  */
 package org.apache.beam.sdk.extensions.sql;
 
+import com.google.common.collect.ImmutableMap;
 import java.math.BigDecimal;
 import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
 import java.util.GregorianCalendar;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.coders.BigDecimalCoder;
@@ -50,26 +50,39 @@ import org.apache.beam.sdk.values.BeamRecordType;
  *
  */
 public class BeamRecordSqlType extends BeamRecordType {
-  private static final Map SQL_TYPE_TO_JAVA_CLASS = new 
HashMap<>();
-  static {
-SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class);
-SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class);
-SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class);
-SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class);
-SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class);
-SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class);
-SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class);
-
-SQL_TYPE_TO_JAVA_CLASS.put(Types.BOOLEAN, Boolean.class);
-
-SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class);
-SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class);
-
-SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class);
-
-SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class);
-SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class);
-  }
+  private static final Map JAVA_CLASSES = ImmutableMap
+  .builder()
+  .put(Types.TINYINT, Byte.class)
+  .put(Types.SMALLINT, Short.class)
+  .put(Types.INTEGER, Integer.class)
+  .put(Types.BIGINT, Long.class)
+  .put(Types.FLOAT, Float.class)
+  .put(Types.DOUBLE, Double.class)
+  .put(Types.DECIMAL, BigDecimal.class)
+  .put(Types.BOOLEAN, Boolean.class)
+  .put(Types.CHAR, String.class)
+  .put(Types.VARCHAR, String.class)
+  .put(Types.TIME, GregorianCalendar.class)
+  .put(Types.DATE, Date.class)
+  .put(Types.TIMESTAMP, Date.class)
+  .build();
+
+  private static final Map CODERS = ImmutableMap
+  .builder()
+  .put(Types.TINYINT, ByteCoder.of())
+  .put(Types.SMALLINT, ShortCoder.of())
+  .put(Types.INTEGER, BigEndianIntegerCoder.of())
+  .put(Types.BIGINT, BigEndianLongCoder.of())
+  .put(Types.FLOAT, FloatCoder.of())
+  .put(Types.DOUBLE, DoubleCoder.of())
+  .put(Types.DECIMAL, BigDecimalCoder.of())
+  .put(Types.BOOLEAN, BooleanCoder.of())
+  .put(Types.CHAR, StringUtf8Coder.of())
+  .put(Types.VARCHAR, StringUtf8Coder.of())
+  .put(Types.TIME, TimeCoder.of())
+  .put(Types.DATE, DateCoder.of())
+  .put(Types.TIMESTAMP, DateCoder.of())
+  .build();
 
   public List fieldTypes;
 
@@ -84,54 +97,24 @@ public class BeamRecordSqlType extends BeamRecordType {
   }
 
   public static BeamRecordSqlType create(List fieldNames,
-  List fieldTypes) {
+ List fieldTypes) {
 if (fieldNames.size() != fieldTypes.size()) {
   throw new IllegalStateException("the sizes of 'dataType' and 
'fieldTypes' must match.");
 }
+
 List fieldCoders = new ArrayList<>(fieldTypes.size());
+
 for (int idx = 0; idx < fieldTypes.size(); ++idx) {
-  switch (fieldTypes.get(idx)) {
-  case Types.INTEGER:
-fieldCoders.add(BigEndianIntegerCoder.of());
-break;
-  case Types.SMALLINT:
-fieldCoders.add(ShortCoder.of());
-break;
-  case Types.TINYINT:
-fieldCoders.add(ByteCoder.of());
-break;
-  case Types.DOUBLE:
-fieldCoders.add(DoubleCoder.of());
-break;
-  case Types.FLOAT:
-fieldCoders.add(FloatCoder.of());
-break;
-  case Types.DECIMAL:
-

[beam] branch master updated (27f505e -> 2eb7de0)

2017-11-28 Thread xumingming
This is an automated email from the ASF dual-hosted git repository.

xumingming pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 27f505e  Merge pull request #4095 from rniemo-g/master
 new e2c3de0  [BEAM-3238][SQL] Move Coders to map in BeamRecordSqlType
 new 2eb7de0  [BEAM-3238][SQL] Add BeamRecordSqlTypeBuilder

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../beam/sdk/extensions/sql/BeamRecordSqlType.java | 190 ++---
 .../sdk/extensions/sql/BeamRecordSqlTypeTest.java  | 115 +
 ...BeamSqlBuiltinFunctionsIntegrationTestBase.java |  56 +++---
 3 files changed, 273 insertions(+), 88 deletions(-)
 create mode 100644 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlTypeTest.java

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" '].


[beam] 02/02: [BEAM-3238][SQL] Add BeamRecordSqlTypeBuilder

2017-11-28 Thread xumingming
This is an automated email from the ASF dual-hosted git repository.

xumingming pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 2eb7de0fe6e96da9805fc827294da1e1329ff716
Author: Anton Kedin 
AuthorDate: Wed Nov 22 19:17:21 2017 -0800

[BEAM-3238][SQL] Add BeamRecordSqlTypeBuilder

To improve readability of creating BeamRecordSqlTypes.
---
 .../beam/sdk/extensions/sql/BeamRecordSqlType.java |  85 ++-
 .../sdk/extensions/sql/BeamRecordSqlTypeTest.java  | 115 +
 ...BeamSqlBuiltinFunctionsIntegrationTestBase.java |  56 +-
 3 files changed, 229 insertions(+), 27 deletions(-)

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java
index 1784ec1..a6b23b6 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.extensions.sql;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import java.math.BigDecimal;
 import java.sql.Types;
@@ -108,8 +109,8 @@ public class BeamRecordSqlType extends BeamRecordType {
   Integer fieldType = fieldTypes.get(idx);
 
   if (!CODERS.containsKey(fieldType)) {
-  throw new UnsupportedOperationException(
-  "Data type: " + fieldType + " not supported yet!");
+throw new UnsupportedOperationException(
+"Data type: " + fieldType + " not supported yet!");
   }
 
   fieldCoders.add(CODERS.get(fieldType));
@@ -166,4 +167,84 @@ public class BeamRecordSqlType extends BeamRecordType {
 return "BeamRecordSqlType [fieldNames=" + getFieldNames()
 + ", fieldTypes=" + fieldTypes + "]";
   }
+
+  public static Builder builder() {
+return new Builder();
+  }
+
+  /**
+   * Builder class to construct {@link BeamRecordSqlType}.
+   */
+  public static class Builder {
+
+private ImmutableList.Builder fieldNames;
+private ImmutableList.Builder fieldTypes;
+
+public Builder withField(String fieldName, Integer fieldType) {
+  fieldNames.add(fieldName);
+  fieldTypes.add(fieldType);
+  return this;
+}
+
+public Builder withTinyIntField(String fieldName) {
+  return withField(fieldName, Types.TINYINT);
+}
+
+public Builder withSmallIntField(String fieldName) {
+  return withField(fieldName, Types.SMALLINT);
+}
+
+public Builder withIntegerField(String fieldName) {
+  return withField(fieldName, Types.INTEGER);
+}
+
+public Builder withBigIntField(String fieldName) {
+  return withField(fieldName, Types.BIGINT);
+}
+
+public Builder withFloatField(String fieldName) {
+  return withField(fieldName, Types.FLOAT);
+}
+
+public Builder withDoubleField(String fieldName) {
+  return withField(fieldName, Types.DOUBLE);
+}
+
+public Builder withDecimalField(String fieldName) {
+  return withField(fieldName, Types.DECIMAL);
+}
+
+public Builder withBooleanField(String fieldName) {
+  return withField(fieldName, Types.BOOLEAN);
+}
+
+public Builder withCharField(String fieldName) {
+  return withField(fieldName, Types.CHAR);
+}
+
+public Builder withVarcharField(String fieldName) {
+  return withField(fieldName, Types.VARCHAR);
+}
+
+public Builder withTimeField(String fieldName) {
+  return withField(fieldName, Types.TIME);
+}
+
+public Builder withDateField(String fieldName) {
+  return withField(fieldName, Types.DATE);
+}
+
+public Builder withTimestampField(String fieldName) {
+  return withField(fieldName, Types.TIMESTAMP);
+}
+
+private Builder() {
+  this.fieldNames = ImmutableList.builder();
+  this.fieldTypes = ImmutableList.builder();
+}
+
+public BeamRecordSqlType build() {
+  return create(fieldNames.build(), fieldTypes.build());
+}
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlTypeTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlTypeTest.java
new file mode 100644
index 000..78ff221
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlTypeTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * 

Jenkins build is still unstable: beam_PostCommit_Java_ValidatesRunner_Spark #3580

2017-11-28 Thread Apache Jenkins Server
See 




[beam] branch master updated (5a37cbe -> 27f505e)

2017-11-28 Thread chamikara
This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 5a37cbe  Merge pull request #4171 from dmytroivanov4206/master
 add 961bd22  Fix BigTableIO to use non-deprecated Futures.addCallback 
function
 add 9df6a9e  Merge branch 'master' of github.com:rniemo-g/beam
 add a1e6c07  Merge branch 'master' of https://github.com/apache/beam
 new 27f505e  Merge pull request #4095 from rniemo-g/master

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java| 5 -
 1 file changed, 4 insertions(+), 1 deletion(-)

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" '].


[jira] [Commented] (BEAM-3156) Use non-deprecated Futures.addCallback function.

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270095#comment-16270095
 ] 

ASF GitHub Bot commented on BEAM-3156:
--

chamikaramj closed pull request #4095: [BEAM-3156] Fix BigTableIO to use 
non-deprecated Futures.addCallback function
URL: https://github.com/apache/beam/pull/4095
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 29dc269f7fd..cb40acbe447 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -36,6 +36,7 @@
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.util.Collections;
@@ -626,7 +627,9 @@ public void startBundle(StartBundleContext c) throws 
IOException {
   public void processElement(ProcessContext c) throws Exception {
 checkForFailures();
 Futures.addCallback(
-bigtableWriter.writeRecord(c.element()), new 
WriteExceptionCallback(c.element()));
+bigtableWriter.writeRecord(c.element()),
+new WriteExceptionCallback(c.element()),
+MoreExecutors.directExecutor());
 ++recordsWritten;
   }
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Use non-deprecated Futures.addCallback function.
> 
>
> Key: BEAM-3156
> URL: https://issues.apache.org/jira/browse/BEAM-3156
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-gcp
>Reporter: Ryan Niemocienski
>Assignee: Chamikara Jayalath
>Priority: Minor
>
> addCallback(ListenableFuture, FutureCallback) is in the process of being 
> deprecated in favor of addCallback(ListenableFuture, FutureCallback, 
> Executor). Currently BigtableIO uses the former function call; switch it to 
> the second one.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[beam] 01/01: Merge pull request #4095 from rniemo-g/master

2017-11-28 Thread chamikara
This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 27f505e341cc5298740c0c5724fdfb2ab322a4de
Merge: 5a37cbe a1e6c07
Author: Chamikara Jayalath 
AuthorDate: Tue Nov 28 19:58:52 2017 -0800

Merge pull request #4095 from rniemo-g/master

[BEAM-3156] Fix BigTableIO to use non-deprecated Futures.addCallback 
function

 .../main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java| 5 -
 1 file changed, 4 insertions(+), 1 deletion(-)


-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" .


[beam] 01/01: Merge pull request #4171 from dmytroivanov4206/master

2017-11-28 Thread chamikara
This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 5a37cbe995f37a88c40d25330dab72a923423598
Merge: f8d8ff1 990f491
Author: Chamikara Jayalath 
AuthorDate: Tue Nov 28 19:54:32 2017 -0800

Merge pull request #4171 from dmytroivanov4206/master

[BEAM-3008] Extends API for BigtableIO Read and Write by adding 
withInstanceId  and withProjectId

 .../beam/sdk/io/gcp/bigtable/BigtableIO.java   | 218 +
 .../beam/sdk/io/gcp/bigtable/BigtableIOTest.java   | 108 +++---
 2 files changed, 263 insertions(+), 63 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" .


[jira] [Commented] (BEAM-3008) BigtableIO should use ValueProviders

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270091#comment-16270091
 ] 

ASF GitHub Bot commented on BEAM-3008:
--

chamikaramj closed pull request #4171: [BEAM-3008] Extends API for BigtableIO 
Read and Write by adding withInstanceId  and withProjectId 
URL: https://github.com/apache/beam/pull/4171
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 29dc269f7fd..8f04c9dfdfd 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -78,38 +78,37 @@
  * The Bigtable source returns a set of rows from a single table, returning 
a
  * {@code PCollection}.
  *
- * To configure a Cloud Bigtable source, you must supply a table id and a 
{@link BigtableOptions}
- * or builder configured with the project and other information necessary to 
identify the
- * Bigtable instance. By default, {@link BigtableIO.Read} will read all rows 
in the table. The row
- * range to be read can optionally be restricted using {@link 
BigtableIO.Read#withKeyRange}, and
- * a {@link RowFilter} can be specified using {@link 
BigtableIO.Read#withRowFilter}. For example:
+ * To configure a Cloud Bigtable source, you must supply a table id, a 
project id, an instance
+ * id and optionally a {@link BigtableOptions} to provide more specific 
connection configuration.
+ * By default, {@link BigtableIO.Read} will read all rows in the table. The 
row range to be read
+ * can optionally be restricted using {@link BigtableIO.Read#withKeyRange}, 
and a {@link RowFilter}
+ * can be specified using {@link BigtableIO.Read#withRowFilter}. For example:
  *
  * {@code
- * BigtableOptions.Builder optionsBuilder =
- * new BigtableOptions.Builder()
- * .setProjectId("project")
- * .setInstanceId("instance");
  *
  * Pipeline p = ...;
  *
  * // Scan the entire table.
  * p.apply("read",
  * BigtableIO.read()
- * .withBigtableOptions(optionsBuilder)
+ * .withProjectId(projectId)
+ * .withInstanceId(instanceId)
  * .withTableId("table"));
  *
  * // Scan a prefix of the table.
  * ByteKeyRange keyRange = ...;
  * p.apply("read",
  * BigtableIO.read()
- * .withBigtableOptions(optionsBuilder)
+ * .withProjectId(projectId)
+ * .withInstanceId(instanceId)
  * .withTableId("table")
  * .withKeyRange(keyRange));
  *
  * // Scan a subset of rows that match the specified row filter.
  * p.apply("filtered read",
  * BigtableIO.read()
- * .withBigtableOptions(optionsBuilder)
+ * .withProjectId(projectId)
+ * .withInstanceId(instanceId)
  * .withTableId("table")
  * .withRowFilter(filter));
  * }
@@ -121,21 +120,17 @@
  * {@link ByteString} is the key of the row being mutated, and each {@link 
Mutation} represents an
  * idempotent transformation to that row.
  *
- * To configure a Cloud Bigtable sink, you must supply a table id and a 
{@link BigtableOptions}
- * or builder configured with the project and other information necessary to 
identify the
- * Bigtable instance, for example:
+ * To configure a Cloud Bigtable sink, you must supply a table id, a 
project id, an instance id
+ * and optionally and optionally a {@link BigtableOptions} to provide more 
specific connection
+ * configuration, for example:
  *
  * {@code
- * BigtableOptions.Builder optionsBuilder =
- * new BigtableOptions.Builder()
- * .setProjectId("project")
- * .setInstanceId("instance");
- *
  * PCollection> data = ...;
  *
  * data.apply("write",
  * BigtableIO.write()
- * .withBigtableOptions(optionsBuilder)
+ * .setProjectId("project")
+ * .setInstanceId("instance")
  * .withTableId("table"));
  * }
  *
@@ -146,8 +141,6 @@
  * {@code
  * BigtableOptions.Builder optionsBuilder =
  * new BigtableOptions.Builder()
- * .setProjectId("project")
- * .setInstanceId("instance")
  * .setUsePlaintextNegotiation(true)
  * .setCredentialOptions(CredentialOptions.nullCredential())
  * .setDataHost("127.0.0.1") // network interface where Bigtable 
emulator is bound
@@ -160,6 +153,8 @@
  * data.apply("write",
  * BigtableIO.write()
  * .withBigtableOptions(optionsBuilder)
+ * .setProjectId("project")

[beam] branch master updated (f8d8ff1 -> 5a37cbe)

2017-11-28 Thread chamikara
This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from f8d8ff1  Updating dataflow API version to newer release.
 add 2863776  Adds withInstanceId and withProjectId to the BigtableIO Read 
and Write classes, first out of four steps to fix [BEAM-3008] bug.
 add 990f491  Removes call to .withBitableOptions() form JavaDoc.
 new 5a37cbe  Merge pull request #4171 from dmytroivanov4206/master

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java   | 218 +
 .../beam/sdk/io/gcp/bigtable/BigtableIOTest.java   | 108 +++---
 2 files changed, 263 insertions(+), 63 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" '].


[jira] [Commented] (BEAM-2500) Add support for S3 as a Apache Beam FileSystem

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270088#comment-16270088
 ] 

ASF GitHub Bot commented on BEAM-2500:
--

jacobmarble commented on a change in pull request #4080: [BEAM-2500] Add S3 
FileSystem to Java SDK
URL: https://github.com/apache/beam/pull/4080#discussion_r153688315
 
 

 ##
 File path: 
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3WritableByteChannel.java
 ##
 @@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws.s3;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import com.amazonaws.services.s3.model.UploadPartResult;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A writable S3 object, as a {@link WritableByteChannel}.
+ */
+class S3WritableByteChannel implements WritableByteChannel {
+
+  private final AmazonS3 amazonS3;
+  private final S3ResourceId path;
+  private final String uploadId;
+  private final ByteArrayOutputStream uploadStream;
+  private final WritableByteChannel uploadChannel;
+  private final List eTags;
+
+  // Amazon parts are 1-indexed, not zero-indexed.
+  private int partNumber = 1;
+  private boolean open = true;
+
+  S3WritableByteChannel(
+  AmazonS3 amazonS3, S3ResourceId path, String storageClass, int 
uploadBufferSizeBytes)
+  throws IOException {
+this.amazonS3 = checkNotNull(amazonS3, "amazonS3");
+this.path = checkNotNull(path, "path");
+checkArgument(uploadBufferSizeBytes > 0, "uploadBufferSizeBytes");
+this.uploadStream = new ByteArrayOutputStream(uploadBufferSizeBytes);
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for S3 as a Apache Beam FileSystem
> --
>
> Key: BEAM-2500
> URL: https://issues.apache.org/jira/browse/BEAM-2500
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: Luke Cwik
>Assignee: Jacob Marble
>Priority: Minor
> Attachments: hadoop_fs_patch.patch
>
>
> Note that this is for providing direct integration with S3 as an Apache Beam 
> FileSystem.
> There is already support for using the Hadoop S3 connector by depending on 
> the Hadoop File System module[1], configuring HadoopFileSystemOptions[2] with 
> a S3 configuration[3].
> 1: https://github.com/apache/beam/tree/master/sdks/java/io/hadoop-file-system
> 2: 
> https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java#L53
> 3: https://wiki.apache.org/hadoop/AmazonS3



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2500) Add support for S3 as a Apache Beam FileSystem

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270089#comment-16270089
 ] 

ASF GitHub Bot commented on BEAM-2500:
--

jacobmarble commented on a change in pull request #4080: [BEAM-2500] Add S3 
FileSystem to Java SDK
URL: https://github.com/apache/beam/pull/4080#discussion_r153688326
 
 

 ##
 File path: 
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3WritableByteChannel.java
 ##
 @@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws.s3;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import com.amazonaws.services.s3.model.UploadPartResult;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A writable S3 object, as a {@link WritableByteChannel}.
+ */
+class S3WritableByteChannel implements WritableByteChannel {
+
+  private final AmazonS3 amazonS3;
+  private final S3ResourceId path;
+  private final String uploadId;
+  private final ByteArrayOutputStream uploadStream;
+  private final WritableByteChannel uploadChannel;
+  private final List eTags;
+
+  // Amazon parts are 1-indexed, not zero-indexed.
+  private int partNumber = 1;
+  private boolean open = true;
+
+  S3WritableByteChannel(
+  AmazonS3 amazonS3, S3ResourceId path, String storageClass, int 
uploadBufferSizeBytes)
+  throws IOException {
+this.amazonS3 = checkNotNull(amazonS3, "amazonS3");
+this.path = checkNotNull(path, "path");
+checkArgument(uploadBufferSizeBytes > 0, "uploadBufferSizeBytes");
+this.uploadStream = new ByteArrayOutputStream(uploadBufferSizeBytes);
+this.uploadChannel = Channels.newChannel(uploadStream);
+eTags = new ArrayList<>();
+
+InitiateMultipartUploadRequest request =
+new InitiateMultipartUploadRequest(path.getBucket(), path.getKey())
+.withStorageClass(storageClass);
+InitiateMultipartUploadResult result;
+try {
+  result = amazonS3.initiateMultipartUpload(request);
+} catch (AmazonServiceException e) {
+  throw new IOException(e);
+}
+uploadId = result.getUploadId();
+  }
+
+  @Override
+  public int write(ByteBuffer sourceBuffer) throws IOException {
+if (!isOpen()) {
+  throw new ClosedChannelException();
+}
+if (!sourceBuffer.hasArray()) {
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for S3 as a Apache Beam FileSystem
> --
>
> Key: BEAM-2500
> URL: https://issues.apache.org/jira/browse/BEAM-2500
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: Luke Cwik
>Assignee: Jacob Marble
>Priority: Minor
> Attachments: hadoop_fs_patch.patch
>
>
> Note that this is for providing direct integration with S3 as an Apache Beam 
> FileSystem.
> There is already support for using the Hadoop S3 connector by depending on 
> the Hadoop File System module[1], configuring HadoopFileSystemOptions[2] 

[jira] [Commented] (BEAM-2500) Add support for S3 as a Apache Beam FileSystem

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270085#comment-16270085
 ] 

ASF GitHub Bot commented on BEAM-2500:
--

jacobmarble commented on a change in pull request #4080: [BEAM-2500] Add S3 
FileSystem to Java SDK
URL: https://github.com/apache/beam/pull/4080#discussion_r153688273
 
 

 ##
 File path: 
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3ReadableSeekableByteChannel.java
 ##
 @@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws.s3;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.S3Object;
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.NonWritableChannelException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+
+/**
+ * A readable S3 object, as a {@link SeekableByteChannel}.
+ */
+class S3ReadableSeekableByteChannel implements SeekableByteChannel {
+
+  private final AmazonS3 amazonS3;
+  private final S3ResourceId path;
+  private final long contentLength;
+  private long position = 0;
+  private boolean open = true;
+  private S3Object s3Object;
+  private ReadableByteChannel s3ObjectContentChannel;
+
+  S3ReadableSeekableByteChannel(AmazonS3 amazonS3, S3ResourceId path) throws 
IOException {
+this.amazonS3 = checkNotNull(amazonS3, "amazonS3");
+this.path = checkNotNull(path, "path");
+try {
+  contentLength = amazonS3.getObjectMetadata(path.getBucket(), 
path.getKey())
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for S3 as a Apache Beam FileSystem
> --
>
> Key: BEAM-2500
> URL: https://issues.apache.org/jira/browse/BEAM-2500
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: Luke Cwik
>Assignee: Jacob Marble
>Priority: Minor
> Attachments: hadoop_fs_patch.patch
>
>
> Note that this is for providing direct integration with S3 as an Apache Beam 
> FileSystem.
> There is already support for using the Hadoop S3 connector by depending on 
> the Hadoop File System module[1], configuring HadoopFileSystemOptions[2] with 
> a S3 configuration[3].
> 1: https://github.com/apache/beam/tree/master/sdks/java/io/hadoop-file-system
> 2: 
> https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java#L53
> 3: https://wiki.apache.org/hadoop/AmazonS3



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2500) Add support for S3 as a Apache Beam FileSystem

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270087#comment-16270087
 ] 

ASF GitHub Bot commented on BEAM-2500:
--

jacobmarble commented on a change in pull request #4080: [BEAM-2500] Add S3 
FileSystem to Java SDK
URL: https://github.com/apache/beam/pull/4080#discussion_r153688303
 
 

 ##
 File path: 
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3ReadableSeekableByteChannel.java
 ##
 @@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws.s3;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.S3Object;
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.NonWritableChannelException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+
+/**
+ * A readable S3 object, as a {@link SeekableByteChannel}.
+ */
+class S3ReadableSeekableByteChannel implements SeekableByteChannel {
+
+  private final AmazonS3 amazonS3;
+  private final S3ResourceId path;
+  private final long contentLength;
+  private long position = 0;
+  private boolean open = true;
+  private S3Object s3Object;
+  private ReadableByteChannel s3ObjectContentChannel;
+
+  S3ReadableSeekableByteChannel(AmazonS3 amazonS3, S3ResourceId path) throws 
IOException {
+this.amazonS3 = checkNotNull(amazonS3, "amazonS3");
+this.path = checkNotNull(path, "path");
+try {
+  contentLength = amazonS3.getObjectMetadata(path.getBucket(), 
path.getKey())
+  .getContentLength();
+} catch (AmazonServiceException e) {
+  throw new IOException(e);
+}
+  }
+
+  @Override
+  public int read(ByteBuffer destinationBuffer) throws IOException {
+if (!isOpen()) {
+  throw new ClosedChannelException();
+}
+if (!destinationBuffer.hasArray()) {
+  throw new UnsupportedOperationException("ByteBuffer.hasArray() must be 
true");
+}
+if (!destinationBuffer.hasRemaining()) {
+  return 0;
+}
+if (position == contentLength) {
+  return -1;
+}
+
+if (s3Object == null) {
+  GetObjectRequest request = new GetObjectRequest(path.getBucket(), 
path.getKey());
+  if (position > 0) {
+request.setRange(position, contentLength);
+  }
+  try {
+s3Object = amazonS3.getObject(request);
+  } catch (AmazonServiceException e) {
+throw new IOException(e);
+  }
+  s3ObjectContentChannel = Channels.newChannel(
+  new BufferedInputStream(s3Object.getObjectContent(), 1024 * 1024));
+}
+
+int totalBytesRead = 0;
+int bytesRead = 0;
+
+do {
+  totalBytesRead += bytesRead;
+  bytesRead = s3ObjectContentChannel.read(destinationBuffer);
+} while (bytesRead > 0);
+
+position += totalBytesRead;
+return totalBytesRead;
+  }
+
+  @Override
+  public long position() throws ClosedChannelException {
+if (!isOpen()) {
+  throw new ClosedChannelException();
+}
+return position;
+  }
+
+  @Override
+  public SeekableByteChannel position(long newPosition) throws IOException {
+if (!isOpen()) {
+  throw new ClosedChannelException();
+}
+checkArgument(newPosition >= 0, "newPosition too low");
+checkArgument(newPosition < contentLength, "new position too high");
+if (s3Object != null) {
 
 Review comment:
   Agreed. Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for S3 as a Apache 

[jira] [Commented] (BEAM-2500) Add support for S3 as a Apache Beam FileSystem

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270083#comment-16270083
 ] 

ASF GitHub Bot commented on BEAM-2500:
--

jacobmarble commented on a change in pull request #4080: [BEAM-2500] Add S3 
FileSystem to Java SDK
URL: https://github.com/apache/beam/pull/4080#discussion_r153688243
 
 

 ##
 File path: 
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
 ##
 @@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws.s3;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CopyPartRequest;
+import com.amazonaws.services.s3.model.CopyPartResult;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
+import com.amazonaws.services.s3.model.ListObjectsV2Request;
+import com.amazonaws.services.s3.model.ListObjectsV2Result;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.base.Strings;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.aws.options.S3Options;
+import org.apache.beam.sdk.io.fs.CreateOptions;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class S3FileSystem extends FileSystem {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(S3FileSystem.class);
+
+  // Amazon S3 API docs: Each part must be at least 5 MB in size, except the 
last part.
+  private static final int MINIMUM_UPLOAD_BUFFER_SIZE_BYTES = 5 * 1024 * 1024;
+  private static final int DEFAULT_UPLOAD_BUFFER_SIZE_BYTES =
+  Runtime.getRuntime().maxMemory() < 512 * 1024 * 1024
+  ? MINIMUM_UPLOAD_BUFFER_SIZE_BYTES
+  : 64 * 1024 * 1024;
+  private static final int MAX_THREADS_PER_CONCURRENT_COPY = 3;
+
+  // S3 API, delete-objects: "You may specify up to 1000 keys."
+  private static final int MAX_DELETE_OBJECTS_PER_REQUEST = 1000;
+
+  // Non-final for testing.
+  private AmazonS3 amazonS3;
+  private final String storageClass;
+  private final int s3UploadBufferSizeBytes;
+  

[jira] [Commented] (BEAM-2500) Add support for S3 as a Apache Beam FileSystem

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270086#comment-16270086
 ] 

ASF GitHub Bot commented on BEAM-2500:
--

jacobmarble commented on a change in pull request #4080: [BEAM-2500] Add S3 
FileSystem to Java SDK
URL: https://github.com/apache/beam/pull/4080#discussion_r153688289
 
 

 ##
 File path: 
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3ReadableSeekableByteChannel.java
 ##
 @@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws.s3;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.S3Object;
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.NonWritableChannelException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+
+/**
+ * A readable S3 object, as a {@link SeekableByteChannel}.
+ */
+class S3ReadableSeekableByteChannel implements SeekableByteChannel {
+
+  private final AmazonS3 amazonS3;
+  private final S3ResourceId path;
+  private final long contentLength;
+  private long position = 0;
+  private boolean open = true;
+  private S3Object s3Object;
+  private ReadableByteChannel s3ObjectContentChannel;
+
+  S3ReadableSeekableByteChannel(AmazonS3 amazonS3, S3ResourceId path) throws 
IOException {
+this.amazonS3 = checkNotNull(amazonS3, "amazonS3");
+this.path = checkNotNull(path, "path");
+try {
+  contentLength = amazonS3.getObjectMetadata(path.getBucket(), 
path.getKey())
+  .getContentLength();
+} catch (AmazonServiceException e) {
+  throw new IOException(e);
+}
+  }
+
+  @Override
+  public int read(ByteBuffer destinationBuffer) throws IOException {
+if (!isOpen()) {
+  throw new ClosedChannelException();
+}
+if (!destinationBuffer.hasArray()) {
+  throw new UnsupportedOperationException("ByteBuffer.hasArray() must be 
true");
 
 Review comment:
   Nope. Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for S3 as a Apache Beam FileSystem
> --
>
> Key: BEAM-2500
> URL: https://issues.apache.org/jira/browse/BEAM-2500
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: Luke Cwik
>Assignee: Jacob Marble
>Priority: Minor
> Attachments: hadoop_fs_patch.patch
>
>
> Note that this is for providing direct integration with S3 as an Apache Beam 
> FileSystem.
> There is already support for using the Hadoop S3 connector by depending on 
> the Hadoop File System module[1], configuring HadoopFileSystemOptions[2] with 
> a S3 configuration[3].
> 1: https://github.com/apache/beam/tree/master/sdks/java/io/hadoop-file-system
> 2: 
> https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java#L53
> 3: https://wiki.apache.org/hadoop/AmazonS3



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2500) Add support for S3 as a Apache Beam FileSystem

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270082#comment-16270082
 ] 

ASF GitHub Bot commented on BEAM-2500:
--

jacobmarble commented on a change in pull request #4080: [BEAM-2500] Add S3 
FileSystem to Java SDK
URL: https://github.com/apache/beam/pull/4080#discussion_r153688223
 
 

 ##
 File path: 
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
 ##
 @@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws.s3;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CopyPartRequest;
+import com.amazonaws.services.s3.model.CopyPartResult;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
+import com.amazonaws.services.s3.model.ListObjectsV2Request;
+import com.amazonaws.services.s3.model.ListObjectsV2Result;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.base.Strings;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.aws.options.S3Options;
+import org.apache.beam.sdk.io.fs.CreateOptions;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class S3FileSystem extends FileSystem {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(S3FileSystem.class);
+
+  // Amazon S3 API docs: Each part must be at least 5 MB in size, except the 
last part.
+  private static final int MINIMUM_UPLOAD_BUFFER_SIZE_BYTES = 5 * 1024 * 1024;
+  private static final int DEFAULT_UPLOAD_BUFFER_SIZE_BYTES =
+  Runtime.getRuntime().maxMemory() < 512 * 1024 * 1024
+  ? MINIMUM_UPLOAD_BUFFER_SIZE_BYTES
+  : 64 * 1024 * 1024;
+  private static final int MAX_THREADS_PER_CONCURRENT_COPY = 3;
+
+  // S3 API, delete-objects: "You may specify up to 1000 keys."
+  private static final int MAX_DELETE_OBJECTS_PER_REQUEST = 1000;
+
+  // Non-final for testing.
+  private AmazonS3 amazonS3;
+  private final String storageClass;
+  private final int s3UploadBufferSizeBytes;
+  

[jira] [Commented] (BEAM-2500) Add support for S3 as a Apache Beam FileSystem

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270081#comment-16270081
 ] 

ASF GitHub Bot commented on BEAM-2500:
--

jacobmarble commented on a change in pull request #4080: [BEAM-2500] Add S3 
FileSystem to Java SDK
URL: https://github.com/apache/beam/pull/4080#discussion_r153688200
 
 

 ##
 File path: 
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
 ##
 @@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws.s3;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CopyPartRequest;
+import com.amazonaws.services.s3.model.CopyPartResult;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
+import com.amazonaws.services.s3.model.ListObjectsV2Request;
+import com.amazonaws.services.s3.model.ListObjectsV2Result;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.base.Strings;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.aws.options.S3Options;
+import org.apache.beam.sdk.io.fs.CreateOptions;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class S3FileSystem extends FileSystem {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(S3FileSystem.class);
+
+  // Amazon S3 API docs: Each part must be at least 5 MB in size, except the 
last part.
+  private static final int MINIMUM_UPLOAD_BUFFER_SIZE_BYTES = 5 * 1024 * 1024;
+  private static final int DEFAULT_UPLOAD_BUFFER_SIZE_BYTES =
+  Runtime.getRuntime().maxMemory() < 512 * 1024 * 1024
+  ? MINIMUM_UPLOAD_BUFFER_SIZE_BYTES
+  : 64 * 1024 * 1024;
+  private static final int MAX_THREADS_PER_CONCURRENT_COPY = 3;
+
+  // S3 API, delete-objects: "You may specify up to 1000 keys."
+  private static final int MAX_DELETE_OBJECTS_PER_REQUEST = 1000;
+
+  // Non-final for testing.
+  private AmazonS3 amazonS3;
+  private final String storageClass;
+  private final int s3UploadBufferSizeBytes;
+  

[jira] [Commented] (BEAM-2500) Add support for S3 as a Apache Beam FileSystem

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270080#comment-16270080
 ] 

ASF GitHub Bot commented on BEAM-2500:
--

jacobmarble commented on a change in pull request #4080: [BEAM-2500] Add S3 
FileSystem to Java SDK
URL: https://github.com/apache/beam/pull/4080#discussion_r153688054
 
 

 ##
 File path: 
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
 ##
 @@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws.s3;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CopyPartRequest;
+import com.amazonaws.services.s3.model.CopyPartResult;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
+import com.amazonaws.services.s3.model.ListObjectsV2Request;
+import com.amazonaws.services.s3.model.ListObjectsV2Result;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.base.Strings;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.aws.options.S3Options;
+import org.apache.beam.sdk.io.fs.CreateOptions;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class S3FileSystem extends FileSystem {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(S3FileSystem.class);
+
+  // Amazon S3 API docs: Each part must be at least 5 MB in size, except the 
last part.
+  private static final int MINIMUM_UPLOAD_BUFFER_SIZE_BYTES = 5 * 1024 * 1024;
+  private static final int DEFAULT_UPLOAD_BUFFER_SIZE_BYTES =
+  Runtime.getRuntime().maxMemory() < 512 * 1024 * 1024
+  ? MINIMUM_UPLOAD_BUFFER_SIZE_BYTES
+  : 64 * 1024 * 1024;
+  private static final int MAX_THREADS_PER_CONCURRENT_COPY = 3;
+
+  // S3 API, delete-objects: "You may specify up to 1000 keys."
+  private static final int MAX_DELETE_OBJECTS_PER_REQUEST = 1000;
+
+  // Non-final for testing.
+  private AmazonS3 amazonS3;
+  private final String storageClass;
+  private final int s3UploadBufferSizeBytes;
+  

[jira] [Commented] (BEAM-2500) Add support for S3 as a Apache Beam FileSystem

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270076#comment-16270076
 ] 

ASF GitHub Bot commented on BEAM-2500:
--

jacobmarble commented on a change in pull request #4080: [BEAM-2500] Add S3 
FileSystem to Java SDK
URL: https://github.com/apache/beam/pull/4080#discussion_r153687438
 
 

 ##
 File path: 
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
 ##
 @@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws.s3;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CopyPartRequest;
+import com.amazonaws.services.s3.model.CopyPartResult;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
+import com.amazonaws.services.s3.model.ListObjectsV2Request;
+import com.amazonaws.services.s3.model.ListObjectsV2Result;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.base.Strings;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.aws.options.S3Options;
+import org.apache.beam.sdk.io.fs.CreateOptions;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class S3FileSystem extends FileSystem {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(S3FileSystem.class);
+
+  // Amazon S3 API docs: Each part must be at least 5 MB in size, except the 
last part.
+  private static final int MINIMUM_UPLOAD_BUFFER_SIZE_BYTES = 5 * 1024 * 1024;
+  private static final int DEFAULT_UPLOAD_BUFFER_SIZE_BYTES =
+  Runtime.getRuntime().maxMemory() < 512 * 1024 * 1024
+  ? MINIMUM_UPLOAD_BUFFER_SIZE_BYTES
+  : 64 * 1024 * 1024;
+  private static final int MAX_THREADS_PER_CONCURRENT_COPY = 3;
+
+  // S3 API, delete-objects: "You may specify up to 1000 keys."
+  private static final int MAX_DELETE_OBJECTS_PER_REQUEST = 1000;
+
+  // Non-final for testing.
+  private AmazonS3 amazonS3;
+  private final String storageClass;
+  private final int s3UploadBufferSizeBytes;
+  

[jira] [Commented] (BEAM-2500) Add support for S3 as a Apache Beam FileSystem

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270074#comment-16270074
 ] 

ASF GitHub Bot commented on BEAM-2500:
--

jacobmarble commented on a change in pull request #4080: [BEAM-2500] Add S3 
FileSystem to Java SDK
URL: https://github.com/apache/beam/pull/4080#discussion_r153687320
 
 

 ##
 File path: 
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/AwsOptions.java
 ##
 @@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.aws.options;
+
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Options used to configure Amazon Web Services specific options such as 
credentials and region.
+ */
+public interface AwsOptions extends PipelineOptions {
+
+  @Description("AWS access key ID")
+  String getAwsAccessKeyId();
+  void setAwsAccessKeyId(String value);
+
+  @Description("AWS secret access key")
+  String getAwsSecretAccessKey();
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for S3 as a Apache Beam FileSystem
> --
>
> Key: BEAM-2500
> URL: https://issues.apache.org/jira/browse/BEAM-2500
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: Luke Cwik
>Assignee: Jacob Marble
>Priority: Minor
> Attachments: hadoop_fs_patch.patch
>
>
> Note that this is for providing direct integration with S3 as an Apache Beam 
> FileSystem.
> There is already support for using the Hadoop S3 connector by depending on 
> the Hadoop File System module[1], configuring HadoopFileSystemOptions[2] with 
> a S3 configuration[3].
> 1: https://github.com/apache/beam/tree/master/sdks/java/io/hadoop-file-system
> 2: 
> https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java#L53
> 3: https://wiki.apache.org/hadoop/AmazonS3



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2500) Add support for S3 as a Apache Beam FileSystem

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270075#comment-16270075
 ] 

ASF GitHub Bot commented on BEAM-2500:
--

jacobmarble commented on a change in pull request #4080: [BEAM-2500] Add S3 
FileSystem to Java SDK
URL: https://github.com/apache/beam/pull/4080#discussion_r153687348
 
 

 ##
 File path: 
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
 ##
 @@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws.s3;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CopyPartRequest;
+import com.amazonaws.services.s3.model.CopyPartResult;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
+import com.amazonaws.services.s3.model.ListObjectsV2Request;
+import com.amazonaws.services.s3.model.ListObjectsV2Result;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.base.Strings;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.aws.options.S3Options;
+import org.apache.beam.sdk.io.fs.CreateOptions;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class S3FileSystem extends FileSystem {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(S3FileSystem.class);
+
+  // Amazon S3 API docs: Each part must be at least 5 MB in size, except the 
last part.
+  private static final int MINIMUM_UPLOAD_BUFFER_SIZE_BYTES = 5 * 1024 * 1024;
+  private static final int DEFAULT_UPLOAD_BUFFER_SIZE_BYTES =
+  Runtime.getRuntime().maxMemory() < 512 * 1024 * 1024
+  ? MINIMUM_UPLOAD_BUFFER_SIZE_BYTES
+  : 64 * 1024 * 1024;
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> 

[jira] [Commented] (BEAM-2500) Add support for S3 as a Apache Beam FileSystem

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270069#comment-16270069
 ] 

ASF GitHub Bot commented on BEAM-2500:
--

jacobmarble commented on a change in pull request #4080: [BEAM-2500] Add S3 
FileSystem to Java SDK
URL: https://github.com/apache/beam/pull/4080#discussion_r153687178
 
 

 ##
 File path: 
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/AwsOptions.java
 ##
 @@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.aws.options;
+
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Options used to configure Amazon Web Services specific options such as 
credentials and region.
+ */
+public interface AwsOptions extends PipelineOptions {
+
+  @Description("AWS access key ID")
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for S3 as a Apache Beam FileSystem
> --
>
> Key: BEAM-2500
> URL: https://issues.apache.org/jira/browse/BEAM-2500
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: Luke Cwik
>Assignee: Jacob Marble
>Priority: Minor
> Attachments: hadoop_fs_patch.patch
>
>
> Note that this is for providing direct integration with S3 as an Apache Beam 
> FileSystem.
> There is already support for using the Hadoop S3 connector by depending on 
> the Hadoop File System module[1], configuring HadoopFileSystemOptions[2] with 
> a S3 configuration[3].
> 1: https://github.com/apache/beam/tree/master/sdks/java/io/hadoop-file-system
> 2: 
> https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java#L53
> 3: https://wiki.apache.org/hadoop/AmazonS3



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-3041) Add portable Python SDK container setup support

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270045#comment-16270045
 ] 

ASF GitHub Bot commented on BEAM-3041:
--

herohde commented on a change in pull request #4192: [BEAM-3041] preinstall 
various packages for better startup performance and reliability
URL: https://github.com/apache/beam/pull/4192#discussion_r153684774
 
 

 ##
 File path: sdks/python/container/Dockerfile
 ##
 @@ -19,8 +19,64 @@
 FROM python:2
 MAINTAINER "Apache Beam "
 
-# TODO(herohde): preinstall various packages for better startup
-# performance and reliability.
+# Install native bindings required for dependencies.
+RUN apt-get update && \
+apt-get install -y \
+   # These packages are needed for "pip install python-snappy" below.
+   libsnappy-dev \
+   # This package is needed for "pip install pyyaml" below to have c 
bindings.
+   libyaml-dev
 
 Review comment:
   You should remove the apt-get package lists as part of this command: rm -rf 
/var/lib/apt/lists/*


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add portable Python SDK container setup support
> ---
>
> Key: BEAM-3041
> URL: https://issues.apache.org/jira/browse/BEAM-3041
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Henning Rohde
>Assignee: Ahmet Altay
>  Labels: portability
>
> The minimal python container setup should be brought up to par with SDK 
> features:
>  - requirements.txt
>  - main session
>  - extra packages
> The name of the SDK package in boot.go should also not be hardcoded.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-3041) Add portable Python SDK container setup support

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270044#comment-16270044
 ] 

ASF GitHub Bot commented on BEAM-3041:
--

herohde commented on issue #4192: [BEAM-3041] preinstall various packages for 
better startup performance and reliability
URL: https://github.com/apache/beam/pull/4192#issuecomment-347740080
 
 
   LGTM
   
   In the future, we should consider using multiple Dockerfiles, so that 
customizations like picking a different version of tensorflow is easier. But, 
for now, a single container image is fine.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add portable Python SDK container setup support
> ---
>
> Key: BEAM-3041
> URL: https://issues.apache.org/jira/browse/BEAM-3041
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Henning Rohde
>Assignee: Ahmet Altay
>  Labels: portability
>
> The minimal python container setup should be brought up to par with SDK 
> features:
>  - requirements.txt
>  - main session
>  - extra packages
> The name of the SDK package in boot.go should also not be hardcoded.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Jenkins build is back to stable : beam_PostCommit_Java_ValidatesRunner_Dataflow #4431

2017-11-28 Thread Apache Jenkins Server
See 




[jira] [Commented] (BEAM-1920) Add Spark 2.x support in Spark runner

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269979#comment-16269979
 ] 

ASF GitHub Bot commented on BEAM-1920:
--

amitsela commented on issue #3808: [BEAM-1920] Add a Spark 2.x support in the 
Spark runner
URL: https://github.com/apache/beam/pull/3808#issuecomment-347734773
 
 
   @jbonofre maybe a new PR for this? this has too much noise already, and the 
Pr should be something like "upgrade Spark to Spark 2.1.0" or something, no ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Spark 2.x support in Spark runner
> -
>
> Key: BEAM-1920
> URL: https://issues.apache.org/jira/browse/BEAM-1920
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>
> I have a branch working with both Spark 1 and Spark 2 backend.
> I'm preparing a pull request about that.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[beam-site] 01/03: Capabilities Matrix: Stateful processing support in Apex Runner

2017-11-28 Thread altay
This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit e6dfdd3cd81db509ca4af1507600b3eec147cd26
Author: Thomas Weise 
AuthorDate: Wed Nov 15 06:07:42 2017 -0800

Capabilities Matrix: Stateful processing support in Apex Runner
---
 src/_data/capability-matrix.yml | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/src/_data/capability-matrix.yml b/src/_data/capability-matrix.yml
index ff6d302..acac0ad 100644
--- a/src/_data/capability-matrix.yml
+++ b/src/_data/capability-matrix.yml
@@ -387,9 +387,9 @@ categories:
 l2: not implemented
 l3: Spark supports per-key state with mapWithState() so 
support should be straightforward.
   - class: apex
-l1: 'No'
-l2: not implemented
-l3: Apex supports per-key state, so adding support for this should 
be easy.
+l1: 'Partially'
+l2: non-merging windows
+l3: State is supported for non-merging windows. SetState and 
MapState are not yet supported.
   - class: gearpump
 l1: 'No'
 l2: not implemented

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" .


[beam-site] 03/03: This closes #351

2017-11-28 Thread altay
This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit e49377a416d66c7179afe64ab0c7e9c97b62825e
Merge: da72cf2 f49ffbb
Author: Ahmet Altay 
AuthorDate: Tue Nov 28 18:40:13 2017 -0800

This closes #351

 content/documentation/runners/capability-matrix/index.html | 4 ++--
 src/_data/capability-matrix.yml| 6 +++---
 2 files changed, 5 insertions(+), 5 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" .


[beam-site] branch asf-site updated (da72cf2 -> e49377a)

2017-11-28 Thread altay
This is an automated email from the ASF dual-hosted git repository.

altay pushed a change to branch asf-site
in repository https://gitbox.apache.org/repos/asf/beam-site.git.


from da72cf2  Prepare repository for deployment.
 new e6dfdd3  Capabilities Matrix: Stateful processing support in Apex 
Runner
 new f49ffbb  Regenerate website
 new e49377a  This closes #351

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 content/documentation/runners/capability-matrix/index.html | 4 ++--
 src/_data/capability-matrix.yml| 6 +++---
 2 files changed, 5 insertions(+), 5 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" '].


[beam-site] 02/03: Regenerate website

2017-11-28 Thread altay
This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit f49ffbbbae606cf8763d4bbc83de6a10fdcea622
Author: Ahmet Altay 
AuthorDate: Tue Nov 28 18:40:13 2017 -0800

Regenerate website
---
 content/documentation/runners/capability-matrix/index.html | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/content/documentation/runners/capability-matrix/index.html 
b/content/documentation/runners/capability-matrix/index.html
index 143e801..1b87a9f 100644
--- a/content/documentation/runners/capability-matrix/index.html
+++ b/content/documentation/runners/capability-matrix/index.html
@@ -643,7 +643,7 @@
 
 
 
-
+~
 
 
 
@@ -2177,7 +2177,7 @@
 
 
 
-No: not 
implementedApex supports per-key state, so adding support 
for this should be easy.
+Partially: 
non-merging windowsState is supported for non-merging 
windows. SetState and MapState are not yet supported.
 
 
 

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" .


[beam-site] branch mergebot updated (b057ee9 -> ee36748)

2017-11-28 Thread mergebot-role
This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a change to branch mergebot
in repository https://gitbox.apache.org/repos/asf/beam-site.git.


from b057ee9  This closes #356
 add da72cf2  Prepare repository for deployment.
 new 2505570  Capabilities Matrix: Stateful processing support in Apex 
Runner
 new ee36748  This closes #351

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 content/contribute/ptransform-style-guide/index.html | 4 ++--
 src/_data/capability-matrix.yml  | 6 +++---
 2 files changed, 5 insertions(+), 5 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" '].


[jira] [Updated] (BEAM-3267) Return file names from TFRecordIO write

2017-11-28 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/BEAM-3267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Łukasz Gajowy updated BEAM-3267:

Description: 
In TFRecordIO we cannot actually retrurn filename set while doing the write. It 
prevents us from creating pipelines that write and then read in the same 
pipeline. It was acomplished in TextIO - there's even an example use case in 
TextIOIT. Maybe we can do the same in TFRecordIO?

See: 
https://github.com/apache/beam/blob/f8d8ff14c49e4dfb15541f4b73aa66513c9a9d23/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java#L114

  was:In TFRecordIO we cannot actually retrurn filename set while doing the 
write. It prevents us from creating pipelines that write and then read in the 
same pipeline. It was acomplished in TextIO - there's even an example use case 
in TextIOIT. Maybe we can do the same in TFRecordIO?


> Return file names from TFRecordIO write
> ---
>
> Key: BEAM-3267
> URL: https://issues.apache.org/jira/browse/BEAM-3267
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Łukasz Gajowy
>Assignee: Kenneth Knowles
>Priority: Minor
>
> In TFRecordIO we cannot actually retrurn filename set while doing the write. 
> It prevents us from creating pipelines that write and then read in the same 
> pipeline. It was acomplished in TextIO - there's even an example use case in 
> TextIOIT. Maybe we can do the same in TFRecordIO?
> See: 
> https://github.com/apache/beam/blob/f8d8ff14c49e4dfb15541f4b73aa66513c9a9d23/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java#L114



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[beam-site] 02/02: This closes #351

2017-11-28 Thread mergebot-role
This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a commit to branch mergebot
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit ee36748a5975f140e3466abd131f5bf75e75e850
Merge: da72cf2 2505570
Author: Mergebot 
AuthorDate: Wed Nov 29 02:28:25 2017 +

This closes #351

 src/_data/capability-matrix.yml | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" .


[jira] [Updated] (BEAM-3267) Return file names from TFRecordIO write

2017-11-28 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/BEAM-3267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Łukasz Gajowy updated BEAM-3267:

Description: In TFRecordIO we cannot actually retrurn filename set while 
doing the write. It prevents us from creating pipelines that write and then 
read in the same pipeline. It was acomplished in TextIO - there's even an 
example use case in 
[https://github.com/apache/beam/blob/f8d8ff14c49e4dfb15541f4b73aa66513c9a9d23/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java#L114](TextIOIT).
 Maybe we can do the same in TFRecordIO?  (was: In TFRecordIO we cannot 
actually retrurn filename set while doing the write. It prevents us from 
creating pipelines that write and then read in the same pipeline. It was 
acomplished in TextIO - there's even an example use case in 
[TextIOIT](https://github.com/apache/beam/blob/f8d8ff14c49e4dfb15541f4b73aa66513c9a9d23/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java#L114).
 Maybe we can do the same in TFRecordIO?)

> Return file names from TFRecordIO write
> ---
>
> Key: BEAM-3267
> URL: https://issues.apache.org/jira/browse/BEAM-3267
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Łukasz Gajowy
>Assignee: Kenneth Knowles
>Priority: Minor
>
> In TFRecordIO we cannot actually retrurn filename set while doing the write. 
> It prevents us from creating pipelines that write and then read in the same 
> pipeline. It was acomplished in TextIO - there's even an example use case in 
> [https://github.com/apache/beam/blob/f8d8ff14c49e4dfb15541f4b73aa66513c9a9d23/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java#L114](TextIOIT).
>  Maybe we can do the same in TFRecordIO?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[beam-site] 01/02: Capabilities Matrix: Stateful processing support in Apex Runner

2017-11-28 Thread mergebot-role
This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a commit to branch mergebot
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit 250557073393627a4682bb9a277455cd7962b373
Author: Thomas Weise 
AuthorDate: Wed Nov 15 06:07:42 2017 -0800

Capabilities Matrix: Stateful processing support in Apex Runner
---
 src/_data/capability-matrix.yml | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/src/_data/capability-matrix.yml b/src/_data/capability-matrix.yml
index ff6d302..acac0ad 100644
--- a/src/_data/capability-matrix.yml
+++ b/src/_data/capability-matrix.yml
@@ -387,9 +387,9 @@ categories:
 l2: not implemented
 l3: Spark supports per-key state with mapWithState() so 
support should be straightforward.
   - class: apex
-l1: 'No'
-l2: not implemented
-l3: Apex supports per-key state, so adding support for this should 
be easy.
+l1: 'Partially'
+l2: non-merging windows
+l3: State is supported for non-merging windows. SetState and 
MapState are not yet supported.
   - class: gearpump
 l1: 'No'
 l2: not implemented

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" .


[jira] [Updated] (BEAM-3267) Return file names from TFRecordIO write

2017-11-28 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/BEAM-3267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Łukasz Gajowy updated BEAM-3267:

Description: In TFRecordIO we cannot actually retrurn filename set while 
doing the write. It prevents us from creating pipelines that write and then 
read in the same pipeline. It was acomplished in TextIO - there's even an 
example use case in 
[TextIOIT](https://github.com/apache/beam/blob/f8d8ff14c49e4dfb15541f4b73aa66513c9a9d23/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java#L114).
 Maybe we can do the same in TFRecordIO?  (was: I mean an analogous version of 
the TextIO's method of the same name. Currently there's no way to perform write 
and then read in one pipeline. A method like this would allow to create such 
pipelines easily. )

> Return file names from TFRecordIO write
> ---
>
> Key: BEAM-3267
> URL: https://issues.apache.org/jira/browse/BEAM-3267
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Łukasz Gajowy
>Assignee: Kenneth Knowles
>Priority: Minor
>
> In TFRecordIO we cannot actually retrurn filename set while doing the write. 
> It prevents us from creating pipelines that write and then read in the same 
> pipeline. It was acomplished in TextIO - there's even an example use case in 
> [TextIOIT](https://github.com/apache/beam/blob/f8d8ff14c49e4dfb15541f4b73aa66513c9a9d23/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java#L114).
>  Maybe we can do the same in TFRecordIO?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-3267) Return file names from TFRecordIO write

2017-11-28 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/BEAM-3267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Łukasz Gajowy updated BEAM-3267:

Description: In TFRecordIO we cannot actually retrurn filename set while 
doing the write. It prevents us from creating pipelines that write and then 
read in the same pipeline. It was acomplished in TextIO - there's even an 
example use case in TextIOIT. Maybe we can do the same in TFRecordIO?  (was: In 
TFRecordIO we cannot actually retrurn filename set while doing the write. It 
prevents us from creating pipelines that write and then read in the same 
pipeline. It was acomplished in TextIO - there's even an example use case in 
[https://github.com/apache/beam/blob/f8d8ff14c49e4dfb15541f4b73aa66513c9a9d23/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java#L114](TextIOIT).
 Maybe we can do the same in TFRecordIO?)

> Return file names from TFRecordIO write
> ---
>
> Key: BEAM-3267
> URL: https://issues.apache.org/jira/browse/BEAM-3267
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Łukasz Gajowy
>Assignee: Kenneth Knowles
>Priority: Minor
>
> In TFRecordIO we cannot actually retrurn filename set while doing the write. 
> It prevents us from creating pipelines that write and then read in the same 
> pipeline. It was acomplished in TextIO - there's even an example use case in 
> TextIOIT. Maybe we can do the same in TFRecordIO?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-3267) Return file names from TFRecordIO write

2017-11-28 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/BEAM-3267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Łukasz Gajowy updated BEAM-3267:

Summary: Return file names from TFRecordIO write  (was: Add 
getPerDestinationOutputFilenames() method in TFRecordIO)

> Return file names from TFRecordIO write
> ---
>
> Key: BEAM-3267
> URL: https://issues.apache.org/jira/browse/BEAM-3267
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Łukasz Gajowy
>Assignee: Kenneth Knowles
>Priority: Minor
>
> I mean an analogous version of the TextIO's method of the same name. 
> Currently there's no way to perform write and then read in one pipeline. A 
> method like this would allow to create such pipelines easily. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-3267) Add getPerDestinationOutputFilenames() method in TFRecordIO

2017-11-28 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/BEAM-3267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Łukasz Gajowy updated BEAM-3267:

Description: I mean an analogous version of the TextIO's method of the same 
name. Currently there's no way to perform write and then read in one pipeline. 
A method like this would allow to create such pipelines easily.   (was: I mean 
an analogous version of the TextIO's method of the same name. It would let us 
for example perform write and then read in one pipeline instead of two or any 
similar operations. )

> Add getPerDestinationOutputFilenames() method in TFRecordIO
> ---
>
> Key: BEAM-3267
> URL: https://issues.apache.org/jira/browse/BEAM-3267
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Łukasz Gajowy
>Assignee: Kenneth Knowles
>Priority: Minor
>
> I mean an analogous version of the TextIO's method of the same name. 
> Currently there's no way to perform write and then read in one pipeline. A 
> method like this would allow to create such pipelines easily. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Jenkins build is back to normal : beam_PerformanceTests_Python #615

2017-11-28 Thread Apache Jenkins Server
See 




Jenkins build became unstable: beam_PostCommit_Java_ValidatesRunner_Spark #3579

2017-11-28 Thread Apache Jenkins Server
See 




[jira] [Commented] (BEAM-3164) Capture stderr logs during gen proto

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269922#comment-16269922
 ] 

ASF GitHub Bot commented on BEAM-3164:
--

robertwb commented on issue #4126: [BEAM-3164] Flush stderr on failures
URL: https://github.com/apache/beam/pull/4126#issuecomment-347726752
 
 
   Jenkins: retest this please
   
   Were there logs missing without these flushes?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Capture stderr logs during gen proto
> 
>
> Key: BEAM-3164
> URL: https://issues.apache.org/jira/browse/BEAM-3164
> Project: Beam
>  Issue Type: Bug
>  Components: build-system, sdk-py-core
>Reporter: holdenk
>Assignee: Holden Karau
>
> Currently python PRs are failing with gen-proto failures, but these are 
> difficult to debug because we don't capture the information (see 
> https://builds.apache.org/job/beam_PreCommit_Python_MavenInstall/727/console 
> ).
> cc [~altay] [~robertwb]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Jenkins build is back to normal : beam_PerformanceTests_Spark #1059

2017-11-28 Thread Apache Jenkins Server
See 




Jenkins build became unstable: beam_PostCommit_Java_MavenInstall #5330

2017-11-28 Thread Apache Jenkins Server
See 




[jira] [Commented] (BEAM-3268) getPerDestinationOutputFilenames() is getting processed before write is finished on dataflow runner

2017-11-28 Thread Eugene Kirpichov (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269877#comment-16269877
 ] 

Eugene Kirpichov commented on BEAM-3268:


cc: [~reuvenlax]

> getPerDestinationOutputFilenames() is getting processed before write is 
> finished on dataflow runner
> ---
>
> Key: BEAM-3268
> URL: https://issues.apache.org/jira/browse/BEAM-3268
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Affects Versions: 2.3.0
>Reporter: Kamil Szewczyk
>Assignee: Reuven Lax
> Attachments: comparison.png
>
>
> While running filebased-io-test we found dataflow-runnner misbehaving. We run 
> tests using single pipeline and without using Reshuffling between writing and 
> reading dataflow jobs are unsuccessful because the runner tries to access the 
> files that were not created yet. 
> On the picture the difference between execution of writting is presented. On 
> the left there is working example with Reshuffling added and on the right 
> without it.
> !comparison.png|thumbnail!
> Steps to reproduce: substitute your-bucket-name wit your valid bucket.
> {code:java}
> mvn -e -Pio-it verify -pl sdks/java/io/file-based-io-tests 
> -DintegrationTestPipelineOptions='["--runner=dataflow", 
> "--filenamePrefix=gs://your-bucket-name/TEXTIO_IT"]' -Pdataflow-runner
> {code}
> Then look on the cloud console and job should fail.
> Now add Reshuffling to 
> sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
>  as in the example.
> {code:java}
> .getPerDestinationOutputFilenames().apply(Values.create())
> .apply(Reshuffle.viaRandomKey());
> PCollection consolidatedHashcode = testFilenames
> {code}
> and trigger previously used maven command to see it working in the console 
> right now.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-3268) getPerDestinationOutputFilenames() is getting processed before write is finished on dataflow runner

2017-11-28 Thread Eugene Kirpichov (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269876#comment-16269876
 ] 

Eugene Kirpichov commented on BEAM-3268:


Yeah this is a bug, because the transforms that produce 
perDestinationOutputFilenames produce them before the files are actually 
copied, e.g. 
https://github.com/apache/beam/blob/f8d8ff14c49e4dfb15541f4b73aa66513c9a9d23/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L942

One fix is to reorder that code (and the respective code in 
FinalizeWindowedFn). Another fix is to insert a reshuffle somewhere around 
https://github.com/apache/beam/blob/f8d8ff14c49e4dfb15541f4b73aa66513c9a9d23/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L816
 , which is less brittle - I would prefer the latter.

> getPerDestinationOutputFilenames() is getting processed before write is 
> finished on dataflow runner
> ---
>
> Key: BEAM-3268
> URL: https://issues.apache.org/jira/browse/BEAM-3268
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Affects Versions: 2.3.0
>Reporter: Kamil Szewczyk
>Assignee: Reuven Lax
> Attachments: comparison.png
>
>
> While running filebased-io-test we found dataflow-runnner misbehaving. We run 
> tests using single pipeline and without using Reshuffling between writing and 
> reading dataflow jobs are unsuccessful because the runner tries to access the 
> files that were not created yet. 
> On the picture the difference between execution of writting is presented. On 
> the left there is working example with Reshuffling added and on the right 
> without it.
> !comparison.png|thumbnail!
> Steps to reproduce: substitute your-bucket-name wit your valid bucket.
> {code:java}
> mvn -e -Pio-it verify -pl sdks/java/io/file-based-io-tests 
> -DintegrationTestPipelineOptions='["--runner=dataflow", 
> "--filenamePrefix=gs://your-bucket-name/TEXTIO_IT"]' -Pdataflow-runner
> {code}
> Then look on the cloud console and job should fail.
> Now add Reshuffling to 
> sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
>  as in the example.
> {code:java}
> .getPerDestinationOutputFilenames().apply(Values.create())
> .apply(Reshuffle.viaRandomKey());
> PCollection consolidatedHashcode = testFilenames
> {code}
> and trigger previously used maven command to see it working in the console 
> right now.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-3030) watchForNewFiles() can emit a file multiple times if it's growing

2017-11-28 Thread Eugene Kirpichov (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269863#comment-16269863
 ] 

Eugene Kirpichov commented on BEAM-3030:


Fix in https://github.com/apache/beam/pull/4190

> watchForNewFiles() can emit a file multiple times if it's growing
> -
>
> Key: BEAM-3030
> URL: https://issues.apache.org/jira/browse/BEAM-3030
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Eugene Kirpichov
>Assignee: Eugene Kirpichov
> Fix For: 2.3.0
>
>
> TextIO and AvroIO watchForNewFiles(), as well as 
> FileIO.match().continuously(), use Watch transform under the hood, and watch 
> the set of Metadata matching a filepattern.
> Two Metadata's with the same filename but different size are not considered 
> equal, so if these transforms observe the same file multiple times with 
> different sizes, they'll read the file multiple times.
> This is likely not yet a problem for production users, because these features 
> require SDF, it's supported only in Dataflow runner, and users of the 
> Dataflow runner are likely to use only files on GCS which doesn't support 
> appends. However, this needs to be fixed still.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[beam] branch master updated: Updating dataflow API version to newer release.

2017-11-28 Thread bchambers
This is an automated email from the ASF dual-hosted git repository.

bchambers pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new f8d8ff1  Updating dataflow API version to newer release.
f8d8ff1 is described below

commit f8d8ff14c49e4dfb15541f4b73aa66513c9a9d23
Author: Pablo 
AuthorDate: Mon Nov 27 12:59:55 2017 -0800

Updating dataflow API version to newer release.
---
 build.gradle | 2 +-
 pom.xml  | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/build.gradle b/build.gradle
index d4181f4..7b25227 100644
--- a/build.gradle
+++ b/build.gradle
@@ -84,7 +84,7 @@ ext.library = [
 google_api_services_bigquery: 
"com.google.apis:google-api-services-bigquery:v2-rev355-$google_clients_version",
 google_api_services_clouddebugger: 
"com.google.apis:google-api-services-clouddebugger:v2-rev8-$google_clients_version",
 google_api_services_cloudresourcemanager: 
"com.google.apis:google-api-services-cloudresourcemanager:v1-rev6-$google_clients_version",
-google_api_services_dataflow: 
"com.google.apis:google-api-services-dataflow:v1b3-rev218-$google_clients_version",
+google_api_services_dataflow: 
"com.google.apis:google-api-services-dataflow:v1b3-rev221-$google_clients_version",
 google_api_services_pubsub: 
"com.google.apis:google-api-services-pubsub:v1-rev10-$google_clients_version",
 google_api_services_storage: 
"com.google.apis:google-api-services-storage:v1-rev71-$google_clients_version",
 google_auth_library_credentials: 
"com.google.auth:google-auth-library-credentials:$google_auth_version",
diff --git a/pom.xml b/pom.xml
index a071d36..efddbeb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -113,7 +113,7 @@
 v1-rev6-1.22.0
 0.1.18
 v2-rev8-1.22.0
-v1b3-rev218-1.22.0
+v1b3-rev221-1.22.0
 0.5.160222
 1.4.0
 1.3.0

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" '].


[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269819#comment-16269819
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

udim commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r153664972
 
 

 ##
 File path: sdks/python/apache_beam/testing/util.py
 ##
 @@ -46,6 +56,26 @@ class BeamAssertException(Exception):
   pass
 
 
+# Used for reifying timestamps and windows for assert_that matchers.
 
 Review comment:
   done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core, sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2524) Update Google Cloud Console URL returned by DataflowRunner to support regions.

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269818#comment-16269818
 ] 

ASF GitHub Bot commented on BEAM-2524:
--

tgroh commented on a change in pull request #4144: [BEAM-2524] Update the 
gcloud cancel command to include the --region flag.
URL: https://github.com/apache/beam/pull/4144#discussion_r153664913
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
 ##
 @@ -214,8 +214,8 @@ public static String 
getGcloudCancelCommand(DataflowPipelineOptions options, Str
 }
 
 // Assemble cancel command from optional prefix and project/job parameters.
-return String.format("%s%s jobs --project=%s cancel %s",
-dataflowApiOverridePrefix, GCLOUD_DATAFLOW_PREFIX, 
options.getProject(), jobId);
+return String.format("%s%s jobs --project=%s cancel %s --region=%s",
 
 Review comment:
   I'm generically confused about this ordering; is the region part of the 
cancel command, or can it be in the same location as the `project` flag (which 
is preferable to me)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Update Google Cloud Console URL returned by DataflowRunner to support regions.
> --
>
> Key: BEAM-2524
> URL: https://issues.apache.org/jira/browse/BEAM-2524
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Robert Burke
>Assignee: Robert Burke
> Fix For: 2.2.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Both the Java and Python Dataflow Runners need to be updated with a 
> regionalized form of the Google Cloud Console URL to support multiple 
> Dataflow Regions.
> The new URL format will be:
> https://console.cloud.corp.google.com/dataflow/jobsDetail/locations//jobs/?project=



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2524) Update Google Cloud Console URL returned by DataflowRunner to support regions.

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269814#comment-16269814
 ] 

ASF GitHub Bot commented on BEAM-2524:
--

lostluck commented on issue #4144: [BEAM-2524] Update the gcloud cancel command 
to include the --region flag.
URL: https://github.com/apache/beam/pull/4144#issuecomment-347711694
 
 
   @tgroh has reviewed my runner changes previously, sorry for lagging on 
fixing the pre-commit tests, I've had other priorities to handle.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Update Google Cloud Console URL returned by DataflowRunner to support regions.
> --
>
> Key: BEAM-2524
> URL: https://issues.apache.org/jira/browse/BEAM-2524
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Robert Burke
>Assignee: Robert Burke
> Fix For: 2.2.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Both the Java and Python Dataflow Runners need to be updated with a 
> regionalized form of the Google Cloud Console URL to support multiple 
> Dataflow Regions.
> The new URL format will be:
> https://console.cloud.corp.google.com/dataflow/jobsDetail/locations//jobs/?project=



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-3008) BigtableIO should use ValueProviders

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269811#comment-16269811
 ] 

ASF GitHub Bot commented on BEAM-3008:
--

chamikaramj commented on issue #4171: [BEAM-3008] Extends API for BigtableIO 
Read and Write by adding withInstanceId  and withProjectId 
URL: https://github.com/apache/beam/pull/4171#issuecomment-347711204
 
 
   Thanks. LGTM.
   
   I'll merge.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> BigtableIO should use ValueProviders 
> -
>
> Key: BEAM-3008
> URL: https://issues.apache.org/jira/browse/BEAM-3008
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-gcp
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>
> [https://github.com/apache/beam/pull/2057] is an effort towards BigtableIO 
> templatization.  This Issue is a request to get a fully featured template for 
> BigtableIO.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2345) Version configuration of plugins / dependencies in root pom.xml is inconsistent

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269803#comment-16269803
 ] 

ASF GitHub Bot commented on BEAM-2345:
--

dhalperi commented on issue #3205: [BEAM-2345] Make versioning in root pom 
consistent.
URL: https://github.com/apache/beam/pull/3205#issuecomment-344022571
 
 
   
   
   
   
   Reviewed 1 of 1 files at r2.
   Review status: all files reviewed at latest revision, all discussions 
resolved, some commit checks failed.
   
   ---
   
   
   
   *Comments from 
[Reviewable](https://reviewable.io:443/reviews/apache/beam/3205)*
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Version configuration of plugins / dependencies in root pom.xml is 
> inconsistent
> ---
>
> Key: BEAM-2345
> URL: https://issues.apache.org/jira/browse/BEAM-2345
> Project: Beam
>  Issue Type: Bug
>  Components: build-system
>Reporter: Jason Kuster
>Assignee: Jason Kuster
>Priority: Minor
> Fix For: 2.3.0
>
>
> Versioning in root pom.xml in some places is controlled by the properties 
> section, sometimes is just inline. Move all versioning of plugins / 
> dependencies to properties section.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-3030) watchForNewFiles() can emit a file multiple times if it's growing

2017-11-28 Thread Eugene Kirpichov (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269802#comment-16269802
 ] 

Eugene Kirpichov commented on BEAM-3030:


This also happens in FileIOTest: 
https://builds.apache.org/job/beam_PostCommit_Java_MavenInstall/org.apache.beam$beam-runners-direct-java/5317/testReport/junit/org.apache.beam.sdk.io/FileIOTest/testMatchWatchForNewFiles/

> watchForNewFiles() can emit a file multiple times if it's growing
> -
>
> Key: BEAM-3030
> URL: https://issues.apache.org/jira/browse/BEAM-3030
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Eugene Kirpichov
>Assignee: Eugene Kirpichov
> Fix For: 2.3.0
>
>
> TextIO and AvroIO watchForNewFiles(), as well as 
> FileIO.match().continuously(), use Watch transform under the hood, and watch 
> the set of Metadata matching a filepattern.
> Two Metadata's with the same filename but different size are not considered 
> equal, so if these transforms observe the same file multiple times with 
> different sizes, they'll read the file multiple times.
> This is likely not yet a problem for production users, because these features 
> require SDF, it's supported only in Dataflow runner, and users of the 
> Dataflow runner are likely to use only files on GCS which doesn't support 
> appends. However, this needs to be fixed still.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-3182) [Nexmark][SQL] Implement supported queries

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269779#comment-16269779
 ] 

ASF GitHub Bot commented on BEAM-3182:
--

akedin commented on issue #4129: [BEAM-3182][Nexmark][SQL] Implement query 1
URL: https://github.com/apache/beam/pull/4129#issuecomment-347707122
 
 
   @aaltay next step is to finish the [previous 
PR](https://github.com/apache/beam/pull/4128). After that this one can be 
rebased and merged as well. 
   
   Probably this PR  can be reviewed independently, it only has 1 commit 
related to it, other commits are from the previous PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> [Nexmark][SQL] Implement supported queries
> --
>
> Key: BEAM-3182
> URL: https://issues.apache.org/jira/browse/BEAM-3182
> Project: Beam
>  Issue Type: Sub-task
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Anton Kedin
>
> Implement all queries which can be run with current SQL features and Nexmark 
> infrastructure.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Jenkins build became unstable: beam_PostCommit_Java_ValidatesRunner_Dataflow #4430

2017-11-28 Thread Apache Jenkins Server
See 




[jira] [Commented] (BEAM-3261) Apex runner does not detect pipeline failure

2017-11-28 Thread Eugene Kirpichov (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269755#comment-16269755
 ] 

Eugene Kirpichov commented on BEAM-3261:


It looks like a separate issue to me. I filed 
https://issues.apache.org/jira/browse/BEAM-3269.

> Apex runner does not detect pipeline failure
> 
>
> Key: BEAM-3261
> URL: https://issues.apache.org/jira/browse/BEAM-3261
> Project: Beam
>  Issue Type: Bug
>  Components: runner-apex
>Reporter: Eugene Kirpichov
>Assignee: Thomas Weise
>Priority: Blocker
>
> I was looking at https://github.com/apache/beam/pull/4074/files and asked 
> myself "Don't we already have a ValidatesRunner test for this?"
> Turns out we do: 
> https://github.com/apache/beam/blob/3b79b6298e84711528b5ad1302200cb8acbac07e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java#L793
> I ran this test with TestApexRunner, and I observed the same exception as in 
> https://stackoverflow.com/questions/46982827/error-when-using-side-input-withsideinputs-method-not-accepting-kv-type-as-inp?noredirect=1#comment81040223_46982827
>  , however the test passed.
> It seems that it is passing because ApexRunnerResult.waitUntilFinish() 
> detects only PAssert assertion errors, but not any other errors: 
> https://github.com/apache/beam/blob/3b79b6298e84711528b5ad1302200cb8acbac07e/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java#L62
> This seems very problematic, as it means that 1) the status of Apex 
> ValidatesRunner tests cannot be trusted: if they fail in any other way other 
> than a PAssert error, the failure will be undetected, and 2) in general, a 
> user can not trust a successful pipeline.run() from Apex runner.
> For tests in particular, some other TestXRunner's guard against such a 
> failure mode by verifying not only that there were no assertion failures, but 
> also that all assertions succeeded - using metrics: e.g. 
> https://github.com/apache/beam/blob/3b79b6298e84711528b5ad1302200cb8acbac07e/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java#L248
>  . Fixing this would be optimal, but meanwhile, the runner should at least 
> fail the pipeline in case of error.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-3269) Occasional ClassCastException in ParDoTranslatorTest.testAssertionFailure

2017-11-28 Thread Eugene Kirpichov (JIRA)
Eugene Kirpichov created BEAM-3269:
--

 Summary: Occasional ClassCastException in 
ParDoTranslatorTest.testAssertionFailure
 Key: BEAM-3269
 URL: https://issues.apache.org/jira/browse/BEAM-3269
 Project: Beam
  Issue Type: Bug
  Components: runner-apex
Reporter: Eugene Kirpichov
Assignee: Thomas Weise


A postcommit build failed: 

https://builds.apache.org/job/beam_PostCommit_Java_MavenInstall/org.apache.beam$beam-runners-apex/5326/testReport/junit/org.apache.beam.runners.apex.translation/ParDoTranslatorTest/testAssertionFailure/

because the following exception happened instead of the expected PAssert 
assertion failure exception:

{code}
85212 [1/PAssert$2/GroupGlobally/Create.Values:ApexReadUnboundedInputOperator] 
INFO  com.datatorrent.stram.StramLocalCluster  - container-10 msg: Stopped 
running due to an exception. java.lang.ClassCastException: java.lang.String 
cannot be cast to org.apache.beam.sdk.values.KV
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:652)
at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:599)
at 
org.apache.beam.runners.apex.translation.utils.ApexStreamTuple$ApexStreamTupleCoder.encode(ApexStreamTuple.java:179)
at 
org.apache.beam.runners.apex.translation.utils.ApexStreamTuple$ApexStreamTupleCoder.encode(ApexStreamTuple.java:152)
at 
org.apache.beam.runners.apex.translation.utils.CoderAdapterStreamCodec.toByteArray(CoderAdapterStreamCodec.java:61)
at 
com.datatorrent.stram.stream.BufferServerPublisher.put(BufferServerPublisher.java:140)
at com.datatorrent.api.DefaultOutputPort.emit(DefaultOutputPort.java:59)
at 
org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator.emitTuples(ApexReadUnboundedInputOperator.java:157)
at com.datatorrent.stram.engine.InputNode.run(InputNode.java:124)
at 
com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1428)
{code}

I wasn't able to reproduce this issue by running the test locally >100 times, 
but it looks alarming nevertheless.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-3268) getPerDestinationOutputFilenames() is getting processed before write is finished on dataflow runner

2017-11-28 Thread Chamikara Jayalath (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269746#comment-16269746
 ] 

Chamikara Jayalath commented on BEAM-3268:
--

cc: [~jkff]

> getPerDestinationOutputFilenames() is getting processed before write is 
> finished on dataflow runner
> ---
>
> Key: BEAM-3268
> URL: https://issues.apache.org/jira/browse/BEAM-3268
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Affects Versions: 2.3.0
>Reporter: Kamil Szewczyk
>Assignee: Chamikara Jayalath
> Attachments: comparison.png
>
>
> While running filebased-io-test we found dataflow-runnner misbehaving. We run 
> tests using single pipeline and without using Reshuffling between writing and 
> reading dataflow jobs are unsuccessful because the runner tries to access the 
> files that were not created yet. 
> On the picture the difference between execution of writting is presented. On 
> the left there is working example with Reshuffling added and on the right 
> without it.
> !comparison.png|thumbnail!
> Steps to reproduce: substitute your-bucket-name wit your valid bucket.
> {code:java}
> mvn -e -Pio-it verify -pl sdks/java/io/file-based-io-tests 
> -DintegrationTestPipelineOptions='["--runner=dataflow", 
> "--filenamePrefix=gs://your-bucket-name/TEXTIO_IT"]' -Pdataflow-runner
> {code}
> Then look on the cloud console and job should fail.
> Now add Reshuffling to 
> sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
>  as in the example.
> {code:java}
> .getPerDestinationOutputFilenames().apply(Values.create())
> .apply(Reshuffle.viaRandomKey());
> PCollection consolidatedHashcode = testFilenames
> {code}
> and trigger previously used maven command to see it working in the console 
> right now.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (BEAM-3268) getPerDestinationOutputFilenames() is getting processed before write is finished on dataflow runner

2017-11-28 Thread Kamil Szewczyk (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamil Szewczyk reassigned BEAM-3268:


Assignee: Chamikara Jayalath  (was: Thomas Groh)

> getPerDestinationOutputFilenames() is getting processed before write is 
> finished on dataflow runner
> ---
>
> Key: BEAM-3268
> URL: https://issues.apache.org/jira/browse/BEAM-3268
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Affects Versions: 2.3.0
>Reporter: Kamil Szewczyk
>Assignee: Chamikara Jayalath
> Attachments: comparison.png
>
>
> While running filebased-io-test we found dataflow-runnner misbehaving. We run 
> tests using single pipeline and without using Reshuffling between writing and 
> reading dataflow jobs are unsuccessful because the runner tries to access the 
> files that were not created yet. 
> On the picture the difference between execution of writting is presented. On 
> the left there is working example with Reshuffling added and on the right 
> without it.
> !comparison.png|thumbnail!
> Steps to reproduce: substitute your-bucket-name wit your valid bucket.
> {code:java}
> mvn -e -Pio-it verify -pl sdks/java/io/file-based-io-tests 
> -DintegrationTestPipelineOptions='["--runner=dataflow", 
> "--filenamePrefix=gs://your-bucket-name/TEXTIO_IT"]' -Pdataflow-runner
> {code}
> Then look on the cloud console and job should fail.
> Now add Reshuffling to 
> sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
>  as in the example.
> {code:java}
> .getPerDestinationOutputFilenames().apply(Values.create())
> .apply(Reshuffle.viaRandomKey());
> PCollection consolidatedHashcode = testFilenames
> {code}
> and trigger previously used maven command to see it working in the console 
> right now.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-3268) getPerDestinationOutputFilenames() is getting processed before write is finished on dataflow runner

2017-11-28 Thread Kamil Szewczyk (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamil Szewczyk updated BEAM-3268:
-
Attachment: comparison.png

> getPerDestinationOutputFilenames() is getting processed before write is 
> finished on dataflow runner
> ---
>
> Key: BEAM-3268
> URL: https://issues.apache.org/jira/browse/BEAM-3268
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Affects Versions: 2.3.0
>Reporter: Kamil Szewczyk
>Assignee: Thomas Groh
> Attachments: comparison.png
>
>
> While running filebased-io-test we found dataflow-runnner misbehaving. We run 
> tests using single pipeline and without using Reshuffling between writing and 
> reading dataflow jobs are unsuccessful because the runner tries to access the 
> files that were not created yet. 
> On the picture the difference between execution of writting is presented. On 
> the left there is working example with Reshuffling added and on the right 
> without it.
> !comparison.png|thumbnail!
> Steps to reproduce: substitute your-bucket-name wit your valid bucket.
> {code:java}
> mvn -e -Pio-it verify -pl sdks/java/io/file-based-io-tests 
> -DintegrationTestPipelineOptions='["--runner=dataflow", 
> "--filenamePrefix=gs://your-bucket-name/TEXTIO_IT"]' -Pdataflow-runner
> {code}
> Then look on the cloud console and job should fail.
> Now add Reshuffling to 
> sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
>  as in the example.
> {code:java}
> .getPerDestinationOutputFilenames().apply(Values.create())
> .apply(Reshuffle.viaRandomKey());
> PCollection consolidatedHashcode = testFilenames
> {code}
> and trigger previously used maven command to see it working in the console 
> right now.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-3268) getPerDestinationOutputFilenames() is getting processed before write is finished on dataflow runner

2017-11-28 Thread Kamil Szewczyk (JIRA)
Kamil Szewczyk created BEAM-3268:


 Summary: getPerDestinationOutputFilenames() is getting processed 
before write is finished on dataflow runner
 Key: BEAM-3268
 URL: https://issues.apache.org/jira/browse/BEAM-3268
 Project: Beam
  Issue Type: Bug
  Components: runner-dataflow
Affects Versions: 2.3.0
Reporter: Kamil Szewczyk
Assignee: Thomas Groh


While running filebased-io-test we found dataflow-runnner misbehaving. We run 
tests using single pipeline and without using Reshuffling between writing and 
reading dataflow jobs are unsuccessful because the runner tries to access the 
files that were not created yet. 

On the picture the difference between execution of writting is presented. On 
the left there is working example with Reshuffling added and on the right 
without it.
!comparison.png|thumbnail!

Steps to reproduce: substitute your-bucket-name wit your valid bucket.

{code:java}
mvn -e -Pio-it verify -pl sdks/java/io/file-based-io-tests 
-DintegrationTestPipelineOptions='["--runner=dataflow", 
"--filenamePrefix=gs://your-bucket-name/TEXTIO_IT"]' -Pdataflow-runner
{code}

Then look on the cloud console and job should fail.
Now add Reshuffling to 
sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
 as in the example.
{code:java}
.getPerDestinationOutputFilenames().apply(Values.create())
.apply(Reshuffle.viaRandomKey());

PCollection consolidatedHashcode = testFilenames
{code}

and trigger previously used maven command to see it working in the console 
right now.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2524) Update Google Cloud Console URL returned by DataflowRunner to support regions.

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269724#comment-16269724
 ] 

ASF GitHub Bot commented on BEAM-2524:
--

aaltay commented on issue #4144: [BEAM-2524] Update the gcloud cancel command 
to include the --region flag.
URL: https://github.com/apache/beam/pull/4144#issuecomment-347701692
 
 
   cc: @ThatRfernand any suggestions for reviewers?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Update Google Cloud Console URL returned by DataflowRunner to support regions.
> --
>
> Key: BEAM-2524
> URL: https://issues.apache.org/jira/browse/BEAM-2524
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Robert Burke
>Assignee: Robert Burke
> Fix For: 2.2.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Both the Java and Python Dataflow Runners need to be updated with a 
> regionalized form of the Google Cloud Console URL to support multiple 
> Dataflow Regions.
> The new URL format will be:
> https://console.cloud.corp.google.com/dataflow/jobsDetail/locations//jobs/?project=



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-3060) Add performance tests for commonly used file-based I/O PTransforms

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269723#comment-16269723
 ] 

ASF GitHub Bot commented on BEAM-3060:
--

chamikaramj commented on issue #4189: [BEAM-3060] add TFRecordIOIT
URL: https://github.com/apache/beam/pull/4189#issuecomment-347701609
 
 
   R: @jkff can you take this ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add performance tests for commonly used file-based I/O PTransforms
> --
>
> Key: BEAM-3060
> URL: https://issues.apache.org/jira/browse/BEAM-3060
> Project: Beam
>  Issue Type: Test
>  Components: sdk-java-core
>Reporter: Chamikara Jayalath
>Assignee: Szymon Nieradka
>
> We recently added a performance testing framework [1] that can be used to do 
> following.
> (1) Execute Beam tests using PerfkitBenchmarker
> (2) Manage Kubernetes-based deployments of data stores.
> (3) Easily publish benchmark results. 
> I think it will be useful to add performance tests for commonly used 
> file-based I/O PTransforms using this framework. I suggest looking into 
> following formats initially.
> (1) AvroIO
> (2) TextIO
> (3) Compressed text using TextIO
> (4) TFRecordIO
> It should be possibly to run these tests for various Beam runners (Direct, 
> Dataflow, Flink, Spark, etc.) and file-systems (GCS, local, HDFS, etc.) 
> easily.
> In the initial version, tests can be made manually triggerable for PRs 
> through Jenkins. Later, we could make some of these tests run periodically 
> and publish benchmark results (to BigQuery) through PerfkitBenchmarker.
> [1] https://beam.apache.org/documentation/io/testing/



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-3247) Sample.any memory constraint

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269722#comment-16269722
 ] 

ASF GitHub Bot commented on BEAM-3247:
--

asfgit closed pull request #4175: [BEAM-3247] fix Sample.any performance
URL: https://github.com/apache/beam/pull/4175
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
index f3bd07a27ac..bda30816f6b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
@@ -27,12 +27,10 @@
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
 
 /**
  * {@code PTransform}s for taking samples of the elements in a
@@ -57,10 +55,6 @@
* If limit is greater than or equal to the size of the input
* {@code PCollection}, then all the input's elements will be selected.
*
-   * All of the elements of the output {@code PCollection} should fit into
-   * main memory of a single worker machine.  This operation does not
-   * run in parallel.
-   *
* Example of use:
*  {@code
* PCollection input = ...;
@@ -149,11 +143,9 @@ private Any(long limit) {
 
 @Override
 public PCollection expand(PCollection in) {
-  PCollectionView iterableView = 
in.apply(View.asIterable());
-  return in.getPipeline()
-  .apply(Create.of((Void) null).withCoder(VoidCoder.of()))
-  .apply(ParDo.of(new SampleAnyDoFn<>(limit, 
iterableView)).withSideInputs(iterableView))
-  .setCoder(in.getCoder());
+  return in
+  .apply(Combine.globally(new SampleAnyCombineFn(limit)))
+  .apply(Flatten.iterables());
 }
 
 @Override
@@ -209,25 +201,45 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
   }
 
   /**
-   * A {@link DoFn} that returns up to limit elements from the side input 
PCollection.
+   * A {@link CombineFn} that combines into a {@link List} of up to limit 
elements.
*/
-  private static class SampleAnyDoFn extends DoFn {
-long limit;
-final PCollectionView iterableView;
+  private static class SampleAnyCombineFn extends CombineFn {
+private final long limit;
 
-public SampleAnyDoFn(long limit, PCollectionView 
iterableView) {
+private SampleAnyCombineFn(long limit) {
   this.limit = limit;
-  this.iterableView = iterableView;
 }
 
-@ProcessElement
-public void processElement(ProcessContext c) {
-  for (T i : c.sideInput(iterableView)) {
-if (limit-- <= 0) {
-  break;
+@Override
+public List createAccumulator() {
+  return new ArrayList<>();
+}
+
+@Override
+public List addInput(List accumulator, T input) {
+  if (accumulator.size() < limit) {
+accumulator.add(input);
+  }
+  return accumulator;
+}
+
+@Override
+public List mergeAccumulators(Iterable accumulators) {
+  List merged = new ArrayList<>();
+  for (List accumulator : accumulators) {
+for (T t : accumulator) {
+  merged.add(t);
+  if (merged.size() >= limit) {
+return merged;
+  }
 }
-c.output(i);
   }
+  return merged;
+}
+
+@Override
+public Iterable extractOutput(List accumulator) {
+  return accumulator;
 }
   }
 
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
index 80f361f1c8a..0677a7919b7 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
@@ -40,7 +40,12 @@
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.PCollection;
+import 

[beam] branch master updated: [BEAM-3247] fix Sample.any performance

2017-11-28 Thread jkff
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 6f309e4  [BEAM-3247] fix Sample.any performance
 new 1ef170e  This closes #4175: [BEAM-3247] fix Sample.any performance
6f309e4 is described below

commit 6f309e462e48cd2b928564b2d3ed70f53a26b76b
Author: Neville Li 
AuthorDate: Fri Nov 24 16:50:30 2017 -0500

[BEAM-3247] fix Sample.any performance
---
 .../org/apache/beam/sdk/transforms/Sample.java | 63 ++--
 .../org/apache/beam/sdk/transforms/SampleTest.java | 87 +-
 2 files changed, 125 insertions(+), 25 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
index f3bd07a..2eb12d6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.transforms;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
@@ -27,12 +28,10 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
 
 /**
  * {@code PTransform}s for taking samples of the elements in a
@@ -57,10 +56,6 @@ public class Sample {
* If limit is greater than or equal to the size of the input
* {@code PCollection}, then all the input's elements will be selected.
*
-   * All of the elements of the output {@code PCollection} should fit into
-   * main memory of a single worker machine.  This operation does not
-   * run in parallel.
-   *
* Example of use:
*  {@code
* PCollection input = ...;
@@ -149,11 +144,9 @@ public class Sample {
 
 @Override
 public PCollection expand(PCollection in) {
-  PCollectionView iterableView = 
in.apply(View.asIterable());
-  return in.getPipeline()
-  .apply(Create.of((Void) null).withCoder(VoidCoder.of()))
-  .apply(ParDo.of(new SampleAnyDoFn<>(limit, 
iterableView)).withSideInputs(iterableView))
-  .setCoder(in.getCoder());
+  return in
+  .apply(Combine.globally(new 
SampleAnyCombineFn(limit)).withoutDefaults())
+  .apply(Flatten.iterables());
 }
 
 @Override
@@ -209,25 +202,49 @@ public class Sample {
   }
 
   /**
-   * A {@link DoFn} that returns up to limit elements from the side input 
PCollection.
+   * A {@link CombineFn} that combines into a {@link List} of up to limit 
elements.
*/
-  private static class SampleAnyDoFn extends DoFn {
-long limit;
-final PCollectionView iterableView;
+  private static class SampleAnyCombineFn extends CombineFn {
+private final long limit;
 
-public SampleAnyDoFn(long limit, PCollectionView 
iterableView) {
+private SampleAnyCombineFn(long limit) {
   this.limit = limit;
-  this.iterableView = iterableView;
 }
 
-@ProcessElement
-public void processElement(ProcessContext c) {
-  for (T i : c.sideInput(iterableView)) {
-if (limit-- <= 0) {
-  break;
+@Override
+public List createAccumulator() {
+  return new ArrayList((int) limit);
+}
+
+@Override
+public List addInput(List accumulator, T input) {
+  if (accumulator.size() < limit) {
+accumulator.add(input);
+  }
+  return accumulator;
+}
+
+@Override
+public List mergeAccumulators(Iterable accumulators) {
+  Iterator iter = accumulators.iterator();
+  if (!iter.hasNext()) {
+return createAccumulator();
+  }
+  List res = iter.next();
+  while (iter.hasNext()) {
+for (T t : iter.next()) {
+  res.add(t);
+  if (res.size() >= limit) {
+return res;
+  }
 }
-c.output(i);
   }
+  return res;
+}
+
+@Override
+public Iterable extractOutput(List accumulator) {
+  return accumulator;
 }
   }
 
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
index 80f361f..357f256 100644
--- 

[jira] [Commented] (BEAM-3060) Add performance tests for commonly used file-based I/O PTransforms

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269717#comment-16269717
 ] 

ASF GitHub Bot commented on BEAM-3060:
--

lgajowy opened a new pull request #4189: [BEAM-3060] add TFRecordIOIT
URL: https://github.com/apache/beam/pull/4189
 
 
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [ ] Make sure there is a [JIRA 
issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the 
change (usually before you start working on it).  Trivial changes like typos do 
not require a JIRA issue.  Your pull request should address just this issue, 
without pulling in other changes.
- [ ] Each commit in the pull request should have a meaningful subject line 
and body.
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue.
- [ ] Write a pull request description that is detailed enough to 
understand what the pull request does, how, and why.
- [ ] Run `mvn clean verify` to make sure basic checks pass. A more 
thorough check will be performed on your pull request automatically.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   ---
   
   Another test for the 3060 task. This one uses two pipelines (there seems to 
be no other way yet). I issued a JIRA regarding that: 
https://issues.apache.org/jira/browse/BEAM-3267
   
   @chamikaramj could you take a look?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add performance tests for commonly used file-based I/O PTransforms
> --
>
> Key: BEAM-3060
> URL: https://issues.apache.org/jira/browse/BEAM-3060
> Project: Beam
>  Issue Type: Test
>  Components: sdk-java-core
>Reporter: Chamikara Jayalath
>Assignee: Szymon Nieradka
>
> We recently added a performance testing framework [1] that can be used to do 
> following.
> (1) Execute Beam tests using PerfkitBenchmarker
> (2) Manage Kubernetes-based deployments of data stores.
> (3) Easily publish benchmark results. 
> I think it will be useful to add performance tests for commonly used 
> file-based I/O PTransforms using this framework. I suggest looking into 
> following formats initially.
> (1) AvroIO
> (2) TextIO
> (3) Compressed text using TextIO
> (4) TFRecordIO
> It should be possibly to run these tests for various Beam runners (Direct, 
> Dataflow, Flink, Spark, etc.) and file-systems (GCS, local, HDFS, etc.) 
> easily.
> In the initial version, tests can be made manually triggerable for PRs 
> through Jenkins. Later, we could make some of these tests run periodically 
> and publish benchmark results (to BigQuery) through PerfkitBenchmarker.
> [1] https://beam.apache.org/documentation/io/testing/



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2345) Version configuration of plugins / dependencies in root pom.xml is inconsistent

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269702#comment-16269702
 ] 

ASF GitHub Bot commented on BEAM-2345:
--

aaltay commented on issue #3205: [BEAM-2345] Make versioning in root pom 
consistent.
URL: https://github.com/apache/beam/pull/3205#issuecomment-347699072
 
 
   What are the next steps for this PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Version configuration of plugins / dependencies in root pom.xml is 
> inconsistent
> ---
>
> Key: BEAM-2345
> URL: https://issues.apache.org/jira/browse/BEAM-2345
> Project: Beam
>  Issue Type: Bug
>  Components: build-system
>Reporter: Jason Kuster
>Assignee: Jason Kuster
>Priority: Minor
> Fix For: 2.3.0
>
>
> Versioning in root pom.xml in some places is controlled by the properties 
> section, sometimes is just inline. Move all versioning of plugins / 
> dependencies to properties section.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2852) Add support for Kafka as source/sink on Nexmark

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269703#comment-16269703
 ] 

ASF GitHub Bot commented on BEAM-2852:
--

aaltay commented on issue #3937: [BEAM-2852] Nexmark Kafka source sink
URL: https://github.com/apache/beam/pull/3937#issuecomment-347699198
 
 
   @vectorijk any updates?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for Kafka as source/sink on Nexmark
> ---
>
> Key: BEAM-2852
> URL: https://issues.apache.org/jira/browse/BEAM-2852
> Project: Beam
>  Issue Type: Improvement
>  Components: testing
>Reporter: Ismaël Mejía
>Assignee: Kai Jiang
>Priority: Minor
>  Labels: newbie, nexmark, starter
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-3142) Fix proto generation in Python 3

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269698#comment-16269698
 ] 

ASF GitHub Bot commented on BEAM-3142:
--

aaltay commented on issue #4077: [BEAM-3142][WIP] Apply futurize on gen_protos
URL: https://github.com/apache/beam/pull/4077#issuecomment-347698925
 
 
   Hey @holdenk, sorry I missed the question earlier. Do you still have the 
issue?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix proto generation in Python 3
> 
>
> Key: BEAM-3142
> URL: https://issues.apache.org/jira/browse/BEAM-3142
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: holdenk
>Assignee: Ahmet Altay
>
> The generated Python code uses relative imports, fix this to be usable in 
> Python 3.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-3164) Capture stderr logs during gen proto

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269695#comment-16269695
 ] 

ASF GitHub Bot commented on BEAM-3164:
--

aaltay commented on issue #4126: [BEAM-3164] Flush stderr on failures
URL: https://github.com/apache/beam/pull/4126#issuecomment-347698695
 
 
   R: @robertwb 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Capture stderr logs during gen proto
> 
>
> Key: BEAM-3164
> URL: https://issues.apache.org/jira/browse/BEAM-3164
> Project: Beam
>  Issue Type: Bug
>  Components: build-system, sdk-py-core
>Reporter: holdenk
>Assignee: Holden Karau
>
> Currently python PRs are failing with gen-proto failures, but these are 
> difficult to debug because we don't capture the information (see 
> https://builds.apache.org/job/beam_PreCommit_Python_MavenInstall/727/console 
> ).
> cc [~altay] [~robertwb]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-3182) [Nexmark][SQL] Implement supported queries

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269694#comment-16269694
 ] 

ASF GitHub Bot commented on BEAM-3182:
--

aaltay commented on issue #4129: [BEAM-3182][Nexmark][SQL] Implement query 1
URL: https://github.com/apache/beam/pull/4129#issuecomment-347698445
 
 
   What is the next action for this PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> [Nexmark][SQL] Implement supported queries
> --
>
> Key: BEAM-3182
> URL: https://issues.apache.org/jira/browse/BEAM-3182
> Project: Beam
>  Issue Type: Sub-task
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Anton Kedin
>
> Implement all queries which can be run with current SQL features and Nexmark 
> infrastructure.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269680#comment-16269680
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

udim commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r153653663
 
 

 ##
 File path: sdks/python/apache_beam/testing/util.py
 ##
 @@ -46,6 +56,26 @@ class BeamAssertException(Exception):
   pass
 
 
+# Used for reifying timestamps and windows for assert_that matchers.
 
 Review comment:
   
   
   > **robertwb** wrote:
   > Ideally one should be able to use any callable, e.g. hamcrest matchers, 
rather than have to implement a windowed variant of each. I misspoke about 
having a windowed_equals_to, we should have a `assert_that_windowed(pcoll, 
equal_to([WindowedValue(...), ...]))`, or `assert_that(pcoll, 
equal_to([WindowedValue(...), ...], reify_windows=True)`.
   
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core, sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269683#comment-16269683
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

udim commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r153653670
 
 

 ##
 File path: sdks/python/apache_beam/transforms/util.py
 ##
 @@ -423,3 +431,115 @@ def expand(self, pcoll):
   self._batch_size_estimator))
 else:
   return pcoll | 
ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator))
+
+
+class IdentityWindowFn(NonMergingWindowFn):
+  """Windowing function that preserves existing windows.
+
+  To be used internally with the Reshuffle transform.
+  Will raise an exception when used after DoFns that return TimestampedValue
+  elements.
+  """
+
+  def __init__(self, coder):
+"""Create a new WindowFn with compatible coder.
+To be applied to PCollections with windows that are compatible with the
+given coder.
+
+Arguments:
+  coder: coders.Coder object to be used on windows.
+"""
+super(IdentityWindowFn, self).__init__()
+if coder is None:
+  raise ValueError('coder should not be None')
+self._coder = coder
+
+  def assign(self, assign_context):
+if assign_context.window is None:
+  raise ValueError(
+  'assign_context.window should not be None. '
+  'This might be due to a DoFn returning a TimestampedValue.')
+return [assign_context.window]
+
+  def get_window_coder(self):
+return self._coder
+
+class TriggerForEveryElement(TriggerFn):
+
+  def __repr__(self):
+return 'TriggerForEveryElement'
+
+  def __eq__(self, other):
+return type(self) == type(other)
+
+  def on_element(self, element, window, context):
+pass
+
+  def on_merge(self, to_be_merged, merge_result, context):
+# doesn't merge
+pass
+
+  def should_fire(self, watermark, window, context):
+return True
+
+  def on_fire(self, watermark, window, context):
+return True
+
+  def reset(self, window, context):
+pass
+
+  @staticmethod
+  def from_runner_api(unused_proto, unused_context):
+return TriggerForEveryElement()
+
+  def to_runner_api(self, unused_context):
+# TODO: add TriggerForEveryElement to proto
+return beam_runner_api_pb2.Trigger(
+element_count=beam_runner_api_pb2.Trigger.ElementCount(
+element_count=0))
+
+
+# TODO(ehudm): compare with Java implementation for more edge cases.
+# TODO: are these typehints necessary?
+@typehints.with_input_types(typehints.KV[K, V])
+@typehints.with_output_types(typehints.KV[K, V])
+class Reshuffle(PTransform):
 
 Review comment:
   
   
   > **robertwb** wrote:
   > As mentioned, rename this ReshufflePerKey, and add a Reshuffle that 
appends then strips a random key (e.g. random.getrandbits(32))
   
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core, sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269687#comment-16269687
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

udim commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r153653668
 
 

 ##
 File path: sdks/python/apache_beam/transforms/util.py
 ##
 @@ -423,3 +431,115 @@ def expand(self, pcoll):
   self._batch_size_estimator))
 else:
   return pcoll | 
ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator))
+
+
+class IdentityWindowFn(NonMergingWindowFn):
+  """Windowing function that preserves existing windows.
+
+  To be used internally with the Reshuffle transform.
+  Will raise an exception when used after DoFns that return TimestampedValue
+  elements.
+  """
+
+  def __init__(self, coder):
+"""Create a new WindowFn with compatible coder.
+To be applied to PCollections with windows that are compatible with the
+given coder.
+
+Arguments:
+  coder: coders.Coder object to be used on windows.
+"""
+super(IdentityWindowFn, self).__init__()
+if coder is None:
+  raise ValueError('coder should not be None')
+self._coder = coder
+
+  def assign(self, assign_context):
+if assign_context.window is None:
+  raise ValueError(
+  'assign_context.window should not be None. '
+  'This might be due to a DoFn returning a TimestampedValue.')
+return [assign_context.window]
+
+  def get_window_coder(self):
+return self._coder
+
+class TriggerForEveryElement(TriggerFn):
 
 Review comment:
   
   
   > **robertwb** wrote:
   > Just use AfterCount(1).
   
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core, sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269679#comment-16269679
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

udim commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r153653661
 
 

 ##
 File path: sdks/python/apache_beam/transforms/util.py
 ##
 @@ -423,3 +431,115 @@ def expand(self, pcoll):
   self._batch_size_estimator))
 else:
   return pcoll | 
ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator))
+
+
+class IdentityWindowFn(NonMergingWindowFn):
+  """Windowing function that preserves existing windows.
+
+  To be used internally with the Reshuffle transform.
+  Will raise an exception when used after DoFns that return TimestampedValue
+  elements.
+  """
+
+  def __init__(self, coder):
+"""Create a new WindowFn with compatible coder.
+To be applied to PCollections with windows that are compatible with the
+given coder.
+
+Arguments:
+  coder: coders.Coder object to be used on windows.
+"""
+super(IdentityWindowFn, self).__init__()
+if coder is None:
+  raise ValueError('coder should not be None')
+self._coder = coder
+
+  def assign(self, assign_context):
+if assign_context.window is None:
+  raise ValueError(
+  'assign_context.window should not be None. '
+  'This might be due to a DoFn returning a TimestampedValue.')
+return [assign_context.window]
+
+  def get_window_coder(self):
+return self._coder
+
+class TriggerForEveryElement(TriggerFn):
+
+  def __repr__(self):
+return 'TriggerForEveryElement'
+
+  def __eq__(self, other):
+return type(self) == type(other)
+
+  def on_element(self, element, window, context):
+pass
+
+  def on_merge(self, to_be_merged, merge_result, context):
+# doesn't merge
+pass
+
+  def should_fire(self, watermark, window, context):
+return True
+
+  def on_fire(self, watermark, window, context):
+return True
+
+  def reset(self, window, context):
+pass
+
+  @staticmethod
+  def from_runner_api(unused_proto, unused_context):
+return TriggerForEveryElement()
+
+  def to_runner_api(self, unused_context):
+# TODO: add TriggerForEveryElement to proto
+return beam_runner_api_pb2.Trigger(
+element_count=beam_runner_api_pb2.Trigger.ElementCount(
+element_count=0))
+
+
+# TODO(ehudm): compare with Java implementation for more edge cases.
+# TODO: are these typehints necessary?
+@typehints.with_input_types(typehints.KV[K, V])
+@typehints.with_output_types(typehints.KV[K, V])
+class Reshuffle(PTransform):
+  """TODO description
+
+  Reshuffle is experimental. No backwards compatibility guarantees.
+  """
+
+  def expand(self, pcoll):
+class ExpandIterableDoFn(DoFn):
 
 Review comment:
   
   
   > **robertwb** wrote:
   > A DoFn with nothing but a process method can be more simply implemented 
via beam.Map or beam.FlatMap.
   
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core, sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-3174) Master python sdk seems broken with test_harness_override_present_in_dataflow_distributions on Py 2.7.6

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269678#comment-16269678
 ] 

ASF GitHub Bot commented on BEAM-3174:
--

aaltay commented on issue #4125: [BEAM-3174] If parse_version just gives us a 
regular tuple use that for formatting
URL: https://github.com/apache/beam/pull/4125#issuecomment-347698007
 
 
   Is this ready for review?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Master python sdk seems broken with 
> test_harness_override_present_in_dataflow_distributions on Py 2.7.6
> ---
>
> Key: BEAM-3174
> URL: https://issues.apache.org/jira/browse/BEAM-3174
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: holdenk
>Assignee: Ahmet Altay
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269685#comment-16269685
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

udim commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r153653669
 
 

 ##
 File path: sdks/python/apache_beam/transforms/util.py
 ##
 @@ -423,3 +431,115 @@ def expand(self, pcoll):
   self._batch_size_estimator))
 else:
   return pcoll | 
ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator))
+
+
+class IdentityWindowFn(NonMergingWindowFn):
+  """Windowing function that preserves existing windows.
+
+  To be used internally with the Reshuffle transform.
+  Will raise an exception when used after DoFns that return TimestampedValue
+  elements.
+  """
+
+  def __init__(self, coder):
+"""Create a new WindowFn with compatible coder.
+To be applied to PCollections with windows that are compatible with the
+given coder.
+
+Arguments:
+  coder: coders.Coder object to be used on windows.
+"""
+super(IdentityWindowFn, self).__init__()
+if coder is None:
+  raise ValueError('coder should not be None')
+self._coder = coder
+
+  def assign(self, assign_context):
+if assign_context.window is None:
+  raise ValueError(
+  'assign_context.window should not be None. '
+  'This might be due to a DoFn returning a TimestampedValue.')
+return [assign_context.window]
+
+  def get_window_coder(self):
+return self._coder
+
+class TriggerForEveryElement(TriggerFn):
+
+  def __repr__(self):
+return 'TriggerForEveryElement'
+
+  def __eq__(self, other):
+return type(self) == type(other)
+
+  def on_element(self, element, window, context):
+pass
+
+  def on_merge(self, to_be_merged, merge_result, context):
+# doesn't merge
+pass
+
+  def should_fire(self, watermark, window, context):
+return True
+
+  def on_fire(self, watermark, window, context):
+return True
+
+  def reset(self, window, context):
+pass
+
+  @staticmethod
+  def from_runner_api(unused_proto, unused_context):
+return TriggerForEveryElement()
+
+  def to_runner_api(self, unused_context):
+# TODO: add TriggerForEveryElement to proto
+return beam_runner_api_pb2.Trigger(
+element_count=beam_runner_api_pb2.Trigger.ElementCount(
+element_count=0))
+
+
+# TODO(ehudm): compare with Java implementation for more edge cases.
+# TODO: are these typehints necessary?
+@typehints.with_input_types(typehints.KV[K, V])
+@typehints.with_output_types(typehints.KV[K, V])
+class Reshuffle(PTransform):
+  """TODO description
+
+  Reshuffle is experimental. No backwards compatibility guarantees.
+  """
+
+  def expand(self, pcoll):
+class ExpandIterableDoFn(DoFn):
+  def process(self, element):
+return [(element[0], value) for value in element[1]]
+
+class ReifyTimestampsIn(DoFn):
+  def process(self, element, timestamp=DoFn.TimestampParam):
+if (isinstance(timestamp, type(DoFn.TimestampParam)) and
+timestamp == DoFn.TimestampParam):
+  raise ValueError('timestamp was unset for element: %r' % element)
+yield element[0], TimestampedValue(element[1], timestamp)
+
+class ReifyTimestampsExtract(DoFn):
+  def process(self, element, window=DoFn.WindowParam):
+yield windowed_value.WindowedValue(
+(element[0], element[1].value), element[1].timestamp, [window])
+
+# TODO: is it safe to reapply this value?
+windowing_saved = pcoll.windowing
+# TODO: add .with_input_types, .with_output_types to PTransforms below?
 
 Review comment:
   
   
   > **robertwb** wrote:
   > It should be able to infer.
   
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core, sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269689#comment-16269689
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

udim commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r153653665
 
 

 ##
 File path: sdks/python/apache_beam/pvalue.py
 ##
 @@ -124,6 +124,11 @@ def windowing(self):
   self.producer.inputs)
 return self._windowing
 
+  # TODO(ehudm): Make this internal.
+  @windowing.setter
 
 Review comment:
   
   
   > **robertwb** wrote:
   > As discussed, remove this setter.
   
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core, sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269682#comment-16269682
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

udim commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r153653664
 
 

 ##
 File path: sdks/python/apache_beam/transforms/util.py
 ##
 @@ -423,3 +431,115 @@ def expand(self, pcoll):
   self._batch_size_estimator))
 else:
   return pcoll | 
ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator))
+
+
+class IdentityWindowFn(NonMergingWindowFn):
+  """Windowing function that preserves existing windows.
+
+  To be used internally with the Reshuffle transform.
+  Will raise an exception when used after DoFns that return TimestampedValue
+  elements.
+  """
+
+  def __init__(self, coder):
+"""Create a new WindowFn with compatible coder.
+To be applied to PCollections with windows that are compatible with the
+given coder.
+
+Arguments:
+  coder: coders.Coder object to be used on windows.
+"""
+super(IdentityWindowFn, self).__init__()
+if coder is None:
+  raise ValueError('coder should not be None')
+self._coder = coder
 
 Review comment:
   
   
   > **robertwb** wrote:
   > Rename this window_coder?
   
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core, sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269684#comment-16269684
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

udim commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r153653671
 
 

 ##
 File path: sdks/python/apache_beam/transforms/util.py
 ##
 @@ -423,3 +431,115 @@ def expand(self, pcoll):
   self._batch_size_estimator))
 else:
   return pcoll | 
ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator))
+
+
+class IdentityWindowFn(NonMergingWindowFn):
+  """Windowing function that preserves existing windows.
+
+  To be used internally with the Reshuffle transform.
+  Will raise an exception when used after DoFns that return TimestampedValue
+  elements.
+  """
+
+  def __init__(self, coder):
+"""Create a new WindowFn with compatible coder.
+To be applied to PCollections with windows that are compatible with the
+given coder.
+
+Arguments:
+  coder: coders.Coder object to be used on windows.
+"""
+super(IdentityWindowFn, self).__init__()
+if coder is None:
+  raise ValueError('coder should not be None')
+self._coder = coder
+
+  def assign(self, assign_context):
+if assign_context.window is None:
+  raise ValueError(
+  'assign_context.window should not be None. '
+  'This might be due to a DoFn returning a TimestampedValue.')
+return [assign_context.window]
+
+  def get_window_coder(self):
+return self._coder
+
+class TriggerForEveryElement(TriggerFn):
+
+  def __repr__(self):
+return 'TriggerForEveryElement'
+
+  def __eq__(self, other):
+return type(self) == type(other)
+
+  def on_element(self, element, window, context):
+pass
+
+  def on_merge(self, to_be_merged, merge_result, context):
+# doesn't merge
+pass
+
+  def should_fire(self, watermark, window, context):
+return True
+
+  def on_fire(self, watermark, window, context):
+return True
+
+  def reset(self, window, context):
+pass
+
+  @staticmethod
+  def from_runner_api(unused_proto, unused_context):
+return TriggerForEveryElement()
+
+  def to_runner_api(self, unused_context):
+# TODO: add TriggerForEveryElement to proto
+return beam_runner_api_pb2.Trigger(
+element_count=beam_runner_api_pb2.Trigger.ElementCount(
+element_count=0))
+
+
+# TODO(ehudm): compare with Java implementation for more edge cases.
+# TODO: are these typehints necessary?
+@typehints.with_input_types(typehints.KV[K, V])
+@typehints.with_output_types(typehints.KV[K, V])
+class Reshuffle(PTransform):
+  """TODO description
+
+  Reshuffle is experimental. No backwards compatibility guarantees.
+  """
+
+  def expand(self, pcoll):
+class ExpandIterableDoFn(DoFn):
+  def process(self, element):
+return [(element[0], value) for value in element[1]]
+
+class ReifyTimestampsIn(DoFn):
+  def process(self, element, timestamp=DoFn.TimestampParam):
+if (isinstance(timestamp, type(DoFn.TimestampParam)) and
+timestamp == DoFn.TimestampParam):
+  raise ValueError('timestamp was unset for element: %r' % element)
+yield element[0], TimestampedValue(element[1], timestamp)
+
+class ReifyTimestampsExtract(DoFn):
+  def process(self, element, window=DoFn.WindowParam):
+yield windowed_value.WindowedValue(
+(element[0], element[1].value), element[1].timestamp, [window])
+
+# TODO: is it safe to reapply this value?
+windowing_saved = pcoll.windowing
+# TODO: add .with_input_types, .with_output_types to PTransforms below?
+pcoll_intermediate = (pcoll
+| 'ReifyTimestampsIn' >> ParDo(ReifyTimestampsIn())
+| 'IdentityWindow' >> WindowInto(
+IdentityWindowFn(windowing_saved.windowfn.get_window_coder()),
+trigger=TriggerForEveryElement(),
+accumulation_mode=AccumulationMode.DISCARDING,
+# TODO: timestamp_combiner=
+)
+| 'GroupByKey' >> GroupByKey()
+| 'ExpandIterable' >> ParDo(ExpandIterableDoFn()))
+pcoll_intermediate.windowing = windowing_saved
 
 Review comment:
   
   
   > **robertwb** wrote:
   > Nit: I'd probably apply assign windowing as the very last thing rather 
than on this intermediate.
   
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> -
>

[jira] [Commented] (BEAM-3076) support TIMESTAMP in BeamRecordType

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269676#comment-16269676
 ] 

ASF GitHub Bot commented on BEAM-3076:
--

aaltay commented on issue #4036: [BEAM-3076] support TIMESTAMP in BeamRecordType
URL: https://github.com/apache/beam/pull/4036#issuecomment-347697876
 
 
   Hey @XuMingmin, any updates on this PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> support TIMESTAMP in BeamRecordType
> ---
>
> Key: BEAM-3076
> URL: https://issues.apache.org/jira/browse/BEAM-3076
> Project: Beam
>  Issue Type: Improvement
>  Components: dsl-sql
>Reporter: Shayang Zang
>Assignee: Shayang Zang
>Priority: Minor
>
> Timestamp type of data was mapped to Data.class and also sharing DateCoder() 
> during BeamSql execution. We want it to be supported in BeamRecordType as a 
> stand-alone datatype.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   3   >