Re: Where to specify trust.jks

2023-05-18 Thread Pablo Estrada via user
Hi Utkarsh,
you can pass a path in GCS (or a filesystem), and the workers should be
able to download it onto themselves. You'd pass
`gs://my-bucket-name/path/to/trust.jks`. Can you try that?
Best
-P.

On Wed, May 10, 2023 at 1:58 PM Utkarsh Parekh 
wrote:

> Hi,
>
> I'm testing a streaming app using kafka, Dafaflow, and Apache beam
> [Python].
>
>  "Error message from worker: org.apache.beam.sdk.util.UserCodeException:
> java.lang.RuntimeException: org.apache.kafka.common.KafkaException: Failed
> to construct kafka consumer
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
> Source)
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:888)
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:825)
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
> org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
> org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357)
> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
> org.apache.beam.sdk.io.Read$OutputSingleSource.processElement(Read.java:1018)
> org.apache.beam.sdk.io.Read$OutputSingleSource$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:533)
> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> java.base/java.lang.Thread.run(Thread.java:829) Caused by:
> java.lang.RuntimeException: org.apache.kafka.common.KafkaException: Failed
> to construct kafka consumer
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:136)
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:45)
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.initializeCurrentReader(Read.java:843)
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.getProgress(Read.java:975)
> org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DefaultGetSize.invokeGetSize(ByteBuddyDoFnInvokerFactory.java:432)
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
> Source)
> org.apache.beam.fn.harness.FnApiDoFnRunner$SizedRestrictionNonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2319)
> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:524)
> Caused by: 

Re: Exactly once guarantee with beam, flink and kafka

2023-05-18 Thread Cristian Constantinescu
Hi Ifat,

I don't work with Beam and Flink anymore so the below is from memory.

When I went through what you are going through, I spent quite some time to
get EOS working. While it kinda worked, I ran into several issues which
I'll describe below.

The issue with EOS is that it uses Kafka transactions. If you're using the
Flink state, that transaction id is saved there. When your pipeline
restarts, there's all kinds of issues that pop up because of that.
Furthermore, I never managed to get more than one record per transaction,
your offsets will not be sequential. Furthermore, performance was very poor.

The somewhat working solution is to forget EOS and rely on the Flink state
alone. What that means is that Beam KafkaIO will use the Flink state to
know where to start reading when the pipeline restarts (again, my memory is
fuzzy). Inflight records, those who have already been read when the
pipeline is stopped will depend on how your transform manages its state.

There's another pitfall though. Beam and Flink have outdated integration
when it comes to Flink's state. You can have a look at the mailing list, I
sent a few emails about the topic [1]. Things like schema evolution is
tricky, sometimes even filesystem paths are saved in the state which
reduces portability.

Anyways, I found that Beam running on Flink is a path filled with pitfalls.
Use caution.

Hopefully my 2 cents are of help.

Cheers,
Cristian

[1] https://lists.apache.org/list?user@beam.apache.org:lte=4y:Flink%20state

On Wed, May 10, 2023 at 11:03 PM Ifat Afek (Nokia) 
wrote:

