Re: [PR] remove processing/scheduling logic from StreamingDataflowWorker [beam]

2024-05-24 Thread via GitHub


m-trieu commented on code in PR #31317:
URL: https://github.com/apache/beam/pull/31317#discussion_r1614313097


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import com.google.auto.value.AutoValue;
+import java.util.HashMap;
+import java.util.Optional;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
+import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutor;
+import org.apache.beam.runners.dataflow.worker.DataflowWorkExecutor;
+import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext;
+import 
org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
+import 
org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter;
+import 
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateReader;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.coders.Coder;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used to process {@link Work} by executing user DoFns for a specific 
computation. May be reused to
+ * process future work items owned a computation.
+ *
+ * Should only be accessed by 1 thread at a time.
+ *
+ * @implNote Once closed, it cannot be reused.
+ */
+// TODO(m-trieu): See if this can be combined/cleaned up with 
StreamingModeExecutionContext as the
+// seperation of responsibilities are unclear.
+@AutoValue
+@Internal
+@NotThreadSafe
+public abstract class ComputationWorkExecutor {

Review Comment:
   @scwhittle this class is ExecutionState.java just renamed accordingly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] remove processing/scheduling logic from StreamingDataflowWorker [beam]

2024-05-24 Thread via GitHub


m-trieu commented on code in PR #31317:
URL: https://github.com/apache/beam/pull/31317#discussion_r1614304621


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.processing;
+
+import java.time.Duration;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class StreamingCommitFinalizer {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamingCommitFinalizer.class);
+  private static final Duration DEFAULT_CACHE_ENTRY_EXPIRY = 
Duration.ofMinutes(5L);
+  private final Cache onCommitFinalizedCache;
+  private final BoundedQueueExecutor workExecutor;
+
+  private StreamingCommitFinalizer(
+  Cache onCommitFinalizedCache, BoundedQueueExecutor 
workExecutor) {
+this.onCommitFinalizedCache = onCommitFinalizedCache;
+this.workExecutor = workExecutor;
+  }
+
+  static StreamingCommitFinalizer create(BoundedQueueExecutor workExecutor) {
+return new StreamingCommitFinalizer(
+
CacheBuilder.newBuilder().expireAfterWrite(DEFAULT_CACHE_ENTRY_EXPIRY).build(),
+workExecutor);
+  }
+
+  /**
+   * Stores a map of user worker generated id's and callbacks to execute once 
a commit has been
+   * successfully committed to the backing state store.
+   */
+  void cacheCommitFinalizers(Map commitCallbacks) {
+onCommitFinalizedCache.putAll(commitCallbacks);
+  }
+
+  /**
+   * Calls callbacks for WorkItem to mark that commit has been persisted 
(finalized) to the backing
+   * state store and to checkpoint the source.
+   */
+  void finalizeCommits(Windmill.WorkItem work) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] remove processing/scheduling logic from StreamingDataflowWorker [beam]

2024-05-24 Thread via GitHub


m-trieu commented on code in PR #31317:
URL: https://github.com/apache/beam/pull/31317#discussion_r1614304419


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##
@@ -193,31 +180,33 @@ public void start(
   for (StepContext stepContext : stepContexts) {
 stepContext.start(
 stateReader,
-inputDataWatermark,
+work.watermarks().inputDataWatermark(),

Review Comment:
   was able to update all of these



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] remove processing/scheduling logic from StreamingDataflowWorker [beam]

2024-05-24 Thread via GitHub


m-trieu commented on code in PR #31317:
URL: https://github.com/apache/beam/pull/31317#discussion_r1614297131


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java:
##
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.processing;
+
+import com.google.api.services.dataflow.model.MapTask;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
+import org.apache.beam.runners.dataflow.worker.DataflowExecutionStateSampler;
+import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutor;
+import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutorFactory;
+import org.apache.beam.runners.dataflow.worker.DataflowWorkExecutor;
+import org.apache.beam.runners.dataflow.worker.HotKeyLogger;
+import org.apache.beam.runners.dataflow.worker.ReaderCache;
+import org.apache.beam.runners.dataflow.worker.WorkItemCancelledException;
+import 
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingMDC;
+import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
+import org.apache.beam.runners.dataflow.worker.streaming.ExecutionState;
+import 
org.apache.beam.runners.dataflow.worker.streaming.KeyCommitTooLargeException;
+import org.apache.beam.runners.dataflow.worker.streaming.StageInfo;
+import org.apache.beam.runners.dataflow.worker.streaming.Work;
+import 
org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingCounters;
+import 
org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import 
org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter;
+import 
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.runners.dataflow.worker.windmill.client.commits.Commit;
+import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateReader;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.FailureTracker;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.WorkFailureProcessor;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.IdGenerator;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Schedules execution of user code to process a {@link
+ * org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem} then 
commits the work item
+ * back to streaming execution backend.
+ */
+@Internal
+@ThreadSafe
+public final class StreamingWorkScheduler {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamingWorkScheduler.class);
+
+  private final DataflowWorkerHarnessOptions options;
+  private final Supplier clock;
+  private final ExecutionStateFactory executionStateFactory;
+  private final SideInputStateFetcher sideInputStateFetcher;
+  private final FailureTracker failureTracker;
+  private final WorkFailureProcessor workFailureProcessor;
+  private final StreamingCommitFinalizer commitFinalizer;
+  private final StreamingCounters streamingCounters;
+  private final HotKeyLogger hotKeyLogger;
+  private final ConcurrentMap stageInfoMap;
+  private final DataflowExecutionStateSampler sampler;
+  private final AtomicInteger maxWorkItemCommitBytes;
+
+  

Re: [PR] Fix an incompatibility with hamcrest 2.2 [beam]

2024-05-24 Thread via GitHub


Abacn merged PR #31395:
URL: https://github.com/apache/beam/pull/31395


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Parse YAML ExpansionService configs directly using SnakeYAML [beam]

2024-05-24 Thread via GitHub


chamikaramj opened a new pull request, #31406:
URL: https://github.com/apache/beam/pull/31406

   This fixes https://github.com/apache/beam/issues/31405.
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] Mention the appropriate issue in your description (for example: 
`addresses #123`), if applicable. This will automatically add a link to the 
pull request in the issue. If you would like the issue to automatically close 
on merging the pull request, comment `fixes #` instead.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier).
   
   To check the build health, please visit 
