[beam] branch master updated: Adding performance improvements to ApproximateQuantiles. (#13175)

2021-02-18 Thread tvalentyn
This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 66a8a4c  Adding performance improvements to ApproximateQuantiles. 
(#13175)
66a8a4c is described below

commit 66a8a4c5e97f674f594c2194da736ad63a842dfe
Author: Ihor Indyk 
AuthorDate: Fri Feb 19 01:50:04 2021 -0500

Adding performance improvements to ApproximateQuantiles. (#13175)
---
 sdks/python/apache_beam/transforms/stats.pxd |  60 ++
 sdks/python/apache_beam/transforms/stats.py  | 719 ++-
 sdks/python/apache_beam/transforms/stats_test.py | 114 +++-
 sdks/python/setup.py |   1 +
 4 files changed, 610 insertions(+), 284 deletions(-)

diff --git a/sdks/python/apache_beam/transforms/stats.pxd 
b/sdks/python/apache_beam/transforms/stats.pxd
new file mode 100644
index 000..e67c012
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/stats.pxd
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+cimport cython
+from libc.stdint cimport int64_t
+
+cdef class _QuantileSpec(object):
+  cdef readonly int64_t buffer_size
+  cdef readonly int64_t num_buffers
+  cdef readonly bint weighted
+  cdef readonly key
+  cdef readonly bint reverse
+  cdef readonly weighted_key
+  cdef readonly less_than
+
+cdef class _QuantileBuffer(object):
+  cdef readonly elements
+  cdef readonly weights
+  cdef readonly bint weighted
+  cdef readonly int64_t level
+  cdef readonly min_val
+  cdef readonly max_val
+  cdef readonly _iter
+
+cdef class _QuantileState(object):
+  cdef readonly _QuantileSpec spec
+  cdef public buffers
+  cdef public unbuffered_elements
+  cdef public unbuffered_weights
+  cdef public add_unbuffered
+  cpdef bint is_empty(self)
+  @cython.locals(num_new_buffers=int64_t, idx=int64_t)
+  cpdef _add_unbuffered(self, elements, offset_fn)
+  @cython.locals(num_new_buffers=int64_t, idx=int64_t)
+  cpdef _add_unbuffered_weighted(self, elements, offset_fn)
+  cpdef finalize(self)
+  @cython.locals(min_level=int64_t)
+  cpdef collapse_if_needed(self, offset_fn)
+
+
+@cython.locals(new_level=int64_t, new_weight=double, step=double, 
offset=double)
+cdef _QuantileBuffer _collapse(buffers, offset_fn, _QuantileSpec spec)
+
+@cython.locals(j=int64_t)
+cdef _interpolate(buffers, int64_t count, double step, double offset,
+  _QuantileSpec spec)
\ No newline at end of file
diff --git a/sdks/python/apache_beam/transforms/stats.py 
b/sdks/python/apache_beam/transforms/stats.py
index 9d19bc4..bd6dc72 100644
--- a/sdks/python/apache_beam/transforms/stats.py
+++ b/sdks/python/apache_beam/transforms/stats.py
@@ -36,14 +36,12 @@ import heapq
 import itertools
 import logging
 import math
-import sys
 import typing
 from builtins import round
 from typing import Any
-from typing import Generic
-from typing import Iterable
+from typing import Callable
 from typing import List
-from typing import Sequence
+from typing import Tuple
 
 from apache_beam import coders
 from apache_beam import typehints
@@ -61,30 +59,34 @@ T = typing.TypeVar('T')
 K = typing.TypeVar('K')
 V = typing.TypeVar('V')
 
+try:
+  import mmh3  # pylint: disable=import-error
 
-def _get_default_hash_fn():
-  """Returns either murmurhash or md5 based on installation."""
-  try:
-import mmh3  # pylint: disable=import-error
+  def _mmh3_hash(value):
+# mmh3.hash64 returns two 64-bit unsigned integers
+return mmh3.hash64(value, seed=0, signed=False)[0]
+
+  _default_hash_fn = _mmh3_hash
+  _default_hash_fn_type = 'mmh3'
+except ImportError:
+
+  def _md5_hash(value):
+# md5 is a 128-bit hash, so we truncate the hexdigest (string of 32
+# hexadecimal digits) to 16 digits and convert to int to get the 64-bit
+# integer fingerprint.
+return int(hashlib.md5(value).hexdigest()[:16], 16)
 
-def _mmh3_hash(value):
-  # mmh3.hash64 returns two 64-bit unsigned integers
-  return mmh3.hash64(value, seed=0, signed=False)[0]
+  _default_hash_fn = _md5_hash
+  _default_hash_fn_type = 'md5'
 
-return _mmh3_hash
 
-  except ImportError:
+def 

[beam] tag nightly-master updated (364a4a0 -> f09f2af)

2021-02-18 Thread github-bot
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a change to tag nightly-master
in repository https://gitbox.apache.org/repos/asf/beam.git.


*** WARNING: tag nightly-master was modified! ***

from 364a4a0  (commit)
  to f09f2af  (commit)
from 364a4a0  [BEAM-11211] Update pandas and pyarrow in python container 
(#13987)
 add b8dd9cb  Update go version to 1.12.7 (#13996)
 add 4178d26  [BEAM-11125] bump checkerframework to 3.10.0
 add 88918f1  Merge pull request #13978: [BEAM-11125] bump checkerframework 
to 3.10.0
 add b6a59b3  Merge #13558. [BEAM-11494][BEAM-11821] FileIO stops 
overwriting files on retries
 add 3ede658  [BEAM-11801] Don't set useCachedDataPool if an emulator host 
is set
 add c76e835  Merge pull request #13964: [BEAM-11801] Don't set 
useCachedDataPool if an emulator host is set
 add cfe60aa  [BEAM-11747] Better filters for BeamCalc
 add 5828613  Merge pull request #13898: [BEAM-11747] Make BeamCalcRel safe 
for ZetaSQL
 add 6ab69f9  Fix IllegalArgumentException in Interval
 add 8d5b7c7  Merge pull request #13993 from baeminbo/patch-1
 add 5d3ffe7  [BEAM-11747] Disable BeamJavaUdfCalcRule by default.
 add 3dfbaaf  spotless
 add 0c08687  Merge pull request #14010 from ibzib/disable-calcrule
 add e9bdda9  [BEAM-11834] Enable arrays literals to have null values.
 add 3a0a3b4  Merge pull request #14017 from ibzib/BEAM-11834
 add 4cffb7d  [BEAM-10708] Support SqlTransform in container (#13944)
 add ef74412  Enable SideInput metrics for DF worker. These have been 
launched on Dataflow
 add ebafa0e  Merge pull request #14018 from pabloem/enablesim
 add fa3dddb  [BEAM-11760] Cleanup interactive cache just once.
 add 9297cf1  [BEAM-11760] Cleanup interactive cache just once.
 add 3bb529c  Merge remote-tracking branch 'origin/double_cache' into 
double_cache
 add f09f2af  Merge pull request #13909 [BEAM-11760] Cleanup interactive 
cache just once.

No new revisions were added by this update.

Summary of changes:
 buildSrc/build.gradle.kts  |   2 +-
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |   4 +-
 .../dataflow/worker/DataflowWorkUnitClient.java|   8 +-
 .../runners/dataflow/worker/ExperimentContext.java |   3 +-
 .../dataflow/worker/IsmSideInputReader.java|  11 +-
 .../dataflow/worker/IsmSideInputReaderTest.java|   6 +-
 sdks/go/pkg/beam/artifact/materialize.go   |   6 +-
 sdks/go/pkg/beam/core/graph/coder/double_test.go   |   4 +-
 sdks/go/pkg/beam/pardo_test.go |   3 +-
 sdks/go/pkg/beam/transforms/stats/stats.shims.go   | 151 ++---
 .../java/org/apache/beam/sdk/io/FileBasedSink.java |   6 +-
 .../java/org/apache/beam/sdk/io/FileSystems.java   |  46 +--
 .../org/apache/beam/sdk/io/fs/MatchResult.java |   6 +
 .../org/apache/beam/sdk/io/fs/MoveOptions.java |   1 +
 sdks/java/expansion-service/build.gradle   |   2 +-
 .../sdk/extensions/gcp/storage/GcsFileSystem.java  |   3 +
 .../schemaio-expansion-service/build.gradle|   4 +-
 sdks/java/extensions/sql/build.gradle  |   2 +-
 .../extensions/sql/impl/utils/CalciteUtils.java|   2 +-
 .../sql/zetasql/ZetaSQLQueryPlanner.java   |  63 +++--
 .../zetasql/ZetaSqlCalciteTranslationUtils.java|   5 +-
 .../sql/zetasql/ZetaSqlDialectSpecTest.java|  16 +++
 .../extensions/sql/zetasql/ZetaSqlJavaUdfTest.java |  11 +-
 .../apache/beam/sdk/io/aws/s3/S3FileSystem.java|  24 ++--
 .../azure/blobstore/AzureBlobStoreFileSystem.java  |  25 ++--
 .../beam/sdk/io/gcp/bigtable/BigtableConfig.java   |   7 +-
 .../runners/interactive/interactive_environment.py |  10 +-
 .../portability/fn_api_runner/worker_handlers.py   |  11 +-
 28 files changed, 283 insertions(+), 159 deletions(-)



[beam] branch master updated (ebafa0e -> f09f2af)

2021-02-18 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from ebafa0e  Merge pull request #14018 from pabloem/enablesim
 add fa3dddb  [BEAM-11760] Cleanup interactive cache just once.
 add 9297cf1  [BEAM-11760] Cleanup interactive cache just once.
 add 3bb529c  Merge remote-tracking branch 'origin/double_cache' into 
double_cache
 add f09f2af  Merge pull request #13909 [BEAM-11760] Cleanup interactive 
cache just once.

No new revisions were added by this update.

Summary of changes:
 .../apache_beam/runners/interactive/interactive_environment.py | 10 +++---
 1 file changed, 7 insertions(+), 3 deletions(-)



[beam] branch master updated: Enable SideInput metrics for DF worker. These have been launched on Dataflow

2021-02-18 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new ef74412  Enable SideInput metrics for DF worker. These have been 
launched on Dataflow
 new ebafa0e  Merge pull request #14018 from pabloem/enablesim
ef74412 is described below

commit ef7441257d655cd8bdc5ab502acfdaced9ad6e39
Author: Pablo Estrada 
AuthorDate: Thu Feb 18 13:31:08 2021 -0800

Enable SideInput metrics for DF worker. These have been launched on Dataflow
---
 .../beam/runners/dataflow/worker/ExperimentContext.java   |  3 +--
 .../beam/runners/dataflow/worker/IsmSideInputReader.java  | 11 ++-
 .../beam/runners/dataflow/worker/IsmSideInputReaderTest.java  |  6 +-
 3 files changed, 4 insertions(+), 16 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ExperimentContext.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ExperimentContext.java
index 0d9e0ef..d215799 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ExperimentContext.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ExperimentContext.java
@@ -44,8 +44,7 @@ public class ExperimentContext {
  * operations for some IO connectors.
  */
 EnableConscryptSecurityProvider("enable_conscrypt_security_provider"),
-IntertransformIO("intertransform_io"), // Intertransform metrics for 
Shuffle IO (insights)
-SideInputIOMetrics("sideinput_io_metrics"); // Intertransform metrics for 
Side Input IO
+IntertransformIO("intertransform_io"); // Intertransform metrics for 
Shuffle IO (insights)
 
 private final String name;
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java
index 26adc74..37fa6fc 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java
@@ -56,7 +56,6 @@ import org.apache.beam.runners.dataflow.util.CloudObject;
 import org.apache.beam.runners.dataflow.util.CloudObjects;
 import org.apache.beam.runners.dataflow.util.PropertyNames;
 import org.apache.beam.runners.dataflow.util.RandomAccessData;
-import org.apache.beam.runners.dataflow.worker.ExperimentContext.Experiment;
 import org.apache.beam.runners.dataflow.worker.util.WorkerPropertyNames;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
 import org.apache.beam.sdk.coders.Coder;
@@ -218,14 +217,8 @@ public class IsmSideInputReader implements SideInputReader 
{
   throw new Exception("unexpected kind of side input: " + sideInputKind);
 }
 
-SideInputReadCounter sideInputReadCounter;
-ExperimentContext ec = ExperimentContext.parseFrom(options);
-if (ec.isEnabled(Experiment.SideInputIOMetrics)) {
-  sideInputReadCounter =
-  new DataflowSideInputReadCounter(executionContext, operationContext, 
sideInputIndex);
-} else {
-  sideInputReadCounter = new NoopSideInputReadCounter();
-}
+SideInputReadCounter sideInputReadCounter =
+new DataflowSideInputReadCounter(executionContext, operationContext, 
sideInputIndex);
 
 ImmutableList.Builder> builder = ImmutableList.builder();
 for (Source source : sideInputInfo.getSources()) {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java
index 4083ee1..2367eac 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java
@@ -70,7 +70,6 @@ import org.apache.beam.runners.dataflow.util.CloudObjects;
 import org.apache.beam.runners.dataflow.util.PropertyNames;
 import org.apache.beam.runners.dataflow.util.RandomAccessData;
 import 
org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
-import org.apache.beam.runners.dataflow.worker.ExperimentContext.Experiment;
 import 
org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.Kind;
 import org.apache.beam.runners.dataflow.worker.counters.Counter;
 import 

[beam] branch master updated (3a0a3b4 -> 4cffb7d)

2021-02-18 Thread bhulette
This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 3a0a3b4  Merge pull request #14017 from ibzib/BEAM-11834
 add 4cffb7d  [BEAM-10708] Support SqlTransform in container (#13944)

No new revisions were added by this update.

Summary of changes:
 .../runners/portability/fn_api_runner/worker_handlers.py  | 11 ---
 1 file changed, 8 insertions(+), 3 deletions(-)



[beam] branch master updated: [BEAM-11834] Enable arrays literals to have null values.

2021-02-18 Thread ibzib
This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new e9bdda9  [BEAM-11834] Enable arrays literals to have null values.
 new 3a0a3b4  Merge pull request #14017 from ibzib/BEAM-11834
e9bdda9 is described below

commit e9bdda9e037600dcda597fd6b105966b0ebeb073
Author: Kyle Weaver 
AuthorDate: Thu Feb 18 13:09:54 2021 -0800

[BEAM-11834] Enable arrays literals to have null values.
---
 .../beam/sdk/extensions/sql/impl/utils/CalciteUtils.java |  2 +-
 .../sql/zetasql/ZetaSqlCalciteTranslationUtils.java  |  5 -
 .../extensions/sql/zetasql/ZetaSqlDialectSpecTest.java   | 16 
 3 files changed, 21 insertions(+), 2 deletions(-)

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
index acd6e6c..10ad199 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
@@ -234,7 +234,7 @@ public class CalciteUtils {
 return FieldType.row(toSchema(calciteType));
 
   default:
-return toFieldType(calciteType.getSqlTypeName());
+return 
toFieldType(calciteType.getSqlTypeName()).withNullable(calciteType.isNullable());
 }
   }
 
diff --git 
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
 
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
index 203337c..dd59c26 100644
--- 
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
+++ 
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
@@ -290,7 +290,10 @@ public final class ZetaSqlCalciteTranslationUtils {
 
   private static RexNode arrayValueToRexNode(Value value, RexBuilder 
rexBuilder) {
 return rexBuilder.makeCall(
-toCalciteArrayType(value.getType().asArray().getElementType(), false, 
rexBuilder),
+toCalciteArrayType(
+value.getType().asArray().getElementType(),
+value.getElementList().stream().anyMatch(v -> v.isNull()),
+rexBuilder),
 SqlStdOperatorTable.ARRAY_VALUE_CONSTRUCTOR,
 value.getElementList().stream()
 .map(v -> toRexNode(v, rexBuilder))
diff --git 
a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
 
b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
index df5959e..4a07989 100644
--- 
a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
+++ 
b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
@@ -2421,6 +2421,22 @@ public class ZetaSqlDialectSpecTest extends 
ZetaSqlTestBase {
   }
 
   @Test
+  public void testUnnestLiteralWithNullElements() {
+String sql = "SELECT * FROM UNNEST(ARRAY['foo', NULL, 'bar']);";
+ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+
+PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+Schema schema = Schema.builder().addNullableField("str_field", 
FieldType.STRING).build();
+PAssert.that(stream)
+.containsInAnyOrder(
+Row.withSchema(schema).addValues("foo").build(),
+Row.withSchema(schema).addValues((String) null).build(),
+Row.withSchema(schema).addValues("bar").build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
   public void testUNNESTParameters() {
 String sql = "SELECT * FROM UNNEST(@p0);";
 ImmutableMap params =



[beam] branch master updated (8d5b7c7 -> 0c08687)

2021-02-18 Thread ibzib
This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 8d5b7c7  Merge pull request #13993 from baeminbo/patch-1
 new 5d3ffe7  [BEAM-11747] Disable BeamJavaUdfCalcRule by default.
 new 3dfbaaf  spotless
 new 0c08687  Merge pull request #14010 from ibzib/disable-calcrule

The 30660 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/beam/sdk/extensions/sql/zetasql/ZetaSQLQueryPlanner.java | 6 ++
 .../apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTest.java  | 2 ++
 2 files changed, 4 insertions(+), 4 deletions(-)



[beam] branch master updated: Fix IllegalArgumentException in Interval

2021-02-18 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 6ab69f9  Fix IllegalArgumentException in Interval
 new 8d5b7c7  Merge pull request #13993 from baeminbo/patch-1
6ab69f9 is described below

commit 6ab69f9e6cd6f876af325fbc13cc329fb42a068c
Author: Minbo Bae <49642083+baemi...@users.noreply.github.com>
AuthorDate: Sun Feb 14 21:36:40 2021 -0800

Fix IllegalArgumentException in Interval

If a time correction happens, `endTime` can be before `startTime, which 
cause `IllegalArgumentException` (the end instant must be greater than the 
start instant).

Compute the elapsed time by `endTime.getMillis() - startTime.getMills()` 
which may be a negative value but I think it's ok because it's only logging 
purpose.
---
 .../beam/runners/dataflow/worker/DataflowWorkUnitClient.java  | 8 
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
index 618a451..1590225 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
@@ -48,7 +48,6 @@ import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditio
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.joda.time.DateTime;
 import org.joda.time.Duration;
-import org.joda.time.Interval;
 import org.slf4j.Logger;
 
 /** A Dataflow WorkUnit client that fetches WorkItems from the Dataflow 
service. */
@@ -215,14 +214,15 @@ class DataflowWorkUnitClient implements WorkUnitClient {
 && DataflowWorkerLoggingMDC.getStageName() != null) {
   DateTime startTime = stageStartTime.get();
   if (startTime != null) {
-// This thread should have been tagged with the stage start time 
during getWorkItem(),
-Interval elapsed = new Interval(startTime, endTime);
+// elapsed time can be negative by time correction
+long elapsed = endTime.getMillis() - startTime.getMillis();
 int numErrors = workItemStatus.getErrors() == null ? 0 : 
workItemStatus.getErrors().size();
+// This thread should have been tagged with the stage start time 
during getWorkItem(),
 logger.info(
 "Finished processing stage {} with {} errors in {} seconds ",
 DataflowWorkerLoggingMDC.getStageName(),
 numErrors,
-(double) elapsed.toDurationMillis() / 1000);
+(double) elapsed / 1000);
   }
 }
 shortIdCache.shortenIdsIfAvailable(workItemStatus.getCounterUpdates());



[beam] branch master updated (c76e835 -> 5828613)

2021-02-18 Thread apilloud
This is an automated email from the ASF dual-hosted git repository.

apilloud pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from c76e835  Merge pull request #13964: [BEAM-11801] Don't set 
useCachedDataPool if an emulator host is set
 add cfe60aa  [BEAM-11747] Better filters for BeamCalc
 add 5828613  Merge pull request #13898: [BEAM-11747] Make BeamCalcRel safe 
for ZetaSQL

No new revisions were added by this update.

Summary of changes:
 .../sql/zetasql/ZetaSQLQueryPlanner.java   | 57 --
 .../extensions/sql/zetasql/ZetaSqlJavaUdfTest.java |  9 ++--
 2 files changed, 59 insertions(+), 7 deletions(-)



[beam] branch master updated (b6a59b3 -> c76e835)

2021-02-18 Thread iemejia
This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from b6a59b3  Merge #13558. [BEAM-11494][BEAM-11821] FileIO stops 
overwriting files on retries
 add 3ede658  [BEAM-11801] Don't set useCachedDataPool if an emulator host 
is set
 new c76e835  Merge pull request #13964: [BEAM-11801] Don't set 
useCachedDataPool if an emulator host is set

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java   | 7 ---
 1 file changed, 4 insertions(+), 3 deletions(-)



[beam] 01/01: Merge pull request #13964: [BEAM-11801] Don't set useCachedDataPool if an emulator host is set

2021-02-18 Thread iemejia
This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit c76e835f69b7b2328e44a1e46e3f299748959017
Merge: b6a59b3 3ede658
Author: Ismaël Mejía 
AuthorDate: Thu Feb 18 14:46:23 2021 +0100

Merge pull request #13964: [BEAM-11801] Don't set useCachedDataPool if an 
emulator host is set

 .../java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java   | 7 ---
 1 file changed, 4 insertions(+), 3 deletions(-)