> Hi,
>
>
>
> I’m trying to configure exactly-once recovery for pipelines that are
> running on top of flink with kafka sources and sinks.
>
>
>
> I wrote two simple pipelines that pass messages from one topic to another
> (for testing purposes):
>
> pipeline1 reads messages from topic_a and writes them to topic_b
>
> pipeline2 reads messages from topic_b and writes them to topic_c
>
>
>
> I tried to configure exactly-once behavior in both pipelines by:
>
>- execution.checkpointing.mode: EXACTLY_ONCE
>- KafkaIO.read().withReadCommitted()
>- KafkaIO.writeRecords().withEOS()
>
>
>
> I started running both pipelines and then killed the task manager of
> pipeline1. After it recovered, it started reading messages that were
> already processed from topic_a. I assume the offset was determined by the
> last checkpoint. Then, pipeline2 started receiving messages that were
> already processed from topic_b. So in practice I got at-least-once behavior
> instead of exactly-once.
>
>
>
> I noticed that on pipeline2, there were two consumers (even though there
> is only one KafkaIO.read()): one named after my pipeline, and the other was
> created in KafkaIOUtils.getOffsetConsumerConfig() with hard-coded
> isolation.level=read_uncommitted.
>
>
>
> I wrote similar pipelines in Flink and managed to achieve exactly-once
> behavior using:
>
>- execution.checkpointing.mode: EXACTLY_ONCE
>- isolation.level: read_committed
>-
>KafkaSink.builder().setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
>- KafkaSink.builder().setTransactionalIdPrefix("my-trx-id-prefix")
>
>
>
> I’m trying to figure out several things.
>
>1. Why are there two consumers in the pipeline? Why is one of them
>hard-coded with read_uncommitted? Is it the reason that pipeline2 gets
>duplicate events after pipeline1 is recovered?
>2. Is KafkaIO exactly-once implementation equivalent to flink
>KafkaSink implementation?
>3. Is there anything else that I missed? How to make it work in Beam?
>
>
>
> Thanks,
>
> Ifat
>
>
>


Beam SQL found limitations

2023-05-18 Thread Wiśniowski Piotr

HI,

After experimenting with Beam SQL I did find some limitations. Testing 
on near latest main (precisely 
`5aad2467469fafd2ed2dd89012bc80c0cd76b168`) with Calcite, direct runner 
and openjdk version "11.0.19". Please let me know if some of them are 
known/ worked on/ have tickets or have estimated fix time. I believe 
most of them are low hanging fruits or just my thinking is not right for 
the problem. If this is the case please guide me to some working solution.


 From my perspective it is ok to have a fix just on master - no need to 
wait for release. Priority order:
- 7. Windowing function on a stream - in detail - How to get previous 
message for a key? setting expiration arbitrary big is ok, but access to 
the previous record must happen fairly quickly not wait for the big 
window to finish and emit the expired keys. Ideally would like to do it 
in pure beam pipeline as saving to some external key/value store and 
then reading this here could potentially result in some race conditions 
which in I would like to avoid, but if its the only option - let it be.

- 5. single UNION ALL possible
- 4. UNNEST ARRAY with nested ROW
- 3. Using * when there is Row type present in the schema
- 1. `CROSS JOIN` between two unrelated tables is not supported - even 
if one is a static number table

- 2. ROW construction not supported. It is not possible to nest data

Below queries tat I use to testing this scenarios.

Thank You for looking at this topics!

Best

Wiśniowski Piotr

---
-- 1. `CROSS JOIN` between two unrelated tables is not supported.
---
-- Only supported is `CROSS JOIN UNNEST` when exploding array from same 
table.

-- It is not possible to number rows
WITHdata_table AS(
SELECT1ASa
),
number_table AS(
SELECT
numbers_exploded ASnumber_item
FROMUNNEST(ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]) ASnumbers_exploded
)
SELECT
data_table.a,
number_table.number_item
FROMdata_table
CROSS JOINnumber_table
;
-- CROSS JOIN, JOIN ON FALSE is not supported!
---
-- 2. ROW construction not supported. It is not possible to nest data
---
SELECTROW(1,2,'a') ASr; -- java.lang.NoSuchFieldException: EXPR$0
SELECT(1,2,'a') ASr; -- java.lang.NoSuchFieldException: EXPR$0
SELECTMAP['field1',1,'field2','a']; -- Parameters must be of the same type
SELECTMAP['field1','b','field2','a']; -- null
-- WORKAROUND - manually compose json string,
-- drawback - decomposing might be not supported or would need to be 
also based on string operations

