Build failed in Jenkins: beam_PerformanceTests_Python #644

2017-12-05 Thread Apache Jenkins Server
See 


Changes:

[wcn] Revert "Introduce a property to influence output location."

[wcn] Introduce a property to influence output location.

[kirpichov] enforce fixed sharding

[kirpichov] Merges Writer.openWindowed/Unwindowed and removes result of close()

[kirpichov] non-null window/pane in FileResult

[kirpichov] remove ShardAssignment

[kirpichov] consolidates windowed/unwindowed finalize fns somewhat

[kirpichov] Unifies windowed and unwindowed finalize.

[kirpichov] Refactors WriteFiles into sub-transforms

[kirpichov] Converts WriteFiles to AutoValue

[kirpichov] Makes checkstyle and findbugs happy

[kirpichov] Renames spilled back to unwritten

[kirpichov] Fixes tests

[kirpichov] Reintroduces dynamic sharding with windowed writes for bounded

[kirpichov] Adds a deduplication key to Watch, and uses it to handle growing 
files

--
Started by timer
[EnvInject] - Loading node environment variables.
Building remotely on beam6 (beam) in workspace 

 > git rev-parse --is-inside-work-tree # timeout=10
Fetching changes from the remote Git repository
 > git config remote.origin.url https://github.com/apache/beam.git # timeout=10