[https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   

   [![Build python source distribution and 
wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python 
tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java 
tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go 
tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more 
information about GitHub Actions CI or the [workflows 
README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) 
to see a list of phrases to trigger workflows.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] enable BigQueryIO read throttling detection for Python SDK [beam]

2024-05-24 Thread via GitHub


github-actions[bot] commented on PR #31404:
URL: https://github.com/apache/beam/pull/31404#issuecomment-2130548849

   Assigning reviewers. If you would like to opt out of this review, comment 
`assign to next reviewer`:
   
   R: @shunping for label python.
   R: @shunping for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any 
comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review 
comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add try-excepts around data sampler encoding [beam]

2024-05-24 Thread via GitHub


github-actions[bot] commented on PR #31396:
URL: https://github.com/apache/beam/pull/31396#issuecomment-2130522811

   Stopping reviewer notifications for this pull request: review requested by 
someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add try-excepts around data sampler encoding [beam]

2024-05-24 Thread via GitHub


rohdesamuel commented on PR #31396:
URL: https://github.com/apache/beam/pull/31396#issuecomment-2130522353

   R: @KevinGG 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Add in redistribute option for Kafka Read [beam]

2024-05-24 Thread via GitHub


codecov[bot] commented on PR #31347:
URL: https://github.com/apache/beam/pull/31347#issuecomment-2130487239

   ## 
[Codecov](https://app.codecov.io/gh/apache/beam/pull/31347?dropdown=coverage=pr=h1_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 Report
   Attention: Patch coverage is `0%` with `1 lines` in your changes are missing 
coverage. Please review.
   > Project coverage is 71.41%. Comparing base 
[(`092f769`)](https://app.codecov.io/gh/apache/beam/commit/092f769a4b48740c4e6ab6e1ddc2a060986a645e?dropdown=coverage=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 to head 
[(`4ada08d`)](https://app.codecov.io/gh/apache/beam/commit/4ada08d2903e34e641452d8f6cc8f7ac45023bfa?dropdown=coverage=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache).
   > Report is 39 commits behind head on master.
   
   | 
[Files](https://app.codecov.io/gh/apache/beam/pull/31347?dropdown=coverage=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | Patch % | Lines |
   |---|---|---|
   | 
[sdks/python/apache\_beam/io/kafka.py](https://app.codecov.io/gh/apache/beam/pull/31347?src=pr=tree=sdks%2Fpython%2Fapache_beam%2Fio%2Fkafka.py_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8va2Fma2EucHk=)
 | 0.00% | [1 Missing :warning: 
](https://app.codecov.io/gh/apache/beam/pull/31347?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 |
   
   Additional details and impacted files
   
   
   ```diff
   @@ Coverage Diff  @@
   ## master   #31347  +/-   ##
   
   - Coverage 71.44%   71.41%   -0.04% 
 Complexity 1474 1474  
   
 Files   909  910   +1 
 Lines113652   113845 +193 
 Branches   1076 1076  
   
   + Hits  8120081299  +99 
   - Misses3043030526  +96 
   + Partials   2022 2020   -2 
   ```
   
   | 
[Flag](https://app.codecov.io/gh/apache/beam/pull/31347/flags?src=pr=flags_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | Coverage Δ | |
   |---|---|---|
   | 
[python](https://app.codecov.io/gh/apache/beam/pull/31347/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | `81.37% <0.00%> (-0.09%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click 
here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#carryforward-flags-in-the-pull-request-comment)
 to find out more.
   
   
   
   
   [:umbrella: View full report in Codecov by 
Sentry](https://app.codecov.io/gh/apache/beam/pull/31347?dropdown=coverage=pr=continue_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache).
   
   :loudspeaker: Have feedback on the report? [Share it 
here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] enable BigQueryIO read throttling detection for Python SDK [beam]

2024-05-24 Thread via GitHub


Abacn commented on code in PR #31404:
URL: https://github.com/apache/beam/pull/31404#discussion_r1614114675


##
sdks/python/apache_beam/io/gcp/bigquery.py:
##
@@ -1186,11 +1186,19 @@ def split(self, desired_bundle_size, 
start_position=None, stop_position=None):
   parent=parent,
   read_session=requested_session,
   max_stream_count=stream_count)
+  if self.use_native_datetime:
+display_schema = "Arrow Schema:" + str(read_session.arrow_schema)
+  else:
+display_schema = "Avro Schema:" + str(read_session.avro_schema)
   _LOGGER.info(

Review Comment:
   this is to make this log readable. Before it was printing raw proto message 
truncated on streams URLs. Now it only prints the number of streams returned 
and the key fields.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] retry test_big_query_write_temp_table_append_schema_update up to 3 times [beam]

2024-05-24 Thread via GitHub


liferoad commented on PR #31364:
URL: https://github.com/apache/beam/pull/31364#issuecomment-2130482486

   > > @liferoad could you please add details to the PR description about the 
nature of flakiness in case someone looks up this PR to understand why we aded 
it? Thanks!
   > 
   > More details are in the closed PR, which is in the description, :)
   
   updated the descriptions with more details now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] enable BigQueryIO read throttling detection for Python SDK [beam]

2024-05-24 Thread via GitHub


Abacn opened a new pull request, #31404:
URL: https://github.com/apache/beam/pull/31404

   Before (using a gcp project of limited quota):
   
   https://github.com/apache/beam/assets/8010435/ba8bf7fa-8fbe-4c4e-bd44-a41f5c0e7764;>
   
   upscale to 270+ worker aggressively, before cancel the job
   
   After (note: backend change not yet rolled out on prod):
   
   (up-hill part, max worker=181)
   https://github.com/apache/beam/assets/8010435/ef11b1b8-954a-416a-be6c-8b129d9584d2;>
   
   (down-hill part)
   https://github.com/apache/beam/assets/8010435/be766d98-5e54-4643-b2c3-59677415e6bb;>
   
   note: autoscaler won't kill active worker until current work item finish, 
and there is a 6 min lapse from worker started to reactive downscale working, 
see https://github.com/apache/beam/pull/31253#issuecomment-2118608486
   
   **Please** add a meaningful description for your change here
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] Mention the appropriate issue in your description (for example: 
`addresses #123`), if applicable. This will automatically add a link to the 
pull request in the issue. If you would like the issue to automatically close 
on merging the pull request, comment `fixes #` instead.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier).
   
   To check the build health, please visit 
[https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   

   [![Build python source distribution and 
wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python 
tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java 
tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go 
tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more 
information about GitHub Actions CI or the [workflows 
README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) 
to see a list of phrases to trigger workflows.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] retry test_big_query_write_temp_table_append_schema_update up to 3 times [beam]

2024-05-24 Thread via GitHub


liferoad commented on PR #31364:
URL: https://github.com/apache/beam/pull/31364#issuecomment-2130465877

   > @liferoad could you please add details to the PR description about the 
nature of flakiness in case someone looks up this PR to understand why we aded 
it? Thanks!
   
   More details are in the closed PR, which is in the description, :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [ManagedIO] pass underlying transform URN as an annotation [beam]

2024-05-24 Thread via GitHub


github-actions[bot] commented on PR #31398:
URL: https://github.com/apache/beam/pull/31398#issuecomment-2130455775

   Checks are failing. Will not request review until checks are succeeding. If 
you'd like to override that behavior, comment `assign set of reviewers`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] retry test_big_query_write_temp_table_append_schema_update up to 3 times [beam]

2024-05-24 Thread via GitHub


tvalentyn commented on PR #31364:
URL: https://github.com/apache/beam/pull/31364#issuecomment-2130455697

   @liferoad could you please add details to the PR description about the 
nature of flakiness in case someone looks up this PR to understand why we aded 
it? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] added pytest.mark.flaky for test_big_query_write_temp_table_append_sc… [beam]

2024-05-24 Thread via GitHub


tvalentyn merged PR #31364:
URL: https://github.com/apache/beam/pull/31364


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Task] Publish Prism binary (os + architecture combos) artifacts on Beam Release. [beam]

2024-05-24 Thread via GitHub


lostluck commented on issue #29697:
URL: https://github.com/apache/beam/issues/29697#issuecomment-2130446494

   Per that linked PR, we ended up just building an action for it ourselves.
   
   Ultimately, we have the zipped files uploaded, along with signatures and 
hashes, so it fully matches what we upload to the Apache SVN repository.
   
   The binary zips we're putting up in GitHub release artifacts will be at URLs 
with the following pattern.
   
http://github.com/apache/beam/releases/download/RELEASE/apache_beam-RELEASE-prism-OS-ARCH.zip
   
   Note: Beam versions have a `v` prefix, eg `v2.56.0` which needs to match in 
the URL.
   
   To actually finish #28187 we need to have Java (#31402) and Python (#31403) 
download the binaries from GitHub. For that just using whatever native library 
that can make the http request and handle the various redirects GitHub puts us 
through.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Bug]: Flink 1.17 cannot be selected on the apache-beam package v2.56.0 [beam]

2024-05-24 Thread via GitHub


jaehyeon-kim closed issue #31378: [Bug]: Flink 1.17 cannot be selected on the 
apache-beam package v2.56.0
URL: https://github.com/apache/beam/issues/31378


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [ManagedIO] pass underlying transform URN as an annotation [beam]

2024-05-24 Thread via GitHub


chamikaramj commented on code in PR #31398:
URL: https://github.com/apache/beam/pull/31398#discussion_r1614065093


##
sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java:
##
@@ -522,11 +521,24 @@ public RunnerApi.PTransform translate(
 }
 
 if (spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM))) {
+  ExternalTransforms.SchemaTransformPayload payload =
+  
ExternalTransforms.SchemaTransformPayload.parseFrom(spec.getPayload());
+  String identifier = payload.getIdentifier();
   transformBuilder.putAnnotations(
-  SCHEMATRANSFORM_URN_KEY,
-  ByteString.copyFromUtf8(
-  
ExternalTransforms.SchemaTransformPayload.parseFrom(spec.getPayload())
-  .getIdentifier()));
+  BeamUrns.getConstant(Annotations.Enum.SCHEMATRANSFORM_URN_KEY),
+  ByteString.copyFromUtf8(identifier));
+  if (identifier.equals(MANAGED_TRANSFORM_URN)) {
+Schema configSchema =
+
SchemaTranslation.schemaFromProto(payload.getConfigurationSchema());
+Row configRow =
+
RowCoder.of(configSchema).decode(payload.getConfigurationRow().newInput());
+String underlyingIdentifier =
+MoreObjects.firstNonNull(
+configRow.getString("transform_identifier"), 
"unknown_identifier");
+transformBuilder.putAnnotations(

Review Comment:
   Is there a valid case where "transform_identifier" would not be set ? If not 
we should just error out.



##
model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto:
##
@@ -111,6 +111,15 @@ message BuilderMethod {
   bytes payload = 3;
 }
 
+message Annotations {
+  enum Enum {
+CONFIG_ROW_KEY = 0 [(org.apache.beam.model.pipeline.v1.beam_constant) = 
"config_row"];

Review Comment:
   Let's add short descriptions regarding each of these.



##
sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java:
##
@@ -522,11 +521,24 @@ public RunnerApi.PTransform translate(
 }
 
 if (spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM))) {
+  ExternalTransforms.SchemaTransformPayload payload =
+  
ExternalTransforms.SchemaTransformPayload.parseFrom(spec.getPayload());
+  String identifier = payload.getIdentifier();
   transformBuilder.putAnnotations(
-  SCHEMATRANSFORM_URN_KEY,
-  ByteString.copyFromUtf8(
-  
ExternalTransforms.SchemaTransformPayload.parseFrom(spec.getPayload())
-  .getIdentifier()));
+  BeamUrns.getConstant(Annotations.Enum.SCHEMATRANSFORM_URN_KEY),
+  ByteString.copyFromUtf8(identifier));
+  if (identifier.equals(MANAGED_TRANSFORM_URN)) {
+Schema configSchema =
+
SchemaTranslation.schemaFromProto(payload.getConfigurationSchema());
+Row configRow =
+
RowCoder.of(configSchema).decode(payload.getConfigurationRow().newInput());
+String underlyingIdentifier =
+MoreObjects.firstNonNull(
+configRow.getString("transform_identifier"), 
"unknown_identifier");

Review Comment:
   Let's add unit tests to make sure that the annotations get added correctly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add Iceberg workflows [beam]

2024-05-24 Thread via GitHub


github-actions[bot] commented on PR #31401:
URL: https://github.com/apache/beam/pull/31401#issuecomment-2130393642

   Checks are failing. Will not request review until checks are succeeding. If 
you'd like to override that behavior, comment `assign set of reviewers`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add iceberg load test [beam]

2024-05-24 Thread via GitHub


ahmedabu98 closed pull request #31399: Add iceberg load test
URL: https://github.com/apache/beam/pull/31399


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Add Iceberg workflows [beam]

2024-05-24 Thread via GitHub


ahmedabu98 opened a new pull request, #31401:
URL: https://github.com/apache/beam/pull/31401

   Adding workflows for Iceberg integration and load tests. 
   
   Integration test already exists. Load test will be added in #31392


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[I] [Bug]: ReadFromKafka does not work without max_num_records parameter [beam]

2024-05-24 Thread via GitHub


gomerudo opened a new issue, #31400:
URL: https://github.com/apache/beam/issues/31400

   ### What happened?
   
   
   When running a streaming job (with DirectRunner locally and with 
DataflowRunner on GCP) that uses the apache_beam.io.kafka.ReadFromKafka 
connector without `max_num_records`, the job does not process any information 
and instead gets trapped in an infinite loop of creating consumers that 
subscribe and get assigned a partition and offset but do not process any 
information. We are forcing `auto.offset.reset = earliest`.
   
   We verified that when setting `max_num_records` the job runs and process the 
information correctly both locally and on DataFlow. All of this makes us 
conclude that this is not a GCP issue but rather a Beam one.
   
   We noticed the infinite loop in the logs and we also noticed that Lenses 
never reports active members of the consumer group:
   
   
![image](https://github.com/apache/beam/assets/5495942/aeb6c311-0d53-410d-8c12-db0b24114a44)
   
![image](https://github.com/apache/beam/assets/5495942/46a0f513-f48c-448c-a264-7e100b463b65)
   
   We have tried the default Kafka configurations as well as custom ones. I'm 
just sharing the latest:
   
   ```
   pipeline
   | "ReadFromStream" >> apache_beam.io.kafka.ReadFromKafka(
   consumer_config={  # Also tested with a single broker
   "bootstrap.servers": 
"kafka-1782273228-1-1908664276.wmt-uscentral.kafka-v2-qarth-reg-stg.ms-df-messaging.prod-wmt-uscentral-4.prod.walmart.com:9092,kafka-1782274279-1-1908664354.wmt-uscentral.kafka-v2-qarth-reg-stg.ms-df-messaging.prod-wmt-uscentral-5.prod.walmart.com:9092,kafka-1782274320-1-1908664432.wmt-uscentral.kafka-v2-qarth-reg-stg.ms-df-messaging.prod-wmt-uscentral-6.prod.walmart.com:9092",
   "auto.offset.reset": "earliest",
   "fetch.max.bytes": "52428800",
   "fetch.min.bytes": "1",
   "fetch.max.wait.ms": "1000",
   "max.poll.interval.ms": "2",
   "max.poll.records": "10",
   "request.timeout.ms": "3",
   "session.timeout.ms": "45000",
   "timeout.ms": "1",
   "group.id": "test-group-id",
   "heartbeat.interval.ms": "200",
   "reconnect.backoff.ms": "100",
   "reconnect.backoff.max.ms": "1",
   },
   topics=["some-topic-i-cannot-share"],
   with_metadata=True,
   # max_num_records=1000  # For testing only
   ``
   
   This does not seem to be a problem of our Kafka Topic, since custom python 
clients (that use kafka-python) run successfully with the exact same Kafka 
configuration.
   
   Beam SDK language: Python
   Beam SDK version: 2.52.0
   
   Any feedback is greatly appreciated.
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [X] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add try-excepts around data sampler encoding [beam]

2024-05-24 Thread via GitHub


codecov[bot] commented on PR #31396:
URL: https://github.com/apache/beam/pull/31396#issuecomment-2130339852

   ## 
[Codecov](https://app.codecov.io/gh/apache/beam/pull/31396?dropdown=coverage=pr=h1_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 Report
   Attention: Patch coverage is `55.6%` with `4 lines` in your changes are 
missing coverage. Please review.
   > Project coverage is 77.62%. Comparing base 
[(`b34cf54`)](https://app.codecov.io/gh/apache/beam/commit/b34cf54b0a1511e88f76836d040c4ee67e420a71?dropdown=coverage=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 to head 
[(`7677083`)](https://app.codecov.io/gh/apache/beam/commit/76770833c99b7962fab52947cf4d4cbac44048e8?dropdown=coverage=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache).
   > Report is 280 commits behind head on master.
   
   | 
[Files](https://app.codecov.io/gh/apache/beam/pull/31396?dropdown=coverage=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | Patch % | Lines |
   |---|---|---|
   | 
[.../python/apache\_beam/runners/worker/data\_sampler.py](https://app.codecov.io/gh/apache/beam/pull/31396?src=pr=tree=sdks%2Fpython%2Fapache_beam%2Frunners%2Fworker%2Fdata_sampler.py_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9zYW1wbGVyLnB5)
 | 55.55% | [4 Missing :warning: 
](https://app.codecov.io/gh/apache/beam/pull/31396?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 |
   
   Additional details and impacted files
   
   
   ```diff
   @@ Coverage Diff  @@
   ## master   #31396  +/-   ##
   
   + Coverage 71.44%   77.62%   +6.18% 
   - Complexity 1474 2980+1506 
   
 Files   906  760 -146 
 Lines11327195406   -17865 
 Branches   1076 3229+2153 
   
   - Hits  8093174063-6868 
   + Misses3032719880   -10447 
   + Partials   2013 1463 -550 
   ```
   
   | 
[Flag](https://app.codecov.io/gh/apache/beam/pull/31396/flags?src=pr=flags_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | Coverage Δ | |
   |---|---|---|
   | 
[go](https://app.codecov.io/gh/apache/beam/pull/31396/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | `?` | |
   | 
[python](https://app.codecov.io/gh/apache/beam/pull/31396/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | `81.36% <55.55%> (-0.16%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click 
here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#carryforward-flags-in-the-pull-request-comment)
 to find out more.
   
   
   
   
   [:umbrella: View full report in Codecov by 
Sentry](https://app.codecov.io/gh/apache/beam/pull/31396?dropdown=coverage=pr=continue_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache).
   
   :loudspeaker: Have feedback on the report? [Share it 
here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [ManagedIO] pass underlying transform URN as an annotation [beam]

2024-05-24 Thread via GitHub


codecov[bot] commented on PR #31398:
URL: https://github.com/apache/beam/pull/31398#issuecomment-2130339854

   ## 
[Codecov](https://app.codecov.io/gh/apache/beam/pull/31398?dropdown=coverage=pr=h1_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 Report
   All modified and coverable lines are covered by tests :white_check_mark:
   > Project coverage is 68.53%. Comparing base 
[(`1f63196`)](https://app.codecov.io/gh/apache/beam/commit/1f6319624a44ce113c39602cd4b6fff6c6db638a?dropdown=coverage=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 to head 
[(`dd20a70`)](https://app.codecov.io/gh/apache/beam/commit/dd20a70d505eac888379d1d1139101b9a2ae31b7?dropdown=coverage=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache).
   > Report is 14 commits behind head on master.
   
   
   Additional details and impacted files
   
   
   ```diff
   @@ Coverage Diff  @@
   ## master   #31398  +/-   ##
   
   - Coverage 68.55%   68.53%   -0.02% 
 Complexity1492114921  
   
 Files  2636 2637   +1 
 Lines222092   95 +203 
 Branches  1182611826  
   
   + Hits 152250   152358 +108 
   - Misses6364763742  +95 
 Partials   6195 6195  
   ```
   
   | 
[Flag](https://app.codecov.io/gh/apache/beam/pull/31398/flags?src=pr=flags_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | Coverage Δ | |
   |---|---|---|
   | 
[python](https://app.codecov.io/gh/apache/beam/pull/31398/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | `81.37% <ø> (-0.09%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click 
here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#carryforward-flags-in-the-pull-request-comment)
 to find out more.
   
   
   
   
   [:umbrella: View full report in Codecov by 
Sentry](https://app.codecov.io/gh/apache/beam/pull/31398?dropdown=coverage=pr=continue_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache).
   
   :loudspeaker: Have feedback on the report? [Share it 
here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Add iceberg load test [beam]

2024-05-24 Thread via GitHub


ahmedabu98 opened a new pull request, #31399:
URL: https://github.com/apache/beam/pull/31399

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] remove processing/scheduling logic from StreamingDataflowWorker [beam]

2024-05-24 Thread via GitHub


m-trieu commented on code in PR #31317:
URL: https://github.com/apache/beam/pull/31317#discussion_r1614002144


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java:
##
@@ -97,18 +104,19 @@ public final class StreamingEngineClient {
   private StreamingEngineClient(
   JobHeader jobHeader,
   GetWorkBudget totalGetWorkBudget,
-  AtomicReference connections,
   GrpcWindmillStreamFactory streamFactory,
-  WorkItemProcessor workItemProcessor,
+  WorkItemScheduler workItemScheduler,
   ChannelCachingStubFactory channelCachingStubFactory,
   GetWorkBudgetDistributor getWorkBudgetDistributor,
   GrpcDispatcherClient dispatcherClient,
-  long clientId) {
+  long clientId,
+  Function 
workCommitterFactory,
+  Consumer> 
heartbeatResponseProcessor) {
 this.jobHeader = jobHeader;
 this.started = new AtomicBoolean();

Review Comment:
   finish method (like stop in StreamingDataflowWorker) is only there for 
testing so i added annotation there as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] remove processing/scheduling logic from StreamingDataflowWorker [beam]

2024-05-24 Thread via GitHub


m-trieu commented on code in PR #31317:
URL: https://github.com/apache/beam/pull/31317#discussion_r1614001559


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java:
##
@@ -97,18 +104,19 @@ public final class StreamingEngineClient {
   private StreamingEngineClient(
   JobHeader jobHeader,
   GetWorkBudget totalGetWorkBudget,
-  AtomicReference connections,
   GrpcWindmillStreamFactory streamFactory,
-  WorkItemProcessor workItemProcessor,
+  WorkItemScheduler workItemScheduler,
   ChannelCachingStubFactory channelCachingStubFactory,
   GetWorkBudgetDistributor getWorkBudgetDistributor,
   GrpcDispatcherClient dispatcherClient,
-  long clientId) {
+  long clientId,
+  Function 
workCommitterFactory,
+  Consumer> 
heartbeatResponseProcessor) {
 this.jobHeader = jobHeader;
 this.started = new AtomicBoolean();

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] remove processing/scheduling logic from StreamingDataflowWorker [beam]

2024-05-24 Thread via GitHub


m-trieu commented on code in PR #31317:
URL: https://github.com/apache/beam/pull/31317#discussion_r1613998893


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ExecuteWorkResult.java:
##
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.processing;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+
+/** Value class that represents the result of executing user DoFns. */
+@AutoValue
+abstract class ExecuteWorkResult {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] remove processing/scheduling logic from StreamingDataflowWorker [beam]

2024-05-24 Thread via GitHub


m-trieu commented on code in PR #31317:
URL: https://github.com/apache/beam/pull/31317#discussion_r1613998310


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.processing;
+
+import java.time.Duration;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class StreamingCommitFinalizer {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamingCommitFinalizer.class);
+  private static final Duration DEFAULT_CACHE_ENTRY_EXPIRY = 
Duration.ofMinutes(5L);
+  private final Cache onCommitFinalizedCache;
+  private final BoundedQueueExecutor workExecutor;
+
+  private StreamingCommitFinalizer(
+  Cache onCommitFinalizedCache, BoundedQueueExecutor 
workExecutor) {
+this.onCommitFinalizedCache = onCommitFinalizedCache;
+this.workExecutor = workExecutor;
+  }
+
+  static StreamingCommitFinalizer create(BoundedQueueExecutor workExecutor) {
+return new StreamingCommitFinalizer(
+
CacheBuilder.newBuilder().expireAfterWrite(DEFAULT_CACHE_ENTRY_EXPIRY).build(),
+workExecutor);
+  }
+
+  /**
+   * Stores a map of user worker generated id's and callbacks to execute once 
a commit has been
+   * successfully committed to the backing state store.
+   */
+  void cacheCommitFinalizers(Map commitCallbacks) {
+onCommitFinalizedCache.putAll(commitCallbacks);
+  }
+
+  /**
+   * Calls callbacks for WorkItem to mark that commit has been persisted 
(finalized) to the backing
+   * state store and to checkpoint the source.
+   */
+  void finalizeCommits(Windmill.WorkItem work) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fix iceberg catalog validation [beam]

2024-05-24 Thread via GitHub


github-actions[bot] commented on PR #31349:
URL: https://github.com/apache/beam/pull/31349#issuecomment-2130290123

   Stopping reviewer notifications for this pull request: review requested by 
someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fix iceberg catalog validation [beam]

2024-05-24 Thread via GitHub


ahmedabu98 commented on PR #31349:
URL: https://github.com/apache/beam/pull/31349#issuecomment-2130288066

   R: @VeronicaWasson 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Default SchemaTransform configs to snake_case [beam]

2024-05-24 Thread via GitHub


ahmedabu98 commented on PR #31374:
URL: https://github.com/apache/beam/pull/31374#issuecomment-2130287268

   Fixes #31353


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Default SchemaTransform configs to snake_case [beam]

2024-05-24 Thread via GitHub


github-actions[bot] commented on PR #31374:
URL: https://github.com/apache/beam/pull/31374#issuecomment-2130283093

   Stopping reviewer notifications for this pull request: review requested by 
someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Default SchemaTransform configs to snake_case [beam]

2024-05-24 Thread via GitHub


ahmedabu98 commented on PR #31374:
URL: https://github.com/apache/beam/pull/31374#issuecomment-2130281990

   R: @robertwb 
   R: @chamikaramj 
   
   CC: @lostluck (if this affects Go at all)
   CC: @Polber (if any more changes are needed for YAML)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] remove processing/scheduling logic from StreamingDataflowWorker [beam]

2024-05-24 Thread via GitHub


m-trieu commented on code in PR #31317:
URL: https://github.com/apache/beam/pull/31317#discussion_r1613967488


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutionState.java:
##
@@ -22,10 +22,19 @@
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.dataflow.worker.DataflowWorkExecutor;
 import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.Coder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @AutoValue
-public abstract class ExecutionState {
+@Internal
+public abstract class ExecutionState implements AutoCloseable {

Review Comment:
   `ExecutionState` is also specific to a computation so will add comment and 
rename accordingly. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] remove processing/scheduling logic from StreamingDataflowWorker [beam]

2024-05-24 Thread via GitHub


m-trieu commented on code in PR #31317:
URL: https://github.com/apache/beam/pull/31317#discussion_r1613964582


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutionState.java:
##
@@ -22,10 +22,19 @@
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.dataflow.worker.DataflowWorkExecutor;
 import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.Coder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @AutoValue
-public abstract class ExecutionState {
+@Internal
+public abstract class ExecutionState implements AutoCloseable {

Review Comment:
   I think it will be worth exploring cleaning up StreamingModeExecutionContext 
+ ExecutionState
   
   looks like `streaming.worker.ExecutionState` (there is another execution 
state StreamingModeExecutionState which extends `DataflowExecutionState` which 
extends `core.metrics.ExecutionState` which seems more for metric sampling)
   
   There could be a lot of clean up here
   
   Added a comment and TODO to clean up as well 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] remove processing/scheduling logic from StreamingDataflowWorker [beam]

2024-05-24 Thread via GitHub


m-trieu commented on code in PR #31317:
URL: https://github.com/apache/beam/pull/31317#discussion_r1613958837


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##
@@ -151,37 +154,22 @@ public boolean workIsFailed() {
 
   public void start(
   @Nullable Object key,
-  Windmill.WorkItem work,
-  Instant inputDataWatermark,
-  @Nullable Instant outputDataWatermark,
-  @Nullable Instant synchronizedProcessingTime,
+  Work work,
   WindmillStateReader stateReader,
   SideInputStateFetcher sideInputStateFetcher,
-  Windmill.WorkItemCommitRequest.Builder outputBuilder,
-  @Nullable Supplier workFailed) {
+  Windmill.WorkItemCommitRequest.Builder outputBuilder) {
 this.key = key;
-this.work = work;
-this.workIsFailed = (workFailed != null) ? workFailed : () -> 
Boolean.FALSE;
+this.work = work.getWorkItem();
+this.workIsFailed = work::isFailed;
 this.computationKey =
-WindmillComputationKey.create(computationId, work.getKey(), 
work.getShardingKey());
+WindmillComputationKey.create(
+computationId, work.getWorkItem().getKey(), 
work.getWorkItem().getShardingKey());

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] remove processing/scheduling logic from StreamingDataflowWorker [beam]

2024-05-24 Thread via GitHub


m-trieu commented on code in PR #31317:
URL: https://github.com/apache/beam/pull/31317#discussion_r1613957781


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##
@@ -87,6 +89,7 @@
 @SuppressWarnings({
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
 })
+@NotThreadSafe

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Bug]: KinesisIO source on FlinkRunner initializes the same splits twice [beam]

2024-05-24 Thread via GitHub


akashk99 commented on issue #31313:
URL: https://github.com/apache/beam/issues/31313#issuecomment-2130258010

   Im surprised this is a bug considering restoring from a flink savepoint is a 
pretty common use case, is it possible there some configuration missing 
somewhere? I havent been able to find anyone else online experiencing this same 
issue but I was able to replicate it using both kinesis and kafka. Given how 
common of a use case it is, Im not 100% sure I believe this is in fact a bug 
and most likely some user error on my part.
   
   I can make do without savepoints by utilizing kafka offset commits and 
consumer groups to ensure no data is lost, but cant figure out a way to not 
lose data that is windowed but not triggered when the flink application is 
stopped. Maybe you know of a solution to that problem?
   
   it seems like a lot of the subtasks arent being utilized when stripping IDs 
with beam_fn_api despite the number of shards being 20 and parallelism being 24 
(in theory should only be 4 idle subtasks) 
   
   https://github.com/apache/beam/assets/38279340/fda0120c-c1a2-4d68-929a-d4ab47f1c57c;>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [#29697] Add prism artifact building workflow. [beam]

2024-05-24 Thread via GitHub


github-actions[bot] commented on PR #31369:
URL: https://github.com/apache/beam/pull/31369#issuecomment-2130253610

   Stopping reviewer notifications for this pull request: review requested by 
someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [#29697] Add prism artifact building workflow. [beam]

2024-05-24 Thread via GitHub


lostluck commented on PR #31369:
URL: https://github.com/apache/beam/pull/31369#issuecomment-2130252309

   R: @kennknowles 
   cc: @damccorm @jrmccluskey @abacn


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Enable to handle NotFound and AccessDenied errors in the streaming insert of `BigQueryIO` [beam]

2024-05-24 Thread via GitHub


damccorm commented on PR #31310:
URL: https://github.com/apache/beam/pull/31310#issuecomment-2130202164

   > But, Dataflow doesn't recommend using a dead-letter topic.
   
   That is strictly for pubsub reads. There is nothing wrong with using a dead 
letter queue for failed records (this is actually an encouraged pattern, see 
"When performing writes from Dataflow to a connector, consider using an 
[ErrorHandler](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/errorhandling/ErrorHandler.html)
 to handle any failed writes..." 
https://cloud.google.com/dataflow/docs/guides/io-connector-best-practices)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] remove processing/scheduling logic from StreamingDataflowWorker [beam]

2024-05-24 Thread via GitHub


m-trieu commented on code in PR #31317:
URL: https://github.com/apache/beam/pull/31317#discussion_r1613894213


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##
@@ -193,31 +180,33 @@ public void start(
   for (StepContext stepContext : stepContexts) {
 stepContext.start(
 stateReader,
-inputDataWatermark,
+work.watermarks().inputDataWatermark(),

Review Comment:
   done
   
   added TODO, will probably update all usage where `Instant 
inputDataWatermark, Instant outputDatamark, Instant synchronizedProcessingTime` 
are used together with `Work.Watermarks`.  Since this is the case will move 
`Watermarks` outside of Work.java



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [flink] #31390 emit watermark with empty source [beam]

2024-05-24 Thread via GitHub


Abacn commented on PR #31391:
URL: https://github.com/apache/beam/pull/31391#issuecomment-2130169518

   Thanks, taking a look
   
   At the same time, have a couple of questions (not directly related to the 
change)
   
   - This sounds similar to #30969, what is the difference here ?
   
   - I also observed similar issue on JmsIO on Dataflow runner  ("watermark 
does not increase when there is no incoming data for a while") and the fix 
#30337 didn't work. I am wondering if #31390 is generic at SDK level and a fix 
could posed in general ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] remove processing/scheduling logic from StreamingDataflowWorker [beam]

2024-05-24 Thread via GitHub


m-trieu commented on code in PR #31317:
URL: https://github.com/apache/beam/pull/31317#discussion_r1613890068


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##
@@ -152,37 +154,22 @@ public boolean workIsFailed() {
 
   public void start(
   @Nullable Object key,
-  Windmill.WorkItem work,
-  Instant inputDataWatermark,
-  @Nullable Instant outputDataWatermark,
-  @Nullable Instant synchronizedProcessingTime,
+  Work work,
   WindmillStateReader stateReader,
   SideInputStateFetcher sideInputStateFetcher,
-  Windmill.WorkItemCommitRequest.Builder outputBuilder,
-  @Nullable Supplier workFailed) {
+  Windmill.WorkItemCommitRequest.Builder outputBuilder) {
 this.key = key;
-this.work = work;
-this.workIsFailed = (workFailed != null) ? workFailed : () -> 
Boolean.FALSE;
+this.work = work.getWorkItem();

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add support for ConnectionFactory ProviderFn in JmsIO [beam]

2024-05-24 Thread via GitHub


Abacn commented on PR #31264:
URL: https://github.com/apache/beam/pull/31264#issuecomment-2130151490

   waiting on author


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add try-excepts around data sampler encoding [beam]

2024-05-24 Thread via GitHub


github-actions[bot] commented on PR #31396:
URL: https://github.com/apache/beam/pull/31396#issuecomment-2130145185

   Assigning reviewers. If you would like to opt out of this review, comment 
`assign to next reviewer`:
   
   R: @riteshghorse for label python.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any 
comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review 
comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update beam-master version to 20240524. [beam]

2024-05-24 Thread via GitHub


damccorm merged PR #31393:
URL: https://github.com/apache/beam/pull/31393


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[I] [Bug]: error when trying to serialize headers with LongString [beam]

2024-05-24 Thread via GitHub


alexxfreitag opened a new issue, #31397:
URL: https://github.com/apache/beam/issues/31397

   ### What happened?
   
   Apache Beam version: 2.55.0
   Java version: 17
   
   When attempting to read messages with List type headers, a 
NotSerializableException error is being returned.
   
   Example:
   
![image](https://github.com/apache/beam/assets/32180411/f7acc148-fd20-4e3b-b630-57889a3ce164)
   
   If the headers are String, Number, or Boolean, it works normally. I have 
tried using a CustomCoder with `.setCoder(new CustomCoder())` to try to avoid 
the headers that are not important in this case, but it appears to throw the 
error before reaching this coder.
   
   Stacktrace:
   ```
   Error message from worker: java.lang.IllegalArgumentException: Unable to 
encode element 'ValueWithRecordId{id=[], 
value=org.apache.beam.sdk.io.rabbitmq.RabbitMqMessage@f9a0d014}' with coder 
'ValueWithRecordId$ValueWithRecordIdCoder(SerializableCoder(org.apache.beam.sdk.io.rabbitmq.RabbitMqMessage))'.
   
org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300)
   
org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
   
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:642)
   
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:558)
   
org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:384)
   
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:128)
   
org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:67)
   
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
   
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:218)
   
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
   
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
   
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1263)
   
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.lambda$scheduleWorkItem$11(StreamingDataflowWorker.java:999)
   
org.apache.beam.runners.dataflow.worker.streaming.Work.run(Work.java:81)
   
org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:183)
   
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
   
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
   java.base/java.lang.Thread.run(Thread.java:833)
   Caused by: java.io.NotSerializableException: 
com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString
   
java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1187)
   
java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
   java.base/java.util.ArrayList.writeObject(ArrayList.java:866)
   
java.base/jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
   
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   java.base/java.lang.reflect.Method.invoke(Method.java:568)
   
java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1070)
   
java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1516)
   
java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1438)
   
java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181)
   
java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
   java.base/java.util.HashMap.internalWriteEntries(HashMap.java:1944)
   java.base/java.util.HashMap.writeObject(HashMap.java:1497)
   
java.base/jdk.internal.reflect.GeneratedMethodAccessor61.invoke(Unknown Source)
   
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   java.base/java.lang.reflect.Method.invoke(Method.java:568)
   
java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1070)
   
java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1516)
   
java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1438)
   
java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181)

[PR] Add try-excepts around data sampler encoding [beam]

2024-05-24 Thread via GitHub


rohdesamuel opened a new pull request, #31396:
URL: https://github.com/apache/beam/pull/31396

   Add try-excepts around data sampler encoding
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] Mention the appropriate issue in your description (for example: 
`addresses #123`), if applicable. This will automatically add a link to the 
pull request in the issue. If you would like the issue to automatically close 
on merging the pull request, comment `fixes #` instead.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier).
   
   To check the build health, please visit 
[https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   

   [![Build python source distribution and 
wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python 
tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java 
tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go 
tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more 
information about GitHub Actions CI or the [workflows 
README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) 
to see a list of phrases to trigger workflows.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add option to disable validation of cloud bigtable change stream IO [beam]

2024-05-24 Thread via GitHub


svetakvsundhar commented on PR #31376:
URL: https://github.com/apache/beam/pull/31376#issuecomment-2130065700

   Sounds good. Will merge after CI completes! Thanks for the quick turnaround! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add option to disable validation of cloud bigtable change stream IO [beam]

2024-05-24 Thread via GitHub


tonytanger commented on PR #31376:
URL: https://github.com/apache/beam/pull/31376#issuecomment-2130063711

   @svetakvsundhar we're ready to merge. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update beam-master version to 20240524. [beam]

2024-05-24 Thread via GitHub


shunping commented on PR #31393:
URL: https://github.com/apache/beam/pull/31393#issuecomment-2130046766

   Run Python_ML PreCommit 3.12


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add option to disable validation of cloud bigtable change stream IO [beam]

2024-05-24 Thread via GitHub


jackdingilian commented on code in PR #31376:
URL: https://github.com/apache/beam/pull/31376#discussion_r1613798994


##
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##
@@ -2284,25 +2286,77 @@ public ReadChangeStream 
withBacklogReplicationAdjustment(Duration adjustment) {
   return toBuilder().setBacklogReplicationAdjustment(adjustment).build();
 }
 
+/**

Review Comment:
   We should note that this disables metadata table creation / update and that 
will need to be done explicitly when this is enabled (and the table isn't 
created / up-to-date)



##
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##
@@ -2284,25 +2286,77 @@ public ReadChangeStream 
withBacklogReplicationAdjustment(Duration adjustment) {
   return toBuilder().setBacklogReplicationAdjustment(adjustment).build();
 }
 
+/**
+ * Disables validation that the table being read and the metadata table 
exists, and that the app
+ * profile used is single cluster and single row transcation enabled. Set 
this option if the
+ * caller does not have additional Bigtable permissions to validate the 
configurations.
+ */
+public ReadChangeStream withoutValidation() {
+  BigtableConfig config = getBigtableConfig();
+  BigtableConfig metadataTableConfig = getMetadataTableBigtableConfig();
+  return toBuilder()
+  .setBigtableConfig(config.withValidate(false))
+  
.setMetadataTableBigtableConfig(metadataTableConfig.withValidate(false))
+  .setValidateConfig(false)
+  .build();
+}
+
+@Override
+public void validate(PipelineOptions options) {
+  BigtableServiceFactory factory = new BigtableServiceFactory();
+  if (getBigtableConfig().getValidate()) {
+try {
+  checkArgument(
+  factory.checkTableExists(getBigtableConfig(), options, 
getTableId()),
+  "Change Stream table %s does not exist",
+  getTableId());
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+}
+
+// Validate the app profile is single cluster and allows single row 
transactions.
+private void validateAppProfile(
+MetadataTableAdminDao metadataTableAdminDao, String appProfileId) {
+  checkArgument(metadataTableAdminDao != null);
+  checkArgument(
+  
metadataTableAdminDao.isAppProfileSingleClusterAndTransactional(appProfileId),
+  "App profile id '"
+  + appProfileId
+  + "' provided to access metadata table needs to use 
single-cluster routing policy"
+  + " and allow single-row transactions.");
+}
+
+// Update metadata table schema if allowed and required.
+private void createOrUpdateMetadataTable(
+MetadataTableAdminDao metadataTableAdminDao, String metadataTableId) {
+  boolean shouldCreateOrUpdateMetadataTable = true;
+  if (getCreateOrUpdateMetadataTable() != null) {
+shouldCreateOrUpdateMetadataTable = getCreateOrUpdateMetadataTable();
+  }
+  // Only try to create or update metadata table if option is set to true. 
Otherwise, just
+  // check if the table exists.
+  if (shouldCreateOrUpdateMetadataTable && 
metadataTableAdminDao.createMetadataTable()) {
+LOG.info("Created metadata table: " + metadataTableId);
+  }
+}
+
 @Override
 public PCollection> expand(PBegin 
input) {
   checkArgument(
   getBigtableConfig() != null,
   "BigtableIO ReadChangeStream is missing required configurations 
fields.");
-  checkArgument(
-  getBigtableConfig().getProjectId() != null, "Missing required 
projectId field.");
-  checkArgument(
-  getBigtableConfig().getInstanceId() != null, "Missing required 
instanceId field.");
+  getBigtableConfig().validate();
   checkArgument(getTableId() != null, "Missing required tableId field.");
 
   BigtableConfig bigtableConfig = getBigtableConfig();

Review Comment:
   Nit: move this before the two calls to getBigtableConfig within this method 
above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fix an incompatibility with hamcrest 2.2 [beam]

2024-05-24 Thread via GitHub


github-actions[bot] commented on PR #31395:
URL: https://github.com/apache/beam/pull/31395#issuecomment-2130008700

   Checks are failing. Will not request review until checks are succeeding. If 
you'd like to override that behavior, comment `assign set of reviewers`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update beam-master version to 20240524. [beam]

2024-05-24 Thread via GitHub


shunping commented on PR #31393:
URL: https://github.com/apache/beam/pull/31393#issuecomment-2129992467

   Run Python_Integration PreCommit 3.12


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Fix an incompatibility with hamcrest 2.2 [beam]

2024-05-24 Thread via GitHub


cushon opened a new pull request, #31395:
URL: https://github.com/apache/beam/pull/31395

   This test asserts on the message of an AssertionError thrown by a
   failing hamcrest assertion. The message changes in the latest versions
   of hamcrest.
   
   ```
   1) 
testBadCoderIsNotDeterministic(org.apache.beam.sdk.testing.CoderPropertiesTest)
   java.lang.AssertionError:
   Expected: a string containing "<84>, <101>, <115>, <116>, <68>"
but: was "
   Expected: [<24b>, <84b>, <101b>, <115b>, <116b>, <68b>, <97b>, <116b>, 
<97b>, <51b>, <51b>, <50b>, <54b>, <49b>, <57b>, <57b>, <52b>, <57b>, <48b>, 
<57b>, <57b>, <55b>, <48b>, <53b>, <53b>]
but: was [<24b>, <84b>, <101b>, <115b>, <116b>, <68b>, <97b>, <116b>, 
<97b>, <51b>, <51b>, <50b>, <54b>, <49b>, <57b>, <57b>, <52b>, <56b>, <50b>, 
<48b>, <52b>, <49b>, <54b>, <49b>, <53b>]"
   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
   at 
org.apache.beam.sdk.testing.CoderPropertiesTest.testBadCoderIsNotDeterministic(CoderPropertiesTest.java:123)
   ```
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] Mention the appropriate issue in your description (for example: 
`addresses #123`), if applicable. This will automatically add a link to the 
pull request in the issue. If you would like the issue to automatically close 
on merging the pull request, comment `fixes #` instead.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier).
   
   To check the build health, please visit 
[https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   

   [![Build python source distribution and 
wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python 
tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java 
tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go 
tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more 
information about GitHub Actions CI or the [workflows 
README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) 
to see a list of phrases to trigger workflows.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] added pytest.mark.flaky for test_big_query_write_temp_table_append_sc… [beam]

2024-05-24 Thread via GitHub


tvalentyn commented on PR #31364:
URL: https://github.com/apache/beam/pull/31364#issuecomment-2129883889

   PTAL at PreCommit Python Lint / beam_PreCommit_PythonLint (Run PythonLint 
PreCommit) (pull_request_target) Failing after 16m
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Don't install TF on Python 3.12, since there is no compatible version atm. [beam]

2024-05-24 Thread via GitHub


tvalentyn merged PR #31386:
URL: https://github.com/apache/beam/pull/31386


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Failing Test]: Python 3.12 ML Precommit suite is failing [beam]

2024-05-24 Thread via GitHub


tvalentyn closed issue #31385: [Failing Test]: Python 3.12 ML Precommit suite 
is failing
URL: https://github.com/apache/beam/issues/31385


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Don't install TF on Python 3.12, since there is no compatible version atm. [beam]

2024-05-24 Thread via GitHub


tvalentyn commented on PR #31386:
URL: https://github.com/apache/beam/pull/31386#issuecomment-2129872124

   Py3.12 suite passed: 
https://github.com/apache/beam/actions/runs/9216485596/job/25356862179?pr=31386,
 other suites are not affected by this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [flink] #31390 emit watermark with empty source [beam]

2024-05-24 Thread via GitHub


je-ik commented on PR #31391:
URL: https://github.com/apache/beam/pull/31391#issuecomment-2129870111

   > The PVR test seems to be stuck at `ViewTest.testTriggeredLatestSingleton`. 
I can observe this locally on both `master` and `release-2.56.0` branches. Does 
this check completed successfully recently?
   
   Hm, it passed on swcond run.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [flink] #31390 emit watermark with empty source [beam]

2024-05-24 Thread via GitHub


github-actions[bot] commented on PR #31391:
URL: https://github.com/apache/beam/pull/31391#issuecomment-2129810043

   Stopping reviewer notifications for this pull request: review requested by 
someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [flink] #31390 emit watermark with empty source [beam]

2024-05-24 Thread via GitHub


je-ik commented on PR #31391:
URL: https://github.com/apache/beam/pull/31391#issuecomment-2129807437

   R: @Abacn 
   
   The PVR test seems to be stuck at `ViewTest.testTriggeredLatestSingleton`. I 
can observe this locally on both `master` and `release-2.56.0` branches. Does 
this check completed successfully recently?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] 31112 drop flink 1.14 [beam]

2024-05-24 Thread via GitHub


github-actions[bot] commented on PR #31394:
URL: https://github.com/apache/beam/pull/31394#issuecomment-2129699085

   Stopping reviewer notifications for this pull request: review requested by 
someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] 31112 drop flink 1.14 [beam]

2024-05-24 Thread via GitHub


je-ik commented on PR #31394:
URL: https://github.com/apache/beam/pull/31394#issuecomment-2129696424

   R: @Abacn 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add iceberg load test [beam]

2024-05-24 Thread via GitHub


github-actions[bot] commented on PR #31392:
URL: https://github.com/apache/beam/pull/31392#issuecomment-2129679833

   Checks are failing. Will not request review until checks are succeeding. If 
you'd like to override that behavior, comment `assign set of reviewers`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] 31112 drop flink 1.14 [beam]

2024-05-24 Thread via GitHub


je-ik opened a new pull request, #31394:
URL: https://github.com/apache/beam/pull/31394

   Closes #31112 
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] Mention the appropriate issue in your description (for example: 
`addresses #123`), if applicable. This will automatically add a link to the 
pull request in the issue. If you would like the issue to automatically close 
on merging the pull request, comment `fixes #` instead.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier).
   
   To check the build health, please visit 
[https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   

   [![Build python source distribution and 
wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python 
tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java 
tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go 
tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more 
information about GitHub Actions CI or the [workflows 
README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) 
to see a list of phrases to trigger workflows.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update beam-master version to 20240524. [beam]

2024-05-24 Thread via GitHub


github-actions[bot] commented on PR #31393:
URL: https://github.com/apache/beam/pull/31393#issuecomment-2129657276

   Stopping reviewer notifications for this pull request: review requested by 
someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update beam-master version to 20240524. [beam]

2024-05-24 Thread via GitHub


shunping commented on PR #31393:
URL: https://github.com/apache/beam/pull/31393#issuecomment-2129654893

   R: @damccorm 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Update beam-master version to 20240524. [beam]

2024-05-24 Thread via GitHub


shunping opened a new pull request, #31393:
URL: https://github.com/apache/beam/pull/31393

   There is a recent update on the version dependency of 
`google-cloud-storage`, so we need a new beam-master image.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [flink] #31390 emit watermark with empty source [beam]

2024-05-24 Thread via GitHub


github-actions[bot] commented on PR #31391:
URL: https://github.com/apache/beam/pull/31391#issuecomment-2129622422

   Checks are failing. Will not request review until checks are succeeding. If 
you'd like to override that behavior, comment `assign set of reviewers`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka SchemaTransform translation [beam]

2024-05-24 Thread via GitHub


github-actions[bot] commented on PR #31362:
URL: https://github.com/apache/beam/pull/31362#issuecomment-2129593876

   Stopping reviewer notifications for this pull request: review requested by 
someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka SchemaTransform translation [beam]

2024-05-24 Thread via GitHub


ahmedabu98 commented on PR #31362:
URL: https://github.com/apache/beam/pull/31362#issuecomment-2129591262

   R: @chamikaramj 
   R: @damccorm 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Support MapState in DataflowRunner [beam]

2024-05-24 Thread via GitHub


scwhittle commented on issue #18200:
URL: https://github.com/apache/beam/issues/18200#issuecomment-2129583396

   We could do so by implementing on top of MultimapState similar to fnapi 
harness
   
https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java#L454


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Support SetState in Dataflow runner [beam]

2024-05-24 Thread via GitHub


scwhittle commented on issue #18140:
URL: https://github.com/apache/beam/issues/18140#issuecomment-2129581936

   We could do so by implementing on top of existing MultiMapState similiar to 
how it is done for the fnapi harness 
   
   
https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java#L370
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Add iceberg load test [beam]

2024-05-24 Thread via GitHub


ahmedabu98 opened a new pull request, #31392:
URL: https://github.com/apache/beam/pull/31392

   Adding a load test for IcebergIO
   
   Separating integration test added in #31220 into its own suite


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add support for ConnectionFactory ProviderFn in JmsIO [beam]

2024-05-24 Thread via GitHub


github-actions[bot] commented on PR #31264:
URL: https://github.com/apache/beam/pull/31264#issuecomment-2129388979

   Reminder, please take a look at this pr: @damondouglas @shunping 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Bug]: Watermarks and Windowing Not Working with FlinkRunner and KinesisIO Read Transform [beam]

2024-05-24 Thread via GitHub


yelianevich commented on issue #31085:
URL: https://github.com/apache/beam/issues/31085#issuecomment-2129353403

   @je-ik sorry for dummy question, is the only way to get binaries is by 
building it locally and then run the test? Is there a faster way to get 
binaries?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] remove processing/scheduling logic from StreamingDataflowWorker [beam]

2024-05-24 Thread via GitHub


scwhittle commented on code in PR #31317:
URL: https://github.com/apache/beam/pull/31317#discussion_r1613137200


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##
@@ -87,6 +89,7 @@
 @SuppressWarnings({
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
 })
+@NotThreadSafe

Review Comment:
   internal annotation
   
   Can you improve class comment as well to help clarify difference between 
Work and this?  This is reused across work instances
   



##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##
@@ -152,37 +154,22 @@ public boolean workIsFailed() {
 
   public void start(
   @Nullable Object key,
-  Windmill.WorkItem work,
-  Instant inputDataWatermark,
-  @Nullable Instant outputDataWatermark,
-  @Nullable Instant synchronizedProcessingTime,
+  Work work,
   WindmillStateReader stateReader,
   SideInputStateFetcher sideInputStateFetcher,
-  Windmill.WorkItemCommitRequest.Builder outputBuilder,
-  @Nullable Supplier workFailed) {
+  Windmill.WorkItemCommitRequest.Builder outputBuilder) {
 this.key = key;
-this.work = work;
-this.workIsFailed = (workFailed != null) ? workFailed : () -> 
Boolean.FALSE;
+this.work = work.getWorkItem();

Review Comment:
   should we just store the work? and get rid of workIsFailed?



##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutionState.java:
##
@@ -35,8 +42,13 @@ public abstract class ExecutionState {
 
   public abstract ExecutionStateTracker executionStateTracker();
 
-  public static ExecutionState.Builder builder() {
-return new AutoValue_ExecutionState.Builder();
+  public final void close() {

Review Comment:
   If it's selectively closed probably better without autosclosable. I think I 
got a lint warning about creating something autosclosable not in a try block.
   
   keep the comment though



##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutionState.java:
##
@@ -22,10 +22,19 @@
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.dataflow.worker.DataflowWorkExecutor;
 import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.Coder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @AutoValue
-public abstract class ExecutionState {
+@Internal
+public abstract class ExecutionState implements AutoCloseable {

Review Comment:
   Can you add a comment here? 
   We have a bunch of context-y things for processing floating around.  Would 
be good to note that this is per-computation and that it can be reused across 
bundles/work with resetting of things it contains.
   
   Separately I wonder if we should merge this and 
StreamingModeExecutionStateContext?



##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##
@@ -193,31 +180,33 @@ public void start(
   for (StepContext stepContext : stepContexts) {
 stepContext.start(
 stateReader,
-inputDataWatermark,
+work.watermarks().inputDataWatermark(),

Review Comment:
   could pass in watermarks instead of separate params (coudl be followup)



##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ExecuteWorkResult.java:
##
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.processing;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+
+/** Value class that represents the result of executing user DoFns. */
+@AutoValue
+abstract class ExecuteWorkResult {

Review Comment:
   seems like this 

Re: [I] [Bug]: KinesisIO source on FlinkRunner initializes the same splits twice [beam]

2024-05-24 Thread via GitHub


je-ik commented on issue #31313:
URL: https://github.com/apache/beam/issues/31313#issuecomment-2129123482

   I don't know the root cause, it seems that Flink does not send the snapshot 
state after restore from savepoint. I observed this on the Impulse (I suspected 
that it affects only bounded sources running in unbounded mode, but it seems it 
is not the case). It might be a Beam bug or a Flink bug.
   
   > Hi, yes that did fix the issue. thank you! For my understanding, what does 
this option do exactly? And should I expect any performance degradation?
   
   The flag turns on different expansion for Read transform - it uses 
splittable DoFn (SDF), which uses Impulse which was fixed earlier. Performance 
should be similar to classical Read.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [flink] #31390 emit watermark with empty source [beam]

2024-05-24 Thread via GitHub


je-ik opened a new pull request, #31391:
URL: https://github.com/apache/beam/pull/31391

   Closes #31390 
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] Mention the appropriate issue in your description (for example: 
`addresses #123`), if applicable. This will automatically add a link to the 
pull request in the issue. If you would like the issue to automatically close 
on merging the pull request, comment `fixes #` instead.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier).
   
   To check the build health, please visit 
[https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   

   [![Build python source distribution and 
wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python 
tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java 
tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go 
tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more 
information about GitHub Actions CI or the [workflows 
README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) 
to see a list of phrases to trigger workflows.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [yaml]: Normalize BigtableIO [beam]

2024-05-24 Thread via GitHub


ffernandez92 commented on issue #28672:
URL: https://github.com/apache/beam/issues/28672#issuecomment-2128911363

   Hey @Polber , are you actively working on this issue at the moment? I may 
have some time in the next couple of weeks, so I could try to work on it unless 
you have already started.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Bug]: KinesisIO source on FlinkRunner initializes the same splits twice [beam]

2024-05-24 Thread via GitHub


akashk99 commented on issue #31313:
URL: https://github.com/apache/beam/issues/31313#issuecomment-2128866628

   I am noticing actually a lot of back pressure using this approach despite 
downstream operators having low CPU usage. Is the fix to the root cause 
relatively straight forward in which case I can implement it in a forked 
version of the repo? or is it more involved?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Bug]: KinesisIO source on FlinkRunner initializes the same splits twice [beam]

2024-05-24 Thread via GitHub


akashk99 commented on issue #31313:
URL: https://github.com/apache/beam/issues/31313#issuecomment-2128818033

   @je-ik Hi, yes that did fix the issue. thank you! For my understanding, what 
does this option do exactly? And should I expect any performance degradation? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Bug]: KinesisIO source on FlinkRunner initializes the same splits twice [beam]

2024-05-24 Thread via GitHub


je-ik commented on issue #31313:
URL: https://github.com/apache/beam/issues/31313#issuecomment-2128718496

   I suppose this is (similar, but) different issue, probably caused by the 
same underlying bug. #30903 fixed Impulse only. Does using 
`--experiments=beam_fn_api` fix the issue?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Bump github.com/aws/aws-sdk-go-v2/service/s3 from 1.42.2 to 1.54.3 in /sdks [beam]

2024-05-24 Thread via GitHub


github-actions[bot] commented on PR #31389:
URL: https://github.com/apache/beam/pull/31389#issuecomment-2128615640

   Checks are failing. Will not request review until checks are succeeding. If 
you'd like to override that behavior, comment `assign set of reviewers`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org