SELECT('{"field1":"'||1||'","field2":"'||'a'||'"}') AS`json_object`;
---
-- 3. Using * when there is Row type present in the schema
---
CREATEEXTERNALTABLEtest_tmp_1(
`ref`VARCHAR,
`author`ROW<
`name`VARCHAR,
`email`VARCHAR



)
TYPEtext
LOCATION'python/dbt/tests/using_star_limitation.jsonl'
TBLPROPERTIES '{"format":"json", 
"deadLetterFile":"top/python/dbt/tests/dead"}';

SELECT*FROMtest_tmp_1;
-- java.lang.NoSuchFieldException: name
-- WORKAROUND - refer to columns explicitly with alias
SELECT
`ref`ASref_value,
test_tmp_1.`author`.`name`ASauthor_name, -- table name must be 
referenced explicitly - this could be fixed too

test_tmp_1.`author`.`email`ASauthor_name
FROMtest_tmp_1;
---
-- 4. UNNEST ARRAY with nested ROW
---
CREATEEXTERNALTABLEtest_tmp(
`ref`VARCHAR,
`commits`ARRAY




)
TYPEtext
LOCATION'python/dbt/tests/array_with_nested_rows_limitation.jsonl'
TBLPROPERTIES '{"format":"json", "deadLetterFile":"python/dbt/tests/dead"}';
SELECT
test_tmp.`ref`ASbranch_name,
commit_item.`id`AScommit_hash,
commit_item.`author`.`name`ASauthor_name
FROMtest_tmp
CROSS JOINUNNEST(test_tmp.commits) AScommit_item;
-- Row expected 4 fields (Field{name=ref, description=, type=STRING, 
options={{}}}, Field{name=commits, description=, type=ARRAYSTRING, author ROW> NOT NULL>, options={{}}}, 
Field{name=id, description=, type=STRING, options={{}}}, 
Field{name=author, description=, type=ROW, 
options={{}}}). initialized with 5 fields.
-- limited WORKAROUND - refer to array elements by index and UNION ALL 
the items into rows
-- note workaround that uses number table will not work as CROSS JOIN is 
not supported

WITHdata_parsed AS(
SELECT
test_tmp.`ref`ASbranch_id,
test_tmp.commits[1].`id`AScommit_hash,
test_tmp.commits[1].`author`.`name`ASauthor_name
FROMtest_tmp
UNION ALL-- this unfortunately works only for two indexes
SELECT
test_tmp.`ref`ASbranch_id,
test_tmp.commits[2].`id`AScommit_hash,
test_tmp.commits[2].`author`.`name`ASauthor_name
FROMtest_tmp
)
SELECT*
FROMdata_parsed
WHEREauthor_name IS NOT NULL
;
-- better WORKAROUND - but tricky to get right (fragile)
WITHdata_with_number_array AS(
SELECT
test_tmp.`ref`ASbranch_name, -- there must be some primary key in the 
data to join on later due to CROSS JOIN support limitation

ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16] ASnumber_array,
CARDINALITY(test_tmp.commits) AScommits_size

Re: state broadcasting in flink

2023-05-18 Thread Sigalit Eliazov
Hi,

i suggest review
https://beam.apache.org/blog/timely-processing/

hope this helps
Sigalit


On Thu, May 18, 2023 at 9:27 AM Zheng Ni  wrote:

> Hi There,
>
> Does beam support flink's state broadcasting feature mentioned in the link
> below? if yes, is there any beam doc/example available?
>
>
> https://flink.apache.org/2019/06/26/a-practical-guide-to-broadcast-state-in-apache-flink/
>
> Thanks,
> Zheng
>
>
>
>


state broadcasting in flink

2023-05-18 Thread Zheng Ni
Hi There,

Does beam support flink's state broadcasting feature mentioned in the link
below? if yes, is there any beam doc/example available?


https://flink.apache.org/2019/06/26/a-practical-guide-to-broadcast-state-in-apache-flink/

Thanks,
Zheng