Fetching upstream changes from https://github.com/apache/beam.git
 > git --version # timeout=10
 > git fetch --tags --progress https://github.com/apache/beam.git 
 > +refs/heads/*:refs/remotes/origin/* 
 > +refs/pull/${ghprbPullId}/*:refs/remotes/origin/pr/${ghprbPullId}/*
 > git rev-parse origin/master^{commit} # timeout=10
Checking out Revision df326cbd2b9eb3f48ba16458da55061af7998d1a (origin/master)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f df326cbd2b9eb3f48ba16458da55061af7998d1a
Commit message: "This closes #4190: [BEAM-3030] Adds a deduplication key to 
Watch, and uses it to handle growing files in FileIO.match"
 > git rev-list fd8ad273061717d8d0140c7e905f9478ed0a9d6c # timeout=10
Cleaning workspace
 > git rev-parse --verify HEAD # timeout=10
Resetting working tree
 > git reset --hard # timeout=10
 > git clean -fdx # timeout=10
[EnvInject] - Executing scripts and injecting environment variables after the 
SCM step.
[EnvInject] - Injecting as environment variables the properties content 
SPARK_LOCAL_IP=127.0.0.1

[EnvInject] - Variables injected successfully.
[beam_PerformanceTests_Python] $ /bin/bash -xe 
/tmp/jenkins6636818159495431500.sh
+ rm -rf PerfKitBenchmarker
[beam_PerformanceTests_Python] $ /bin/bash -xe 
/tmp/jenkins8787497610165193776.sh
+ git clone https://github.com/GoogleCloudPlatform/PerfKitBenchmarker.git
Cloning into 'PerfKitBenchmarker'...
[beam_PerformanceTests_Python] $ /bin/bash -xe 
/tmp/jenkins1269355828541315744.sh
+ pip install --user -r PerfKitBenchmarker/requirements.txt
Requirement already satisfied: absl-py in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 14))
Requirement already satisfied: jinja2>=2.7 in 
/usr/local/lib/python2.7/dist-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 15))
Requirement already satisfied: setuptools in /usr/lib/python2.7/dist-packages 
(from -r PerfKitBenchmarker/requirements.txt (line 16))
Requirement already satisfied: colorlog[windows]==2.6.0 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 17))
Requirement already satisfied: blinker>=1.3 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 18))
Requirement already satisfied: futures>=3.0.3 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 19))
Requirement already satisfied: PyYAML==3.12 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 20))
Requirement already satisfied: pint>=0.7 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 21))
Requirement already satisfied: numpy in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 22))
Requirement already satisfied: functools32 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 23))
Requirement already satisfied: contextlib2>=0.5.1 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 24))
Requirement already satisfied: pywinrm in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 25))
Requirement already satisfied: six in 
/home/jenkins/.local/lib/python2.7/site-packages (from absl-py->-r 
PerfKitBenchmarker/requirements.txt (line 14))
Requirement already satisfied: MarkupSafe>=0.23 in 
/usr/local/lib/python2.7/dist-packages (from jinja2>=2.7->-r 

Build failed in Jenkins: beam_PerformanceTests_Spark #1088

2017-12-05 Thread Apache Jenkins Server
See 


Changes:

[wcn] Revert "Introduce a property to influence output location."

[wcn] Introduce a property to influence output location.

[kirpichov] enforce fixed sharding

[kirpichov] Merges Writer.openWindowed/Unwindowed and removes result of close()

[kirpichov] non-null window/pane in FileResult

[kirpichov] remove ShardAssignment

[kirpichov] consolidates windowed/unwindowed finalize fns somewhat

[kirpichov] Unifies windowed and unwindowed finalize.

[kirpichov] Refactors WriteFiles into sub-transforms

[kirpichov] Converts WriteFiles to AutoValue

[kirpichov] Makes checkstyle and findbugs happy

[kirpichov] Renames spilled back to unwritten

[kirpichov] Fixes tests

[kirpichov] Reintroduces dynamic sharding with windowed writes for bounded

[kirpichov] Adds a deduplication key to Watch, and uses it to handle growing 
files

--
Started by timer
[EnvInject] - Loading node environment variables.
Building remotely on beam2 (beam) in workspace 

 > git rev-parse --is-inside-work-tree # timeout=10
Fetching changes from the remote Git repository
 > git config remote.origin.url https://github.com/apache/beam.git # timeout=10
Fetching upstream changes from https://github.com/apache/beam.git
 > git --version # timeout=10
 > git fetch --tags --progress https://github.com/apache/beam.git 
 > +refs/heads/*:refs/remotes/origin/* 
 > +refs/pull/${ghprbPullId}/*:refs/remotes/origin/pr/${ghprbPullId}/*
 > git rev-parse origin/master^{commit} # timeout=10
Checking out Revision df326cbd2b9eb3f48ba16458da55061af7998d1a (origin/master)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f df326cbd2b9eb3f48ba16458da55061af7998d1a
Commit message: "This closes #4190: [BEAM-3030] Adds a deduplication key to 
Watch, and uses it to handle growing files in FileIO.match"
 > git rev-list fd8ad273061717d8d0140c7e905f9478ed0a9d6c # timeout=10
Cleaning workspace
 > git rev-parse --verify HEAD # timeout=10
Resetting working tree
 > git reset --hard # timeout=10
 > git clean -fdx # timeout=10
[EnvInject] - Executing scripts and injecting environment variables after the 
SCM step.
[EnvInject] - Injecting as environment variables the properties content 
SPARK_LOCAL_IP=127.0.0.1

[EnvInject] - Variables injected successfully.
[beam_PerformanceTests_Spark] $ /bin/bash -xe /tmp/jenkins2557893741131265302.sh
+ rm -rf PerfKitBenchmarker
[beam_PerformanceTests_Spark] $ /bin/bash -xe /tmp/jenkins6026153501747707360.sh
+ git clone https://github.com/GoogleCloudPlatform/PerfKitBenchmarker.git
Cloning into 'PerfKitBenchmarker'...
[beam_PerformanceTests_Spark] $ /bin/bash -xe /tmp/jenkins1110195438892032166.sh
+ pip install --user -r PerfKitBenchmarker/requirements.txt
Requirement already satisfied: absl-py in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 14))
Requirement already satisfied: jinja2>=2.7 in 
/usr/local/lib/python2.7/dist-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 15))
Requirement already satisfied: setuptools in /usr/lib/python2.7/dist-packages 
(from -r PerfKitBenchmarker/requirements.txt (line 16))
Requirement already satisfied: colorlog[windows]==2.6.0 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 17))
Requirement already satisfied: blinker>=1.3 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 18))
Requirement already satisfied: futures>=3.0.3 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 19))
Requirement already satisfied: PyYAML==3.12 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 20))
Requirement already satisfied: pint>=0.7 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 21))
Requirement already satisfied: numpy in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 22))
Requirement already satisfied: functools32 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 23))
Requirement already satisfied: contextlib2>=0.5.1 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 24))
Requirement already satisfied: pywinrm in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 25))
Requirement already satisfied: six in 
/home/jenkins/.local/lib/python2.7/site-packages (from absl-py->-r 
PerfKitBenchmarker/requirements.txt (line 14))
Requirement already satisfied: MarkupSafe in 
/usr/local/lib/python2.7/dist-packages (from jinja2>=2.7->-r 

[jira] [Commented] (BEAM-3040) Python precommit timed out after 150 minutes

2017-12-05 Thread Valentyn Tymofieiev (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16279681#comment-16279681
 ] 

Valentyn Tymofieiev commented on BEAM-3040:
---

cc: [~robertwb]
Another recent error:
test_errors 
(apache_beam.runners.portability.universal_local_runner_test.UniversalLocalRunnerTest)
 ... Traceback (most recent call last):
  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_MavenInstall/src/sdks/python/apache_beam/runners/portability/universal_local_runner.py",
 line 243, in run
).run_via_runner_api(self._pipeline_proto)
  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_MavenInstall/src/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
 line 204, in run_via_runner_api
return self.run_stages(*self.create_stages(pipeline_proto))
  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_MavenInstall/src/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
 line 670, in run_stages
pcoll_buffers, safe_coders).process_bundle.metrics
  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_MavenInstall/src/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
 line 763, in run_stage
controller.control_handler.push(process_bundle)
  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_MavenInstall/src/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
 line 871, in push
response = self.worker.do_instruction(request)
  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_MavenInstall/src/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 126, in do_instruction
getattr(request, request_type), request.instruction_id)
  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_MavenInstall/src/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 144, in process_bundle
processor.process_bundle(instruction_id)
  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_MavenInstall/src/sdks/python/apache_beam/runners/worker/bundle_processor.py",
 line 293, in process_bundle
op.start()
  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_MavenInstall/src/sdks/python/apache_beam/runners/worker/operations.py",
 line 233, in start
self.output(windowed_value)
  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_MavenInstall/src/sdks/python/apache_beam/runners/worker/operations.py",
 line 156, in output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_MavenInstall/src/sdks/python/apache_beam/runners/worker/operations.py",
 line 87, in receive
cython.cast(Operation, consumer).process(windowed_value)
  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_MavenInstall/src/sdks/python/apache_beam/runners/worker/operations.py",
 line 371, in process
self.dofn_receiver.receive(o)
  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_MavenInstall/src/sdks/python/apache_beam/runners/common.py",
 line 382, in receive
self.process(windowed_value)
  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_MavenInstall/src/sdks/python/apache_beam/runners/common.py",
 line 390, in process
self._reraise_augmented(exn)
  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_MavenInstall/src/sdks/python/apache_beam/runners/common.py",
 line 388, in process
self.do_fn_invoker.invoke_process(windowed_value)
  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_MavenInstall/src/sdks/python/apache_beam/runners/common.py",
 line 190, in invoke_process
windowed_value, self.process_method(windowed_value.value))
  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_MavenInstall/src/sdks/python/apache_beam/runners/common.py",
 line 480, in process_outputs
self.main_receivers.receive(windowed_value)
  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_MavenInstall/src/sdks/python/apache_beam/runners/worker/operations.py",
 line 87, in receive
cython.cast(Operation, consumer).process(windowed_value)
  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_MavenInstall/src/sdks/python/apache_beam/runners/worker/operations.py",
 line 371, in process
self.dofn_receiver.receive(o)
  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_MavenInstall/src/sdks/python/apache_beam/runners/common.py",
 line 382, in receive
self.process(windowed_value)
  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_MavenInstall/src/sdks/python/apache_beam/runners/common.py",
 line 390, in process
self._reraise_augmented(exn)
  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_MavenInstall/src/sdks/python/apache_beam/runners/common.py",
 line 388, in 

[jira] [Commented] (BEAM-2750) Read whole files as one PCollection element each

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16279615#comment-16279615
 ] 

ASF GitHub Bot commented on BEAM-2750:
--

jkff closed pull request #3717: [BEAM-2750][BEAM-2751] Implement WholeFileIO
URL: https://github.com/apache/beam/pull/3717
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java
new file mode 100644
index 000..8560e052fe7
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY;
+import static 
org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_FILE;
+import static org.apache.beam.sdk.util.MimeTypes.BINARY;
+
+import com.google.auto.value.AutoValue;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MoveOptions;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.StreamUtils;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link PTransform}s for reading and writing files as {@link KV} pairs of 
filename {@link String}s
+ * and byte arrays.
+ *
+ * To read a {@link PCollection} of one or more files as {@link KV}s, use
+ * {@code WholeFileIO.read()} to instantiate a transform and use
+ * {@link WholeFileIO.Read#from(String)} to specify the path of the file(s) to 
be read.
+ * Alternatively, if the filenames to be read are themselves in a {@link
+ * PCollection}, apply {@link WholeFileIO#readAll()}.
+ *
+ * Method {@link #read} returns a {@link PCollection} of {@code  KV}s,
+ * each corresponding to one file's filename and contents.
+ *
+ * The filepatterns are expanded only once.
+ *
+ * Example 1: reading a file or filepattern (or file glob).
+ *
+ * {@code
+ * Pipeline p = ...;
+ *
+ * // A Read of a local file (only runs locally):
+ * PCollection> oneFile = p.apply(
+ *  
WholeFileIO.read().from("/local/path/to/file.txt"));
+ *
+ * // A Read of local files in a directory (only runs locally):
+ * PCollection> manyFiles = p.apply(
+ *   
WholeFileIO.read().from("/local/path/to/files/*"));
+ *
+ * // A Read of local files in nested directories (only runs locally):
+ * PCollection> manyFiles = p.apply(
+ *
WholeFileIO.read().from("/local/path/to/nested/dirs/**"));
+ * // ^ The KV's String corresponding to filename retains only the last term 
of the file path
+ * //   (i.e. it retains the filename and ignores intermediate 

[jira] [Commented] (BEAM-607) Add DistributedLog IO

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16279614#comment-16279614
 ] 

ASF GitHub Bot commented on BEAM-607:
-

jkff closed pull request #1464: BEAM-607: Add a bounded source for Apache 
DistributedLog (incubating)
URL: https://github.com/apache/beam/pull/1464
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/io/distributedlog/README.md 
b/sdks/java/io/distributedlog/README.md
new file mode 100644
index 000..c3b025821ca
--- /dev/null
+++ b/sdks/java/io/distributedlog/README.md
@@ -0,0 +1,24 @@
+
+
+# DistributedLog IO
+
+This library provides sources and sinks to make it possible to read and
+write Apache DistributedLog (incubating) streams in bounded and unbounded way
+from Apache Beam pipelines.
diff --git a/sdks/java/io/distributedlog/pom.xml 
b/sdks/java/io/distributedlog/pom.xml
new file mode 100644
index 000..59bedce9e9c
--- /dev/null
+++ b/sdks/java/io/distributedlog/pom.xml
@@ -0,0 +1,115 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+  4.0.0
+
+  
+org.apache.beam
+beam-sdks-java-io-parent
+0.4.0-incubating-SNAPSHOT
+../pom.xml
+  
+
+  beam-sdks-java-io-distributedlog
+  Apache Beam :: SDKs :: Java :: IO :: DistributedLog
+  Library to read DistributedLog streams.
+
+  
+
+  
+
+  org.codehaus.mojo
+  findbugs-maven-plugin
+  
+true
+  
+
+  
+
+
+
+  
+org.apache.maven.plugins
+maven-compiler-plugin
+  
+  
+org.apache.maven.plugins
+maven-surefire-plugin
+
+  
+false
+  
+
+  
+  
+org.apache.maven.plugins
+maven-jar-plugin
+  
+
+  
+
+  
+
+  org.apache.beam
+  beam-sdks-java-core
+
+
+
+  com.twitter
+  distributedlog-core_2.11
+  0.3.51-RC1
+
+
+
+  com.google.guava
+  guava
+
+
+
+  com.google.code.findbugs
+  jsr305
+
+
+
+
+  org.apache.beam
+  beam-runners-direct-java
+  ${project.version}
+  test
+
+
+
+  org.hamcrest
+  hamcrest-all
+  test
+
+
+
+  junit
+  junit
+  test
+
+
+
+  org.mockito
+  mockito-all
+  test
+
+
+  
+
diff --git 
a/sdks/java/io/distributedlog/src/main/java/org/apache/beam/sdk/io/distributedlog/DLBoundedSource.java
 
b/sdks/java/io/distributedlog/src/main/java/org/apache/beam/sdk/io/distributedlog/DLBoundedSource.java
new file mode 100644
index 000..1a3cff559b3
--- /dev/null
+++ 
b/sdks/java/io/distributedlog/src/main/java/org/apache/beam/sdk/io/distributedlog/DLBoundedSource.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.distributedlog;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.DistributedLogConstants;
+import com.twitter.distributedlog.DistributedLogManager;
+import com.twitter.distributedlog.LogSegmentMetadata;
+import com.twitter.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import 

[jira] [Commented] (BEAM-3297) StartBundle/FinishBundle should be called only if there are elements

2017-12-05 Thread Kenneth Knowles (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16279610#comment-16279610
 ] 

Kenneth Knowles commented on BEAM-3297:
---

BTW a bundle can have timers but no elements.

> StartBundle/FinishBundle should be called only if there are elements 
> -
>
> Key: BEAM-3297
> URL: https://issues.apache.org/jira/browse/BEAM-3297
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Minor
>
> I believe the semantics is lazy. Exec currently calls SB/FB eagerly 
> irrespectively of whether any elements are emitted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-3297) StartBundle/FinishBundle should be called only if there are elements

2017-12-05 Thread Kenneth Knowles (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16279603#comment-16279603
 ] 

Kenneth Knowles commented on BEAM-3297:
---

Empty bundles are permitted, and I think it is permissible for an SDK harness 
to call StartBundle and FinishBundle with no intervening elements. It is 
pointless, of course, and might have a cost.

> StartBundle/FinishBundle should be called only if there are elements 
> -
>
> Key: BEAM-3297
> URL: https://issues.apache.org/jira/browse/BEAM-3297
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Minor
>
> I believe the semantics is lazy. Exec currently calls SB/FB eagerly 
> irrespectively of whether any elements are emitted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-3300) Portable flattens in Go SDK Harness

2017-12-05 Thread Henning Rohde (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henning Rohde updated BEAM-3300:

Priority: Minor  (was: Major)

> Portable flattens in Go SDK Harness
> ---
>
> Key: BEAM-3300
> URL: https://issues.apache.org/jira/browse/BEAM-3300
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Minor
>  Labels: portability
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-3309) Multi-bundle execution in Go runtime

2017-12-05 Thread Henning Rohde (JIRA)
Henning Rohde created BEAM-3309:
---

 Summary:  Multi-bundle execution in Go runtime
 Key: BEAM-3309
 URL: https://issues.apache.org/jira/browse/BEAM-3309
 Project: Beam
  Issue Type: Improvement
  Components: sdk-go
Reporter: Henning Rohde
Assignee: Henning Rohde
Priority: Minor


The prototype runtime code executes bundles serially for debugability. It 
should parallelize bundle execution and re-use bundle descriptors (when 
supported by the runner).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-3308) Improve Go exec runtime error handling

2017-12-05 Thread Henning Rohde (JIRA)
Henning Rohde created BEAM-3308:
---

 Summary: Improve Go exec runtime error handling
 Key: BEAM-3308
 URL: https://issues.apache.org/jira/browse/BEAM-3308
 Project: Beam
  Issue Type: Improvement
  Components: sdk-go
Reporter: Henning Rohde
Assignee: Henning Rohde
Priority: Minor


Move to Go exec unit failures instead of panic. We should perhaps also catch 
user code panics to more gracefully fail bundle.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-3307) Harden Go harness runtime logging

2017-12-05 Thread Henning Rohde (JIRA)
Henning Rohde created BEAM-3307:
---

 Summary: Harden Go harness runtime logging
 Key: BEAM-3307
 URL: https://issues.apache.org/jira/browse/BEAM-3307
 Project: Beam
  Issue Type: Improvement
  Components: sdk-go
Reporter: Henning Rohde
Assignee: Henning Rohde
Priority: Minor


Add buffering, bulk send. We should also flush logs on Critical messages and 
harness.Main panic.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-3306) Consider: Go coder registry

2017-12-05 Thread Henning Rohde (JIRA)
Henning Rohde created BEAM-3306:
---

 Summary: Consider: Go coder registry
 Key: BEAM-3306
 URL: https://issues.apache.org/jira/browse/BEAM-3306
 Project: Beam
  Issue Type: Improvement
  Components: sdk-go
Reporter: Henning Rohde
Assignee: Henning Rohde
Priority: Minor


Add coder registry to allow easier overwrite of default coders. We may also 
allow otherwise un-encodable types, but that would require that function 
analysis depends on it.

If we're hardcoding support for proto/avro, then there may be little need for 
such a feature. Conversely, this may be how we implement such support.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (BEAM-3030) watchForNewFiles() can emit a file multiple times if it's growing

2017-12-05 Thread Eugene Kirpichov (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eugene Kirpichov closed BEAM-3030.
--
Resolution: Fixed

> watchForNewFiles() can emit a file multiple times if it's growing
> -
>
> Key: BEAM-3030
> URL: https://issues.apache.org/jira/browse/BEAM-3030
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Eugene Kirpichov
>Assignee: Eugene Kirpichov
> Fix For: 2.3.0
>
>
> TextIO and AvroIO watchForNewFiles(), as well as 
> FileIO.match().continuously(), use Watch transform under the hood, and watch 
> the set of Metadata matching a filepattern.
> Two Metadata's with the same filename but different size are not considered 
> equal, so if these transforms observe the same file multiple times with 
> different sizes, they'll read the file multiple times.
> This is likely not yet a problem for production users, because these features 
> require SDF, it's supported only in Dataflow runner, and users of the 
> Dataflow runner are likely to use only files on GCS which doesn't support 
> appends. However, this needs to be fixed still.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-3030) watchForNewFiles() can emit a file multiple times if it's growing

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16279443#comment-16279443
 ] 

ASF GitHub Bot commented on BEAM-3030:
--

asfgit closed pull request #4190: [BEAM-3030] Adds a deduplication key to 
Watch, and uses it to handle growing files in FileIO.match
URL: https://github.com/apache/beam/pull/4190
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
index a244c070129..4e7124af8a5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
@@ -33,13 +33,17 @@
 import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
 import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.Contextful;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Requirements;
 import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
 import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
@@ -69,6 +73,11 @@
* By default, a filepattern matching no resources is treated according 
to {@link
* EmptyMatchTreatment#DISALLOW}. To configure this behavior, use {@link
* Match#withEmptyMatchTreatment}.
+   *
+   * Returned {@link MatchResult.Metadata} are deduplicated by filename. 
For example, if this
+   * transform observes a file with the same name several times with different 
metadata (e.g.
+   * because the file is growing), it will emit the metadata the first time 
this file is observed,
+   * and will ignore future changes to this file.
*/
   public static Match match() {
 return new AutoValue_FileIO_Match.Builder()
@@ -317,13 +326,17 @@ public MatchAll continuously(
 "Match filepatterns",
 ParDo.of(new 
MatchFn(getConfiguration().getEmptyMatchTreatment(;
   } else {
-res = input
-.apply(
-"Continuously match filepatterns",
-Watch.growthOf(new MatchPollFn())
-.withPollInterval(getConfiguration().getWatchInterval())
-
.withTerminationPerInput(getConfiguration().getWatchTerminationCondition()))
-.apply(Values.create());
+res =
+input
+.apply(
+"Continuously match filepatterns",
+Watch.growthOf(
+Contextful.>of(
+new MatchPollFn(), Requirements.empty()),
+new ExtractFilenameFn())
+
.withPollInterval(getConfiguration().getWatchInterval())
+
.withTerminationPerInput(getConfiguration().getWatchTerminationCondition()))
+.apply(Values.create());
   }
   return res.apply(Reshuffle.viaRandomKey());
 }
@@ -346,7 +359,7 @@ public void process(ProcessContext c) throws Exception {
   }
 }
 
-private static class MatchPollFn extends Watch.Growth.PollFn {
+private static class MatchPollFn extends PollFn {
   @Override
   public Watch.Growth.PollResult apply(String 
element, Context c)
   throws Exception {
@@ -354,6 +367,14 @@ public void process(ProcessContext c) throws Exception {
 Instant.now(), FileSystems.match(element, 
EmptyMatchTreatment.ALLOW).metadata());
   }
 }
+
+private static class ExtractFilenameFn
+implements SerializableFunction {
+  @Override
+  public String apply(MatchResult.Metadata input) {
+return input.resourceId().toString();
+  }
+}
   }
 
   /** Implementation of {@link #readMatches}. */
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
index 75c2fe45b80..4b31ae71333 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
+++ 

[beam] branch master updated: Adds a deduplication key to Watch, and uses it to handle growing files in FileIO.match

2017-12-05 Thread jkff
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 38d94c8  Adds a deduplication key to Watch, and uses it to handle 
growing files in FileIO.match
 new df326cb  This closes #4190: [BEAM-3030] Adds a deduplication key to 
Watch, and uses it to handle growing files in FileIO.match
38d94c8 is described below

commit 38d94c8e3900a5e59ae8e3be772583da6cb2c8c6
Author: Eugene Kirpichov 
AuthorDate: Tue Nov 28 17:02:06 2017 -0800

Adds a deduplication key to Watch, and uses it to handle growing files in 
FileIO.match
---
 .../main/java/org/apache/beam/sdk/io/FileIO.java   |  37 +++-
 .../java/org/apache/beam/sdk/transforms/Watch.java | 204 ++---
 .../org/apache/beam/sdk/transforms/WatchTest.java  | 119 +---
 3 files changed, 259 insertions(+), 101 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
index a244c07..4e7124a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
@@ -33,13 +33,17 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
 import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.Contextful;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Requirements;
 import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
 import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
@@ -69,6 +73,11 @@ public class FileIO {
* By default, a filepattern matching no resources is treated according 
to {@link
* EmptyMatchTreatment#DISALLOW}. To configure this behavior, use {@link
* Match#withEmptyMatchTreatment}.
+   *
+   * Returned {@link MatchResult.Metadata} are deduplicated by filename. 
For example, if this
+   * transform observes a file with the same name several times with different 
metadata (e.g.
+   * because the file is growing), it will emit the metadata the first time 
this file is observed,
+   * and will ignore future changes to this file.
*/
   public static Match match() {
 return new AutoValue_FileIO_Match.Builder()
@@ -317,13 +326,17 @@ public class FileIO {
 "Match filepatterns",
 ParDo.of(new 
MatchFn(getConfiguration().getEmptyMatchTreatment(;
   } else {
-res = input
-.apply(
-"Continuously match filepatterns",
-Watch.growthOf(new MatchPollFn())
-.withPollInterval(getConfiguration().getWatchInterval())
-
.withTerminationPerInput(getConfiguration().getWatchTerminationCondition()))
-.apply(Values.create());
+res =
+input
+.apply(
+"Continuously match filepatterns",
+Watch.growthOf(
+Contextful.>of(
+new MatchPollFn(), Requirements.empty()),
+new ExtractFilenameFn())
+
.withPollInterval(getConfiguration().getWatchInterval())
+
.withTerminationPerInput(getConfiguration().getWatchTerminationCondition()))
+.apply(Values.create());
   }
   return res.apply(Reshuffle.viaRandomKey());
 }
@@ -346,7 +359,7 @@ public class FileIO {
   }
 }
 
-private static class MatchPollFn extends Watch.Growth.PollFn {
+private static class MatchPollFn extends PollFn {
   @Override
   public Watch.Growth.PollResult apply(String 
element, Context c)
   throws Exception {
@@ -354,6 +367,14 @@ public class FileIO {
 Instant.now(), FileSystems.match(element, 
EmptyMatchTreatment.ALLOW).metadata());
   }
 }
+
+private static class ExtractFilenameFn
+implements SerializableFunction {
+  @Override
+  public String apply(MatchResult.Metadata input) {
+return input.resourceId().toString();
+  }
+}
   }
 
   /** Implementation of {@link 

[jira] [Created] (BEAM-3305) Consider: Go ViewFn support

2017-12-05 Thread Henning Rohde (JIRA)
Henning Rohde created BEAM-3305:
---

 Summary: Consider: Go ViewFn support
 Key: BEAM-3305
 URL: https://issues.apache.org/jira/browse/BEAM-3305
 Project: Beam
  Issue Type: Improvement
  Components: sdk-go
Reporter: Henning Rohde
Priority: Minor


We can add it as an optional field to beam.SideInput (see: pkg/beam/option.go). 
The execution side needs to support it as well. However, with the various side 
input forms, it's not clear how valuable such a feature would be.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-3304) Go triggering support

2017-12-05 Thread Henning Rohde (JIRA)
Henning Rohde created BEAM-3304:
---

 Summary: Go triggering support
 Key: BEAM-3304
 URL: https://issues.apache.org/jira/browse/BEAM-3304
 Project: Beam
  Issue Type: Improvement
  Components: sdk-go
Reporter: Henning Rohde
Assignee: Henning Rohde
Priority: Minor


Add support for triggers.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[beam] 01/13: enforce fixed sharding

2017-12-05 Thread jkff
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 90402e4f21dbbf60ed498a67a5dfc3e276dcb07c
Author: Eugene Kirpichov 
AuthorDate: Wed Nov 15 18:06:39 2017 -0800

enforce fixed sharding
---
 sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 8328d7b..c99abce 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -854,7 +854,8 @@ public class WriteFiles
 } else if (numShardsProvider != null) {
   fixedNumShards = numShardsProvider.get();
 } else {
-  fixedNumShards = null;
+  throw new IllegalStateException(
+  "When finalizing a windowed write, should have set fixed 
sharding");
 }
   }
 }

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" .


[beam] branch master updated (b059664 -> 761ec1a)

2017-12-05 Thread jkff
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from b059664  Merge pull request #4218: Build output redirection option
 new 90402e4  enforce fixed sharding
 new 5795c32  Merges Writer.openWindowed/Unwindowed and removes result of 
close()
 new b2d0671  non-null window/pane in FileResult
 new 54eacf4  remove ShardAssignment
 new 97df5e7  consolidates windowed/unwindowed finalize fns somewhat
 new c615438  Unifies windowed and unwindowed finalize.
 new 2f73a95  Refactors WriteFiles into sub-transforms
 new 5b600da  Converts WriteFiles to AutoValue
 new 3ecf13b  Makes checkstyle and findbugs happy
 new 83837eb  Renames spilled back to unwritten
 new 060f05c  Fixes tests
 new d314339  Reintroduces dynamic sharding with windowed writes for 
bounded collections
 new 761ec1a  This closes #4145: Many simplifications to WriteFiles

The 13 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/beam/examples/WindowedWordCount.java|5 +-
 .../examples/common/WriteOneFilePerWindow.java |   12 +-
 .../apache/beam/examples/WindowedWordCountIT.java  |8 +
 .../beam/runners/apex/examples/WordCountTest.java  |2 +-
 .../core/construction/WriteFilesTranslation.java   |7 +-
 .../construction/WriteFilesTranslationTest.java|   13 +-
 .../beam/runners/dataflow/DataflowRunnerTest.java  |2 +-
 .../beam/runners/spark/io/AvroPipelineTest.java|5 +-
 .../java/org/apache/beam/sdk/io/FileBasedSink.java |  281 +++---
 .../java/org/apache/beam/sdk/io/WriteFiles.java| 1061 
 .../java/org/apache/beam/sdk/transforms/Reify.java |   73 +-
 .../apache/beam/sdk/values/TypeDescriptors.java|4 +
 .../org/apache/beam/sdk/io/FileBasedSinkTest.java  |   66 +-
 .../org/apache/beam/sdk/io/WriteFilesTest.java |   27 +-
 14 files changed, 745 insertions(+), 821 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" '].


[beam] 11/13: Fixes tests

2017-12-05 Thread jkff
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 060f05c659920c3a48dbc67c38db770788802d06
Author: Eugene Kirpichov 
AuthorDate: Mon Nov 27 11:41:25 2017 -0800

Fixes tests
---
 .../core/construction/WriteFilesTranslation.java   |  2 +-
 .../java/org/apache/beam/sdk/io/FileBasedSink.java | 32 -
 .../java/org/apache/beam/sdk/io/WriteFiles.java| 17 +++--
 .../org/apache/beam/sdk/io/FileBasedSinkTest.java  | 42 --
 4 files changed, 55 insertions(+), 38 deletions(-)

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
index a6dd55c..90f6453 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
@@ -303,7 +303,7 @@ public class WriteFilesTranslation {
 public Map
 getTransformPayloadTranslators() {
   return Collections.singletonMap(
-  WriteFiles.class, new WriteFilesTranslator());
+  WriteFiles.CONCRETE_CLASS, new WriteFilesTranslator());
 }
 
 @Override
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 12c4555..48d7521 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -699,7 +699,10 @@ public abstract class FileBasedSink
   // if set.
   Set missingShardNums;
   if (numShards == null) {
-missingShardNums = ImmutableSet.of(UNKNOWN_SHARDNUM);
+missingShardNums =
+existingResults.isEmpty()
+? ImmutableSet.of(UNKNOWN_SHARDNUM)
+: ImmutableSet.of();
   } else {
 missingShardNums = Sets.newHashSet();
 for (int i = 0; i < numShards; ++i) {
@@ -726,8 +729,9 @@ public abstract class FileBasedSink
   String uuid = UUID.randomUUID().toString();
   LOG.info("Opening empty writer {} for destination {}", uuid, dest);
   Writer writer = createWriter();
+  writer.setDestination(dest);
   // Currently this code path is only called in the unwindowed case.
-  writer.open(uuid, dest);
+  writer.open(uuid);
   writer.close();
   completeResults.add(
   new FileResult<>(
@@ -760,8 +764,8 @@ public abstract class FileBasedSink
 List> 
resultsToFinalFilenames) throws IOException {
   int numFiles = resultsToFinalFilenames.size();
   LOG.debug("Copying {} files.", numFiles);
-  List srcFiles = new 
ArrayList<>(resultsToFinalFilenames.size());
-  List dstFiles = new 
ArrayList<>(resultsToFinalFilenames.size());
+  List srcFiles = new ArrayList<>();
+  List dstFiles = new ArrayList<>();
   for (KV entry : 
resultsToFinalFilenames) {
 srcFiles.add(entry.getKey().getTempFilename());
 dstFiles.add(entry.getValue());
@@ -923,22 +927,14 @@ public abstract class FileBasedSink
 protected void finishWrite() throws Exception {}
 
 /**
- * Performs bundle initialization. For example, creates a temporary file 
for writing or
- * initializes any state that will be used across calls to {@link 
Writer#write}.
+ * Opens a uniquely named temporary file and initializes the writer using 
{@link #prepareWrite}.
  *
  * The unique id that is given to open should be used to ensure that 
the writer's output does
  * not interfere with the output of other Writers, as a bundle may be 
executed many times for
  * fault tolerance.
- *
- * The window and paneInfo arguments are populated when windowed writes 
are requested. shard
- * id populated for the case of static sharding. In cases where the runner 
is dynamically
- * picking sharding, shard might be set to -1.
  */
-public final void open(
-String uId, DestinationT destination)
-throws Exception {
+public final void open(String uId) throws Exception {
   this.id = uId;
-  this.destination = destination;
   ResourceId tempDirectory = getWriteOperation().tempDirectory.get();
   outputFile = tempDirectory.resolve(id, 
StandardResolveOptions.RESOLVE_FILE);
   verifyNotNull(
@@ -1040,6 +1036,10 

[beam] 06/13: Unifies windowed and unwindowed finalize.

2017-12-05 Thread jkff
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit c6154382263a68d7eca893c7da3617d177e4c1df
Author: Eugene Kirpichov 
AuthorDate: Wed Nov 15 20:19:09 2017 -0800

Unifies windowed and unwindowed finalize.
---
 .../java/org/apache/beam/sdk/io/WriteFiles.java| 232 -
 .../java/org/apache/beam/sdk/transforms/Reify.java |  73 ++-
 .../apache/beam/sdk/values/TypeDescriptors.java|   4 +
 3 files changed, 163 insertions(+), 146 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 9cfabfe..87459e9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -42,11 +42,11 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
+import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.ShardedKeyCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.FileBasedSink.FileResult;
 import org.apache.beam.sdk.io.FileBasedSink.FileResultCoder;
 import org.apache.beam.sdk.io.FileBasedSink.WriteOperation;
@@ -55,14 +55,13 @@ import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reify;
 import org.apache.beam.sdk.transforms.Reshuffle;
-import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.WithKeys;
@@ -653,6 +652,9 @@ public class WriteFiles
   .discardingFiredPanes());
 }
 
+final FileBasedSink.DynamicDestinations 
destinations =
+writeOperation.getSink().getDynamicDestinations();
+
 // Perform the per-bundle writes as a ParDo on the input PCollection (with 
the
 // WriteOperation as a side input) and collect the results of the writes 
in a
 // PCollection. There is a dependency between this ParDo and the first (the
@@ -663,19 +665,6 @@ public class WriteFiles
 List shardingSideInputs = numShardsView == null
 ? ImmutableList.of()
 : ImmutableList.of(numShardsView);
-SerializableFunction getFixedNumShards =
-new SerializableFunction() {
-  @Override
-  public Integer apply(DoFn.ProcessContext c) {
-if (numShardsView != null) {
-  return c.sideInput(numShardsView);
-} else if (numShardsProvider != null) {
-  return numShardsProvider.get();
-} else {
-  return null;
-}
-  }
-};
 
 @SuppressWarnings("unchecked")
 Coder shardedWindowCoder =
@@ -683,13 +672,12 @@ public class WriteFiles
 final Coder destinationCoder;
 try {
   destinationCoder =
-  sink.getDynamicDestinations()
-  
.getDestinationCoderWithDefault(input.getPipeline().getCoderRegistry());
+  
destinations.getDestinationCoderWithDefault(input.getPipeline().getCoderRegistry());
   destinationCoder.verifyDeterministic();
 } catch (CannotProvideCoderException | NonDeterministicException e) {
   throw new RuntimeException(e);
 }
-FileResultCoder fileResultCoder =
+final FileResultCoder fileResultCoder =
 FileResultCoder.of(shardedWindowCoder, destinationCoder);
 
 PCollection results;
@@ -749,155 +737,109 @@ public class WriteFiles
 }
 results.setCoder(fileResultCoder);
 
-PCollection> outputFilenames;
+PCollection> fileResultBundles;
 if (windowedWrites) {
-  // We need to materialize the FileResult's before the renaming stage: 
this can be done either
-  // via a side input or via a GBK. However, when processing streaming 
windowed 

[beam] 03/13: non-null window/pane in FileResult

2017-12-05 Thread jkff
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit b2d0671185fa1bd7f100853c7921e555c84578e7
Author: Eugene Kirpichov 
AuthorDate: Wed Nov 15 18:25:14 2017 -0800

non-null window/pane in FileResult
---
 .../java/org/apache/beam/sdk/io/FileBasedSink.java | 10 --
 .../java/org/apache/beam/sdk/io/WriteFiles.java| 38 ++
 2 files changed, 24 insertions(+), 24 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 2108253..c8bdbfc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -655,6 +655,7 @@ public abstract class FileBasedSink
 checkArgument(
 result.getShard() != UNKNOWN_SHARDNUM, "Should have set shard 
number on %s", result);
 ResourceId finalFilename = result.getDestinationFile(
+windowedWrites,
 getSink().getDynamicDestinations(),
 effectiveNumShards,
 getSink().getWritableByteChannelFactory());
@@ -984,7 +985,7 @@ public abstract class FileBasedSink
   public static final class FileResult {
 private final ResourceId tempFilename;
 private final int shard;
-private final @Nullable BoundedWindow window;
+private final BoundedWindow window;
 private final PaneInfo paneInfo;
 private final DestinationT destination;
 
@@ -992,9 +993,11 @@ public abstract class FileBasedSink
 public FileResult(
 ResourceId tempFilename,
 int shard,
-@Nullable BoundedWindow window,
+BoundedWindow window,
 PaneInfo paneInfo,
 DestinationT destination) {
+  checkArgument(window != null);
+  checkArgument(paneInfo != null);
   this.tempFilename = tempFilename;
   this.shard = shard;
   this.window = window;
@@ -1029,13 +1032,14 @@ public abstract class FileBasedSink
 
 @Experimental(Kind.FILESYSTEM)
 public ResourceId getDestinationFile(
+boolean windowedWrites,
 DynamicDestinations dynamicDestinations,
 int numShards,
 OutputFileHints outputFileHints) {
   checkArgument(getShard() != UNKNOWN_SHARDNUM);
   checkArgument(numShards > 0);
   FilenamePolicy policy = 
dynamicDestinations.getFilenamePolicy(destination);
-  if (getWindow() != null) {
+  if (windowedWrites) {
 return policy.windowedFilename(
 getShard(), numShards, getWindow(), getPaneInfo(), 
outputFileHints);
   } else {
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 35b28a1..19457e6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -71,6 +71,7 @@ import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -387,7 +388,6 @@ public class WriteFiles
   private class WriteBundles extends DoFn {
 private final TupleTag> unwrittenRecordsTag;
 private final Coder destinationCoder;
-private final boolean windowedWrites;
 
 // Initialized in startBundle()
 private @Nullable Map> writers;
@@ -395,10 +395,8 @@ public class WriteFiles
 private int spilledShardNum = UNKNOWN_SHARDNUM;
 
 WriteBundles(
-boolean windowedWrites,
 TupleTag> unwrittenRecordsTag,
 Coder destinationCoder) {
-  this.windowedWrites = windowedWrites;
   this.unwrittenRecordsTag = unwrittenRecordsTag;
   this.destinationCoder = destinationCoder;
 }
@@ -466,13 +464,11 @@ public class WriteFiles
   throw e;
 }
 BoundedWindow window = key.window;
-FileResult res =
-windowedWrites
-? new FileResult<>(
-writer.getOutputFile(), UNKNOWN_SHARDNUM, window, 
key.paneInfo, key.destination)
-: new FileResult<>(
-writer.getOutputFile(), 

[beam] 07/13: Refactors WriteFiles into sub-transforms

2017-12-05 Thread jkff
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 2f73a9534a709dd1fe775ab31e0598d2d89f123c
Author: Eugene Kirpichov 
AuthorDate: Fri Nov 17 12:25:45 2017 -0800

Refactors WriteFiles into sub-transforms
---
 .../java/org/apache/beam/sdk/io/WriteFiles.java| 630 +++--
 1 file changed, 322 insertions(+), 308 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 87459e9..0a538b1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -24,7 +24,6 @@ import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.base.Objects;
 import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -37,7 +36,6 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
@@ -47,6 +45,7 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.ShardedKeyCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
 import org.apache.beam.sdk.io.FileBasedSink.FileResult;
 import org.apache.beam.sdk.io.FileBasedSink.FileResultCoder;
 import org.apache.beam.sdk.io.FileBasedSink.WriteOperation;
@@ -176,53 +175,12 @@ public class WriteFiles
 return PCollectionViews.toAdditionalInputs(sideInputs);
   }
 
-  @Override
-  public WriteFilesResult expand(PCollection input) {
-if (input.isBounded() == IsBounded.UNBOUNDED) {
-  checkArgument(windowedWrites,
-  "Must use windowed writes when applying %s to an unbounded 
PCollection",
-  WriteFiles.class.getSimpleName());
-}
-if (windowedWrites) {
-  // The reason for this is https://issues.apache.org/jira/browse/BEAM-1438
-  // and similar behavior in other runners.
-  checkArgument(
-  computeNumShards != null || numShardsProvider != null,
-  "When using windowed writes, must specify number of output shards 
explicitly",
-  WriteFiles.class.getSimpleName());
-}
-this.writeOperation = sink.createWriteOperation();
-this.writeOperation.setWindowedWrites(windowedWrites);
-return createWrite(input);
-  }
-
-  @Override
-  public void validate(PipelineOptions options) {
-sink.validate(options);
-  }
-
-  @Override
-  public void populateDisplayData(DisplayData.Builder builder) {
-super.populateDisplayData(builder);
-builder
-.add(DisplayData.item("sink", sink.getClass()).withLabel("WriteFiles 
Sink"))
-.include("sink", sink);
-if (getSharding() != null) {
-  builder.include("sharding", getSharding());
-} else {
-  builder.addIfNotNull(DisplayData.item("numShards", getNumShards())
-  .withLabel("Fixed Number of Shards"));
-}
-  }
-
   /** Returns the {@link FileBasedSink} associated with this PTransform. */
   public FileBasedSink getSink() {
 return sink;
   }
 
-  /**
-   * Returns whether or not to perform windowed writes.
-   */
+  /** Returns whether or not to perform windowed writes. */
   public boolean isWindowedWrites() {
 return windowedWrites;
   }
@@ -339,50 +297,189 @@ public class WriteFiles
 sink, computeNumShards, numShardsProvider, true, 
maxNumWritersPerBundle, sideInputs);
   }
 
-  private static class WriterKey {
-private final BoundedWindow window;
-private final PaneInfo paneInfo;
-private final DestinationT destination;
+  @Override
+  public void validate(PipelineOptions options) {
+sink.validate(options);
+  }
 
-WriterKey(BoundedWindow window, PaneInfo paneInfo, DestinationT 
destination) {
-  this.window = window;
-  this.paneInfo = paneInfo;
-  this.destination = destination;
+  @Override
+  public WriteFilesResult expand(PCollection input) {
+if (input.isBounded() == IsBounded.UNBOUNDED) {
+  checkArgument(
+  windowedWrites,
+  "Must use windowed writes when applying %s to an unbounded 
PCollection",
+  WriteFiles.class.getSimpleName());
+}
+if (windowedWrites) {
+  // The reason for this is https://issues.apache.org/jira/browse/BEAM-1438
+  // and similar 

[beam] 12/13: Reintroduces dynamic sharding with windowed writes for bounded collections

2017-12-05 Thread jkff
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit d314339ed2f8d5ce385c7b40705ef13f6ea43b45
Author: Eugene Kirpichov 
AuthorDate: Thu Nov 30 13:05:00 2017 -0800

Reintroduces dynamic sharding with windowed writes for bounded collections
---
 .../apache/beam/examples/WindowedWordCount.java|  5 ++--
 .../examples/common/WriteOneFilePerWindow.java | 12 ++
 .../apache/beam/examples/WindowedWordCountIT.java  |  8 +++
 .../beam/runners/apex/examples/WordCountTest.java  |  2 +-
 .../construction/WriteFilesTranslationTest.java|  1 +
 .../beam/runners/spark/io/AvroPipelineTest.java|  5 ++--
 .../java/org/apache/beam/sdk/io/FileBasedSink.java | 27 +++---
 .../java/org/apache/beam/sdk/io/WriteFiles.java| 27 +++---
 .../org/apache/beam/sdk/io/WriteFilesTest.java |  5 ++--
 9 files changed, 41 insertions(+), 51 deletions(-)

diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java 
b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
index 21cfed8..b31ce4a 100644
--- 
a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
@@ -162,9 +162,8 @@ public class WindowedWordCount {
 void setMaxTimestampMillis(Long value);
 
 @Description("Fixed number of shards to produce per window")
-@Default.Integer(3)
-int getNumShards();
-void setNumShards(int numShards);
+Integer getNumShards();
+void setNumShards(Integer numShards);
   }
 
   public static void main(String[] args) throws IOException {
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
 
b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
index a5c84f6..abd14b7 100644
--- 
a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
@@ -19,6 +19,7 @@ package org.apache.beam.examples.common;
 
 import static com.google.common.base.MoreObjects.firstNonNull;
 
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
@@ -45,9 +46,10 @@ import org.joda.time.format.ISODateTimeFormat;
 public class WriteOneFilePerWindow extends PTransform {
   private static final DateTimeFormatter FORMATTER = 
ISODateTimeFormat.hourMinute();
   private String filenamePrefix;
-  private int numShards;
+  @Nullable
+  private Integer numShards;
 
-  public WriteOneFilePerWindow(String filenamePrefix, int numShards) {
+  public WriteOneFilePerWindow(String filenamePrefix, Integer numShards) {
 this.filenamePrefix = filenamePrefix;
 this.numShards = numShards;
   }
@@ -59,8 +61,10 @@ public class WriteOneFilePerWindow extends 
PTransform

[beam] 04/13: remove ShardAssignment

2017-12-05 Thread jkff
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 54eacf4b79993f40b9034bf429e387faeffdbdba
Author: Eugene Kirpichov 
AuthorDate: Wed Nov 15 18:58:28 2017 -0800

remove ShardAssignment
---
 .../java/org/apache/beam/sdk/io/WriteFiles.java| 118 +
 1 file changed, 48 insertions(+), 70 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 19457e6..28ac1a5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -24,6 +24,7 @@ import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.base.Objects;
 import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
@@ -478,19 +479,12 @@ public class WriteFiles
 }
   }
 
-  enum ShardAssignment { ASSIGN_IN_FINALIZE, ASSIGN_WHEN_WRITING }
-
   /*
* Like {@link WriteBundles}, but where the elements for each shard have 
been collected into a
* single iterable.
*/
   private class WriteShardedBundles
   extends DoFn, 
FileResult> {
-ShardAssignment shardNumberAssignment;
-WriteShardedBundles(ShardAssignment shardNumberAssignment) {
-  this.shardNumberAssignment = shardNumberAssignment;
-}
-
 @ProcessElement
 public void processElement(ProcessContext c, BoundedWindow window) throws 
Exception {
   sink.getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
@@ -527,13 +521,8 @@ public class WriteFiles
   writer.cleanup();
   throw e;
 }
-int shardNumber =
-shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING
-? c.element().getKey().getShardNumber()
-: UNKNOWN_SHARDNUM;
-c.output(
-new FileResult<>(
-writer.getOutputFile(), shardNumber, window, c.pane(), 
entry.getKey()));
+int shard = c.element().getKey().getShardNumber();
+c.output(new FileResult<>(writer.getOutputFile(), shard, window, 
c.pane(), entry.getKey()));
   }
 }
 
@@ -672,8 +661,11 @@ public class WriteFiles
 // PCollection. There is a dependency between this ParDo and the first (the
 // WriteOperation PCollection as a side input), so this will happen after 
the
 // initial ParDo.
-PCollection results;
-final PCollectionView numShardsView;
+PCollectionView numShardsView =
+(computeNumShards == null) ? null : input.apply(computeNumShards);
+List shardingSideInputs = numShardsView == null
+? ImmutableList.of()
+: ImmutableList.of(numShardsView);
 @SuppressWarnings("unchecked")
 Coder shardedWindowCoder =
 (Coder) 
input.getWindowingStrategy().getWindowFn().windowCoder();
@@ -686,74 +678,65 @@ public class WriteFiles
 } catch (CannotProvideCoderException | NonDeterministicException e) {
   throw new RuntimeException(e);
 }
+FileResultCoder fileResultCoder =
+FileResultCoder.of(shardedWindowCoder, destinationCoder);
 
+PCollection results;
 if (computeNumShards == null && numShardsProvider == null) {
-  numShardsView = null;
   TupleTag writtenRecordsTag =
   new TupleTag<>("writtenRecordsTag");
-  TupleTag> unwrittedRecordsTag =
-  new TupleTag<>("unwrittenRecordsTag");
+  TupleTag> spilledRecordsTag =
+  new TupleTag<>("spilledRecordsTag");
   String writeName = windowedWrites ? "WriteWindowedBundles" : 
"WriteBundles";
   PCollectionTuple writeTuple =
   input.apply(
   writeName,
-  ParDo.of(new WriteBundles(unwrittedRecordsTag, destinationCoder))
+  ParDo.of(new WriteBundles(spilledRecordsTag, destinationCoder))
   .withSideInputs(sideInputs)
-  .withOutputTags(writtenRecordsTag, 
TupleTagList.of(unwrittedRecordsTag)));
+  .withOutputTags(writtenRecordsTag, 
TupleTagList.of(spilledRecordsTag)));
   PCollection writtenBundleFiles =
-  writeTuple
-  .get(writtenRecordsTag)
-  .setCoder(FileResultCoder.of(shardedWindowCoder, 
destinationCoder));
+  writeTuple.get(writtenRecordsTag).setCoder(fileResultCoder);
   // Any "spilled" elements are written using 

[beam] 05/13: consolidates windowed/unwindowed finalize fns somewhat

2017-12-05 Thread jkff
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 97df5e703d4a891ab63a40b46c4e87d7c373168b
Author: Eugene Kirpichov 
AuthorDate: Wed Nov 15 19:51:10 2017 -0800

consolidates windowed/unwindowed finalize fns somewhat
---
 .../java/org/apache/beam/sdk/io/FileBasedSink.java | 138 +
 .../java/org/apache/beam/sdk/io/WriteFiles.java| 221 ++---
 .../org/apache/beam/sdk/io/FileBasedSinkTest.java  |  19 +-
 3 files changed, 171 insertions(+), 207 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index c8bdbfc..5bc84be 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -28,10 +28,12 @@ import static 
org.apache.beam.sdk.values.TypeDescriptors.extractFromTypeParamete
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -39,6 +41,7 @@ import java.io.Serializable;
 import java.nio.channels.WritableByteChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
@@ -46,6 +49,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -72,6 +76,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
 import org.apache.beam.sdk.util.MimeTypes;
@@ -103,10 +108,9 @@ import org.slf4j.LoggerFactory;
  *
  * In order to ensure fault-tolerance, a bundle may be executed multiple 
times (e.g., in the
  * event of failure/retry or for redundancy). However, exactly one of these 
executions will have its
- * result passed to the finalize method. Each call to {@link Writer#open} or 
{@link
- * Writer#openUnwindowed} is passed a unique bundle id when it is 
called by the WriteFiles
- * transform, so even redundant or retried bundles will have a unique way of 
identifying their
- * output.
+ * result passed to the finalize method. Each call to {@link Writer#open} is 
passed a unique
+ * bundle id when it is called by the WriteFiles transform, so even 
redundant or retried
+ * bundles will have a unique way of identifying their output.
  *
  * The bundle id should be used to guarantee that a bundle's output is 
unique. This uniqueness
  * guarantee is important; if a bundle is to be output to a file, for example, 
the name of the file
@@ -447,7 +451,7 @@ public abstract class FileBasedSink
* written,
*
* 
-   *   {@link WriteOperation#finalize} is given a list of the temporary 
files containing the
+   *   {@link WriteOperation#finalizeDestination} is given a list of the 
temporary files containing the
*   output bundles.
*   During finalize, these temporary files are copied to final output 
locations and named
*   according to a file naming template.
@@ -577,17 +581,22 @@ public abstract class FileBasedSink
  * not be cleaned up. Note that {@link WriteFiles} does attempt clean up 
files if exceptions
  * are thrown, however there are still some scenarios where temporary 
files might be left.
  */
-public void removeTemporaryFiles(Set filenames) throws 
IOException {
+public void removeTemporaryFiles(Collection filenames) throws 
IOException {
   removeTemporaryFiles(filenames, !windowedWrites);
 }
 
 @Experimental(Kind.FILESYSTEM)
-protected final List> 
buildOutputFilenames(
+protected final List> 
finalizeDestination(
 @Nullable DestinationT dest,
 @Nullable BoundedWindow window,
 @Nullable Integer numShards,
-Iterable writerResults) {
-  for 

[beam] 02/13: Merges Writer.openWindowed/Unwindowed and removes result of close()

2017-12-05 Thread jkff
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 5795c32c91a6de02b6731eacb5eef8ae55f069f5
Author: Eugene Kirpichov 
AuthorDate: Tue Oct 17 17:06:43 2017 -0700

Merges Writer.openWindowed/Unwindowed and removes result of close()
---
 .../java/org/apache/beam/sdk/io/FileBasedSink.java | 94 ++
 .../java/org/apache/beam/sdk/io/WriteFiles.java| 69 +---
 .../org/apache/beam/sdk/io/FileBasedSinkTest.java  | 15 ++--
 3 files changed, 71 insertions(+), 107 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index d4cb57d..2108253 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -103,7 +103,7 @@ import org.slf4j.LoggerFactory;
  *
  * In order to ensure fault-tolerance, a bundle may be executed multiple 
times (e.g., in the
  * event of failure/retry or for redundancy). However, exactly one of these 
executions will have its
- * result passed to the finalize method. Each call to {@link 
Writer#openWindowed} or {@link
+ * result passed to the finalize method. Each call to {@link Writer#open} or 
{@link
  * Writer#openUnwindowed} is passed a unique bundle id when it is 
called by the WriteFiles
  * transform, so even redundant or retried bundles will have a unique way of 
identifying their
  * output.
@@ -805,10 +805,7 @@ public abstract class FileBasedSink
 /** Unique id for this output bundle. */
 private @Nullable String id;
 
-private @Nullable BoundedWindow window;
-private @Nullable PaneInfo paneInfo;
-private int shard = -1;
-private @Nullable DestinationT destination;
+private DestinationT destination;
 
 /** The output file for this bundle. May be null if opening failed. */
 private @Nullable ResourceId outputFile;
@@ -868,53 +865,10 @@ public abstract class FileBasedSink
  * id populated for the case of static sharding. In cases where the runner 
is dynamically
  * picking sharding, shard might be set to -1.
  */
-public final void openWindowed(
-String uId, BoundedWindow window, PaneInfo paneInfo, int shard, 
DestinationT destination)
-throws Exception {
-  if (!getWriteOperation().windowedWrites) {
-throw new IllegalStateException("openWindowed called a non-windowed 
sink.");
-  }
-  open(uId, window, paneInfo, shard, destination);
-}
-
-/** Called for each value in the bundle. */
-public abstract void write(OutputT value) throws Exception;
-
-/**
- * Similar to {@link #openWindowed} however for the case where unwindowed 
writes were requested.
- */
-public final void openUnwindowed(String uId, int shard, DestinationT 
destination)
-throws Exception {
-  if (getWriteOperation().windowedWrites) {
-throw new IllegalStateException("openUnwindowed called a windowed 
sink.");
-  }
-  open(uId, null, null, shard, destination);
-}
-
-// Helper function to close a channel, on exception cases.
-// Always throws prior exception, with any new closing exception 
suppressed.
-private static void closeChannelAndThrow(
-WritableByteChannel channel, ResourceId filename, Exception prior) 
throws Exception {
-  try {
-channel.close();
-  } catch (Exception e) {
-LOG.error("Closing channel for {} failed.", filename, e);
-prior.addSuppressed(e);
-throw prior;
-  }
-}
-
-private void open(
-String uId,
-@Nullable BoundedWindow window,
-@Nullable PaneInfo paneInfo,
-int shard,
-DestinationT destination)
+public final void open(
+String uId, DestinationT destination)
 throws Exception {
   this.id = uId;
-  this.window = window;
-  this.paneInfo = paneInfo;
-  this.shard = shard;
   this.destination = destination;
   ResourceId tempDirectory = getWriteOperation().tempDirectory.get();
   outputFile = tempDirectory.resolve(id, 
StandardResolveOptions.RESOLVE_FILE);
@@ -925,15 +879,6 @@ public abstract class FileBasedSink
   getWriteOperation().getSink().writableByteChannelFactory;
   // The factory may force a MIME type or it may return null, indicating 
to use the sink's MIME.
   String channelMimeType = firstNonNull(factory.getMimeType(), mimeType);
-  LOG.info(
-  "Opening temporary file {} with MIME type {} "
-  + "to write destination {} shard {} window {} pane {}",
-  outputFile,
-  channelMimeType,
-  destination,
-  shard,
-  window,
- 

[beam] 08/13: Converts WriteFiles to AutoValue

2017-12-05 Thread jkff
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 5b600da86a21e2c4339261d73fa1c2588cb3ab8d
Author: Eugene Kirpichov 
AuthorDate: Fri Nov 17 12:38:15 2017 -0800

Converts WriteFiles to AutoValue
---
 .../core/construction/WriteFilesTranslation.java   |   5 +-
 .../construction/WriteFilesTranslationTest.java|  12 +-
 .../java/org/apache/beam/sdk/io/FileBasedSink.java |   4 +-
 .../java/org/apache/beam/sdk/io/WriteFiles.java| 187 +
 .../org/apache/beam/sdk/io/WriteFilesTest.java |  22 +--
 5 files changed, 104 insertions(+), 126 deletions(-)

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
index d0b2182..a6dd55c 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
@@ -85,12 +85,13 @@ public class WriteFilesTranslation {
 
   @Override
   public boolean isWindowedWrites() {
-return transform.isWindowedWrites();
+return transform.getWindowedWrites();
   }
 
   @Override
   public boolean isRunnerDeterminedSharding() {
-return transform.getNumShards() == null && transform.getSharding() 
== null;
+return transform.getNumShardsProvider() == null
+&& transform.getComputeNumShards() == null;
   }
 },
 components);
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
index ccb366e..2d45681 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
@@ -80,9 +80,11 @@ public class WriteFilesTranslationTest {
 
   assertThat(
   payload.getRunnerDeterminedSharding(),
-  equalTo(writeFiles.getNumShards() == null && 
writeFiles.getSharding() == null));
+  equalTo(
+  writeFiles.getNumShardsProvider() == null
+  && writeFiles.getComputeNumShards() == null));
 
-  assertThat(payload.getWindowedWrites(), 
equalTo(writeFiles.isWindowedWrites()));
+  assertThat(payload.getWindowedWrites(), 
equalTo(writeFiles.getWindowedWrites()));
 
   assertThat(
   (FileBasedSink)
@@ -102,11 +104,13 @@ public class WriteFilesTranslationTest {
 
   assertThat(
   WriteFilesTranslation.isRunnerDeterminedSharding(appliedPTransform),
-  equalTo(writeFiles.getNumShards() == null && 
writeFiles.getSharding() == null));
+  equalTo(
+  writeFiles.getNumShardsProvider() == null
+  && writeFiles.getComputeNumShards() == null));
 
   assertThat(
   WriteFilesTranslation.isWindowedWrites(appliedPTransform),
-  equalTo(writeFiles.isWindowedWrites()));
+  equalTo(writeFiles.getWindowedWrites()));
   assertThat(
   WriteFilesTranslation.getSink(appliedPTransform),
   equalTo(writeFiles.getSink()));
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 5bc84be..20d2a27 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -451,8 +451,8 @@ public abstract class FileBasedSink
* written,
*
* 
-   *   {@link WriteOperation#finalizeDestination} is given a list of the 
temporary files containing the
-   *   output bundles.
+   *   {@link WriteOperation#finalizeDestination} is given a list of the 
temporary files
+   *   containing the output bundles.
*   During finalize, these temporary files are copied to final output 
locations and named
*   according to a file naming template.
*   Finally, any temporary files that were created during the write are 
removed.
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 0a538b1..d6c5788 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ 

[beam] 09/13: Makes checkstyle and findbugs happy

2017-12-05 Thread jkff
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 3ecf13b9fc889f1978efef57f14f610766242901
Author: Eugene Kirpichov 
AuthorDate: Fri Nov 17 15:41:20 2017 -0800

Makes checkstyle and findbugs happy
---
 .../java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java | 2 +-
 .../core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java  | 2 +-
 .../core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java | 8 
 3 files changed, 6 insertions(+), 6 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 3467d53..edf513b 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -1344,7 +1344,7 @@ public class DataflowRunnerTest implements Serializable {
 (WriteFiles)
 
factory.getReplacementTransform(originalApplication).getTransform();
 assertThat(replacement, not(equalTo((Object) original)));
-assertThat(replacement.getNumShards().get(), equalTo(expectedNumShards));
+assertThat(replacement.getNumShardsProvider().get(), 
equalTo(expectedNumShards));
   }
 
   private static class TestSink extends FileBasedSink {
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 20d2a27..12c4555 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -874,7 +874,7 @@ public abstract class FileBasedSink
 /** Unique id for this output bundle. */
 private @Nullable String id;
 
-private DestinationT destination;
+private @Nullable DestinationT destination;
 
 /** The output file for this bundle. May be null if opening failed. */
 private @Nullable ResourceId outputFile;
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index d6c5788..7e04332 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -170,10 +170,10 @@ public abstract class WriteFiles
 FileBasedSink sink);
 
 abstract Builder setComputeNumShards(
-PTransform 
computeNumShards);
+@Nullable PTransform 
computeNumShards);
 
 abstract Builder setNumShardsProvider(
-ValueProvider numShardsProvider);
+@Nullable ValueProvider numShardsProvider);
 
 abstract Builder setWindowedWrites(boolean 
windowedWrites);
 
@@ -604,12 +604,12 @@ public abstract class WriteFiles
   extends PTransform> {
 private final Coder destinationCoder;
 private final Coder fileResultCoder;
-private final PCollectionView numShardsView;
+private final @Nullable PCollectionView numShardsView;
 
 private WriteShardedBundlesToTempFiles(
 Coder destinationCoder,
 Coder fileResultCoder,
-PCollectionView numShardsView) {
+@Nullable PCollectionView numShardsView) {
   this.destinationCoder = destinationCoder;
   this.fileResultCoder = fileResultCoder;
   this.numShardsView = numShardsView;

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" .


[beam] 10/13: Renames spilled back to unwritten

2017-12-05 Thread jkff
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 83837eb20c1e1e4793f8410de1dc6d5864586f7f
Author: Eugene Kirpichov 
AuthorDate: Wed Nov 22 16:34:46 2017 -0800

Renames spilled back to unwritten
---
 .../src/main/java/org/apache/beam/sdk/io/WriteFiles.java| 13 +++--
 1 file changed, 7 insertions(+), 6 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 7e04332..12f5cce 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -403,22 +403,23 @@ public abstract class WriteFiles
 @Override
 public PCollection expand(PCollection 
input) {
   TupleTag writtenRecordsTag = new 
TupleTag<>("writtenRecords");
-  TupleTag> spilledRecordsTag = new 
TupleTag<>("spilledRecords");
+  TupleTag> unwrittenRecordsTag =
+  new TupleTag<>("unwrittenRecords");
   PCollectionTuple writeTuple =
   input.apply(
   "WriteUnshardedBundles",
   ParDo.of(
   new WriteUnshardedTempFilesWithSpillingFn(
-  spilledRecordsTag, destinationCoder))
+  unwrittenRecordsTag, destinationCoder))
   .withSideInputs(getSideInputs())
-  .withOutputTags(writtenRecordsTag, 
TupleTagList.of(spilledRecordsTag)));
+  .withOutputTags(writtenRecordsTag, 
TupleTagList.of(unwrittenRecordsTag)));
   PCollection writtenBundleFiles =
   writeTuple.get(writtenRecordsTag).setCoder(fileResultCoder);
   // Any "spilled" elements are written using WriteShardedBundles. Assign 
shard numbers in
   // finalize to stay consistent with what WriteWindowedBundles does.
   PCollection writtenSpilledFiles =
   writeTuple
-  .get(spilledRecordsTag)
+  .get(unwrittenRecordsTag)
   .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), 
input.getCoder()))
   // Here we group by a synthetic shard number in the range [0, 
spill factor),
   // just for the sake of getting some parallelism within each 
destination when
@@ -426,9 +427,9 @@ public abstract class WriteFiles
   // number assigned at all. Drop the shard number on the spilled 
records so that
   // shard numbers are assigned together to both the spilled and 
non-spilled files in
   // finalize.
-  .apply("GroupSpilled", GroupByKey.create())
+  .apply("GroupUnwritten", GroupByKey.create())
   .apply(
-  "WriteSpilled",
+  "WriteUnwritten",
   ParDo.of(new 
WriteShardsIntoTempFilesFn()).withSideInputs(getSideInputs()))
   .setCoder(fileResultCoder)
   .apply(

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" .


[beam] 13/13: This closes #4145: Many simplifications to WriteFiles

2017-12-05 Thread jkff
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 761ec1af410f0ee153893f6e7082db85d0fdc3e7
Merge: b059664 d314339
Author: Eugene Kirpichov 
AuthorDate: Tue Dec 5 16:27:17 2017 -0800

This closes #4145: Many simplifications to WriteFiles

 .../apache/beam/examples/WindowedWordCount.java|5 +-
 .../examples/common/WriteOneFilePerWindow.java |   12 +-
 .../apache/beam/examples/WindowedWordCountIT.java  |8 +
 .../beam/runners/apex/examples/WordCountTest.java  |2 +-
 .../core/construction/WriteFilesTranslation.java   |7 +-
 .../construction/WriteFilesTranslationTest.java|   13 +-
 .../beam/runners/dataflow/DataflowRunnerTest.java  |2 +-
 .../beam/runners/spark/io/AvroPipelineTest.java|5 +-
 .../java/org/apache/beam/sdk/io/FileBasedSink.java |  281 +++---
 .../java/org/apache/beam/sdk/io/WriteFiles.java| 1061 
 .../java/org/apache/beam/sdk/transforms/Reify.java |   73 +-
 .../apache/beam/sdk/values/TypeDescriptors.java|4 +
 .../org/apache/beam/sdk/io/FileBasedSinkTest.java  |   66 +-
 .../org/apache/beam/sdk/io/WriteFilesTest.java |   27 +-
 14 files changed, 745 insertions(+), 821 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" .


[jira] [Created] (BEAM-3303) Go windowing support

2017-12-05 Thread Henning Rohde (JIRA)
Henning Rohde created BEAM-3303:
---

 Summary: Go windowing support
 Key: BEAM-3303
 URL: https://issues.apache.org/jira/browse/BEAM-3303
 Project: Beam
  Issue Type: Improvement
  Components: sdk-go
Reporter: Henning Rohde
Assignee: Henning Rohde
Priority: Minor


Add support for Window.into and windowing strategies on Node. Implement the 
various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[beam] 01/01: Merge pull request #4218: Build output redirection option

2017-12-05 Thread kenn
This is an automated email from the ASF dual-hosted git repository.

kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit b0596642561c386ceb4b28f3d83befd2757d63be
Merge: fd8ad27 5bc69dd
Author: Kenn Knowles 
AuthorDate: Tue Dec 5 16:25:49 2017 -0800

Merge pull request #4218: Build output redirection option

 pom.xml | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)


-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" .


[jira] [Created] (BEAM-3302) Go CoGBK support

2017-12-05 Thread Henning Rohde (JIRA)
Henning Rohde created BEAM-3302:
---

 Summary: Go CoGBK support
 Key: BEAM-3302
 URL: https://issues.apache.org/jira/browse/BEAM-3302
 Project: Beam
  Issue Type: Improvement
  Components: sdk-go
Reporter: Henning Rohde
Assignee: Henning Rohde
Priority: Minor


Consider using the java approach and implement as union coder over GBK, given 
that runners may not have CoGBK support.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Build failed in Jenkins: beam_PerformanceTests_Python #643

2017-12-05 Thread Apache Jenkins Server
See 


Changes:

[msaul] Added loose failure mode to allow individual VCF record reads to fail

[kirpichov] Fixes a bug in Sample.Any

--
Started by timer
[EnvInject] - Loading node environment variables.
Building remotely on beam7 (beam) in workspace 

 > git rev-parse --is-inside-work-tree # timeout=10
Fetching changes from the remote Git repository
 > git config remote.origin.url https://github.com/apache/beam.git # timeout=10
Fetching upstream changes from https://github.com/apache/beam.git
 > git --version # timeout=10
 > git fetch --tags --progress https://github.com/apache/beam.git 
 > +refs/heads/*:refs/remotes/origin/* 
 > +refs/pull/${ghprbPullId}/*:refs/remotes/origin/pr/${ghprbPullId}/*
 > git rev-parse origin/master^{commit} # timeout=10
Checking out Revision fd8ad273061717d8d0140c7e905f9478ed0a9d6c (origin/master)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f fd8ad273061717d8d0140c7e905f9478ed0a9d6c
Commit message: "This closes #4216: Fixes a bug in Sample.Any"
 > git rev-list 6474616ff1a2078f35cce60c9e6cd6991b95b3dd # timeout=10
Cleaning workspace
 > git rev-parse --verify HEAD # timeout=10
Resetting working tree
 > git reset --hard # timeout=10
 > git clean -fdx # timeout=10
[EnvInject] - Executing scripts and injecting environment variables after the 
SCM step.
[EnvInject] - Injecting as environment variables the properties content 
SPARK_LOCAL_IP=127.0.0.1

[EnvInject] - Variables injected successfully.
[beam_PerformanceTests_Python] $ /bin/bash -xe 
/tmp/jenkins7047280778716289921.sh
+ rm -rf PerfKitBenchmarker
[beam_PerformanceTests_Python] $ /bin/bash -xe 
/tmp/jenkins2563229367503599425.sh
+ git clone https://github.com/GoogleCloudPlatform/PerfKitBenchmarker.git
Cloning into 'PerfKitBenchmarker'...
[beam_PerformanceTests_Python] $ /bin/bash -xe 
/tmp/jenkins2112765337655753221.sh
+ pip install --user -r PerfKitBenchmarker/requirements.txt
Requirement already satisfied: absl-py in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 14))
Requirement already satisfied: jinja2>=2.7 in 
/usr/local/lib/python2.7/dist-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 15))
Requirement already satisfied: setuptools in /usr/lib/python2.7/dist-packages 
(from -r PerfKitBenchmarker/requirements.txt (line 16))
Requirement already satisfied: colorlog[windows]==2.6.0 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 17))
Requirement already satisfied: blinker>=1.3 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 18))
Requirement already satisfied: futures>=3.0.3 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 19))
Requirement already satisfied: PyYAML==3.12 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 20))
Requirement already satisfied: pint>=0.7 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 21))
Requirement already satisfied: numpy in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 22))
Requirement already satisfied: functools32 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 23))
Requirement already satisfied: contextlib2>=0.5.1 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 24))
Requirement already satisfied: pywinrm in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 25))
Requirement already satisfied: six in 
/home/jenkins/.local/lib/python2.7/site-packages (from absl-py->-r 
PerfKitBenchmarker/requirements.txt (line 14))
Requirement already satisfied: MarkupSafe>=0.23 in 
/usr/local/lib/python2.7/dist-packages (from jinja2>=2.7->-r 
PerfKitBenchmarker/requirements.txt (line 15))
Requirement already satisfied: colorama; extra == "windows" in 
/usr/lib/python2.7/dist-packages (from colorlog[windows]==2.6.0->-r 
PerfKitBenchmarker/requirements.txt (line 17))
Requirement already satisfied: xmltodict in 
/home/jenkins/.local/lib/python2.7/site-packages (from pywinrm->-r 
PerfKitBenchmarker/requirements.txt (line 25))
Requirement already satisfied: requests-ntlm>=0.3.0 in 
/home/jenkins/.local/lib/python2.7/site-packages (from pywinrm->-r 
PerfKitBenchmarker/requirements.txt (line 25))
Requirement already satisfied: requests>=2.9.1 in 
/home/jenkins/.local/lib/python2.7/site-packages (from pywinrm->-r 
PerfKitBenchmarker/requirements.txt (line 25))
Requirement already satisfied: ntlm-auth>=1.0.2 in 

[jira] [Created] (BEAM-3301) Go SplittableDoFn support

2017-12-05 Thread Henning Rohde (JIRA)
Henning Rohde created BEAM-3301:
---

 Summary: Go SplittableDoFn support
 Key: BEAM-3301
 URL: https://issues.apache.org/jira/browse/BEAM-3301
 Project: Beam
  Issue Type: Improvement
  Components: sdk-go
Reporter: Henning Rohde
Assignee: Henning Rohde
Priority: Minor


SDFs will be the only way to add streaming and liquid sharded IO for Go.

Design doc: https://s.apache.org/splittable-do-fn



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-3300) Portable flattens in Go SDK Harness

2017-12-05 Thread Henning Rohde (JIRA)
Henning Rohde created BEAM-3300:
---

 Summary: Portable flattens in Go SDK Harness
 Key: BEAM-3300
 URL: https://issues.apache.org/jira/browse/BEAM-3300
 Project: Beam
  Issue Type: Sub-task
  Components: sdk-go
Reporter: Henning Rohde
Assignee: Henning Rohde






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-3299) Go SDK support for portable progress reporting

2017-12-05 Thread Henning Rohde (JIRA)
Henning Rohde created BEAM-3299:
---

 Summary: Go SDK support for portable progress reporting
 Key: BEAM-3299
 URL: https://issues.apache.org/jira/browse/BEAM-3299
 Project: Beam
  Issue Type: Sub-task
  Components: sdk-go
Reporter: Henning Rohde






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-3299) Go SDK support for portable progress reporting

2017-12-05 Thread Henning Rohde (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henning Rohde updated BEAM-3299:

Priority: Minor  (was: Major)

> Go SDK support for portable progress reporting
> --
>
> Key: BEAM-3299
> URL: https://issues.apache.org/jira/browse/BEAM-3299
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-go
>Reporter: Henning Rohde
>Priority: Minor
>  Labels: portability
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-3298) Go SDK support for portable user state

2017-12-05 Thread Henning Rohde (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henning Rohde updated BEAM-3298:

Labels: portability  (was: )

> Go SDK support for portable user state
> --
>
> Key: BEAM-3298
> URL: https://issues.apache.org/jira/browse/BEAM-3298
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-go
>Reporter: Henning Rohde
>Priority: Minor
>  Labels: portability
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (BEAM-3298) Go SDK support for portable user state

2017-12-05 Thread Henning Rohde (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henning Rohde reassigned BEAM-3298:
---

Assignee: (was: Henning Rohde)

> Go SDK support for portable user state
> --
>
> Key: BEAM-3298
> URL: https://issues.apache.org/jira/browse/BEAM-3298
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-go
>Reporter: Henning Rohde
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-3298) Go SDK support for portable user state

2017-12-05 Thread Henning Rohde (JIRA)
Henning Rohde created BEAM-3298:
---

 Summary: Go SDK support for portable user state
 Key: BEAM-3298
 URL: https://issues.apache.org/jira/browse/BEAM-3298
 Project: Beam
  Issue Type: Sub-task
  Components: sdk-go
Reporter: Henning Rohde
Assignee: Henning Rohde
Priority: Minor






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Build failed in Jenkins: beam_PerformanceTests_Spark #1087

2017-12-05 Thread Apache Jenkins Server
See 


Changes:

[msaul] Added loose failure mode to allow individual VCF record reads to fail

[kirpichov] Fixes a bug in Sample.Any

--
Started by timer
[EnvInject] - Loading node environment variables.
Building remotely on beam2 (beam) in workspace 

 > git rev-parse --is-inside-work-tree # timeout=10
Fetching changes from the remote Git repository
 > git config remote.origin.url https://github.com/apache/beam.git # timeout=10
Fetching upstream changes from https://github.com/apache/beam.git
 > git --version # timeout=10
 > git fetch --tags --progress https://github.com/apache/beam.git 
 > +refs/heads/*:refs/remotes/origin/* 
 > +refs/pull/${ghprbPullId}/*:refs/remotes/origin/pr/${ghprbPullId}/*
 > git rev-parse origin/master^{commit} # timeout=10
Checking out Revision fd8ad273061717d8d0140c7e905f9478ed0a9d6c (origin/master)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f fd8ad273061717d8d0140c7e905f9478ed0a9d6c
Commit message: "This closes #4216: Fixes a bug in Sample.Any"
 > git rev-list 6474616ff1a2078f35cce60c9e6cd6991b95b3dd # timeout=10
Cleaning workspace
 > git rev-parse --verify HEAD # timeout=10
Resetting working tree
 > git reset --hard # timeout=10
 > git clean -fdx # timeout=10
[EnvInject] - Executing scripts and injecting environment variables after the 
SCM step.
[EnvInject] - Injecting as environment variables the properties content 
SPARK_LOCAL_IP=127.0.0.1

[EnvInject] - Variables injected successfully.
[beam_PerformanceTests_Spark] $ /bin/bash -xe /tmp/jenkins6931077741878248247.sh
+ rm -rf PerfKitBenchmarker
[beam_PerformanceTests_Spark] $ /bin/bash -xe /tmp/jenkins6106531060924756386.sh
+ git clone https://github.com/GoogleCloudPlatform/PerfKitBenchmarker.git
Cloning into 'PerfKitBenchmarker'...
[beam_PerformanceTests_Spark] $ /bin/bash -xe /tmp/jenkins2982872821786694055.sh
+ pip install --user -r PerfKitBenchmarker/requirements.txt
Requirement already satisfied: absl-py in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 14))
Requirement already satisfied: jinja2>=2.7 in 
/usr/local/lib/python2.7/dist-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 15))
Requirement already satisfied: setuptools in /usr/lib/python2.7/dist-packages 
(from -r PerfKitBenchmarker/requirements.txt (line 16))
Requirement already satisfied: colorlog[windows]==2.6.0 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 17))
Requirement already satisfied: blinker>=1.3 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 18))
Requirement already satisfied: futures>=3.0.3 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 19))
Requirement already satisfied: PyYAML==3.12 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 20))
Requirement already satisfied: pint>=0.7 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 21))
Requirement already satisfied: numpy in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 22))
Requirement already satisfied: functools32 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 23))
Requirement already satisfied: contextlib2>=0.5.1 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 24))
Requirement already satisfied: pywinrm in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 25))
Requirement already satisfied: six in 
/home/jenkins/.local/lib/python2.7/site-packages (from absl-py->-r 
PerfKitBenchmarker/requirements.txt (line 14))
Requirement already satisfied: MarkupSafe in 
/usr/local/lib/python2.7/dist-packages (from jinja2>=2.7->-r 
PerfKitBenchmarker/requirements.txt (line 15))
Requirement already satisfied: colorama; extra == "windows" in 
/usr/lib/python2.7/dist-packages (from colorlog[windows]==2.6.0->-r 
PerfKitBenchmarker/requirements.txt (line 17))
Requirement already satisfied: xmltodict in 
/home/jenkins/.local/lib/python2.7/site-packages (from pywinrm->-r 
PerfKitBenchmarker/requirements.txt (line 25))
Requirement already satisfied: requests-ntlm>=0.3.0 in 
/home/jenkins/.local/lib/python2.7/site-packages (from pywinrm->-r 
PerfKitBenchmarker/requirements.txt (line 25))
Requirement already satisfied: requests>=2.9.1 in 
/home/jenkins/.local/lib/python2.7/site-packages (from pywinrm->-r 
PerfKitBenchmarker/requirements.txt (line 25))
Requirement already satisfied: ntlm-auth>=1.0.2 in 

[jira] [Commented] (BEAM-413) Mean$CountSum tests for floating point equality

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16279330#comment-16279330
 ] 

ASF GitHub Bot commented on BEAM-413:
-

huygaa11 opened a new pull request #4219: [BEAM-413] Created local annotation 
for floating point equality warning.
URL: https://github.com/apache/beam/pull/4219
 
 
   Equals method for the CountSum is used in a test that makes sure encoding 
and decoding CountSum object results in an equal object. Since 
coderDecodeEncodeEqual method below takes an object and does the equality check 
automatically, it is easier to keep the equals() function with annotation than 
removing it and writing custom method instead of coderDecodeEncodeEqual. 
   
   CoderProperties.coderDecodeEncodeEqual(coder, countSumObject);


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Mean$CountSum tests for floating point equality
> ---
>
> Key: BEAM-413
> URL: https://issues.apache.org/jira/browse/BEAM-413
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Scott Wegner
>Assignee: Batkhuyag Batsaikhan
>Priority: Minor
>  Labels: findbugs, newbie, starter
>
> [FindBugs 
> FE_FLOATING_POINT_EQUALITY|https://github.com/apache/incubator-beam/blob/58a029a06aea1030279e5da8f9fa3114f456c1db/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml#L298]:
>  Test for floating point equality
> Applies to: 
> [Mean$CountSum.equals|https://github.com/apache/incubator-beam/blob/58a029a06aea1030279e5da8f9fa3114f456c1db/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java#L165].
> This is a good starter bug. When fixing, please remove the corresponding 
> entries from 
> [findbugs-filter.xml|https://github.com/apache/incubator-beam/blob/master/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml]
>  and verify the build passes.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Jenkins build is back to stable : beam_PostCommit_Java_ValidatesRunner_Spark #3631

2017-12-05 Thread Apache Jenkins Server
See 




[beam] branch master updated: Fixes a bug in Sample.Any

2017-12-05 Thread jkff
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new f3f80d8  Fixes a bug in Sample.Any
 new fd8ad27  This closes #4216: Fixes a bug in Sample.Any
f3f80d8 is described below

commit f3f80d841897f4481fdb51609659d76c3fe7e550
Author: Eugene Kirpichov 
AuthorDate: Mon Dec 4 19:08:14 2017 -0800

Fixes a bug in Sample.Any

(I introduced the bug while merging a different PR...)
Also adds a test for the combine fn and exposes the fn.
Documents the difference between any() and fixedSize().
---
 .../java/org/apache/beam/sdk/testing/CombineFnTester.java |  2 +-
 .../main/java/org/apache/beam/sdk/transforms/Sample.java  | 15 +--
 .../java/org/apache/beam/sdk/transforms/SampleTest.java   | 15 +++
 3 files changed, 29 insertions(+), 3 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CombineFnTester.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CombineFnTester.java
index efd2af3..896d955 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CombineFnTester.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CombineFnTester.java
@@ -95,7 +95,7 @@ public class CombineFnTester {
   CombineFn fn,
   List> shards,
   Matcher matcher) {
-AccumT accumulator = null;
+AccumT accumulator = shards.isEmpty() ? fn.createAccumulator() : null;
 for (AccumT inputAccum : combineInputs(fn, shards)) {
   if (accumulator == null) {
 accumulator = inputAccum;
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
index 2eb12d6..d7cba7e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
@@ -38,17 +38,28 @@ import org.apache.beam.sdk.values.PCollection;
  * {@code PCollection}, or samples of the values associated with each
  * key in a {@code PCollection} of {@code KV}s.
  *
+ * {@link #fixedSizeGlobally(int)} and {@link #fixedSizePerKey(int)} compute 
uniformly random
+ * samples. {@link #any(long)} is faster, but provides no uniformity 
guarantees.
+ *
  * {@link #combineFn} can also be used manually, in combination with state 
and with the
  * {@link Combine} transform.
  */
 public class Sample {
 
-  /** Returns a {@link CombineFn} that computes a fixed-sized sample of its 
inputs. */
+  /** Returns a {@link CombineFn} that computes a fixed-sized uniform sample 
of its inputs. */
   public static  CombineFn combineFn(int sampleSize) {
 return new FixedSizedSampleFn<>(sampleSize);
   }
 
   /**
+   * Returns a {@link CombineFn} that computes a fixed-sized potentially 
non-uniform sample of its
+   * inputs.
+   */
+  public static  CombineFn anyCombineFn(int sampleSize) {
+return new SampleAnyCombineFn<>(sampleSize);
+  }
+
+  /**
* {@code Sample#any(long)} takes a {@code PCollection} and a limit, and
* produces a new {@code PCollection} containing up to limit
* elements of the input {@code PCollection}.
@@ -233,10 +244,10 @@ public class Sample {
   List res = iter.next();
   while (iter.hasNext()) {
 for (T t : iter.next()) {
-  res.add(t);
   if (res.size() >= limit) {
 return res;
   }
+  res.add(t);
 }
   }
   return res;
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
index 357f256..ed6905d 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
@@ -20,6 +20,9 @@ package org.apache.beam.sdk.transforms;
 import static com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.isIn;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -37,6 +40,7 @@ import java.util.TreeSet;
 import org.apache.beam.sdk.TestUtils;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.CombineFnTester;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.ValidatesRunner;
@@ -46,6 +50,7 @@ import 

[jira] [Commented] (BEAM-3297) StartBundle/FinishBundle should be called only if there are elements

2017-12-05 Thread Henning Rohde (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16279249#comment-16279249
 ] 

Henning Rohde commented on BEAM-3297:
-

[~kenn] is my understanding correct here?

> StartBundle/FinishBundle should be called only if there are elements 
> -
>
> Key: BEAM-3297
> URL: https://issues.apache.org/jira/browse/BEAM-3297
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Minor
>
> I believe the semantics is lazy. Exec currently calls SB/FB eagerly 
> irrespectively of whether any elements are emitted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-3297) StartBundle/FinishBundle should be called only if there are elements

2017-12-05 Thread Henning Rohde (JIRA)
Henning Rohde created BEAM-3297:
---

 Summary: StartBundle/FinishBundle should be called only if there 
are elements 
 Key: BEAM-3297
 URL: https://issues.apache.org/jira/browse/BEAM-3297
 Project: Beam
  Issue Type: Bug
  Components: sdk-go
Reporter: Henning Rohde
Assignee: Henning Rohde
Priority: Minor


I believe the semantics is lazy. Exec currently calls SB/FB eagerly 
irrespectively of whether any elements are emitted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-3295) Consider: make KV more convenient

2017-12-05 Thread Henning Rohde (JIRA)
Henning Rohde created BEAM-3295:
---

 Summary: Consider: make KV more convenient
 Key: BEAM-3295
 URL: https://issues.apache.org/jira/browse/BEAM-3295
 Project: Beam
  Issue Type: Improvement
  Components: sdk-go
Reporter: Henning Rohde
Priority: Minor


The KV design makes it implicit (and hence a second-class value). We currently 
need to shim a KV into a struct for certain operations that work without such 
need in Java (where KV is a first-class value). This is a tax to users. Maybe 
we should, say:
  
   (1) make utilities for pair predicates, etc and have top.Largest, 
filter.Include accept KV input and a multi-arity functions? 
   (2) automatically generate KV types implicitly plus helpers to generate 
component-wise operations on such types? top.Largest would then do have to be 
changed.
   (3) add nested KV in some cases and either not allow runtime user 
manipulation (via beam.T, say) or via a nestable func () (A,B). Less obvious is 
a good the emitter form. 
   (4) something else? (or do nothing)

Approach 1 is essentially to embrace the 2nd class nature of KVs and make it 
more convenient to manage the different cases (such as in debug.Printf) whereas 
approach 2 is to coerce KVs into 1st class values easily/on demand and add 
utilities to help work with these values. Option 3 would make KV more -- but 
not fully -- 1st class.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-3294) Move to graph.External and remove Source/Sink

2017-12-05 Thread Henning Rohde (JIRA)
Henning Rohde created BEAM-3294:
---

 Summary: Move to graph.External and remove Source/Sink
 Key: BEAM-3294
 URL: https://issues.apache.org/jira/browse/BEAM-3294
 Project: Beam
  Issue Type: Task
  Components: sdk-go
Reporter: Henning Rohde
Assignee: Henning Rohde
Priority: Minor


Cleanup: eliminate Source/Sink and promote beam.External to the graph level.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-3293) Add lazy map side input form

2017-12-05 Thread Henning Rohde (JIRA)
Henning Rohde created BEAM-3293:
---

 Summary: Add lazy map side input form
 Key: BEAM-3293
 URL: https://issues.apache.org/jira/browse/BEAM-3293
 Project: Beam
  Issue Type: Improvement
  Components: sdk-go
Reporter: Henning Rohde
Assignee: Henning Rohde
Priority: Minor


Add InputKinds LazyMap and LazyMultiMap that allow map lookup without reading 
everything to memory. They will be accessed through functions such as:

func(k string) int
func(k string) []int



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2083) Develop a Go SDK for Beam

2017-12-05 Thread Henning Rohde (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henning Rohde updated BEAM-2083:

Description: 
Allow users of the Go programming language (https://golang.org/) to write Beam 
pipelines in this language. The effort is focusing on full-fledged SDK that 
leverages the Beam Fn API to bootstrap a native Go experience.

Initial design:

https://s.apache.org/beam-go-sdk-design-rfc

Development in the go-sdk branch. Work in progress. YMMV.

  was:
Allow users of the Go programming language (https://golang.org/) to write Beam 
pipelines in this language. The effort is focusing on full-fledged SDK that 
leverages the Beam Fn API to bootstrap a native Go experience.

Initial design:

https://s.apache.org/beam-go-sdk-design-rfc

Development in the go-sdk branch (pending PR#4200). Work in progress. YMMV.


> Develop a Go SDK for Beam
> -
>
> Key: BEAM-2083
> URL: https://issues.apache.org/jira/browse/BEAM-2083
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-go
>Reporter: Bill Neubauer
>Assignee: Henning Rohde
>
> Allow users of the Go programming language (https://golang.org/) to write 
> Beam pipelines in this language. The effort is focusing on full-fledged SDK 
> that leverages the Beam Fn API to bootstrap a native Go experience.
> Initial design:
> https://s.apache.org/beam-go-sdk-design-rfc
> Development in the go-sdk branch. Work in progress. YMMV.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2083) Develop a Go SDK for Beam

2017-12-05 Thread Henning Rohde (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henning Rohde updated BEAM-2083:

Due Date: (was: 31/Oct/17)

> Develop a Go SDK for Beam
> -
>
> Key: BEAM-2083
> URL: https://issues.apache.org/jira/browse/BEAM-2083
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-go
>Reporter: Bill Neubauer
>Assignee: Henning Rohde
>
> Allow users of the Go programming language (https://golang.org/) to write 
> Beam pipelines in this language. The effort is focusing on full-fledged SDK 
> that leverages the Beam Fn API to bootstrap a native Go experience.
> Initial design:
> https://s.apache.org/beam-go-sdk-design-rfc
> Development in the go-sdk branch (pending PR#4200). Work in progress. YMMV.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2774) Add I/O source for VCF files (python)

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16279144#comment-16279144
 ] 

ASF GitHub Bot commented on BEAM-2774:
--

chamikaramj closed pull request #4157: [BEAM-2774] Added loose failure mode to 
allow individual VCF record reads to fail
URL: https://github.com/apache/beam/pull/4157
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/io/vcfio.py 
b/sdks/python/apache_beam/io/vcfio.py
index b877a32d01b..80f4631e462 100644
--- a/sdks/python/apache_beam/io/vcfio.py
+++ b/sdks/python/apache_beam/io/vcfio.py
@@ -22,6 +22,8 @@
 
 from __future__ import absolute_import
 
+import logging
+import traceback
 from collections import namedtuple
 
 import vcf
@@ -33,8 +35,8 @@
 from apache_beam.io.textio import _TextSource as TextSource
 from apache_beam.transforms import PTransform
 
-__all__ = ['ReadFromVcf', 'Variant', 'VariantCall', 'VariantInfo']
-
+__all__ = ['ReadFromVcf', 'Variant', 'VariantCall', 'VariantInfo',
+   'MalformedVcfRecord']
 
 # Stores data about variant INFO fields. The type of 'data' is specified in the
 # VCF headers. 'field_count' is a string that specifies the number of fields
@@ -45,6 +47,10 @@
 #   - 'G': one value for each possible genotype.
 #   - 'R': one value for each possible allele (including the reference).
 VariantInfo = namedtuple('VariantInfo', ['data', 'field_count'])
+# Stores data about failed VCF record reads. `line` is the text line that
+# caused the failed read and `file_name` is the name of the file that the read
+# failed in.
+MalformedVcfRecord = namedtuple('MalformedVcfRecord', ['file_name', 'line'])
 MISSING_FIELD_VALUE = '.'  # Indicates field is missing in VCF record.
 PASS_FILTER = 'PASS'  # Indicates that all filters have been passed.
 END_INFO_KEY = 'END'  # The info key that explicitly specifies end of a record.
@@ -223,7 +229,8 @@ def __init__(self,
file_pattern,
compression_type=CompressionTypes.AUTO,
buffer_size=DEFAULT_VCF_READ_BUFFER_SIZE,
-   validate=True):
+   validate=True,
+   allow_malformed_records=False):
 super(_VcfSource, self).__init__(file_pattern,
  compression_type=compression_type,
  validate=validate)
@@ -231,6 +238,7 @@ def __init__(self,
 self._header_lines_per_file = {}
 self._compression_type = compression_type
 self._buffer_size = buffer_size
+self._allow_malformed_records = allow_malformed_records
 
   def read_records(self, file_name, range_tracker):
 record_iterator = _VcfSource._VcfRecordIterator(
@@ -238,6 +246,7 @@ def read_records(self, file_name, range_tracker):
 range_tracker,
 self._pattern,
 self._compression_type,
+self._allow_malformed_records,
 buffer_size=self._buffer_size,
 skip_header_lines=0)
 
@@ -253,10 +262,12 @@ def __init__(self,
  range_tracker,
  file_pattern,
  compression_type,
+ allow_malformed_records,
  **kwargs):
   self._header_lines = []
   self._last_record = None
   self._file_name = file_name
+  self._allow_malformed_records = allow_malformed_records
 
   text_source = TextSource(
   file_pattern,
@@ -274,7 +285,9 @@ def __init__(self,
   try:
 self._vcf_reader = vcf.Reader(fsock=self._create_generator())
   except SyntaxError as e:
-raise ValueError('Invalid VCF header %s' % str(e))
+raise ValueError('An exception was raised when reading header from VCF 
'
+ 'file %s: %s' % (self._file_name,
+  traceback.format_exc(e)))
 
 def _store_header_lines(self, header_lines):
   self._header_lines = header_lines
@@ -301,7 +314,18 @@ def next(self):
 return self._convert_to_variant_record(record, self._vcf_reader.infos,
self._vcf_reader.formats)
   except (LookupError, ValueError) as e:
-raise ValueError('Invalid record in VCF file. Error: %s' % str(e))
+if self._allow_malformed_records:
+  logging.warning(
+  'An exception was raised when reading record from VCF file '
+  '%s. Invalid record was %s: %s',
+  self._file_name, self._last_record, traceback.format_exc(e))
+  return MalformedVcfRecord(self._file_name, self._last_record)
+
+raise ValueError('An exception was raised when reading record from VCF 
'
+ 'file %s. Invalid record was 

[beam] branch master updated (6474616 -> cf38ed3)

2017-12-05 Thread chamikara
This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 6474616  Merge pull request #4085: Introduce a property to influence 
Maven output location.
 add 5792602  Added loose failure mode to allow individual VCF record reads 
to fail
 new cf38ed3  Merge pull request #4157 from mhsaul/vcf-record-loose-mode

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 sdks/python/apache_beam/io/vcfio.py  |  46 +--
 sdks/python/apache_beam/io/vcfio_test.py | 133 +++
 2 files changed, 119 insertions(+), 60 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" '].


[beam] 01/01: Merge pull request #4157 from mhsaul/vcf-record-loose-mode

2017-12-05 Thread chamikara
This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit cf38ed3fc8c03ecdb28c22ca32ae079b7f22d526
Merge: 6474616 5792602
Author: Chamikara Jayalath 
AuthorDate: Tue Dec 5 12:28:02 2017 -0800

Merge pull request #4157 from mhsaul/vcf-record-loose-mode

[BEAM-2774] Added loose failure mode to allow individual VCF record reads 
to fail

 sdks/python/apache_beam/io/vcfio.py  |  46 +--
 sdks/python/apache_beam/io/vcfio_test.py | 133 +++
 2 files changed, 119 insertions(+), 60 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" .


[jira] [Commented] (BEAM-2721) Augment BeamRecordType to do slicing and concatenation.

2017-12-05 Thread Anton Kedin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16279131#comment-16279131
 ] 

Anton Kedin commented on BEAM-2721:
---

After looking further into this, we can likely just remove the whole 
BeamRecordSqlTypes if we find a way to bypass type ints: 
https://issues.apache.org/jira/browse/BEAM-3292

> Augment BeamRecordType to do slicing and concatenation.
> ---
>
> Key: BEAM-2721
> URL: https://issues.apache.org/jira/browse/BEAM-2721
> Project: Beam
>  Issue Type: Bug
>  Components: dsl-sql
>Reporter: Robert Bradshaw
>Assignee: Anton Kedin
>
> Currently in several places we cast to BeamSqlRecordType, extract the field 
> type ints, do the slicing, and then reconstruct a new BeamSqlRecordType. If 
> BeamRecordType had polymorphic methods to slice/concat this would be cleaner 
> and more flexible. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-3292) Remove BeamRecordSqlType

2017-12-05 Thread Anton Kedin (JIRA)
Anton Kedin created BEAM-3292:
-

 Summary: Remove BeamRecordSqlType
 Key: BEAM-3292
 URL: https://issues.apache.org/jira/browse/BEAM-3292
 Project: Beam
  Issue Type: Bug
  Components: dsl-sql
Reporter: Anton Kedin
Assignee: Anton Kedin


[BeamRecordType|https://github.com/apache/beam/blob/39e66e953b0f8e16435acb038cad364acf2b3a57/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java]
 is implemented as 2 lists: the list of field names, and the list of the coders 
for those fields. Both lists are ordered.

[BeamRecordSqlType|https://github.com/apache/beam/blob/2eb7de0fe6e96da9805fc827294da1e1329ff716/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java]
 additionally has a list of 
[java.sql.Types|https://docs.oracle.com/javase/7/docs/api/java/sql/Types.html] 
ints to define types of those fields. It is used to map between Java types, 
Calcite types, and Beam Coders.

This information is not used for anything except for that mapping, which in 
turn is only used to create records and map back to Calcite types.

But because of this indirect mapping we cannot rely on core BeamRecordType and 
are forced to have BeamRecordSqlType. This introduces additional complexity, 
when, for example, generating record types based on pojo classes.

If we could find another mechanism to map Calcite types and java classes to 
Beam Coders bypassing java.sql.Types then we can just use the core 
BeamRecordType and remove the BeamRecordSqlType functionality.

One approach is to have a predefined set of coders which are then used like 
types, e.g.:
{code:java}

public static class SqlCoders {
   public Coder INTEGER = VarIntCoder.of();
   public Coder VARCHAR = StringUtf8COder.of();
   public Coder TIMESTAMP = DateCoder.of();
}
{code}

Problem with that approach is establishing the coders identity. That is, when a 
coder is serialized and then deserialized, it becomes a different instance, so 
we need a mechanism to know the identity or maybe just equality of the coders. 
If this is solved then replacing java.sql.Types with predefined SQL coders like 
above becomes trivial.

Few links on this:
 - 
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java#L56

- 
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslator.java#L34

 - 
https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L391




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-3189) Python Fnapi - Worker speedup

2017-12-05 Thread Ankur Goenka (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ankur Goenka updated BEAM-3189:
---
External issue URL: 
https://docs.google.com/document/d/1mHFaNgHA71RVGLVNrGrHIlWHgJb4tKCJ2qQzS13REY8

> Python Fnapi - Worker speedup
> -
>
> Key: BEAM-3189
> URL: https://issues.apache.org/jira/browse/BEAM-3189
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Affects Versions: 2.3.0
>Reporter: Ankur Goenka
>Assignee: Ankur Goenka
>Priority: Minor
>  Labels: performance, portability
>
> Beam Python SDK is couple of magnitude slower than Java SDK when it comes to 
> stream processing.
> There are two related issues:
> # Given a single core, currently we are not fully utilizing the core because 
> the single thread spends a lot of time on the IO. This is more of a 
> limitation of our implementation rather than a limitation of Python.
> # Given a machine with multiple cores, single Python process could only 
> utilize one core.
> In this task we will only address 1. 2 will be good for future optimization.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2083) Develop a Go SDK for Beam

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16279088#comment-16279088
 ] 

ASF GitHub Bot commented on BEAM-2083:
--

herohde opened a new pull request #4217: [BEAM-2083] Add Go SDK README
URL: https://github.com/apache/beam/pull/4217
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Develop a Go SDK for Beam
> -
>
> Key: BEAM-2083
> URL: https://issues.apache.org/jira/browse/BEAM-2083
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-go
>Reporter: Bill Neubauer
>Assignee: Henning Rohde
>
> Allow users of the Go programming language (https://golang.org/) to write 
> Beam pipelines in this language. The effort is focusing on full-fledged SDK 
> that leverages the Beam Fn API to bootstrap a native Go experience.
> Initial design:
> https://s.apache.org/beam-go-sdk-design-rfc
> Development in the go-sdk branch (pending PR#4200). Work in progress. YMMV.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Jenkins build is still unstable: beam_PostCommit_Java_ValidatesRunner_Spark #3629

2017-12-05 Thread Apache Jenkins Server
See 




Build failed in Jenkins: beam_PerformanceTests_Spark #1086

2017-12-05 Thread Apache Jenkins Server
See 


--
Started by timer
[EnvInject] - Loading node environment variables.
Building remotely on beam2 (beam) in workspace 

 > git rev-parse --is-inside-work-tree # timeout=10
Fetching changes from the remote Git repository
 > git config remote.origin.url https://github.com/apache/beam.git # timeout=10
Fetching upstream changes from https://github.com/apache/beam.git
 > git --version # timeout=10
 > git fetch --tags --progress https://github.com/apache/beam.git 
 > +refs/heads/*:refs/remotes/origin/* 
 > +refs/pull/${ghprbPullId}/*:refs/remotes/origin/pr/${ghprbPullId}/*
 > git rev-parse origin/master^{commit} # timeout=10
Checking out Revision 6474616ff1a2078f35cce60c9e6cd6991b95b3dd (origin/master)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f 6474616ff1a2078f35cce60c9e6cd6991b95b3dd
Commit message: "Merge pull request #4085: Introduce a property to influence 
Maven output location."
 > git rev-list 6474616ff1a2078f35cce60c9e6cd6991b95b3dd # timeout=10
Cleaning workspace
 > git rev-parse --verify HEAD # timeout=10
Resetting working tree
 > git reset --hard # timeout=10
 > git clean -fdx # timeout=10
[EnvInject] - Executing scripts and injecting environment variables after the 
SCM step.
[EnvInject] - Injecting as environment variables the properties content 
SPARK_LOCAL_IP=127.0.0.1

[EnvInject] - Variables injected successfully.
[beam_PerformanceTests_Spark] $ /bin/bash -xe /tmp/jenkins8893985875063206031.sh
+ rm -rf PerfKitBenchmarker
[beam_PerformanceTests_Spark] $ /bin/bash -xe /tmp/jenkins90588913037948700.sh
+ git clone https://github.com/GoogleCloudPlatform/PerfKitBenchmarker.git
Cloning into 'PerfKitBenchmarker'...
[beam_PerformanceTests_Spark] $ /bin/bash -xe /tmp/jenkins7847866367273003253.sh
+ pip install --user -r PerfKitBenchmarker/requirements.txt
Requirement already satisfied: absl-py in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 14))
Requirement already satisfied: jinja2>=2.7 in 
/usr/local/lib/python2.7/dist-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 15))
Requirement already satisfied: setuptools in /usr/lib/python2.7/dist-packages 
(from -r PerfKitBenchmarker/requirements.txt (line 16))
Requirement already satisfied: colorlog[windows]==2.6.0 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 17))
Requirement already satisfied: blinker>=1.3 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 18))
Requirement already satisfied: futures>=3.0.3 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 19))
Requirement already satisfied: PyYAML==3.12 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 20))
Requirement already satisfied: pint>=0.7 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 21))
Requirement already satisfied: numpy in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 22))
Requirement already satisfied: functools32 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 23))
Requirement already satisfied: contextlib2>=0.5.1 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 24))
Requirement already satisfied: pywinrm in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 25))
Requirement already satisfied: six in 
/home/jenkins/.local/lib/python2.7/site-packages (from absl-py->-r 
PerfKitBenchmarker/requirements.txt (line 14))
Requirement already satisfied: MarkupSafe in 
/usr/local/lib/python2.7/dist-packages (from jinja2>=2.7->-r 
PerfKitBenchmarker/requirements.txt (line 15))
Requirement already satisfied: colorama; extra == "windows" in 
/usr/lib/python2.7/dist-packages (from colorlog[windows]==2.6.0->-r 
PerfKitBenchmarker/requirements.txt (line 17))
Requirement already satisfied: xmltodict in 
/home/jenkins/.local/lib/python2.7/site-packages (from pywinrm->-r 
PerfKitBenchmarker/requirements.txt (line 25))
Requirement already satisfied: requests-ntlm>=0.3.0 in 
/home/jenkins/.local/lib/python2.7/site-packages (from pywinrm->-r 
PerfKitBenchmarker/requirements.txt (line 25))
Requirement already satisfied: requests>=2.9.1 in 
/home/jenkins/.local/lib/python2.7/site-packages (from pywinrm->-r 
PerfKitBenchmarker/requirements.txt (line 25))
Requirement already satisfied: ntlm-auth>=1.0.2 in 
/home/jenkins/.local/lib/python2.7/site-packages (from 
requests-ntlm>=0.3.0->pywinrm->-r PerfKitBenchmarker/requirements.txt (line 25))

[jira] [Commented] (BEAM-2400) Null pointer exception when creating a template

2017-12-05 Thread Tobi Vollebregt (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278936#comment-16278936
 ] 

Tobi Vollebregt commented on BEAM-2400:
---

My request: change PipelineResult / waitUntilFinish so you can safely call 
{{p.run().waitUntilFinish()}}, both when staging a template and when running a 
job.

That is, I think it would be nice if {{waitUntilFinish}} is a no-op when 
staging a template.

> Null pointer exception when creating a template
> ---
>
> Key: BEAM-2400
> URL: https://issues.apache.org/jira/browse/BEAM-2400
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Melissa Pashniak
>Priority: Minor
>
> The template is successfully created, but is then followed by a null pointer 
> exception.
> Command:
> mvn compile exec:java  -Dexec.mainClass=com.example.WordCount  
> -Dexec.args="--runner=DataflowRunner \
>   --project=my-project \
>   --stagingLocation=gs://my-bucket/staging \
>   --output=gs://my-bucket/output/outputfile \
>   --templateLocation=gs://my-bucket/templates/mytemplate"
> INFO: Template successfully created.
> [WARNING] 
> java.lang.reflect.InvocationTargetException
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.beam.runners.dataflow.DataflowPipelineJob.getJobWithRetries(DataflowPipelineJob.java:489)
>   at 
> org.apache.beam.runners.dataflow.DataflowPipelineJob.getStateWithRetries(DataflowPipelineJob.java:465)
>   at 
> org.apache.beam.runners.dataflow.DataflowPipelineJob.waitUntilFinish(DataflowPipelineJob.java:304)
>   at 
> org.apache.beam.runners.dataflow.DataflowPipelineJob.waitUntilFinish(DataflowPipelineJob.java:240)
>   at 
> org.apache.beam.runners.dataflow.DataflowPipelineJob.waitUntilFinish(DataflowPipelineJob.java:193)
>   at 
> org.apache.beam.runners.dataflow.DataflowPipelineJob.waitUntilFinish(DataflowPipelineJob.java:186)
>   at com.example.WordCount.main(WordCount.java:184)
>   ... 6 more



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2400) Null pointer exception when creating a template

2017-12-05 Thread Tobi Vollebregt (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278934#comment-16278934
 ] 

Tobi Vollebregt commented on BEAM-2400:
---

I'm running into this as well.

Turns out that when staging a template with the dataflow runner, you cannot 
call {{PipelineResult.waitForFinish()}}. So if you have the common 
{{p.run().waitForFinish();}} then your code throws NullPointerException when 
staging a template.

Experimentally I found that even something like this:

{code}
PipelineResult result = p.run();
if (!result.getState().isTerminal()) {
  result.waitUntilFinish();
}
{code}

will throw, but in this case it gives a more meaningful error:

{code}
Exception in thread "main" java.lang.UnsupportedOperationException: The result 
of template creation should not be used.
{code}

The only way I've found that will call {{waitUntilFinish}} when running the 
pipeline, and not call it when staging a template, is:

{code}
PipelineResult result = p.run();
if (options.getTemplateLocation() == null) {
  result.waitUntilFinish();
}
{code}

> Null pointer exception when creating a template
> ---
>
> Key: BEAM-2400
> URL: https://issues.apache.org/jira/browse/BEAM-2400
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Melissa Pashniak
>Priority: Minor
>
> The template is successfully created, but is then followed by a null pointer 
> exception.
> Command:
> mvn compile exec:java  -Dexec.mainClass=com.example.WordCount  
> -Dexec.args="--runner=DataflowRunner \
>   --project=my-project \
>   --stagingLocation=gs://my-bucket/staging \
>   --output=gs://my-bucket/output/outputfile \
>   --templateLocation=gs://my-bucket/templates/mytemplate"
> INFO: Template successfully created.
> [WARNING] 
> java.lang.reflect.InvocationTargetException
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.beam.runners.dataflow.DataflowPipelineJob.getJobWithRetries(DataflowPipelineJob.java:489)
>   at 
> org.apache.beam.runners.dataflow.DataflowPipelineJob.getStateWithRetries(DataflowPipelineJob.java:465)
>   at 
> org.apache.beam.runners.dataflow.DataflowPipelineJob.waitUntilFinish(DataflowPipelineJob.java:304)
>   at 
> org.apache.beam.runners.dataflow.DataflowPipelineJob.waitUntilFinish(DataflowPipelineJob.java:240)
>   at 
> org.apache.beam.runners.dataflow.DataflowPipelineJob.waitUntilFinish(DataflowPipelineJob.java:193)
>   at 
> org.apache.beam.runners.dataflow.DataflowPipelineJob.waitUntilFinish(DataflowPipelineJob.java:186)
>   at com.example.WordCount.main(WordCount.java:184)
>   ... 6 more



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-3070) Add support for windowed filenames in Python SDK

2017-12-05 Thread Eugene Kirpichov (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278890#comment-16278890
 ] 

Eugene Kirpichov commented on BEAM-3070:


Thanks! However, I strongly recommend to wait until 
https://github.com/apache/beam/pull/3817 lands, and recommend to mimic the 
(simpler and more general) design of that instead.

> Add support for windowed filenames in Python SDK
> 
>
> Key: BEAM-3070
> URL: https://issues.apache.org/jira/browse/BEAM-3070
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Reporter: Vilhelm von Ehrenheim
>Assignee: Vilhelm von Ehrenheim
>
> The Python SDK's FilebasedSink does not support windowed filenames. Currently 
> the only filename template available is based on shards.
> The Java SDK has this feature in the `FilenamePolicy.windowedFilename()`. The 
> python SDk only has a somewhat more simple implementation using a text 
> template and `FileBasedSink._template_to_format()` which does not support 
> windows.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (BEAM-3070) Add support for windowed filenames in Python SDK

2017-12-05 Thread Vilhelm von Ehrenheim (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278786#comment-16278786
 ] 

Vilhelm von Ehrenheim edited comment on BEAM-3070 at 12/5/17 4:42 PM:
--

Sure! I'd be happy to. Might need some pointers though. Maybe I'll ask for help 
on slack in that case. 


was (Author: while):
Sure! I'd be happy to. Might need some pointers though. Maybe I'll ask for help 
in slack in that case. 

> Add support for windowed filenames in Python SDK
> 
>
> Key: BEAM-3070
> URL: https://issues.apache.org/jira/browse/BEAM-3070
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Reporter: Vilhelm von Ehrenheim
>Assignee: Vilhelm von Ehrenheim
>
> The Python SDK's FilebasedSink does not support windowed filenames. Currently 
> the only filename template available is based on shards.
> The Java SDK has this feature in the `FilenamePolicy.windowedFilename()`. The 
> python SDk only has a somewhat more simple implementation using a text 
> template and `FileBasedSink._template_to_format()` which does not support 
> windows.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (BEAM-3070) Add support for windowed filenames in Python SDK

2017-12-05 Thread Vilhelm von Ehrenheim (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vilhelm von Ehrenheim reassigned BEAM-3070:
---

Assignee: Vilhelm von Ehrenheim

> Add support for windowed filenames in Python SDK
> 
>
> Key: BEAM-3070
> URL: https://issues.apache.org/jira/browse/BEAM-3070
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Reporter: Vilhelm von Ehrenheim
>Assignee: Vilhelm von Ehrenheim
>
> The Python SDK's FilebasedSink does not support windowed filenames. Currently 
> the only filename template available is based on shards.
> The Java SDK has this feature in the `FilenamePolicy.windowedFilename()`. The 
> python SDk only has a somewhat more simple implementation using a text 
> template and `FileBasedSink._template_to_format()` which does not support 
> windows.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-3070) Add support for windowed filenames in Python SDK

2017-12-05 Thread Vilhelm von Ehrenheim (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278786#comment-16278786
 ] 

Vilhelm von Ehrenheim commented on BEAM-3070:
-

Sure! I'd be happy to. Might need some pointers though. Maybe I'll ask for help 
in slack in that case. 

> Add support for windowed filenames in Python SDK
> 
>
> Key: BEAM-3070
> URL: https://issues.apache.org/jira/browse/BEAM-3070
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Reporter: Vilhelm von Ehrenheim
>
> The Python SDK's FilebasedSink does not support windowed filenames. Currently 
> the only filename template available is based on shards.
> The Java SDK has this feature in the `FilenamePolicy.windowedFilename()`. The 
> python SDk only has a somewhat more simple implementation using a text 
> template and `FileBasedSink._template_to_format()` which does not support 
> windows.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-3291) Add Kinesis Sink

2017-12-05 Thread JIRA
Ismaël Mejía created BEAM-3291:
--

 Summary: Add Kinesis Sink
 Key: BEAM-3291
 URL: https://issues.apache.org/jira/browse/BEAM-3291
 Project: Beam
  Issue Type: Improvement
  Components: sdk-java-extensions
Affects Versions: Not applicable
Reporter: Ismaël Mejía
Priority: Minor


Currently KinesisIO only has a Read transform, we need to provide a Write too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-3026) Improve retrying in ElasticSearch client

2017-12-05 Thread Etienne Chauchot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278613#comment-16278613
 ] 

Etienne Chauchot commented on BEAM-3026:


Maybe an exponential backoff

> Improve retrying in ElasticSearch client
> 
>
> Key: BEAM-3026
> URL: https://issues.apache.org/jira/browse/BEAM-3026
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: Tim Robertson
>Assignee: Jean-Baptiste Onofré
>
> Currently an overloaded ES server will result in clients failing fast.
> I suggest implementing backoff pauses.  Perhaps something like this:
> {code}
> ElasticsearchIO.ConnectionConfiguration conn = 
> ElasticsearchIO.ConnectionConfiguration
>   .create(new String[]{"http://...:9200"}, "test", "test")
>   .retryWithWaitStrategy(WaitStrategies.exponentialBackoff(1000, 
> TimeUnit.MILLISECONDS)
>   .retryWithStopStrategy(StopStrategies.stopAfterAttempt(10)
> );
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Jenkins build is still unstable: beam_PostCommit_Java_ValidatesRunner_Spark #3628

2017-12-05 Thread Apache Jenkins Server
See 




[jira] [Created] (BEAM-3290) Construct iterators directly if possible to allow spilling to disk

2017-12-05 Thread holdenk (JIRA)
holdenk created BEAM-3290:
-

 Summary: Construct iterators directly if possible to allow 
spilling to disk
 Key: BEAM-3290
 URL: https://issues.apache.org/jira/browse/BEAM-3290
 Project: Beam
  Issue Type: Improvement
  Components: runner-spark
Reporter: holdenk
Assignee: Amit Sela


When you construct a collection first and convert it to an iterator you force 
Spark to evaluate the entire input partition before it can get the first 
element off the output. This breaks some of the spilling to disk Spark can do 
otherwise. Instead chain operations on Iterators.

This is only possible in the Java API for Spark 2 and above (and that's my 
fault from back in my work in the Spark project).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Build failed in Jenkins: beam_PerformanceTests_Spark #1085

2017-12-05 Thread Apache Jenkins Server
See 


--
Started by timer
[EnvInject] - Loading node environment variables.
Building remotely on beam2 (beam) in workspace 

 > git rev-parse --is-inside-work-tree # timeout=10
Fetching changes from the remote Git repository
 > git config remote.origin.url https://github.com/apache/beam.git # timeout=10
Fetching upstream changes from https://github.com/apache/beam.git
 > git --version # timeout=10
 > git fetch --tags --progress https://github.com/apache/beam.git 
 > +refs/heads/*:refs/remotes/origin/* 
 > +refs/pull/${ghprbPullId}/*:refs/remotes/origin/pr/${ghprbPullId}/*
 > git rev-parse origin/master^{commit} # timeout=10
Checking out Revision 6474616ff1a2078f35cce60c9e6cd6991b95b3dd (origin/master)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f 6474616ff1a2078f35cce60c9e6cd6991b95b3dd
Commit message: "Merge pull request #4085: Introduce a property to influence 
Maven output location."
 > git rev-list 6474616ff1a2078f35cce60c9e6cd6991b95b3dd # timeout=10
Cleaning workspace
 > git rev-parse --verify HEAD # timeout=10
Resetting working tree
 > git reset --hard # timeout=10
 > git clean -fdx # timeout=10
[EnvInject] - Executing scripts and injecting environment variables after the 
SCM step.
[EnvInject] - Injecting as environment variables the properties content 
SPARK_LOCAL_IP=127.0.0.1

[EnvInject] - Variables injected successfully.
[beam_PerformanceTests_Spark] $ /bin/bash -xe /tmp/jenkins199182219369561023.sh
+ rm -rf PerfKitBenchmarker
[beam_PerformanceTests_Spark] $ /bin/bash -xe /tmp/jenkins7710766486548273611.sh
+ git clone https://github.com/GoogleCloudPlatform/PerfKitBenchmarker.git
Cloning into 'PerfKitBenchmarker'...
[beam_PerformanceTests_Spark] $ /bin/bash -xe /tmp/jenkins2273044487356752601.sh
+ pip install --user -r PerfKitBenchmarker/requirements.txt
Requirement already satisfied: absl-py in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 14))
Requirement already satisfied: jinja2>=2.7 in 
/usr/local/lib/python2.7/dist-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 15))
Requirement already satisfied: setuptools in /usr/lib/python2.7/dist-packages 
(from -r PerfKitBenchmarker/requirements.txt (line 16))
Requirement already satisfied: colorlog[windows]==2.6.0 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 17))
Requirement already satisfied: blinker>=1.3 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 18))
Requirement already satisfied: futures>=3.0.3 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 19))
Requirement already satisfied: PyYAML==3.12 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 20))
Requirement already satisfied: pint>=0.7 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 21))
Requirement already satisfied: numpy in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 22))
Requirement already satisfied: functools32 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 23))
Requirement already satisfied: contextlib2>=0.5.1 in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 24))
Requirement already satisfied: pywinrm in 
/home/jenkins/.local/lib/python2.7/site-packages (from -r 
PerfKitBenchmarker/requirements.txt (line 25))
Requirement already satisfied: six in 
/home/jenkins/.local/lib/python2.7/site-packages (from absl-py->-r 
PerfKitBenchmarker/requirements.txt (line 14))
Requirement already satisfied: MarkupSafe in 
/usr/local/lib/python2.7/dist-packages (from jinja2>=2.7->-r 
PerfKitBenchmarker/requirements.txt (line 15))
Requirement already satisfied: colorama; extra == "windows" in 
/usr/lib/python2.7/dist-packages (from colorlog[windows]==2.6.0->-r 
PerfKitBenchmarker/requirements.txt (line 17))
Requirement already satisfied: xmltodict in 
/home/jenkins/.local/lib/python2.7/site-packages (from pywinrm->-r 
PerfKitBenchmarker/requirements.txt (line 25))
Requirement already satisfied: requests-ntlm>=0.3.0 in 
/home/jenkins/.local/lib/python2.7/site-packages (from pywinrm->-r 
PerfKitBenchmarker/requirements.txt (line 25))
Requirement already satisfied: requests>=2.9.1 in 
/home/jenkins/.local/lib/python2.7/site-packages (from pywinrm->-r 
PerfKitBenchmarker/requirements.txt (line 25))
Requirement already satisfied: ntlm-auth>=1.0.2 in 
/home/jenkins/.local/lib/python2.7/site-packages (from 
requests-ntlm>=0.3.0->pywinrm->-r PerfKitBenchmarker/requirements.txt (line 25))

[jira] [Commented] (BEAM-3159) DoFnTester should be deprecated in favor of TestPipeline

2017-12-05 Thread Etienne Chauchot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278304#comment-16278304
 ] 

Etienne Chauchot commented on BEAM-3159:


I agree that UDF execution being linked to the runner, it is more 
representative to use DirectRunner and Testpipeline, and also it allows to 
benefit from the new features as soon as they are implemented in the direct 
runner rather than waiting for their support in DoFnTester. That said, consider 
the following example: you have DoFn used in an IO for the write part. It has a 
complex flush mechanism that can be dynamically determined or parametrized by 
the user. If you want to test that the flush occurs at the correct moment based 
on conditions/parameters set, how can you do that without having lower level 
test abilities on the DoFn?

> DoFnTester should be deprecated in favor of TestPipeline
> 
>
> Key: BEAM-3159
> URL: https://issues.apache.org/jira/browse/BEAM-3159
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Ben Chambers
>Assignee: Kenneth Knowles
>Priority: Minor
>
> Reasons:
> 1. The logical unit within a Beam pipeline is a transform. Either a small 
> transform like a ParDo or a larger composite transform. Unit tests should 
> focus on these units, rather than probing specific behaviors of the 
> user-defined functions.
> 2. The way that a runner interacts with a user-defined function is not 
> necessarily obvious. DoFnTester allows testing non-sensical cases that 
> wouldn't arise in practice, since it allows low-level interactions with the 
> actual UDFs.
> Instead, we should encourage the use of TestPipeline with the direct runner. 
> This allows testing a single transform (such as a ParDo running a UDF) in 
> context. It also makes it easier to test things like side-inputs and multiple 
> outputs, since you use the same techniques in the test as you would in a real 
> pipeline, rather than requiring a whole new API.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)