Re: Where to specify trust.jks
Hi Utkarsh, you can pass a path in GCS (or a filesystem), and the workers should be able to download it onto themselves. You'd pass `gs://my-bucket-name/path/to/trust.jks`. Can you try that? Best -P. On Wed, May 10, 2023 at 1:58 PM Utkarsh Parekh wrote: > Hi, > > I'm testing a streaming app using kafka, Dafaflow, and Apache beam > [Python]. > > "Error message from worker: org.apache.beam.sdk.util.UserCodeException: > java.lang.RuntimeException: org.apache.kafka.common.KafkaException: Failed > to construct kafka consumer > org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) > org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown > Source) > org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:888) > org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) > org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) > org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789) > org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:825) > org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) > org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) > org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789) > org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143) > org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357) > org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527) > org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78) > org.apache.beam.sdk.io.Read$OutputSingleSource.processElement(Read.java:1018) > org.apache.beam.sdk.io.Read$OutputSingleSource$DoFnInvoker.invokeProcessElement(Unknown > Source) > org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800) > org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) > org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) > org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213) > org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158) > org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:533) > org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151) > org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116) > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162) > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > java.base/java.lang.Thread.run(Thread.java:829) Caused by: > java.lang.RuntimeException: org.apache.kafka.common.KafkaException: Failed > to construct kafka consumer > org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:136) > org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:45) > org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.initializeCurrentReader(Read.java:843) > org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.getProgress(Read.java:975) > org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DefaultGetSize.invokeGetSize(ByteBuddyDoFnInvokerFactory.java:432) > org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown > Source) > org.apache.beam.fn.harness.FnApiDoFnRunner$SizedRestrictionNonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2319) > org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527) > org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78) > org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:524) > Caused by:
Re: Exactly once guarantee with beam, flink and kafka
Hi Ifat, I don't work with Beam and Flink anymore so the below is from memory. When I went through what you are going through, I spent quite some time to get EOS working. While it kinda worked, I ran into several issues which I'll describe below. The issue with EOS is that it uses Kafka transactions. If you're using the Flink state, that transaction id is saved there. When your pipeline restarts, there's all kinds of issues that pop up because of that. Furthermore, I never managed to get more than one record per transaction, your offsets will not be sequential. Furthermore, performance was very poor. The somewhat working solution is to forget EOS and rely on the Flink state alone. What that means is that Beam KafkaIO will use the Flink state to know where to start reading when the pipeline restarts (again, my memory is fuzzy). Inflight records, those who have already been read when the pipeline is stopped will depend on how your transform manages its state. There's another pitfall though. Beam and Flink have outdated integration when it comes to Flink's state. You can have a look at the mailing list, I sent a few emails about the topic [1]. Things like schema evolution is tricky, sometimes even filesystem paths are saved in the state which reduces portability. Anyways, I found that Beam running on Flink is a path filled with pitfalls. Use caution. Hopefully my 2 cents are of help. Cheers, Cristian [1] https://lists.apache.org/list?user@beam.apache.org:lte=4y:Flink%20state On Wed, May 10, 2023 at 11:03 PM Ifat Afek (Nokia) wrote: > Hi, > > > > I’m trying to configure exactly-once recovery for pipelines that are > running on top of flink with kafka sources and sinks. > > > > I wrote two simple pipelines that pass messages from one topic to another > (for testing purposes): > > pipeline1 reads messages from topic_a and writes them to topic_b > > pipeline2 reads messages from topic_b and writes them to topic_c > > > > I tried to configure exactly-once behavior in both pipelines by: > >- execution.checkpointing.mode: EXACTLY_ONCE >- KafkaIO.read().withReadCommitted() >- KafkaIO.writeRecords().withEOS() > > > > I started running both pipelines and then killed the task manager of > pipeline1. After it recovered, it started reading messages that were > already processed from topic_a. I assume the offset was determined by the > last checkpoint. Then, pipeline2 started receiving messages that were > already processed from topic_b. So in practice I got at-least-once behavior > instead of exactly-once. > > > > I noticed that on pipeline2, there were two consumers (even though there > is only one KafkaIO.read()): one named after my pipeline, and the other was > created in KafkaIOUtils.getOffsetConsumerConfig() with hard-coded > isolation.level=read_uncommitted. > > > > I wrote similar pipelines in Flink and managed to achieve exactly-once > behavior using: > >- execution.checkpointing.mode: EXACTLY_ONCE >- isolation.level: read_committed >- >KafkaSink.builder().setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) >- KafkaSink.builder().setTransactionalIdPrefix("my-trx-id-prefix") > > > > I’m trying to figure out several things. > >1. Why are there two consumers in the pipeline? Why is one of them >hard-coded with read_uncommitted? Is it the reason that pipeline2 gets >duplicate events after pipeline1 is recovered? >2. Is KafkaIO exactly-once implementation equivalent to flink >KafkaSink implementation? >3. Is there anything else that I missed? How to make it work in Beam? > > > > Thanks, > > Ifat > > >
Beam SQL found limitations
HI, After experimenting with Beam SQL I did find some limitations. Testing on near latest main (precisely `5aad2467469fafd2ed2dd89012bc80c0cd76b168`) with Calcite, direct runner and openjdk version "11.0.19". Please let me know if some of them are known/ worked on/ have tickets or have estimated fix time. I believe most of them are low hanging fruits or just my thinking is not right for the problem. If this is the case please guide me to some working solution. From my perspective it is ok to have a fix just on master - no need to wait for release. Priority order: - 7. Windowing function on a stream - in detail - How to get previous message for a key? setting expiration arbitrary big is ok, but access to the previous record must happen fairly quickly not wait for the big window to finish and emit the expired keys. Ideally would like to do it in pure beam pipeline as saving to some external key/value store and then reading this here could potentially result in some race conditions which in I would like to avoid, but if its the only option - let it be. - 5. single UNION ALL possible - 4. UNNEST ARRAY with nested ROW - 3. Using * when there is Row type present in the schema - 1. `CROSS JOIN` between two unrelated tables is not supported - even if one is a static number table - 2. ROW construction not supported. It is not possible to nest data Below queries tat I use to testing this scenarios. Thank You for looking at this topics! Best Wiśniowski Piotr --- -- 1. `CROSS JOIN` between two unrelated tables is not supported. --- -- Only supported is `CROSS JOIN UNNEST` when exploding array from same table. -- It is not possible to number rows WITHdata_table AS( SELECT1ASa ), number_table AS( SELECT numbers_exploded ASnumber_item FROMUNNEST(ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]) ASnumbers_exploded ) SELECT data_table.a, number_table.number_item FROMdata_table CROSS JOINnumber_table ; -- CROSS JOIN, JOIN ON FALSE is not supported! --- -- 2. ROW construction not supported. It is not possible to nest data --- SELECTROW(1,2,'a') ASr; -- java.lang.NoSuchFieldException: EXPR$0 SELECT(1,2,'a') ASr; -- java.lang.NoSuchFieldException: EXPR$0 SELECTMAP['field1',1,'field2','a']; -- Parameters must be of the same type SELECTMAP['field1','b','field2','a']; -- null -- WORKAROUND - manually compose json string, -- drawback - decomposing might be not supported or would need to be also based on string operations SELECT('{"field1":"'||1||'","field2":"'||'a'||'"}') AS`json_object`; --- -- 3. Using * when there is Row type present in the schema --- CREATEEXTERNALTABLEtest_tmp_1( `ref`VARCHAR, `author`ROW< `name`VARCHAR, `email`VARCHAR ) TYPEtext LOCATION'python/dbt/tests/using_star_limitation.jsonl' TBLPROPERTIES '{"format":"json", "deadLetterFile":"top/python/dbt/tests/dead"}'; SELECT*FROMtest_tmp_1; -- java.lang.NoSuchFieldException: name -- WORKAROUND - refer to columns explicitly with alias SELECT `ref`ASref_value, test_tmp_1.`author`.`name`ASauthor_name, -- table name must be referenced explicitly - this could be fixed too test_tmp_1.`author`.`email`ASauthor_name FROMtest_tmp_1; --- -- 4. UNNEST ARRAY with nested ROW --- CREATEEXTERNALTABLEtest_tmp( `ref`VARCHAR, `commits`ARRAY ) TYPEtext LOCATION'python/dbt/tests/array_with_nested_rows_limitation.jsonl' TBLPROPERTIES '{"format":"json", "deadLetterFile":"python/dbt/tests/dead"}'; SELECT test_tmp.`ref`ASbranch_name, commit_item.`id`AScommit_hash, commit_item.`author`.`name`ASauthor_name FROMtest_tmp CROSS JOINUNNEST(test_tmp.commits) AScommit_item; -- Row expected 4 fields (Field{name=ref, description=, type=STRING, options={{}}}, Field{name=commits, description=, type=ARRAYSTRING, author ROW> NOT NULL>, options={{}}}, Field{name=id, description=, type=STRING, options={{}}}, Field{name=author, description=, type=ROW, options={{}}}). initialized with 5 fields. -- limited WORKAROUND - refer to array elements by index and UNION ALL the items into rows -- note workaround that uses number table will not work as CROSS JOIN is not supported WITHdata_parsed AS( SELECT test_tmp.`ref`ASbranch_id, test_tmp.commits[1].`id`AScommit_hash, test_tmp.commits[1].`author`.`name`ASauthor_name FROMtest_tmp UNION ALL-- this unfortunately works only for two indexes SELECT test_tmp.`ref`ASbranch_id, test_tmp.commits[2].`id`AScommit_hash, test_tmp.commits[2].`author`.`name`ASauthor_name FROMtest_tmp ) SELECT* FROMdata_parsed WHEREauthor_name IS NOT NULL ; -- better WORKAROUND - but tricky to get right (fragile) WITHdata_with_number_array AS( SELECT test_tmp.`ref`ASbranch_name, -- there must be some primary key in the data to join on later due to CROSS JOIN support limitation ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16] ASnumber_array, CARDINALITY(test_tmp.commits) AScommits_size
Re: state broadcasting in flink
Hi, i suggest review https://beam.apache.org/blog/timely-processing/ hope this helps Sigalit On Thu, May 18, 2023 at 9:27 AM Zheng Ni wrote: > Hi There, > > Does beam support flink's state broadcasting feature mentioned in the link > below? if yes, is there any beam doc/example available? > > > https://flink.apache.org/2019/06/26/a-practical-guide-to-broadcast-state-in-apache-flink/ > > Thanks, > Zheng > > > >
state broadcasting in flink
Hi There, Does beam support flink's state broadcasting feature mentioned in the link below? if yes, is there any beam doc/example available? https://flink.apache.org/2019/06/26/a-practical-guide-to-broadcast-state-in-apache-flink/ Thanks, Zheng